From 58c2308fc66162d49370c0046eaadffaeecd209c Mon Sep 17 00:00:00 2001 From: Greg Date: Tue, 16 Jul 2024 19:29:31 +0900 Subject: [PATCH] add request count in ConsumedCapacity --- batchget.go | 1 + batchwrite.go | 1 + delete.go | 1 + put.go | 1 + query.go | 4 ++++ scan.go | 2 ++ table.go | 14 ++++++++++++++ table_test.go | 2 +- tx.go | 3 +++ update.go | 1 + 10 files changed, 29 insertions(+), 1 deletion(-) diff --git a/batchget.go b/batchget.go index 0a16d6b..58251dd 100644 --- a/batchget.go +++ b/batchget.go @@ -341,6 +341,7 @@ redo: itr.err = itr.bg.batch.table.db.retry(ctx, func() error { var err error itr.output, err = itr.bg.batch.table.db.client.BatchGetItem(ctx, itr.input) + itr.bg.cc.incRequests() return err }) if itr.err != nil { diff --git a/batchwrite.go b/batchwrite.go index adf5021..b2d0704 100644 --- a/batchwrite.go +++ b/batchwrite.go @@ -139,6 +139,7 @@ func (bw *BatchWrite) Run(ctx context.Context) (wrote int, err error) { err := bw.batch.table.db.retry(ctx, func() error { var err error res, err = bw.batch.table.db.client.BatchWriteItem(ctx, req) + bw.cc.incRequests() return err }) if err != nil { diff --git a/delete.go b/delete.go index 67a3710..d29f91d 100644 --- a/delete.go +++ b/delete.go @@ -108,6 +108,7 @@ func (d *Delete) run(ctx context.Context) (*dynamodb.DeleteItemOutput, error) { err := d.table.db.retry(ctx, func() error { var err error output, err = d.table.db.client.DeleteItem(ctx, input) + d.cc.incRequests() return err }) if d.cc != nil && output != nil { diff --git a/put.go b/put.go index 08fc277..1c26a95 100644 --- a/put.go +++ b/put.go @@ -81,6 +81,7 @@ func (p *Put) run(ctx context.Context) (output *dynamodb.PutItemOutput, err erro req := p.input() p.table.db.retry(ctx, func() error { output, err = p.table.db.client.PutItem(ctx, req) + p.cc.incRequests() return err }) if p.cc != nil && output != nil { diff --git a/query.go b/query.go index 8e49d5c..61c28fa 100644 --- a/query.go +++ b/query.go @@ -221,6 +221,7 @@ func (q *Query) One(ctx context.Context, out interface{}) error { err := q.table.db.retry(ctx, func() error { var err error res, err = q.table.db.client.GetItem(ctx, req) + q.cc.incRequests() if err != nil { return err } @@ -246,6 +247,7 @@ func (q *Query) One(ctx context.Context, out interface{}) error { err := q.table.db.retry(ctx, func() error { var err error res, err = q.table.db.client.Query(ctx, req) + q.cc.incRequests() if err != nil { return err } @@ -288,6 +290,7 @@ func (q *Query) Count(ctx context.Context) (int, error) { err := q.table.db.retry(ctx, func() error { var err error res, err = q.table.db.client.Query(ctx, input) + q.cc.incRequests() if err != nil { return err } @@ -392,6 +395,7 @@ func (itr *queryIter) Next(ctx context.Context, out interface{}) bool { itr.err = itr.query.table.db.retry(ctx, func() error { var err error itr.output, err = itr.query.table.db.client.Query(ctx, itr.input) + itr.query.cc.incRequests() return err }) diff --git a/scan.go b/scan.go index f649a7f..1069713 100644 --- a/scan.go +++ b/scan.go @@ -254,6 +254,7 @@ func (s *Scan) Count(ctx context.Context) (int, error) { err := s.table.db.retry(ctx, func() error { var err error out, err = s.table.db.client.Scan(ctx, input) + s.cc.incRequests() return err }) if err != nil { @@ -399,6 +400,7 @@ redo: itr.err = itr.scan.table.db.retry(ctx, func() error { var err error itr.output, err = itr.scan.table.db.client.Scan(ctx, itr.input) + itr.scan.cc.incRequests() return err }) diff --git a/table.go b/table.go index 056e991..4c54682 100644 --- a/table.go +++ b/table.go @@ -207,6 +207,7 @@ type ConsumedCapacity struct { // Write is the total number of write capacity units consumed during this operation. // This seems to be only set for transactions. Write float64 + // GSI is a map of Global Secondary Index names to total consumed capacity units. GSI map[string]float64 // GSIRead is a map of Global Secondary Index names to consumed read capacity units. @@ -215,6 +216,7 @@ type ConsumedCapacity struct { // GSIWrite is a map of Global Secondary Index names to consumed write capacity units. // This seems to be only set for transactions. GSIWrite map[string]float64 + // LSI is a map of Local Secondary Index names to total consumed capacity units. LSI map[string]float64 // LSIRead is a map of Local Secondary Index names to consumed read capacity units. @@ -223,6 +225,7 @@ type ConsumedCapacity struct { // LSIWrite is a map of Local Secondary Index names to consumed write capacity units. // This seems to be only set for transactions. LSIWrite map[string]float64 + // Table is the amount of total throughput consumed by the table. Table float64 // TableRead is the amount of read throughput consumed by the table. @@ -233,6 +236,9 @@ type ConsumedCapacity struct { TableWrite float64 // TableName is the name of the table affected by this operation. TableName string + + // Requests is the number of SDK requests made against DynamoDB's API. + Requests int } func addConsumedCapacity(cc *ConsumedCapacity, raw *types.ConsumedCapacity) { @@ -302,6 +308,13 @@ func addConsumedCapacity(cc *ConsumedCapacity, raw *types.ConsumedCapacity) { } } +func (cc *ConsumedCapacity) incRequests() { + if cc == nil { + return + } + cc.Requests++ +} + func mergeConsumedCapacity(dst, src *ConsumedCapacity) { if dst == nil || src == nil { return @@ -363,4 +376,5 @@ func mergeConsumedCapacity(dst, src *ConsumedCapacity) { if dst.TableName == "" && src.TableName != "" { dst.TableName = src.TableName } + dst.Requests += src.Requests } diff --git a/table_test.go b/table_test.go index ca356cc..99a2f3b 100644 --- a/table_test.go +++ b/table_test.go @@ -170,7 +170,7 @@ func TestAddConsumedCapacity(t *testing.T) { } var cc = new(ConsumedCapacity) - addConsumedCapacity(cc, raw) + cc.add(raw) if !reflect.DeepEqual(cc, expected) { t.Error("bad ConsumedCapacity:", cc, "≠", expected) diff --git a/tx.go b/tx.go index 1caecd5..1df1597 100644 --- a/tx.go +++ b/tx.go @@ -70,6 +70,7 @@ func (tx *GetTx) Run(ctx context.Context) error { err = tx.db.retry(ctx, func() error { var err error resp, err = tx.db.client.TransactGetItems(ctx, input) + tx.cc.incRequests() if tx.cc != nil && resp != nil { for _, cc := range resp.ConsumedCapacity { addConsumedCapacity(tx.cc, &cc) @@ -110,6 +111,7 @@ func (tx *GetTx) All(ctx context.Context, out interface{}) error { err = tx.db.retry(ctx, func() error { var err error resp, err = tx.db.client.TransactGetItems(ctx, input) + tx.cc.incRequests() if tx.cc != nil && resp != nil { for _, cc := range resp.ConsumedCapacity { addConsumedCapacity(tx.cc, &cc) @@ -256,6 +258,7 @@ func (tx *WriteTx) Run(ctx context.Context) error { } err = tx.db.retry(ctx, func() error { out, err := tx.db.client.TransactWriteItems(ctx, input) + tx.cc.incRequests() if tx.cc != nil && out != nil { for _, cc := range out.ConsumedCapacity { addConsumedCapacity(tx.cc, &cc) diff --git a/update.go b/update.go index d8e6d0d..286171d 100644 --- a/update.go +++ b/update.go @@ -347,6 +347,7 @@ func (u *Update) run(ctx context.Context) (*dynamodb.UpdateItemOutput, error) { err := u.table.db.retry(ctx, func() error { var err error output, err = u.table.db.client.UpdateItem(ctx, input) + u.cc.incRequests() return err }) if u.cc != nil && output != nil {