Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

rbd: Dont depend on image state to issue resync #4076

Merged
merged 2 commits into from
Aug 30, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion internal/cephfs/util/mountinfo.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import (

"github.com/container-storage-interface/spec/lib/go/csi"
// google.golang.org/protobuf/encoding doesn't offer MessageV2().
"github.com/golang/protobuf/proto" //nolint:staticcheck // See comment above.
"github.com/golang/protobuf/proto" //nolint:all // See comment above.
"google.golang.org/protobuf/encoding/protojson"
)

Expand Down
121 changes: 102 additions & 19 deletions internal/csi-addons/rbd/replication.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,11 @@ const (
// imageMirrorModeJournal uses journaling to propagate RBD images between
// ceph clusters.
imageMirrorModeJournal imageMirroringMode = "journal"

// imageCreationTimeKey is the key to get/set the image creation timestamp
// on the image metadata. The key is starting with `.rbd` so that it will
// not get replicated to remote cluster.
imageCreationTimeKey = ".rbd.image.creation_time"
)

const (
Expand Down Expand Up @@ -480,6 +485,14 @@ func (rs *ReplicationServer) DemoteVolume(ctx context.Context,

return nil, err
}

creationTime, err := rbdVol.GetImageCreationTime()
if err != nil {
log.ErrorLog(ctx, err.Error())

return nil, status.Error(codes.Internal, err.Error())
}

mirroringInfo, err := rbdVol.GetImageMirroringInfo()
if err != nil {
log.ErrorLog(ctx, err.Error())
Expand All @@ -497,6 +510,17 @@ func (rs *ReplicationServer) DemoteVolume(ctx context.Context,

// demote image to secondary
if mirroringInfo.Primary {
// store the image creation time for resync
_, err = rbdVol.GetMetadata(imageCreationTimeKey)
if err != nil && errors.Is(err, librbd.ErrNotFound) {
err = rbdVol.SetMetadata(imageCreationTimeKey, timestampToString(creationTime))
}
if err != nil {
log.ErrorLog(ctx, err.Error())

return nil, status.Error(codes.Internal, err.Error())
}

err = rbdVol.DemoteImage()
if err != nil {
log.ErrorLog(ctx, err.Error())
Expand Down Expand Up @@ -538,6 +562,8 @@ func checkRemoteSiteStatus(ctx context.Context, mirrorStatus *librbd.GlobalMirro
// ResyncVolume extracts the RBD volume information from the volumeID, If the
// image is present, mirroring is enabled and the image is in demoted state.
// If yes it will resync the image to correct the split-brain.
//
//nolint:gocyclo,cyclop // TODO: reduce complexity
func (rs *ReplicationServer) ResyncVolume(ctx context.Context,
req *replication.ResyncVolumeRequest,
) (*replication.ResyncVolumeResponse, error) {
Expand Down Expand Up @@ -578,7 +604,7 @@ func (rs *ReplicationServer) ResyncVolume(ctx context.Context,
// it takes time for this operation.
log.ErrorLog(ctx, err.Error())

return nil, status.Error(codes.Aborted, err.Error())
return nil, status.Errorf(codes.Aborted, err.Error())
}

if mirroringInfo.State != librbd.MirrorImageEnabled {
Expand Down Expand Up @@ -637,14 +663,40 @@ func (rs *ReplicationServer) ResyncVolume(ctx context.Context,
ready = checkRemoteSiteStatus(ctx, mirrorStatus)
}

err = rbdVol.ResyncVol(localStatus, req.Force)
creationTime, err := rbdVol.GetImageCreationTime()
if err != nil {
return nil, getGRPCError(err)
return nil, status.Errorf(codes.Internal, "failed to get image info for %s: %s", rbdVol, err.Error())
}

err = checkVolumeResyncStatus(localStatus)
// image creation time is stored in the image metadata. it looks like
// `"seconds:1692879841 nanos:631526669"`
savedImageTime, err := rbdVol.GetMetadata(imageCreationTimeKey)
if err != nil {
return nil, status.Error(codes.Internal, err.Error())
return nil, status.Errorf(codes.Internal,
"failed to get %s key from image metadata for %s: %s",
imageCreationTimeKey,
rbdVol,
err.Error())
}

st, err := timestampFromString(savedImageTime)
if err != nil {
return nil, status.Errorf(codes.Internal, "failed to parse image creation time: %s", err.Error())
}
log.DebugLog(ctx, "image %s, savedImageTime=%v, currentImageTime=%v", rbdVol, st, creationTime.AsTime())

if req.Force && st.Equal(creationTime.AsTime()) {
err = rbdVol.ResyncVol(localStatus)
if err != nil {
return nil, getGRPCError(err)
}
}

if !ready {
err = checkVolumeResyncStatus(localStatus)
if err != nil {
return nil, status.Error(codes.Internal, err.Error())
}
}

err = rbdVol.RepairResyncedImageID(ctx, ready)
Expand All @@ -659,6 +711,40 @@ func (rs *ReplicationServer) ResyncVolume(ctx context.Context,
return resp, nil
}

// timestampToString converts the time.Time object to string.
func timestampToString(st *timestamppb.Timestamp) string {
return fmt.Sprintf("seconds:%d nanos:%d", st.Seconds, st.Nanos)
}

// timestampFromString parses the timestamp string and returns the time.Time
// object.
func timestampFromString(timestamp string) (time.Time, error) {
st := time.Time{}
parts := strings.Fields(timestamp)
if len(parts) != 2 {
return st, fmt.Errorf("failed to parse image creation time: %s", timestamp)
}
if len(strings.Split(parts[0], ":")) != 2 || len(strings.Split(parts[1], ":")) != 2 {
return st, fmt.Errorf("failed to parse image creation time: %s", timestamp)
}
secondsStr := strings.Split(parts[0], ":")[1]
nanosStr := strings.Split(parts[1], ":")[1]

seconds, err := strconv.ParseInt(secondsStr, 10, 64)
if err != nil {
return st, fmt.Errorf("failed to parse image creation time seconds: %s", err.Error())
}

nanos, err := strconv.ParseInt(nanosStr, 10, 32)
if err != nil {
return st, fmt.Errorf("failed to parse image creation time nenos: %s", err.Error())
}

st = time.Unix(seconds, nanos)

return st, nil
}

func getGRPCError(err error) error {
if err == nil {
return status.Error(codes.OK, codes.OK.String())
Expand Down Expand Up @@ -854,20 +940,17 @@ func getLastSyncInfo(description string) (*replication.GetVolumeReplicationInfoR
}

func checkVolumeResyncStatus(localStatus librbd.SiteMirrorImageStatus) error {
// we are considering 2 states to check resync started and resync completed
// as below. all other states will be considered as an error state so that
// cephCSI can return error message and volume replication operator can
// mark the VolumeReplication status as not resyncing for the volume.

// If the state is Replaying means the resync is going on.
// Once the volume on remote cluster is demoted and resync
// is completed the image state will be moved to UNKNOWN.
// RBD mirror daemon should be always running on the primary cluster.
if !localStatus.Up || (localStatus.State != librbd.MirrorImageStatusStateReplaying &&
localStatus.State != librbd.MirrorImageStatusStateUnknown) {
return fmt.Errorf(
"not resyncing. Local status: daemon up=%t image is in %q state",
localStatus.Up, localStatus.State)
// we are considering local snapshot timestamp to check if the resync is
// started or not, if we dont see local_snapshot_timestamp in the
// description of localStatus, we are returning error. if we see the local
// snapshot timestamp in the description we return resyncing started.
description := localStatus.Description
resp, err := getLastSyncInfo(description)
if err != nil {
return fmt.Errorf("failed to get last sync info: %w", err)
}
if resp.LastSyncTime == nil {
return errors.New("last sync time is nil")
}

return nil
Expand Down
127 changes: 70 additions & 57 deletions internal/csi-addons/rbd/replication_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -225,74 +225,26 @@ func TestCheckVolumeResyncStatus(t *testing.T) {
wantErr bool
}{
{
name: "test when rbd mirror daemon is not running",
name: "test when local_snapshot_timestamp is non zero",
args: librbd.SiteMirrorImageStatus{
State: librbd.MirrorImageStatusStateUnknown,
Up: false,
},
wantErr: true,
},
{
name: "test for unknown state",
args: librbd.SiteMirrorImageStatus{
State: librbd.MirrorImageStatusStateUnknown,
Up: true,
},
wantErr: false,
},
{
name: "test for error state",
args: librbd.SiteMirrorImageStatus{
State: librbd.MirrorImageStatusStateError,
Up: true,
},
wantErr: true,
},
{
name: "test for syncing state",
args: librbd.SiteMirrorImageStatus{
State: librbd.MirrorImageStatusStateSyncing,
Up: true,
},
wantErr: true,
},
{
name: "test for starting_replay state",
args: librbd.SiteMirrorImageStatus{
State: librbd.MirrorImageStatusStateStartingReplay,
Up: true,
},
wantErr: true,
},
{
name: "test for replaying state",
args: librbd.SiteMirrorImageStatus{
State: librbd.MirrorImageStatusStateReplaying,
Up: true,
//nolint:lll // sample output cannot be split into multiple lines.
Description: `replaying, {"bytes_per_second":0.0,"bytes_per_snapshot":81920.0,"last_snapshot_bytes":81920,"last_snapshot_sync_seconds":56743,"local_snapshot_timestamp":1684675261,"remote_snapshot_timestamp":1684675261,"replay_state":"idle"}`,
},
wantErr: false,
},
{
name: "test for stopping_replay state",
args: librbd.SiteMirrorImageStatus{
State: librbd.MirrorImageStatusStateStoppingReplay,
Up: true,
},
wantErr: true,
},
{
name: "test for stopped state",
name: "test when local_snapshot_timestamp is zero",
//nolint:lll // sample output cannot be split into multiple lines.
args: librbd.SiteMirrorImageStatus{
State: librbd.MirrorImageStatusStateStopped,
Up: true,
Description: `replaying, {"bytes_per_second":0.0,"bytes_per_snapshot":81920.0,"last_snapshot_bytes":81920,"last_snapshot_sync_seconds":56743,"local_snapshot_timestamp":0,"remote_snapshot_timestamp":1684675261,"replay_state":"idle"}`,
},
wantErr: true,
},
{
name: "test for invalid state",
name: "test when local_snapshot_timestamp is not present",
//nolint:lll // sample output cannot be split into multiple lines.
args: librbd.SiteMirrorImageStatus{
State: librbd.MirrorImageStatusState(100),
Up: true,
Description: `replaying, {"bytes_per_second":0.0,"bytes_per_snapshot":81920.0,"last_snapshot_bytes":81920,"last_snapshot_sync_seconds":56743,"remote_snapshot_timestamp":1684675261,"replay_state":"idle"}`,
},
wantErr: true,
},
Expand Down Expand Up @@ -644,3 +596,64 @@ func TestGetGRPCError(t *testing.T) {
})
}
}

func Test_timestampFromString(t *testing.T) {
tm := timestamppb.Now()
t.Parallel()
tests := []struct {
name string
timestamp string
want time.Time
wantErr bool
}{
{
name: "valid timestamp",
timestamp: timestampToString(tm),
want: tm.AsTime().Local(),
wantErr: false,
},
{
name: "invalid timestamp",
timestamp: "invalid",
want: time.Time{},
wantErr: true,
},
{
name: "empty timestamp",
timestamp: "",
want: time.Time{},
wantErr: true,
},
{
name: "invalid format",
timestamp: "seconds:%d nanos:%d",
want: time.Time{},
wantErr: true,
},
{
name: "missing nanos",
timestamp: "seconds:10",
want: time.Time{},
wantErr: true,
},
{
name: "missing seconds",
timestamp: "nanos:0",
want: time.Time{},
wantErr: true,
},
}
for _, tt := range tests {
tt := tt
t.Run(tt.name, func(t *testing.T) {
t.Parallel()
got, err := timestampFromString(tt.timestamp)
if (err != nil) != tt.wantErr {
t.Errorf("timestampFromString() error = %v, wantErr %v", err, tt.wantErr)
}
if !reflect.DeepEqual(got, tt.want) {
t.Errorf("timestampFromString() = %v, want %v", got, tt.want)
}
})
}
}
13 changes: 13 additions & 0 deletions internal/rbd/rbd_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -1558,6 +1558,19 @@ func (rv *rbdVolume) setImageOptions(ctx context.Context, options *librbd.ImageO
return nil
}

// GetImageCreationTime returns the creation time of the image. if the image
// creation time is not set, it queries the image info and returns the creation time.
func (ri *rbdImage) GetImageCreationTime() (*timestamppb.Timestamp, error) {
if ri.CreatedAt != nil {
return ri.CreatedAt, nil
}
if err := ri.getImageInfo(); err != nil {
return nil, err
}

return ri.CreatedAt, nil
}

// getImageInfo queries rbd about the given image and returns its metadata, and returns
// ErrImageNotFound if provided image is not found.
func (ri *rbdImage) getImageInfo() error {
Expand Down
Loading