Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make Create path uniform with Update/Delete #187

Merged
merged 2 commits into from
Oct 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 7 additions & 7 deletions pkg/kine/drivers/generic/generic.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ import (
"time"

"github.com/canonical/k8s-dqlite/pkg/kine/prepared"
"github.com/canonical/k8s-dqlite/pkg/kine/server"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
"go.opentelemetry.io/otel"
Expand Down Expand Up @@ -519,7 +518,7 @@ func (d *Generic) Count(ctx context.Context, prefix, startKey string, revision i
return rev.Int64, id, err
}

func (d *Generic) Create(ctx context.Context, key string, value []byte, ttl int64) (rev int64, err error) {
func (d *Generic) Create(ctx context.Context, key string, value []byte, ttl int64) (rev int64, succeeded bool, err error) {
ctx, span := otelTracer.Start(ctx, fmt.Sprintf("%s.Create", otelName))

defer func() {
Expand All @@ -541,16 +540,17 @@ func (d *Generic) Create(ctx context.Context, key string, value []byte, ttl int6
result, err := d.execute(ctx, "create_sql", d.CreateSQL, key, ttl, value, key)
if err != nil {
logrus.WithError(err).Error("failed to create key")
return 0, err
return 0, false, err
}

if insertCount, err := result.RowsAffected(); err != nil {
return 0, err
return 0, false, err
} else if insertCount == 0 {
return 0, server.ErrKeyExists
return 0, false, nil
}
return result.LastInsertId()
rev, err = result.LastInsertId()
return rev, true, err
}

func (d *Generic) Update(ctx context.Context, key string, value []byte, preRev, ttl int64) (rev int64, updated bool, err error) {
ctx, span := otelTracer.Start(ctx, fmt.Sprintf("%s.Update", otelName))
defer func() {
Expand Down
12 changes: 6 additions & 6 deletions pkg/kine/logstructured/logstructured.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,9 @@ type Log interface {
Wait()
CurrentRevision(ctx context.Context) (int64, error)
List(ctx context.Context, prefix, startKey string, limit, revision int64, includeDeletes bool) (int64, []*server.Event, error)
Create(ctx context.Context, key string, value []byte, lease int64) (int64, error)
Update(ctx context.Context, key string, value []byte, revision, lease int64) (revRet int64, updateRet bool, errRet error)
Delete(ctx context.Context, key string, revision int64) (revRet int64, deleted bool, errRet error)
Create(ctx context.Context, key string, value []byte, lease int64) (rev int64, created bool, err error)
Update(ctx context.Context, key string, value []byte, revision, lease int64) (rev int64, updated bool, err error)
Delete(ctx context.Context, key string, revision int64) (rev int64, deleted bool, err error)
After(ctx context.Context, prefix string, revision, limit int64) (int64, []*server.Event, error)
Watch(ctx context.Context, prefix string) <-chan []*server.Event
Count(ctx context.Context, prefix, startKey string, revision int64) (int64, int64, error)
Expand Down Expand Up @@ -137,10 +137,10 @@ func (l *LogStructured) adjustRevision(ctx context.Context, rev *int64) {
}
}

func (l *LogStructured) Create(ctx context.Context, key string, value []byte, lease int64) (rev int64, err error) {
rev, err = l.log.Create(ctx, key, value, lease)
func (l *LogStructured) Create(ctx context.Context, key string, value []byte, lease int64) (rev int64, created bool, err error) {
rev, created, err = l.log.Create(ctx, key, value, lease)
logrus.Debugf("CREATE %s, size=%d, lease=%d => rev=%d, err=%v", key, len(value), lease, rev, err)
return rev, err
return rev, created, err
}

func (l *LogStructured) Delete(ctx context.Context, key string, revision int64) (revRet int64, deleted bool, errRet error) {
Expand Down
17 changes: 9 additions & 8 deletions pkg/kine/logstructured/sqllog/sql.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ type Dialect interface {
CurrentRevision(ctx context.Context) (int64, error)
AfterPrefix(ctx context.Context, prefix string, rev, limit int64) (*sql.Rows, error)
After(ctx context.Context, rev, limit int64) (*sql.Rows, error)
Create(ctx context.Context, key string, value []byte, lease int64) (int64, error)
Create(ctx context.Context, key string, value []byte, lease int64) (int64, bool, error)
Update(ctx context.Context, key string, value []byte, prevRev, lease int64) (int64, bool, error)
Delete(ctx context.Context, key string, revision int64) (int64, bool, error)
DeleteRevision(ctx context.Context, revision int64) error
Expand Down Expand Up @@ -106,7 +106,7 @@ func (s *SQLLog) compactStart(ctx context.Context) error {
}

if len(events) == 0 {
_, err := s.Create(ctx, "compact_rev_key", []byte(""), 0)
_, _, err := s.Create(ctx, "compact_rev_key", []byte(""), 0)
return err
} else if len(events) == 1 {
return nil
Expand Down Expand Up @@ -493,14 +493,15 @@ func (s *SQLLog) Count(ctx context.Context, prefix, startKey string, revision in
return s.d.Count(ctx, prefix, startKey, revision)
}

func (s *SQLLog) Create(ctx context.Context, key string, value []byte, lease int64) (rev int64, err error) {
rev, err = s.d.Create(ctx, key, value, lease)
func (s *SQLLog) Create(ctx context.Context, key string, value []byte, lease int64) (int64, bool, error) {
rev, created, err := s.d.Create(ctx, key, value, lease)
if err != nil {
return 0, err
return 0, false, err
}

s.notifyWatcherPoll(rev)
return rev, nil
if created {
s.notifyWatcherPoll(rev)
}
return rev, created, nil
}

func (s *SQLLog) Delete(ctx context.Context, key string, revision int64) (rev int64, deleted bool, err error) {
Expand Down
12 changes: 7 additions & 5 deletions pkg/kine/server/create.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ func isCreate(txn *etcdserverpb.TxnRequest) *etcdserverpb.PutRequest {
return nil
}

func (l *LimitedServer) create(ctx context.Context, put *etcdserverpb.PutRequest, txn *etcdserverpb.TxnRequest) (*etcdserverpb.TxnResponse, error) {
func (l *LimitedServer) create(ctx context.Context, put *etcdserverpb.PutRequest) (*etcdserverpb.TxnResponse, error) {
var err error
createCnt.Add(ctx, 1)
ctx, span := otelTracer.Start(ctx, fmt.Sprintf("%s.create", otelName))
Expand All @@ -42,16 +42,18 @@ func (l *LimitedServer) create(ctx context.Context, put *etcdserverpb.PutRequest
return nil, unsupported("prevKv")
}

rev, err := l.backend.Create(ctx, string(put.Key), put.Value, put.Lease)
rev, created, err := l.backend.Create(ctx, string(put.Key), put.Value, put.Lease)
if err != nil {
return nil, err
}

span.SetAttributes(attribute.Int64("revision", rev))
if err == ErrKeyExists {
if !created {
span.AddEvent("key exists")
return &etcdserverpb.TxnResponse{
Header: txnHeader(rev),
Succeeded: false,
}, nil
} else if err != nil {
return nil, err
}

return &etcdserverpb.TxnResponse{
Expand Down
2 changes: 1 addition & 1 deletion pkg/kine/server/limited.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ func txnHeader(rev int64) *etcdserverpb.ResponseHeader {

func (l *LimitedServer) Txn(ctx context.Context, txn *etcdserverpb.TxnRequest) (*etcdserverpb.TxnResponse, error) {
if put := isCreate(txn); put != nil {
return l.create(ctx, put, txn)
return l.create(ctx, put)
}
if rev, key, ok := isDelete(txn); ok {
return l.delete(ctx, key, rev)
Expand Down
2 changes: 1 addition & 1 deletion pkg/kine/server/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ type Backend interface {
Start(ctx context.Context) error
Wait()
Get(ctx context.Context, key, rangeEnd string, limit, revision int64) (int64, *KeyValue, error)
Create(ctx context.Context, key string, value []byte, lease int64) (int64, error)
Create(ctx context.Context, key string, value []byte, lease int64) (int64, bool, error)
Delete(ctx context.Context, key string, revision int64) (int64, bool, error)
List(ctx context.Context, prefix, startKey string, limit, revision int64) (int64, []*KeyValue, error)
Count(ctx context.Context, prefix, startKey string, revision int64) (int64, int64, error)
Expand Down
24 changes: 8 additions & 16 deletions pkg/kine/server/update.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,9 @@ func isUpdate(txn *etcdserverpb.TxnRequest) (int64, string, []byte, int64, bool)

func (l *LimitedServer) update(ctx context.Context, rev int64, key string, value []byte, lease int64) (*etcdserverpb.TxnResponse, error) {
var (
kv *KeyValue
updated bool
err error
kv *KeyValue
succeeded bool
err error
)
updateCnt.Add(ctx, 1)

Expand All @@ -45,29 +45,21 @@ func (l *LimitedServer) update(ctx context.Context, rev int64, key string, value
)

if rev == 0 {
rev, err = l.backend.Create(ctx, key, value, lease)
if err == ErrKeyExists {
return &etcdserverpb.TxnResponse{
Header: txnHeader(rev),
Succeeded: false,
}, nil
} else {
updated = true
}
rev, succeeded, err = l.backend.Create(ctx, key, value, lease)
} else {
rev, updated, err = l.backend.Update(ctx, key, value, rev, lease)
rev, succeeded, err = l.backend.Update(ctx, key, value, rev, lease)
}
if err != nil {
return nil, err
}
span.SetAttributes(attribute.Bool("updated", updated), attribute.Int64("revision", rev))
span.SetAttributes(attribute.Bool("updated", succeeded), attribute.Int64("revision", rev))

resp := &etcdserverpb.TxnResponse{
Header: txnHeader(rev),
Succeeded: updated,
Succeeded: succeeded,
}

if updated {
if succeeded {
resp.Responses = []*etcdserverpb.ResponseOp{
{
Response: &etcdserverpb.ResponseOp_ResponsePut{
Expand Down
Loading