From dbe3d7e4d422be9cd1265e1c3d8f98f095883d8b Mon Sep 17 00:00:00 2001 From: carlvine500 Date: Tue, 14 Jan 2025 15:52:32 +0800 Subject: [PATCH] add aof_writer cmd_writer json_writer (#914) --- cmd/redis-shake/main.go | 8 ++ docs/src/en/writer/file_writer.md | 58 ++++++++++++++ docs/src/zh/writer/file_writer.md | 57 +++++++++++++ internal/writer/file_writer.go | 129 ++++++++++++++++++++++++++++++ shake.toml | 4 + 5 files changed, 256 insertions(+) create mode 100644 docs/src/en/writer/file_writer.md create mode 100644 docs/src/zh/writer/file_writer.md create mode 100644 internal/writer/file_writer.go diff --git a/cmd/redis-shake/main.go b/cmd/redis-shake/main.go index 199237aa..b72ac35e 100644 --- a/cmd/redis-shake/main.go +++ b/cmd/redis-shake/main.go @@ -139,6 +139,14 @@ func main() { // create writer var theWriter writer.Writer switch { + case v.IsSet("file_writer"): + opts := new(writer.FileWriterOptions) + defaults.SetDefaults(opts) + err := v.UnmarshalKey("file_writer", opts) + if err != nil { + log.Panicf("failed to read the FileWriter config entry. err: %v", err) + } + theWriter = writer.NewFileWriter(ctx, opts) case v.IsSet("redis_writer"): opts := new(writer.RedisWriterOptions) defaults.SetDefaults(opts) diff --git a/docs/src/en/writer/file_writer.md b/docs/src/en/writer/file_writer.md new file mode 100644 index 00000000..2871a7ce --- /dev/null +++ b/docs/src/en/writer/file_writer.md @@ -0,0 +1,58 @@ +# file_writer + +## Introduction + +Can use ` file_writer ` to write data to file with type CMD/JSON/AOF . +It is commonly used to extract/migrate/fix data by file. + +## configuration + +```toml +[file_writer] +filepath = "/tmp/cmd.txt" +type = "cmd" #cmd,aof,json (default cmd) +``` + +* An absolute filepath should be passed in. +## application scenarios +- share data between two system: one system write aof to disk/s3/oss, another system read file from them. +- partial migrate data with business prefix: extract aof with prefix "XXX:" data from A system, B system import the aof with command `redis-cli --pipe XXX.aof` . +- fix data by cmd file: export cmd data from one system, fix wrong data, and then import cmd file with command `redis-cli < cmd.txt`. +- analysis data with json: export json file, and then import them into mongodb/bi to analysis. + +## example output: +### cmd_writer output: +``` +SELECT 0 +set key1 1 +set key2 2 +set key3 3 +sadd key4 1 2 3 4 +lpush key5 1 2 3 4 5 +zadd key6 1 2 3 4 5 6 +``` +### json_writer output: +``` +{"DbId":0,"Argv":["SELECT","0"],"CmdName":"SELECT","Group":"CONNECTION","Keys":null,"KeyIndexes":null,"Slots":[],"SerializedSize":23} +{"DbId":0,"Argv":["set","key1","1"],"CmdName":"SET","Group":"STRING","Keys":["key1"],"KeyIndexes":[2],"Slots":[9189],"SerializedSize":30} +{"DbId":0,"Argv":["set","key2","2"],"CmdName":"SET","Group":"STRING","Keys":["key2"],"KeyIndexes":[2],"Slots":[4998],"SerializedSize":30} +{"DbId":0,"Argv":["set","key3","3"],"CmdName":"SET","Group":"STRING","Keys":["key3"],"KeyIndexes":[2],"Slots":[935],"SerializedSize":30} +{"DbId":0,"Argv":["sadd","key4","1","2","3","4"],"CmdName":"SADD","Group":"SET","Keys":["key4"],"KeyIndexes":[2],"Slots":[13120],"SerializedSize":52} +{"DbId":0,"Argv":["lpush","key5","1","2","3","4","5"],"CmdName":"LPUSH","Group":"LIST","Keys":["key5"],"KeyIndexes":[2],"Slots":[9057],"SerializedSize":60} +{"DbId":0,"Argv":["zadd","key6","1","2","3","4","5","6"],"CmdName":"ZADD","Group":"SORTED_SET","Keys":["key6"],"KeyIndexes":[2],"Slots":[4866],"SerializedSize":66} +``` +### aof_writer output: +``` +*2 +$6 +SELECT +$1 +0 +*3 +$3 +set +$4 +key1 +$1 +1 +``` diff --git a/docs/src/zh/writer/file_writer.md b/docs/src/zh/writer/file_writer.md new file mode 100644 index 00000000..b36ecaaa --- /dev/null +++ b/docs/src/zh/writer/file_writer.md @@ -0,0 +1,57 @@ +# file_writer + +## 介绍 + +可以使用 ` file_writer` 写文件, 可写的格式有 CMD/JSON/AOF, 常用于通过文件介质抽取/迁移/订正数据. +## 配置 + +```toml +[file_writer] +filepath = "/tmp/cmd.txt" +type = "cmd" #cmd,aof,json (default cmd) +``` + +* 绝对路径 filepath 是必填的. + +## 应用场景 +- 俩系统共享数据: 一个系统把文件写到 disk/s3/oss, 另一系统从中读取. +- 跨系统局部迁移带指定前缀的数据: 从A系统迁出带前缀"XXX:"的数据, B系统通过命令导入这些数据 `redis-cli --pipe XXX.aof` . +- 通过命令文件订正数据: 从一个系统中导出数据成cmd格式, 订正后再导入命令`redis-cli < cmd.txt`. +- 通过json格式做数据分析: 导出成json文件, 导入到mongodb/bi做分析. + +## 示例输出 +### cmd_writer 输出: +``` +SELECT 0 +set key1 1 +set key2 2 +set key3 3 +sadd key4 1 2 3 4 +lpush key5 1 2 3 4 5 +zadd key6 1 2 3 4 5 6 +``` +### json_writer 输出: +``` +{"DbId":0,"Argv":["SELECT","0"],"CmdName":"SELECT","Group":"CONNECTION","Keys":null,"KeyIndexes":null,"Slots":[],"SerializedSize":23} +{"DbId":0,"Argv":["set","key1","1"],"CmdName":"SET","Group":"STRING","Keys":["key1"],"KeyIndexes":[2],"Slots":[9189],"SerializedSize":30} +{"DbId":0,"Argv":["set","key2","2"],"CmdName":"SET","Group":"STRING","Keys":["key2"],"KeyIndexes":[2],"Slots":[4998],"SerializedSize":30} +{"DbId":0,"Argv":["set","key3","3"],"CmdName":"SET","Group":"STRING","Keys":["key3"],"KeyIndexes":[2],"Slots":[935],"SerializedSize":30} +{"DbId":0,"Argv":["sadd","key4","1","2","3","4"],"CmdName":"SADD","Group":"SET","Keys":["key4"],"KeyIndexes":[2],"Slots":[13120],"SerializedSize":52} +{"DbId":0,"Argv":["lpush","key5","1","2","3","4","5"],"CmdName":"LPUSH","Group":"LIST","Keys":["key5"],"KeyIndexes":[2],"Slots":[9057],"SerializedSize":60} +{"DbId":0,"Argv":["zadd","key6","1","2","3","4","5","6"],"CmdName":"ZADD","Group":"SORTED_SET","Keys":["key6"],"KeyIndexes":[2],"Slots":[4866],"SerializedSize":66} +``` +### aof_writer 输出: +``` +*2 +$6 +SELECT +$1 +0 +*3 +$3 +set +$4 +key1 +$1 +1 +``` diff --git a/internal/writer/file_writer.go b/internal/writer/file_writer.go new file mode 100644 index 00000000..7efbe706 --- /dev/null +++ b/internal/writer/file_writer.go @@ -0,0 +1,129 @@ +package writer + +import ( + "RedisShake/internal/entry" + "RedisShake/internal/log" + "bufio" + "context" + "encoding/json" + "fmt" + "os" + "path/filepath" + "strings" + "sync" + "time" +) + +type FileType string + +const ( + AOF FileType = "aof" + CMD FileType = "cmd" + JSON FileType = "json" +) + +var FileTypes = []FileType{CMD, AOF, JSON} + +type FileWriterOptions struct { + Filepath string `mapstructure:"filepath" default:""` + FileType string `mapstructure:"type" default:"cmd"` +} + +type fileWriter struct { + fileType FileType + path string + DbId int + ch chan *entry.Entry + chWg sync.WaitGroup + stat struct { + EntryCount int `json:"entry_count"` + } +} + +func (w *fileWriter) Write(e *entry.Entry) { + w.ch <- e +} + +func (w *fileWriter) Close() { + close(w.ch) + w.chWg.Wait() +} + +func (w *fileWriter) Status() interface{} { + return w.stat +} + +func (w *fileWriter) StatusString() string { + return fmt.Sprintf("exported entry count=%d", w.stat.EntryCount) +} + +func (w *fileWriter) StatusConsistent() bool { + return true +} + +func NewFileWriter(ctx context.Context, opts *FileWriterOptions) Writer { + absolutePath, err := filepath.Abs(opts.Filepath) + if err != nil { + log.Panicf("NewFileWriter path=[%s]: filepath.Abs error: %s", opts.Filepath, err.Error()) + } + log.Infof("NewFileWriter absolute path=[%s],type=[%s]", absolutePath, opts.FileType) + w := &fileWriter{ + fileType: FileType(opts.FileType), + DbId: 0, + path: absolutePath, + ch: make(chan *entry.Entry), + } + w.stat.EntryCount = 0 + return w +} + +func (w *fileWriter) StartWrite(ctx context.Context) (ch chan *entry.Entry) { + w.chWg = sync.WaitGroup{} + w.chWg.Add(1) + go w.processWrite(ctx) + return w.ch + +} + +func (w *fileWriter) processWrite(ctx context.Context) { + ticker := time.NewTicker(10 * time.Millisecond) + defer ticker.Stop() + file, err := os.Create(w.path) + if err != nil { + log.Panicf("create file failed:", err) + return + } + defer file.Close() + writer := bufio.NewWriter(file) + for { + select { + case <-ctx.Done(): + // do nothing until w.ch is closed + case <-ticker.C: + writer.Flush() + case e, ok := <-w.ch: + if !ok { + w.chWg.Done() + writer.Flush() + return + } + w.stat.EntryCount++ + w.writeEntry(writer, e) + } + } +} + +func (w *fileWriter) writeEntry(writer *bufio.Writer, e *entry.Entry) { + switch w.fileType { + case CMD: + writer.WriteString(strings.Join(e.Argv, " ") + "\n") + case AOF: + writer.Write(e.Serialize()) + case JSON: + // compute SerializeSize for json result + e.Serialize() + json, _ := json.Marshal(e) + writer.Write(json) + writer.WriteString("\n") + } +} diff --git a/shake.toml b/shake.toml index 2be71747..b4abc8e3 100644 --- a/shake.toml +++ b/shake.toml @@ -35,6 +35,10 @@ password = "" # keep empty if no authentication is required tls = false off_reply = false # turn off the server reply +# [file_writer] +# filepath = "/tmp/cmd.txt" +# type = "cmd" #cmd,aof,json (default cmd) + [filter] # Allow keys with specific prefixes or suffixes # Examples: