-
Couldn't load subscription status.
- Fork 233
feat: live backups evm and ev-node #2758
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
8cf0fed
8abdd83
9663554
05fe34a
bf29d4e
c76b16d
dfaed99
078015f
aa2845e
32578cb
53907a5
097f3ce
97d49c6
0cf5f9d
f8f5af1
a49a4c2
15401d8
7a31192
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,146 @@ | ||
| package cmd | ||
|
|
||
| import ( | ||
| "bufio" | ||
| "context" | ||
| "errors" | ||
| "fmt" | ||
| "io" | ||
| "os" | ||
| "path/filepath" | ||
| "strings" | ||
| "time" | ||
|
|
||
| "github.com/spf13/cobra" | ||
|
|
||
| clientrpc "github.com/evstack/ev-node/pkg/rpc/client" | ||
| pb "github.com/evstack/ev-node/types/pb/evnode/v1" | ||
| ) | ||
|
|
||
| // NewBackupCmd creates a cobra command that streams a datastore backup via the RPC client. | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit, lets define flags as variables |
||
| func NewBackupCmd() *cobra.Command { | ||
| cmd := &cobra.Command{ | ||
| Use: "backup", | ||
| Short: "Stream a datastore backup to a local file via RPC", | ||
| SilenceUsage: true, | ||
| RunE: func(cmd *cobra.Command, args []string) error { | ||
| nodeConfig, err := ParseConfig(cmd) | ||
| if err != nil { | ||
| return fmt.Errorf("error parsing config: %w", err) | ||
| } | ||
|
|
||
| rpcAddress := strings.TrimSpace(nodeConfig.RPC.Address) | ||
| if rpcAddress == "" { | ||
| return fmt.Errorf("RPC address not found in node configuration") | ||
| } | ||
|
|
||
| baseURL := rpcAddress | ||
| if !strings.HasPrefix(baseURL, "http://") && !strings.HasPrefix(baseURL, "https://") { | ||
| baseURL = fmt.Sprintf("http://%s", baseURL) | ||
| } | ||
|
|
||
| outputPath, err := cmd.Flags().GetString("output") | ||
| if err != nil { | ||
| return err | ||
| } | ||
|
|
||
| if outputPath == "" { | ||
| timestamp := time.Now().UTC().Format("20060102-150405") | ||
| outputPath = fmt.Sprintf("evnode-backup-%s.badger", timestamp) | ||
| } | ||
|
|
||
| outputPath, err = filepath.Abs(outputPath) | ||
| if err != nil { | ||
| return fmt.Errorf("failed to resolve output path: %w", err) | ||
| } | ||
|
|
||
| if err := os.MkdirAll(filepath.Dir(outputPath), 0o755); err != nil { | ||
| return fmt.Errorf("failed to create output directory: %w", err) | ||
| } | ||
|
|
||
| force, err := cmd.Flags().GetBool("force") | ||
| if err != nil { | ||
| return err | ||
| } | ||
|
|
||
| if !force { | ||
| if _, statErr := os.Stat(outputPath); statErr == nil { | ||
| return fmt.Errorf("output file %s already exists (use --force to overwrite)", outputPath) | ||
| } else if !errors.Is(statErr, os.ErrNotExist) { | ||
| return fmt.Errorf("failed to inspect output file: %w", statErr) | ||
| } | ||
| } | ||
|
|
||
| file, err := os.OpenFile(outputPath, os.O_CREATE|os.O_WRONLY|os.O_TRUNC, 0o600) | ||
| if err != nil { | ||
| return fmt.Errorf("failed to open output file: %w", err) | ||
| } | ||
| defer file.Close() | ||
|
|
||
| writer := bufio.NewWriterSize(file, 1<<20) // 1 MiB buffer for fewer syscalls. | ||
| bytesCount := &countingWriter{} | ||
| streamWriter := io.MultiWriter(writer, bytesCount) | ||
|
|
||
| sinceVersion, err := cmd.Flags().GetUint64("since-version") | ||
| if err != nil { | ||
| return err | ||
| } | ||
|
|
||
| ctx := cmd.Context() | ||
| if ctx == nil { | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. not necessary. |
||
| ctx = context.Background() | ||
| } | ||
|
|
||
| client := clientrpc.NewClient(baseURL) | ||
|
|
||
| metadata, backupErr := client.Backup(ctx, &pb.BackupRequest{ | ||
| SinceVersion: sinceVersion, | ||
| }, streamWriter) | ||
| if backupErr != nil { | ||
| // Remove the partial file on failure to avoid keeping corrupt snapshots. | ||
| _ = writer.Flush() | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. use defer and remove them from all those error handling |
||
| _ = file.Close() | ||
| _ = os.Remove(outputPath) | ||
| return fmt.Errorf("backup failed: %w", backupErr) | ||
| } | ||
|
|
||
| if err := writer.Flush(); err != nil { | ||
| _ = file.Close() | ||
| _ = os.Remove(outputPath) | ||
| return fmt.Errorf("failed to flush backup data: %w", err) | ||
| } | ||
|
|
||
| if !metadata.GetCompleted() { | ||
| _ = file.Close() | ||
| _ = os.Remove(outputPath) | ||
| return fmt.Errorf("backup stream ended without completion metadata") | ||
| } | ||
|
|
||
| cmd.Printf("Backup saved to %s (%d bytes)\n", outputPath, bytesCount.Bytes()) | ||
| cmd.Printf("Current height: %d\n", metadata.GetCurrentHeight()) | ||
| cmd.Printf("Since version: %d\n", metadata.GetSinceVersion()) | ||
| cmd.Printf("Last version: %d\n", metadata.GetLastVersion()) | ||
|
|
||
| return nil | ||
| }, | ||
| } | ||
|
|
||
| cmd.Flags().String("output", "", "Path to the backup file (defaults to ./evnode-backup-<timestamp>.badger)") | ||
| cmd.Flags().Uint64("since-version", 0, "Generate an incremental backup starting from the provided version") | ||
| cmd.Flags().Bool("force", false, "Overwrite the output file if it already exists") | ||
|
|
||
| return cmd | ||
| } | ||
|
|
||
| type countingWriter struct { | ||
| total int64 | ||
| } | ||
|
|
||
| func (c *countingWriter) Write(p []byte) (int, error) { | ||
| c.total += int64(len(p)) | ||
| return len(p), nil | ||
| } | ||
|
|
||
| func (c *countingWriter) Bytes() int64 { | ||
| return c.total | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,126 @@ | ||
| package cmd | ||
|
|
||
| import ( | ||
| "io" | ||
| "net/http" | ||
| "net/http/httptest" | ||
| "os" | ||
| "path/filepath" | ||
| "strings" | ||
| "testing" | ||
|
|
||
| "github.com/rs/zerolog" | ||
| "github.com/spf13/cobra" | ||
| "github.com/stretchr/testify/mock" | ||
| "github.com/stretchr/testify/require" | ||
| "golang.org/x/net/http2" | ||
| "golang.org/x/net/http2/h2c" | ||
|
|
||
| "github.com/evstack/ev-node/pkg/config" | ||
| "github.com/evstack/ev-node/pkg/rpc/server" | ||
| "github.com/evstack/ev-node/test/mocks" | ||
| "github.com/evstack/ev-node/types/pb/evnode/v1/v1connect" | ||
| ) | ||
|
|
||
| func TestBackupCmd_Success(t *testing.T) { | ||
| t.Parallel() | ||
|
|
||
| mockStore := mocks.NewMockStore(t) | ||
|
|
||
| mockStore.On("Height", mock.Anything).Return(uint64(15), nil) | ||
| mockStore.On("Backup", mock.Anything, mock.Anything, uint64(9)).Run(func(args mock.Arguments) { | ||
| writer := args.Get(1).(io.Writer) | ||
| _, _ = writer.Write([]byte("chunk-1")) | ||
| _, _ = writer.Write([]byte("chunk-2")) | ||
| }).Return(uint64(21), nil) | ||
|
|
||
| logger := zerolog.Nop() | ||
| storeServer := server.NewStoreServer(mockStore, logger) | ||
| mux := http.NewServeMux() | ||
| storePath, storeHandler := v1connect.NewStoreServiceHandler(storeServer) | ||
| mux.Handle(storePath, storeHandler) | ||
|
|
||
| httpServer := httptest.NewServer(h2c.NewHandler(mux, &http2.Server{})) | ||
| defer httpServer.Close() | ||
|
|
||
| tempDir, err := os.MkdirTemp("", "evnode-backup-*") | ||
| require.NoError(t, err) | ||
| t.Cleanup(func() { | ||
| _ = os.RemoveAll(tempDir) | ||
| }) | ||
|
|
||
| backupCmd := NewBackupCmd() | ||
| config.AddFlags(backupCmd) | ||
|
|
||
| rootCmd := &cobra.Command{Use: "root"} | ||
| config.AddGlobalFlags(rootCmd, "test") | ||
| rootCmd.AddCommand(backupCmd) | ||
|
|
||
| outPath := filepath.Join(tempDir, "snapshot.badger") | ||
| rpcAddr := strings.TrimPrefix(httpServer.URL, "http://") | ||
|
|
||
| output, err := executeCommandC( | ||
| rootCmd, | ||
| "backup", | ||
| "--home="+tempDir, | ||
| "--evnode.rpc.address="+rpcAddr, | ||
| "--output", outPath, | ||
| "--since-version", "9", | ||
| ) | ||
|
|
||
| require.NoError(t, err, "command failed: %s", output) | ||
|
|
||
| data, readErr := os.ReadFile(outPath) | ||
| require.NoError(t, readErr) | ||
| require.Equal(t, "chunk-1chunk-2", string(data)) | ||
|
|
||
| require.Contains(t, output, "Backup saved to") | ||
| require.Contains(t, output, "Current height: 15") | ||
| require.Contains(t, output, "Since version: 9") | ||
| require.Contains(t, output, "Last version: 21") | ||
|
|
||
| mockStore.AssertExpectations(t) | ||
| } | ||
|
|
||
| func TestBackupCmd_ExistingFileWithoutForce(t *testing.T) { | ||
| t.Parallel() | ||
|
|
||
| mockStore := mocks.NewMockStore(t) | ||
| logger := zerolog.Nop() | ||
| storeServer := server.NewStoreServer(mockStore, logger) | ||
| mux := http.NewServeMux() | ||
| storePath, storeHandler := v1connect.NewStoreServiceHandler(storeServer) | ||
| mux.Handle(storePath, storeHandler) | ||
|
|
||
| httpServer := httptest.NewServer(h2c.NewHandler(mux, &http2.Server{})) | ||
| defer httpServer.Close() | ||
|
|
||
| tempDir, err := os.MkdirTemp("", "evnode-backup-existing-*") | ||
| require.NoError(t, err) | ||
| t.Cleanup(func() { | ||
| _ = os.RemoveAll(tempDir) | ||
| }) | ||
|
|
||
| outPath := filepath.Join(tempDir, "snapshot.badger") | ||
| require.NoError(t, os.WriteFile(outPath, []byte("existing"), 0o600)) | ||
|
|
||
| backupCmd := NewBackupCmd() | ||
| config.AddFlags(backupCmd) | ||
|
|
||
| rootCmd := &cobra.Command{Use: "root"} | ||
| config.AddGlobalFlags(rootCmd, "test") | ||
| rootCmd.AddCommand(backupCmd) | ||
|
|
||
| rpcAddr := strings.TrimPrefix(httpServer.URL, "http://") | ||
|
|
||
| output, err := executeCommandC( | ||
| rootCmd, | ||
| "backup", | ||
| "--home="+tempDir, | ||
| "--evnode.rpc.address="+rpcAddr, | ||
| "--output", outPath, | ||
| ) | ||
|
|
||
| require.Error(t, err) | ||
| require.Contains(t, output, "already exists (use --force to overwrite)") | ||
| } |
Uh oh!
There was an error while loading. Please reload this page.