Skip to content

Commit

Permalink
Fix review issues
Browse files Browse the repository at this point in the history
  • Loading branch information
UgnineSirdis committed Oct 2, 2024
1 parent 1a67a9e commit 70fe1c4
Showing 1 changed file with 27 additions and 23 deletions.
50 changes: 27 additions & 23 deletions internal/table/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,10 @@ func New(ctx context.Context, cc grpc.ClientConnInterface, config *config.Config
)

return &Client{
clock: config.Clock(),
config: config,
service: Ydb_Table_V1.NewTableServiceClient(cc),
cc: cc,
clock: config.Clock(),
config: config,
client: Ydb_Table_V1.NewTableServiceClient(cc),
cc: cc,
build: func(ctx context.Context) (s *session, err error) {
return newSession(ctx, cc, config)
},
Expand Down Expand Up @@ -89,13 +89,13 @@ func New(ctx context.Context, cc grpc.ClientConnInterface, config *config.Config
// A Client is safe for use by multiple goroutines simultaneously.
type Client struct {
// read-only fields
config *config.Config
service Ydb_Table_V1.TableServiceClient
build sessionBuilder
cc grpc.ClientConnInterface
clock clockwork.Clock
pool sessionPool
done chan struct{}
config *config.Config
client Ydb_Table_V1.TableServiceClient
build sessionBuilder
cc grpc.ClientConnInterface
clock clockwork.Clock
pool sessionPool
done chan struct{}
}

func (c *Client) CreateSession(ctx context.Context, opts ...table.Option) (_ table.ClosableSession, err error) {
Expand Down Expand Up @@ -289,21 +289,25 @@ func (c *Client) BulkUpsert(
onDone(finalErr, attempts)
}()

return retryBackoff(ctx, c.pool, func(ctx context.Context, s table.Session) (err error) {
attempts++
request := Ydb_Table.BulkUpsertRequest{
Table: tableName,
}
finalErr = data.ApplyBulkUpsertRequest(a, (*table.BulkUpsertRequest)(&request))
if finalErr != nil {
return finalErr
}

req := Ydb_Table.BulkUpsertRequest{
Table: tableName,
}
err = data.ApplyBulkUpsertRequest(a, (*table.BulkUpsertRequest)(&req))
if err != nil {
return err
}
finalErr = retry.Retry(ctx,
func(ctx context.Context) (err error) {
attempts++
_, err = c.client.BulkUpsert(ctx, &request, config.CallOptions...)

_, err = c.service.BulkUpsert(ctx, &req, config.CallOptions...)
return err
},
config.RetryOptions...,
)

return err
}, config.RetryOptions...)
return xerrors.WithStackTrace(finalErr)
}

func executeTxOperation(ctx context.Context, c *Client, op table.TxOperation, tx table.Transaction) (err error) {
Expand Down

0 comments on commit 70fe1c4

Please sign in to comment.