diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 7e9b74e..f207b26 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -97,3 +97,17 @@ jobs: run: | cd applications/sveltekit pnpm install + + application-mydata: + runs-on: ubuntu-latest + steps: + - name: Check out repository code + uses: actions/checkout@v4 + - name: Set Go env + uses: actions/setup-go@v4 + with: + go-version: '1.20' + - name: Build and run sample + run: | + cd applications/mydata + make && ./dist/mydata exp -h diff --git a/applications/mydata/.gitignore b/applications/mydata/.gitignore new file mode 100644 index 0000000..6010986 --- /dev/null +++ b/applications/mydata/.gitignore @@ -0,0 +1,3 @@ +.idea/ +testdata/ +vendor/ \ No newline at end of file diff --git a/applications/mydata/LICENSE b/applications/mydata/LICENSE new file mode 100644 index 0000000..9e32cde --- /dev/null +++ b/applications/mydata/LICENSE @@ -0,0 +1,127 @@ + 木兰宽松许可证, 第2版 + + 木兰宽松许可证, 第2版 + 2020年1月 http://license.coscl.org.cn/MulanPSL2 + + + 您对“软件”的复制、使用、修改及分发受木兰宽松许可证,第2版(“本许可证”)的如下条款的约束: + + 0. 定义 + + “软件”是指由“贡献”构成的许可在“本许可证”下的程序和相关文档的集合。 + + “贡献”是指由任一“贡献者”许可在“本许可证”下的受版权法保护的作品。 + + “贡献者”是指将受版权法保护的作品许可在“本许可证”下的自然人或“法人实体”。 + + “法人实体”是指提交贡献的机构及其“关联实体”。 + + “关联实体”是指,对“本许可证”下的行为方而言,控制、受控制或与其共同受控制的机构,此处的控制是指有受控方或共同受控方至少50%直接或间接的投票权、资金或其他有价证券。 + + 1. 授予版权许可 + + 每个“贡献者”根据“本许可证”授予您永久性的、全球性的、免费的、非独占的、不可撤销的版权许可,您可以复制、使用、修改、分发其“贡献”,不论修改与否。 + + 2. 授予专利许可 + + 每个“贡献者”根据“本许可证”授予您永久性的、全球性的、免费的、非独占的、不可撤销的(根据本条规定撤销除外)专利许可,供您制造、委托制造、使用、许诺销售、销售、进口其“贡献”或以其他方式转移其“贡献”。前述专利许可仅限于“贡献者”现在或将来拥有或控制的其“贡献”本身或其“贡献”与许可“贡献”时的“软件”结合而将必然会侵犯的专利权利要求,不包括对“贡献”的修改或包含“贡献”的其他结合。如果您或您的“关联实体”直接或间接地,就“软件”或其中的“贡献”对任何人发起专利侵权诉讼(包括反诉或交叉诉讼)或其他专利维权行动,指控其侵犯专利权,则“本许可证”授予您对“软件”的专利许可自您提起诉讼或发起维权行动之日终止。 + + 3. 无商标许可 + + “本许可证”不提供对“贡献者”的商品名称、商标、服务标志或产品名称的商标许可,但您为满足第4条规定的声明义务而必须使用除外。 + + 4. 分发限制 + + 您可以在任何媒介中将“软件”以源程序形式或可执行形式重新分发,不论修改与否,但您必须向接收者提供“本许可证”的副本,并保留“软件”中的版权、商标、专利及免责声明。 + + 5. 免责声明与责任限制 + + “软件”及其中的“贡献”在提供时不带任何明示或默示的担保。在任何情况下,“贡献者”或版权所有者不对任何人因使用“软件”或其中的“贡献”而引发的任何直接或间接损失承担责任,不论因何种原因导致或者基于何种法律理论,即使其曾被建议有此种损失的可能性。 + + 6. 语言 + “本许可证”以中英文双语表述,中英文版本具有同等法律效力。如果中英文版本存在任何冲突不一致,以中文版为准。 + + 条款结束 + + 如何将木兰宽松许可证,第2版,应用到您的软件 + + 如果您希望将木兰宽松许可证,第2版,应用到您的新软件,为了方便接收者查阅,建议您完成如下三步: + + 1, 请您补充如下声明中的空白,包括软件名、软件的首次发表年份以及您作为版权人的名字; + + 2, 请您在软件包的一级目录下创建以“LICENSE”为名的文件,将整个许可证文本放入该文件中; + + 3, 请将如下声明文本放入每个源文件的头部注释中。 + + Copyright (c) [Year] [name of copyright holder] + [Software Name] is licensed under Mulan PSL v2. + You can use this software according to the terms and conditions of the Mulan PSL v2. + You may obtain a copy of Mulan PSL v2 at: + http://license.coscl.org.cn/MulanPSL2 + THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE. + See the Mulan PSL v2 for more details. + + + Mulan Permissive Software License,Version 2 + + Mulan Permissive Software License,Version 2 (Mulan PSL v2) + January 2020 http://license.coscl.org.cn/MulanPSL2 + + Your reproduction, use, modification and distribution of the Software shall be subject to Mulan PSL v2 (this License) with the following terms and conditions: + + 0. Definition + + Software means the program and related documents which are licensed under this License and comprise all Contribution(s). + + Contribution means the copyrightable work licensed by a particular Contributor under this License. + + Contributor means the Individual or Legal Entity who licenses its copyrightable work under this License. + + Legal Entity means the entity making a Contribution and all its Affiliates. + + Affiliates means entities that control, are controlled by, or are under common control with the acting entity under this License, ‘control’ means direct or indirect ownership of at least fifty percent (50%) of the voting power, capital or other securities of controlled or commonly controlled entity. + + 1. Grant of Copyright License + + Subject to the terms and conditions of this License, each Contributor hereby grants to you a perpetual, worldwide, royalty-free, non-exclusive, irrevocable copyright license to reproduce, use, modify, or distribute its Contribution, with modification or not. + + 2. Grant of Patent License + + Subject to the terms and conditions of this License, each Contributor hereby grants to you a perpetual, worldwide, royalty-free, non-exclusive, irrevocable (except for revocation under this Section) patent license to make, have made, use, offer for sale, sell, import or otherwise transfer its Contribution, where such patent license is only limited to the patent claims owned or controlled by such Contributor now or in future which will be necessarily infringed by its Contribution alone, or by combination of the Contribution with the Software to which the Contribution was contributed. The patent license shall not apply to any modification of the Contribution, and any other combination which includes the Contribution. If you or your Affiliates directly or indirectly institute patent litigation (including a cross claim or counterclaim in a litigation) or other patent enforcement activities against any individual or entity by alleging that the Software or any Contribution in it infringes patents, then any patent license granted to you under this License for the Software shall terminate as of the date such litigation or activity is filed or taken. + + 3. No Trademark License + + No trademark license is granted to use the trade names, trademarks, service marks, or product names of Contributor, except as required to fulfill notice requirements in Section 4. + + 4. Distribution Restriction + + You may distribute the Software in any medium with or without modification, whether in source or executable forms, provided that you provide recipients with a copy of this License and retain copyright, patent, trademark and disclaimer statements in the Software. + + 5. Disclaimer of Warranty and Limitation of Liability + + THE SOFTWARE AND CONTRIBUTION IN IT ARE PROVIDED WITHOUT WARRANTIES OF ANY KIND, EITHER EXPRESS OR IMPLIED. IN NO EVENT SHALL ANY CONTRIBUTOR OR COPYRIGHT HOLDER BE LIABLE TO YOU FOR ANY DAMAGES, INCLUDING, BUT NOT LIMITED TO ANY DIRECT, OR INDIRECT, SPECIAL OR CONSEQUENTIAL DAMAGES ARISING FROM YOUR USE OR INABILITY TO USE THE SOFTWARE OR THE CONTRIBUTION IN IT, NO MATTER HOW IT’S CAUSED OR BASED ON WHICH LEGAL THEORY, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGES. + + 6. Language + + THIS LICENSE IS WRITTEN IN BOTH CHINESE AND ENGLISH, AND THE CHINESE VERSION AND ENGLISH VERSION SHALL HAVE THE SAME LEGAL EFFECT. IN THE CASE OF DIVERGENCE BETWEEN THE CHINESE AND ENGLISH VERSIONS, THE CHINESE VERSION SHALL PREVAIL. + + END OF THE TERMS AND CONDITIONS + + How to Apply the Mulan Permissive Software License,Version 2 (Mulan PSL v2) to Your Software + + To apply the Mulan PSL v2 to your work, for easy identification by recipients, you are suggested to complete following three steps: + + i Fill in the blanks in following statement, including insert your software name, the year of the first publication of your software, and your name identified as the copyright owner; + + ii Create a file named “LICENSE” which contains the whole context of this License in the first directory of your software package; + + iii Attach the statement to the appropriate annotated syntax at the beginning of each source file. + + + Copyright (c) [Year] [name of copyright holder] + [Software Name] is licensed under Mulan PSL v2. + You can use this software according to the terms and conditions of the Mulan PSL v2. + You may obtain a copy of Mulan PSL v2 at: + http://license.coscl.org.cn/MulanPSL2 + THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE. + See the Mulan PSL v2 for more details. diff --git a/applications/mydata/Makefile b/applications/mydata/Makefile new file mode 100644 index 0000000..f324e31 --- /dev/null +++ b/applications/mydata/Makefile @@ -0,0 +1,29 @@ + + +REPO := mydata + +GOOS := $(if $(GOOS),$(GOOS),$(shell go env GOOS)) +GOARCH := $(if $(GOARCH),$(GOARCH),$(shell go env GOARCH)) +GOENV := GO111MODULE=on CGO_ENABLED=1 GOOS=$(GOOS) GOARCH=$(GOARCH) +GO := $(GOENV) go +GOBUILD := $(GO) build -trimpath +GORUN := $(GO) run +SHELL := /usr/bin/env bash + +COMMIT := $(shell git describe --always --no-match --tags --dirty="-dev") +BUILDTS := $(shell date '+%Y-%m-%d %H:%M:%S') +GITHASH := $(shell git rev-parse HEAD) +GITREF := $(shell git rev-parse --abbrev-ref HEAD) +GOVER := $(shell go version) + +LDFLAGS := -w -s +LDFLAGS += -X "$(REPO)/internal/version.ReleaseVersion=$(COMMIT)" +LDFLAGS += -X "$(REPO)/internal/version.BuildTS=$(BUILDTS)" +LDFLAGS += -X "$(REPO)/internal/version.GitHash=$(GITHASH)" +LDFLAGS += -X "$(REPO)/internal/version.GitBranch=$(GITREF)" +LDFLAGS += -X "$(REPO)/internal/version.GoVersion=$(GOVER)" + +all: mydata + +mydata: + $(GOBUILD) -ldflags '$(LDFLAGS)' -o ./dist/mydata main.go diff --git a/applications/mydata/README.md b/applications/mydata/README.md new file mode 100644 index 0000000..deaa0ba --- /dev/null +++ b/applications/mydata/README.md @@ -0,0 +1,62 @@ +# A Fast and Flexible export data from remote mysql server + +### how to use +`go run main.go` or you can `go build -o mydata main.go && ./mydata -h` + +### cmd help + +```text +Usage: + mydata [flags] + mydata [command] + +Available Commands: + completion Generate the autocompletion script for the specified shell + export Export data from remote database server + help Help about any command + hex Convert hex string to byte + import Import data to remote database server + version Print the version number of tool + +Flags: + --debug print stack log + -h, --help help for mydata + +Use "mydata [command] --help" for more information about a command. +``` + +### How to export data +pls type help: +`./mydata exp -h` + +```text +Export data from remote database server + +Usage: + mydata export [flags] + +Aliases: + export, exp + +Flags: + -a, --addr string mysql database addr, format: ip:port + -u, --username string username for connect database + -p, --password string password for connect database + -D, --dbname string default database name + -e, --query-sql string select sql + -o, --output string output filename + --fields-terminated string fields terminated (default ",") + --fields-enclosed string fields enclosed + --fields-escaped string fields escaped (default "\\") + --lines-terminated string lines terminated (default "\n") + --enclosed-optionally fields enclosed optionally + --params string connection Params (default "timeout=3s") + --buf-size int buf size for write outfile (default 32768) + --concurrency int concurrency number (default 5) + --not-merge merge chunks to one file +``` +### example +export data by `select * from bigdata` from 127.0.0.1 +```shell +./mydata exp -a 127.0.0.1:3306 -u username@tenant#clustername:clusterid -p xxx -D test -e "select * from bigdata" -o bigdata +``` diff --git a/applications/mydata/app/export/export.go b/applications/mydata/app/export/export.go new file mode 100644 index 0000000..5185905 --- /dev/null +++ b/applications/mydata/app/export/export.go @@ -0,0 +1,116 @@ +package export + +import ( + "context" + "log" + + "github.com/pkg/errors" + + "mydata/internal/dbinfo" + "mydata/internal/file" +) + +type Exporter struct { + ChunkSeq int // 分块序号 + DbConf dbinfo.DbConf // 数据库配置信息 + Query string // 输入查询sql + EnclosedOps bool // 包裹符标记 + OutFile *file.MyOutfile // 输出文件 +} + +func (e *Exporter) ExpData() error { + conn, err := dbinfo.NewDB(e.DbConf) + if err != nil { + return err + } + defer func() { + _ = conn.Close() + }() + + ctx, cancelFunc := context.WithCancel(context.Background()) + defer cancelFunc() + + log.Printf("[chunk:%02d]execute query sql:[%s]", e.ChunkSeq, e.Query) + rows, err := conn.QueryContext(ctx, e.Query) + if err != nil { + return errors.Wrapf(err, "query sql:[%s]", e.Query) + } + defer func() { + _ = rows.Close() + }() + + isChar, isEnclosed, err := dbinfo.MakeColFlags(rows, e.EnclosedOps) + if err != nil { + return err + } + + scanArgs, scanValues, err := dbinfo.MakeScanBuf(rows) + if err != nil { + return err + } + + log.Printf("[chunk:%02d]scan rows into buffer ...", e.ChunkSeq) + count := 0 + + for rows.Next() { + if err := rows.Scan(scanArgs...); err != nil { + return errors.WithStack(err) + } + + // scan rows + for colIndex, col := range scanValues { + if colIndex > 0 { + // 写入列分隔符 + e.OutFile.Write(file.FieldsTerminated) + } + + if col == nil { // 字段为null的转义 + e.OutFile.WriteSingleByte(file.FieldsEscaped) + e.OutFile.WriteSingleByte('N') + continue + } + + // 写入前包裹符号 + if isEnclosed[colIndex] && file.FieldsEnclosed > 0 { + e.OutFile.WriteSingleByte(file.FieldsEnclosed) + } + // 写入列值 + // 针对该列的字段,只能针对字符类型 + if isChar[colIndex] { // 是字符类型 + for ii := range col { + if col[ii] == file.FieldsEscaped || col[ii] == file.FieldsEnclosed || + col[ii] == file.LinesTerminated[0] || col[ii] == file.FieldsTerminated[0] { + e.OutFile.WriteSingleByte(file.FieldsEscaped) + } + + e.OutFile.Write([]byte{col[ii]}) + } + } else { + e.OutFile.Write(col) + } + + // 写入后包裹符号 + if isEnclosed[colIndex] && file.FieldsEnclosed > 0 { + e.OutFile.WriteSingleByte(file.FieldsEnclosed) + } + } + + // 写入行分割符 + e.OutFile.Write(file.LinesTerminated) + + // 统一处理错误异常 + if e.OutFile.Err != nil { + return errors.WithStack(e.OutFile.Err) + } + + count++ + } + + if err := rows.Err(); err != nil { + return errors.WithStack(err) + } + + log.Printf("[chunk:%02d]finished successfully, records:%d", e.ChunkSeq, count) + + return nil +} diff --git a/applications/mydata/cmd/export.go b/applications/mydata/cmd/export.go new file mode 100644 index 0000000..abba0c8 --- /dev/null +++ b/applications/mydata/cmd/export.go @@ -0,0 +1,145 @@ +package cmd + +import ( + "fmt" + "log" + "os" + "time" + + "github.com/pkg/errors" + "github.com/spf13/cobra" + "golang.org/x/sync/errgroup" + + "mydata/app/export" + "mydata/internal/dbinfo" + "mydata/internal/file" +) + +var defaultFormat = file.FileFormat{ + FieldsTerminated: ",", + FieldsEnclosed: "", + EnclosedOptFlag: false, + FieldsEscaped: "\\", + LinesTerminated: "\n", +} + +var ( + ff = defaultFormat + dbc = dbinfo.DbConf{} + + fileName string + querySql string + bufSize int + concNum int + notMerge bool +) + +var expCmd = &cobra.Command{ + Use: "export", + Aliases: []string{"exp"}, + Short: "Export data from remote database server", + RunE: RunExpE, +} + +func init() { + expCmd.Flags().StringVarP(&dbc.Addr, "addr", "a", "", "mysql database addr, format: ip:port") + expCmd.Flags().StringVarP(&dbc.Username, "username", "u", "", "username for connect database") + expCmd.Flags().StringVarP(&dbc.Password, "password", "p", "", "password for connect database") + expCmd.Flags().StringVarP(&dbc.Dbname, "dbname", "D", "", "default database name") + expCmd.Flags().StringVarP(&querySql, "query-sql", "e", "", "select sql") + expCmd.Flags().StringVarP(&fileName, "output", "o", "", "output filename") + + expCmd.Flags().StringVar(&ff.FieldsTerminated, "fields-terminated", defaultFormat.FieldsTerminated, "fields terminated") + expCmd.Flags().StringVar(&ff.FieldsEnclosed, "fields-enclosed", defaultFormat.FieldsEnclosed, "fields enclosed") + expCmd.Flags().StringVar(&ff.FieldsEscaped, "fields-escaped", defaultFormat.FieldsEscaped, "fields escaped") + expCmd.Flags().StringVar(&ff.LinesTerminated, "lines-terminated", defaultFormat.LinesTerminated, "lines terminated") + expCmd.Flags().BoolVar(&ff.EnclosedOptFlag, "enclosed-optionally", defaultFormat.EnclosedOptFlag, "fields enclosed optionally") + + expCmd.Flags().StringVar(&dbc.Params, "params", "timeout=3s", "connection Params") + expCmd.Flags().IntVar(&bufSize, "buf-size", 1024*32, "buf size for write outfile") + expCmd.Flags().IntVar(&concNum, "concurrency", 5, "concurrency number") + expCmd.Flags().BoolVar(¬Merge, "not-merge", false, "merge chunks to one file") + + _ = expCmd.MarkFlagRequired("output") + _ = expCmd.MarkFlagRequired("query-sql") + _ = expCmd.MarkFlagRequired("username") + _ = expCmd.MarkFlagRequired("password") + _ = expCmd.MarkFlagRequired("addr") + _ = expCmd.MarkFlagRequired("dbname") + + expCmd.Flags().SortFlags = false // 禁止flag排序 + + rootCmd.AddCommand(expCmd) +} + +func RunExpE(*cobra.Command, []string) error { + defer func(startAt time.Time) { + log.Printf("execute cmd elapse time(s):%.3f", time.Since(startAt).Seconds()) + }(time.Now()) + + // 处理输入参数 + if err := ff.AdjustAndSetFlags(); err != nil { + return err + } + + // 判断输入fileName必须为文件 + stat, err := os.Stat(fileName) + if err == nil && stat.IsDir() { + return errors.Errorf("input:%s is directory, must include filename", fileName) + } + + // 默认分块并发,获取query range, + queryWithRanges, err := dbinfo.GetChunkQuery(dbc, querySql, concNum) + if err != nil { + // 获取chunk失败就用原始sql单并发执行 + if debug { + log.Printf("%v", err) + } + queryWithRanges = []string{querySql} + } + + // 获取输入sql的range信息 + wg := errgroup.Group{} + pid := os.Getpid() + + outFiles := make([]string, len(queryWithRanges)) + for i, q := range queryWithRanges { + rangeSeq := i + // filename = input prefix+rangeSeq+pid + outFiles[rangeSeq] = fmt.Sprintf("%s.%d.%d", fileName, rangeSeq, pid) + query := q + wg.Go(func() error { + out, err := file.NewMyOutFile(outFiles[rangeSeq], bufSize) + if err != nil { + return err + } + defer out.Close() + + exp := export.Exporter{ChunkSeq: rangeSeq, DbConf: dbc, Query: query, EnclosedOps: ff.EnclosedOptFlag, OutFile: out} + + return exp.ExpData() + }) + } + log.Printf("chunk:%d, outfiles:%v", len(queryWithRanges), outFiles) + + // 等待所有并发任务结束 + if err := wg.Wait(); err != nil { + return errors.WithStack(err) + } + + if notMerge { // 不合并文件,直接退出 + return nil + } + + // 合并临时文件 + if err := file.MergeAndCleanN(outFiles[0], outFiles[1:]); err != nil { + return err + } + + // 重命名目标文件 + if err := os.Rename(outFiles[0], fileName); err != nil { + return errors.WithStack(err) + } + + return nil +} diff --git a/applications/mydata/cmd/hex.go b/applications/mydata/cmd/hex.go new file mode 100644 index 0000000..102f80f --- /dev/null +++ b/applications/mydata/cmd/hex.go @@ -0,0 +1,50 @@ +package cmd + +import ( + "encoding/hex" + "log" + "os" + + "github.com/spf13/cobra" +) + +var hexCmd = &cobra.Command{ + Use: "hex", + Short: "Convert hex string to byte", + RunE: writeE, +} + +var in string +var out string + +func init() { + hexCmd.Flags().StringVarP(&in, "in", "i", "", "hex string") + hexCmd.Flags().StringVarP(&out, "out", "o", "", "out file name") + + hexCmd.MarkFlagRequired("in") + hexCmd.MarkFlagRequired("out") + + rootCmd.AddCommand(hexCmd) +} + +func writeE(cmd *cobra.Command, args []string) error { + log.Printf(">>> %s", in) + + decodeString, err := hex.DecodeString(in) + if err != nil { + return err + } + + f, err := os.Create(out) + if err != nil { + return err + } + defer f.Close() + + _, err = f.Write(decodeString) + if err != nil { + return err + } + + return nil +} diff --git a/applications/mydata/cmd/import.go b/applications/mydata/cmd/import.go new file mode 100644 index 0000000..9c77d36 --- /dev/null +++ b/applications/mydata/cmd/import.go @@ -0,0 +1,74 @@ +package cmd + +import ( + "bytes" + "log" + "os" + + "github.com/pkg/errors" + "github.com/spf13/cobra" + + "mydata/internal/file" +) + +var impCmd = &cobra.Command{ + Use: "import", + Aliases: []string{"imp"}, + Short: "Import data to remote database server", + RunE: RunImpE, +} + +func init() { + impCmd.Flags().StringVarP(&dbc.Addr, "addr", "a", "", "mysql database addr, format: ip:port") + impCmd.Flags().StringVarP(&dbc.Username, "username", "u", "", "username for connect database") + impCmd.Flags().StringVarP(&dbc.Password, "password", "p", "", "password for connect database") + impCmd.Flags().StringVarP(&dbc.Dbname, "dbname", "D", "", "default database name") + impCmd.Flags().StringVarP(&querySql, "query-sql", "e", "", "select sql") + impCmd.Flags().StringVarP(&fileName, "file-name", "o", "", "output filename") + + impCmd.Flags().StringVar(&ff.FieldsTerminated, "fields-terminated", defaultFormat.FieldsTerminated, "fields terminated") + impCmd.Flags().StringVar(&ff.FieldsEnclosed, "fields-enclosed", defaultFormat.FieldsEnclosed, "fields enclosed") + impCmd.Flags().StringVar(&ff.FieldsEscaped, "fields-escaped", defaultFormat.FieldsEscaped, "fields escaped") + impCmd.Flags().StringVar(&ff.LinesTerminated, "lines-terminated", defaultFormat.LinesTerminated, "lines terminated") + impCmd.Flags().BoolVar(&ff.EnclosedOptFlag, "enclosed-optionally", defaultFormat.EnclosedOptFlag, "fields enclosed optionally") + + impCmd.Flags().StringVar(&dbc.Params, "params", "", "connection Params") + impCmd.Flags().IntVar(&bufSize, "buf-size", 1024*32, "buf size for write outfile") + impCmd.Flags().IntVar(&concNum, "concurrency", 5, "concurrency number") + impCmd.Flags().BoolVar(¬Merge, "not-merge", false, "merge chunks to one file") + + _ = impCmd.MarkFlagRequired("file-name") + _ = impCmd.MarkFlagRequired("query-sql") + _ = impCmd.MarkFlagRequired("username") + _ = impCmd.MarkFlagRequired("password") + _ = impCmd.MarkFlagRequired("addr") + _ = impCmd.MarkFlagRequired("dbname") + + impCmd.Flags().SortFlags = false // 禁止flag排序 + + // TODO: import + //rootCmd.AddCommand(impCmd) +} + +func RunImpE(*cobra.Command, []string) error { + log.Printf("filename:%s", fileName) + // 根据行、列分隔符,解析文件 + srcFile, err := os.Open(fileName) + if err != nil { + return errors.WithStack(err) + } + defer srcFile.Close() + + scanner := file.NewScannerDelim(srcFile, bufSize) + for scanner.Scan() { + row := bytes.Split(scanner.Bytes(), file.FieldsTerminated) + log.Printf(">>> row:%v", row) + } + + if scanner.Err() != nil { + return errors.WithStack(scanner.Err()) + } + + // 目标库执行 + return nil +} diff --git a/applications/mydata/cmd/root.go b/applications/mydata/cmd/root.go new file mode 100644 index 0000000..1a38087 --- /dev/null +++ b/applications/mydata/cmd/root.go @@ -0,0 +1,43 @@ +package cmd + +import ( + "fmt" + "log" + "os" + + "github.com/spf13/cobra" +) + +var rootCmd = &cobra.Command{ + Use: "mydata", + Short: "A Fast and Flexible export data from remote mysql server", + Long: `A Fast and Flexible export data from remote mysql server`, + Run: func(cmd *cobra.Command, args []string) { + if len(args) == 0 { + _ = cmd.Help() + } + }, + SilenceUsage: true, +} + +var debug bool + +func init() { + log.SetFlags(log.LstdFlags | log.Lshortfile | log.Lmicroseconds) + log.SetOutput(os.Stdout) + + rootCmd.PersistentFlags().BoolVar(&debug, "debug", false, "print stack log") + + rootCmd.Flags().SortFlags = false // 禁止flag排序 +} + +func Execute() { + rootCmd.SilenceUsage = true + + if err := rootCmd.Execute(); err != nil { + if debug { + fmt.Printf("%+v\n", err) + } + os.Exit(1) + } +} diff --git a/applications/mydata/cmd/version.go b/applications/mydata/cmd/version.go new file mode 100644 index 0000000..f7bf9de --- /dev/null +++ b/applications/mydata/cmd/version.go @@ -0,0 +1,22 @@ +package cmd + +import ( + "fmt" + + "github.com/spf13/cobra" + + "mydata/internal/version" +) + +var versionCmd = &cobra.Command{ + Use: "version", + Short: "Print the version number of tool", + Long: "Print the version number of tool", + Run: func(cmd *cobra.Command, args []string) { + fmt.Println(version.GetRawInfo()) + }, +} + +func init() { + rootCmd.AddCommand(versionCmd) +} diff --git a/applications/mydata/go.mod b/applications/mydata/go.mod new file mode 100644 index 0000000..5fe1bc5 --- /dev/null +++ b/applications/mydata/go.mod @@ -0,0 +1,29 @@ +module mydata + +go 1.20 + +require ( + github.com/djimenez/iconv-go v0.0.0-20160305225143-8960e66bd3da + github.com/go-sql-driver/mysql v1.8.1 + github.com/pingcap/tidb/pkg/parser v0.0.0-20240409135851-ab90c771a215 + github.com/pkg/errors v0.9.1 + github.com/spf13/cobra v1.8.0 + golang.org/x/sync v0.7.0 + golang.org/x/text v0.14.0 +) + +require ( + filippo.io/edwards25519 v1.1.0 // indirect + github.com/cznic/mathutil v0.0.0-20181122101859-297441e03548 // indirect + github.com/inconshreveable/mousetrap v1.1.0 // indirect + github.com/pingcap/errors v0.11.5-0.20240311024730-e056997136bb // indirect + github.com/pingcap/failpoint v0.0.0-20220801062533-2eaa32854a6c // indirect + github.com/pingcap/log v1.1.0 // indirect + github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec // indirect + github.com/spf13/pflag v1.0.5 // indirect + go.uber.org/atomic v1.11.0 // indirect + go.uber.org/multierr v1.11.0 // indirect + go.uber.org/zap v1.27.0 // indirect + golang.org/x/exp v0.0.0-20240409090435-93d18d7e34b8 // indirect + gopkg.in/natefinch/lumberjack.v2 v2.2.1 // indirect +) diff --git a/applications/mydata/go.sum b/applications/mydata/go.sum new file mode 100644 index 0000000..4ed325e --- /dev/null +++ b/applications/mydata/go.sum @@ -0,0 +1,91 @@ +filippo.io/edwards25519 v1.1.0 h1:FNf4tywRC1HmFuKW5xopWpigGjJKiJSV0Cqo0cJWDaA= +filippo.io/edwards25519 v1.1.0/go.mod h1:BxyFTGdWcka3PhytdK4V28tE5sGfRvvvRV7EaN4VDT4= +github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= +github.com/benbjohnson/clock v1.1.0/go.mod h1:J11/hYXuz8f4ySSvYwY0FKfm+ezbsZBKZxNJlLklBHA= +github.com/cpuguy83/go-md2man/v2 v2.0.3/go.mod h1:tgQtvFlXSQOSOSIRvRPT7W67SCa46tRHOmNcaadrF8o= +github.com/cznic/mathutil v0.0.0-20181122101859-297441e03548 h1:iwZdTE0PVqJCos1vaoKsclOGD3ADKpshg3SRtYBbwso= +github.com/cznic/mathutil v0.0.0-20181122101859-297441e03548/go.mod h1:e6NPNENfs9mPDVNRekM7lKScauxd5kXTr1Mfyig6TDM= +github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/djimenez/iconv-go v0.0.0-20160305225143-8960e66bd3da h1:0qwwqQCLOOXPl58ljnq3sTJR7yRuMolM02vjxDh4ZVE= +github.com/djimenez/iconv-go v0.0.0-20160305225143-8960e66bd3da/go.mod h1:ns+zIWBBchgfRdxNgIJWn2x6U95LQchxeqiN5Cgdgts= +github.com/go-sql-driver/mysql v1.8.1 h1:LedoTUt/eveggdHS9qUFC1EFSa8bU2+1pZjSRpvNJ1Y= +github.com/go-sql-driver/mysql v1.8.1/go.mod h1:wEBSXgmK//2ZFJyE+qWnIsVGmvmEKlqwuVSjsCm7DZg= +github.com/inconshreveable/mousetrap v1.1.0 h1:wN+x4NVGpMsO7ErUn/mUI3vEoE6Jt13X2s0bqwp9tc8= +github.com/inconshreveable/mousetrap v1.1.0/go.mod h1:vpF70FUmC8bwa3OWnCshd2FqLfsEA9PFc4w1p2J65bw= +github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo= +github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= +github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= +github.com/pingcap/errors v0.11.0/go.mod h1:Oi8TUi2kEtXXLMJk9l1cGmz20kV3TaQ0usTwv5KuLY8= +github.com/pingcap/errors v0.11.4/go.mod h1:Oi8TUi2kEtXXLMJk9l1cGmz20kV3TaQ0usTwv5KuLY8= +github.com/pingcap/errors v0.11.5-0.20240311024730-e056997136bb h1:3pSi4EDG6hg0orE1ndHkXvX6Qdq2cZn8gAPir8ymKZk= +github.com/pingcap/errors v0.11.5-0.20240311024730-e056997136bb/go.mod h1:X2r9ueLEUZgtx2cIogM0v4Zj5uvvzhuuiu7Pn8HzMPg= +github.com/pingcap/failpoint v0.0.0-20220801062533-2eaa32854a6c h1:CgbKAHto5CQgWM9fSBIvaxsJHuGP0uM74HXtv3MyyGQ= +github.com/pingcap/failpoint v0.0.0-20220801062533-2eaa32854a6c/go.mod h1:4qGtCB0QK0wBzKtFEGDhxXnSnbQApw1gc9siScUl8ew= +github.com/pingcap/log v1.1.0 h1:ELiPxACz7vdo1qAvvaWJg1NrYFoY6gqAh/+Uo6aXdD8= +github.com/pingcap/log v1.1.0/go.mod h1:DWQW5jICDR7UJh4HtxXSM20Churx4CQL0fwL/SoOSA4= +github.com/pingcap/tidb/pkg/parser v0.0.0-20240409135851-ab90c771a215 h1:+JkFpvs9wyvq/iuyRpE0vdrsCvMtE0+KRDQS7ZbvBuU= +github.com/pingcap/tidb/pkg/parser v0.0.0-20240409135851-ab90c771a215/go.mod h1:c/4la2yfv1vBYvtIG8WCDyDinLMDIUC5+zLRHiafY+Y= +github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= +github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= +github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec h1:W09IVJc94icq4NjY3clb7Lk8O1qJ8BdBEF8z0ibU0rE= +github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec/go.mod h1:qqbHyh8v60DhA7CoWK5oRCqLrMHRGoxYCSS9EjAz6Eo= +github.com/russross/blackfriday/v2 v2.1.0/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM= +github.com/sergi/go-diff v1.1.0/go.mod h1:STckp+ISIX8hZLjrqAeVduY0gWCT9IjLuqbuNXdaHfM= +github.com/spf13/cobra v1.8.0 h1:7aJaZx1B85qltLMc546zn58BxxfZdR/W22ej9CFoEf0= +github.com/spf13/cobra v1.8.0/go.mod h1:WXLWApfZ71AjXPya3WOlMsY9yMs7YeiHhFVlvLyhcho= +github.com/spf13/pflag v1.0.5 h1:iy+VFUOCP1a+8yFto/drg2CJ5u0yRoB7fZw3DKv/JXA= +github.com/spf13/pflag v1.0.5/go.mod h1:McXfInJRrz4CZXVZOBLb0bTZqETkiAhM9Iw0y3An2Bg= +github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= +github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4= +github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= +github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk= +go.uber.org/atomic v1.6.0/go.mod h1:sABNBOSYdrvTF6hTgEIbc7YasKWGhgEQZyfxyTvoXHQ= +go.uber.org/atomic v1.7.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc= +go.uber.org/atomic v1.9.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc= +go.uber.org/atomic v1.11.0 h1:ZvwS0R+56ePWxUNi+Atn9dWONBPp/AUETXlHW0DxSjE= +go.uber.org/atomic v1.11.0/go.mod h1:LUxbIzbOniOlMKjJjyPfpl4v+PKK2cNJn91OQbhoJI0= +go.uber.org/goleak v1.1.10/go.mod h1:8a7PlsEVH3e/a/GLqe5IIrQx6GzcnRmZEufDUTk4A7A= +go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto= +go.uber.org/multierr v1.6.0/go.mod h1:cdWPpRnG4AhwMwsgIHip0KRBQjJy5kYEpYjJxpXp9iU= +go.uber.org/multierr v1.7.0/go.mod h1:7EAYxJLBy9rStEaz58O2t4Uvip6FSURkq8/ppBp95ak= +go.uber.org/multierr v1.11.0 h1:blXXJkSxSSfBVBlC76pxqeO+LN3aDfLQo+309xJstO0= +go.uber.org/multierr v1.11.0/go.mod h1:20+QtiLqy0Nd6FdQB9TLXag12DsQkrbs3htMFfDN80Y= +go.uber.org/zap v1.19.0/go.mod h1:xg/QME4nWcxGxrpdeYfq7UvYrLh66cuVKdrbD1XF/NI= +go.uber.org/zap v1.27.0 h1:aJMhYGrd5QSmlpLMr2MftRKl7t8J8PTZPA732ud/XR8= +go.uber.org/zap v1.27.0/go.mod h1:GB2qFLM7cTU87MWRP2mPIjqfIDnGu+VIO4V/SdhGo2E= +golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= +golang.org/x/exp v0.0.0-20240409090435-93d18d7e34b8 h1:ESSUROHIBHg7USnszlcdmjBEwdMj9VUvU+OPk4yl2mc= +golang.org/x/exp v0.0.0-20240409090435-93d18d7e34b8/go.mod h1:/lliqkxwWAhPjf5oSOIJup2XcqJaw8RGS6k3TGEc7GI= +golang.org/x/lint v0.0.0-20190930215403-16217165b5de/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc= +golang.org/x/net v0.0.0-20190311183353-d8887717615a/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= +golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= +golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.7.0 h1:YsImfSBoP9QPYL0xyKJPq0gcaJdG3rInoqxTWbfQu9M= +golang.org/x/sync v0.7.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk= +golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= +golang.org/x/text v0.14.0 h1:ScX5w1eTa3QqT8oi6+ziP7dTV1S2+ALU0bI+0zXKWiQ= +golang.org/x/text v0.14.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU= +golang.org/x/tools v0.0.0-20190311212946-11955173bddd/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs= +golang.org/x/tools v0.0.0-20191029041327-9cc4af7d6b2c/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= +golang.org/x/tools v0.0.0-20191108193012-7d206e10da11/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= +golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/natefinch/lumberjack.v2 v2.0.0/go.mod h1:l0ndWWf7gzL7RNwBG7wST/UCcT4T24xpD6X8LsfU/+k= +gopkg.in/natefinch/lumberjack.v2 v2.2.1 h1:bBRl1b0OH9s/DuPhuXpNl+VtCaJXFZ5/uEFST95x9zc= +gopkg.in/natefinch/lumberjack.v2 v2.2.1/go.mod h1:YD8tP3GAjkrDg1eZH7EGmyESg/lsYskCTPBJVb9jqSc= +gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= +gopkg.in/yaml.v2 v2.2.4/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= +gopkg.in/yaml.v2 v2.2.8/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= +gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= +gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= +gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= +gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/applications/mydata/internal/crypto/crypto.go b/applications/mydata/internal/crypto/crypto.go new file mode 100644 index 0000000..6507d3a --- /dev/null +++ b/applications/mydata/internal/crypto/crypto.go @@ -0,0 +1,94 @@ +package crypto + +import ( + "bytes" + "crypto/aes" + "crypto/cipher" + "crypto/rand" + "encoding/base64" + "io" + "strings" + + "github.com/pkg/errors" +) + +var key = "CAB664DCC2FE67712919C274B11DD4D4" + +func addBase64Padding(value string) string { + m := len(value) % 4 + + if m != 0 { + value += strings.Repeat("=", 4-m) + } + + return value +} + +func removeBase64Padding(value string) string { + return strings.Replace(value, "=", "", -1) +} + +func pad(src []byte) []byte { + padding := aes.BlockSize - len(src)%aes.BlockSize + padText := bytes.Repeat([]byte{byte(padding)}, padding) + return append(src, padText...) +} + +func unpad(src []byte) ([]byte, error) { + length := len(src) + uppadding := int(src[length-1]) + + if uppadding > length { + return nil, errors.New("unpad error. This could happen when incorrect encryption key is used") + } + + return src[:(length - uppadding)], nil +} + +func Encrypt(text string) (string, error) { + block, err := aes.NewCipher([]byte(key)) + if err != nil { + return "", errors.WithStack(err) + } + + msg := pad([]byte(text)) + ciphertext := make([]byte, aes.BlockSize+len(msg)) + iv := ciphertext[:aes.BlockSize] + if _, err := io.ReadFull(rand.Reader, iv); err != nil { + return "", errors.WithStack(err) + } + + cfb := cipher.NewCFBEncrypter(block, iv) + cfb.XORKeyStream(ciphertext[aes.BlockSize:], []byte(msg)) + finalMsg := removeBase64Padding(base64.URLEncoding.EncodeToString(ciphertext)) + return finalMsg, nil +} + +func Decrypt(text string) (string, error) { + block, err := aes.NewCipher([]byte(key)) + if err != nil { + return "", errors.WithStack(err) + } + + decodedMsg, err := base64.URLEncoding.DecodeString(addBase64Padding(text)) + if err != nil { + return "", errors.WithStack(err) + } + + if len(decodedMsg)%aes.BlockSize != 0 { + return "", errors.New("blockSize must be multipe of decoded message length") + } + + iv := decodedMsg[:aes.BlockSize] + msg := decodedMsg[aes.BlockSize:] + + cfb := cipher.NewCFBDecrypter(block, iv) + cfb.XORKeyStream(msg, msg) + + unpadMsg, err := unpad(msg) + if err != nil { + return "", err + } + + return string(unpadMsg), nil +} diff --git a/applications/mydata/internal/crypto/crypto_test.go b/applications/mydata/internal/crypto/crypto_test.go new file mode 100644 index 0000000..2f12c9a --- /dev/null +++ b/applications/mydata/internal/crypto/crypto_test.go @@ -0,0 +1,47 @@ +package crypto + +import ( + "testing" +) + +func TestEncrypt(t *testing.T) { + type args struct { + input string + } + + cases := []struct { + name string + input string + want string + wantErr bool + }{ + {name: "case1", input: "[!@#$%^&*()]"}, + {name: "case1", input: "`1qazZSE$<>?"}, + {name: "case1", input: "@WSXXDR%"}, + {name: "case1", input: "#EDCCFT^"}, + {name: "case1", input: "1234567890"}, + {name: "case1", input: "abcdefghigklmnopqrstuvwxyz"}, + } + + for _, tt := range cases { + t.Run(tt.name, func(t *testing.T) { + decPass, err := Encrypt(tt.input) + if err != nil { + t.Errorf("err:%+v", err) + } + + tt.want, err = Decrypt(decPass) + if err != nil { + t.Errorf("err:%v", err) + } + + if tt.input != tt.want { + t.Errorf("input:%s, want:%s, decPass:%s, err:%v", tt.input, tt.want, decPass, err) + } else { + t.Logf("decPass:%s", decPass) + t.Logf("input:%s", tt.input) + t.Logf("want :%s", tt.want) + } + }) + } +} diff --git a/applications/mydata/internal/dbinfo/database.go b/applications/mydata/internal/dbinfo/database.go new file mode 100644 index 0000000..68632a8 --- /dev/null +++ b/applications/mydata/internal/dbinfo/database.go @@ -0,0 +1,225 @@ +package dbinfo + +import ( + "database/sql" + "fmt" + "log" + "regexp" + "strings" + + "github.com/go-sql-driver/mysql" + "github.com/pkg/errors" +) + +type DbConf struct { + Addr string + Username string + Password string + Dbname string + Params string +} + +var reg = regexp.MustCompile("[!-~]") + +func (c DbConf) String() string { + return fmt.Sprintf("{Addr:%s, Username:%s, Passwd:%s, DbName:%s, Params:%s}", + c.Addr, c.Username, reg.ReplaceAllString(c.Password, "*"), c.Dbname, c.Params) +} + +func NewDB(conf DbConf) (*sql.DB, error) { + //dsn := "root:root@tcp(192.168.56.128:3306)/testdb?param=value" + if strings.Count(conf.Addr, ":") != 1 { + return nil, errors.Errorf("database addr invalid, format ip:port, but got [%s]", conf.Addr) + } + + dbc := mysql.NewConfig() + dbc.Addr = conf.Addr + dbc.User = conf.Username + dbc.Passwd = conf.Password + dbc.DBName = conf.Dbname + + // 处理输入连接参数 + if err := parseDSNParams(dbc, conf.Params); err != nil { + return nil, err + } + + connector, err := mysql.NewConnector(dbc) + if err != nil { + return nil, errors.Wrapf(err, "create connector faild:[%s:%s@tcp(%s)/%s]", + conf.Username, reg.ReplaceAllString(conf.Password, "*"), conf.Addr, conf.Dbname) + } + + conn := sql.OpenDB(connector) + if err := conn.Ping(); err != nil { + return nil, errors.Wrapf(err, "ping db faild:[%s:%s@tcp(%s)/%s]", conf.Username, + reg.ReplaceAllString(conf.Password, "*"), conf.Addr, conf.Dbname) + } + + return conn, nil +} + +func MakeScanBuf(rows *sql.Rows) ([]interface{}, []sql.RawBytes, error) { + cols, err := rows.Columns() + if err != nil { + return nil, nil, errors.WithStack(err) + } + + colSize := len(cols) + scanArgs := make([]interface{}, colSize) + scanValues := make([]sql.RawBytes, colSize) + + for i := range scanValues { + scanArgs[i] = &scanValues[i] + } + + return scanArgs, scanValues, nil +} + +func MakeColFlags(rows *sql.Rows, enclosedFlag bool) ([]bool, []bool, error) { + columnTypes, err := rows.ColumnTypes() + if err != nil { + return nil, nil, errors.WithStack(err) + } + size := len(columnTypes) + + isCharFlag := make([]bool, size) + isEnclosed := make([]bool, size) + for i, ct := range columnTypes { + // 字符类型 + isCharFlag[i] = isCharFunc(ct.DatabaseTypeName()) + // 当opt=true时,只有字符类型添加enclosed + // 当opt=false时,所有字符类型都添加enclosed + + if enclosedFlag { // 设置opt的话,只有字符类型被包裹 + if isCharFlag[i] { + isEnclosed[i] = true + } + } else { // 默认不设置opt,所有字段都被包裹 + isEnclosed[i] = true + } + } + + return isCharFlag, isEnclosed, nil +} + +func isCharFunc(s string) bool { + return s == "CHAR" || s == "VARCHAR" || s == "TIMESTAMP" || s == "TEXT" || s == "LONGTEXT" || s == "TINYTEXT" || + s == "MEDIUMTEXT" || s == "LONGBLOB" || s == "BLOB" || s == "TINYBLOB" +} + +func GetKeyName(db *sql.DB, schema, table string) (string, error) { + keys := make([]string, 0) + + // TODO:适配ob的weak模式查询 + sqlKeyName := fmt.Sprintf("select /*+READ_CONSISTENCY(WEAK) */ COLUMN_NAME from information_schema.columns where TABLE_SCHEMA='%s' and TABLE_NAME='%s' and COLUMN_KEY ='PRI' order by ORDINAL_POSITION asc", + schema, table) + log.Printf("get key name:[%s]", sqlKeyName) + rows, err := db.Query(sqlKeyName) + if err != nil { + return "", errors.WithStack(err) + } + defer func() { + _ = rows.Close() + }() + + for rows.Next() { + var s string + if err := rows.Scan(&s); err != nil { + return "", errors.WithStack(err) + } + keys = append(keys, s) + } + + if err := rows.Err(); err != nil { + return "", errors.WithStack(err) + } + + if len(keys) == 0 { + return "", errors.Errorf("not found keys") + } + + return strings.Join(keys, ","), nil +} + +func GetKeyValueByStep(db *sql.DB, origWhere string, schema, table string, keyNames string, stepSize int64) ([]string, error) { + if stepSize <= 0 { + return nil, errors.New("stepSize is zero") + } + + // TODO:根据输入条件,计算主键的范围 + var sqlKeyValue string + if len(origWhere) > 0 { + sqlKeyValue = fmt.Sprintf("select /*+READ_CONSISTENCY(WEAK) */ %s from %s.%s where %s order by %s", + keyNames, schema, table, origWhere, keyNames) + } else { + sqlKeyValue = fmt.Sprintf("select /*+READ_CONSISTENCY(WEAK) */ %s from %s.%s order by %s", + keyNames, schema, table, keyNames) + } + log.Printf("get key value by step:[%s]", sqlKeyValue) + + rows, err := db.Query(sqlKeyValue) + if err != nil { + return nil, errors.WithStack(err) + } + defer rows.Close() + + colTypes, err := rows.ColumnTypes() + if err != nil { + return nil, errors.WithStack(err) + } + + isChars := make([]bool, 0) + for _, v := range colTypes { + isChars = append(isChars, isCharFunc(v.DatabaseTypeName())) + } + + scanArgs, scanVals, err := MakeScanBuf(rows) + if err != nil { + return nil, err + } + + var count int64 = 0 + keyVals := make([]string, 0) + for rows.Next() { + if err := rows.Scan(scanArgs...); err != nil { + return nil, errors.WithStack(err) + } + + if count%stepSize == 0 { + keyVals = append(keyVals, getKeyValueCond(scanVals, isChars)) + } + + count++ + } + + if count%stepSize != 0 { // 处理尾部数据 + keyVals = append(keyVals, getKeyValueCond(scanVals, isChars)) + } + + if err := rows.Err(); err != nil { + return nil, errors.WithStack(err) + } + + return keyVals, nil +} + +func getKeyValueCond(scanVals []sql.RawBytes, isChars []bool) string { + var buf strings.Builder + buf.WriteString("(") + for i, v := range scanVals { + if i > 0 { + buf.WriteString(",") + } + if isChars[i] { + buf.WriteString("'") + buf.Write(v) + buf.WriteString("'") + } else { + buf.Write(v) + } + } + + buf.WriteString(")") + + return buf.String() +} diff --git a/applications/mydata/internal/dbinfo/keyrange.go b/applications/mydata/internal/dbinfo/keyrange.go new file mode 100644 index 0000000..f90ed39 --- /dev/null +++ b/applications/mydata/internal/dbinfo/keyrange.go @@ -0,0 +1,112 @@ +package dbinfo + +import ( + "fmt" + "log" + + "github.com/pkg/errors" + "mydata/internal/parser" +) + +type RangeVals struct { + Start string + End string +} + +func GetKeyRange(keyVals []string) ([]RangeVals, error) { + var lenKeyVals = len(keyVals) + if lenKeyVals < 2 { + return nil, errors.New("key values only small") + } + + tmpRange := make([]RangeVals, lenKeyVals-1) + var start = 0 + var end = 1 + for end < lenKeyVals { + tmpRange[start].Start = keyVals[start] + tmpRange[start].End = keyVals[end] + start++ + end++ + } + + return tmpRange, nil +} + +func GetChunkQuery(dbc DbConf, queryOrig string, concNum int) ([]string, error) { + if concNum <= 1 { + log.Printf("concurrency num set too small:%d", concNum) + return []string{queryOrig}, nil + } + + conn, err := NewDB(dbc) + if err != nil { + return nil, err + } + defer func() { + _ = conn.Close() + }() + + stmt, err := parser.NewSelectStmt(queryOrig) + if err != nil { + return nil, err + } + + tableOrig, isOneTable := parser.ExtractTable(stmt) + if isOneTable == false { + return nil, err + } + + if isSimpleSql := parser.JudgeSimpleSQL(stmt); !isSimpleSql { + return nil, errors.Errorf("not one simple query sql") + } + + schema, table, err := parser.GetTabAndSchema(tableOrig, dbc.Dbname) + if err != nil { + return nil, err + } + + keyNameStr, err := GetKeyName(conn, schema, table) + if err != nil { + return nil, err + } + + // 根据explain和并发读计算step大小 + var step int64 + if IsOB(conn) { + step, err = GetStep4OB(conn, queryOrig, concNum) + if err != nil { + return nil, err + } + } else { + step, err = GetStep4MySQL(conn, queryOrig, concNum) + if err != nil { + return nil, err + } + } + + keyValues, err := GetKeyValueByStep(conn, parser.GetOrigWhere(stmt), schema, table, keyNameStr, step) + if err != nil { + return nil, err + } + + keyRanges, err := GetKeyRange(keyValues) + if err != nil { + return nil, err + } + + rewriterSql := make([]string, 0) + maxChunk := len(keyRanges) + for i, vv := range keyRanges { + subWhere := "" + if i == maxChunk-1 { + subWhere = fmt.Sprintf("(%s)>=%s", keyNameStr, vv.Start) + } else { + subWhere = fmt.Sprintf("(%s)>=%s and (%s)<%s", keyNameStr, vv.Start, keyNameStr, vv.End) + } + + newSql := parser.AddSubWhereSql(stmt, subWhere) + rewriterSql = append(rewriterSql, newSql) + } + + return rewriterSql, nil +} diff --git a/applications/mydata/internal/dbinfo/keyrange_test.go b/applications/mydata/internal/dbinfo/keyrange_test.go new file mode 100644 index 0000000..fcf4894 --- /dev/null +++ b/applications/mydata/internal/dbinfo/keyrange_test.go @@ -0,0 +1,19 @@ +package dbinfo + +import ( + "log" + "testing" +) + +func TestGetChunkQuery(t *testing.T) { + dbc := DbConf{Addr: "192.168.111.3:3306", Username: "root", Password: "root", Dbname: "testdb"} + for _, v := range []int{2, 3, 4, 5, 10} { + query, err := GetChunkQuery(dbc, "select * from t1", v) + if err != nil { + t.Fatal(err) + } + for ii, vv := range query { + log.Printf(">>>v:%d, %d, sql:%s", v, ii, vv) + } + } +} diff --git a/applications/mydata/internal/dbinfo/mysql.go b/applications/mydata/internal/dbinfo/mysql.go new file mode 100644 index 0000000..71e7ca0 --- /dev/null +++ b/applications/mydata/internal/dbinfo/mysql.go @@ -0,0 +1,48 @@ +package dbinfo + +import ( + "database/sql" + "log" + "strconv" + + "github.com/pkg/errors" +) + +func GetStep4MySQL(db *sql.DB, query string, conc int) (int64, error) { + explain := "explain " + query + + log.Printf("get step by conc:[%s]", explain) + rows, err := db.Query(explain) + if err != nil { + return 0, errors.WithStack(err) + } + defer rows.Close() + + scanArgs, scanVals, err := MakeScanBuf(rows) + if err != nil { + return 0, err + } + + if len(scanArgs) != 12 { + return 0, errors.New("not mysql v8.x") + } + + for rows.Next() { + if err := rows.Scan(scanArgs...); err != nil { + return 0, errors.WithStack(err) + } + // only scan first row + break + } + + // for mysql 8.x + parseInt, err := strconv.ParseInt(string(scanVals[9]), 10, 64) + if err != nil { + return 0, errors.WithStack(err) + } + + step := parseInt/int64(conc) + 1 + log.Printf("explain want to result size, rows:%s=>int:%d, step:%d", scanVals[9], parseInt, step) + + return step, nil +} diff --git a/applications/mydata/internal/dbinfo/ob.go b/applications/mydata/internal/dbinfo/ob.go new file mode 100644 index 0000000..9d64483 --- /dev/null +++ b/applications/mydata/internal/dbinfo/ob.go @@ -0,0 +1,71 @@ +package dbinfo + +import ( + "database/sql" + "encoding/json" + "log" + "strings" + + "github.com/pkg/errors" +) + +func IsOB(conn *sql.DB) bool { + rows, err := conn.Query("show variables like 'version'") + if err != nil { + log.Printf("err:%v", err) + return false + } + defer rows.Close() + + scanArgs, scanVals, err := MakeScanBuf(rows) + if err != nil { + return false + } + + for rows.Next() { + if err := rows.Scan(scanArgs...); err != nil { + log.Printf("err:%v", err) + return false + } + // only scan first row + break + } + + return strings.Contains(string(scanVals[1]), "OceanBase") +} + +func GetStep4OB(db *sql.DB, query string, conc int) (int64, error) { + explain := "explain format=json " + query + + log.Printf("get step by conc:[%s]", explain) + rows, err := db.Query(explain) + if err != nil { + return 0, errors.WithStack(err) + } + defer rows.Close() + + scanArgs, scanVals, err := MakeScanBuf(rows) + if err != nil { + return 0, err + } + + for rows.Next() { + if err := rows.Scan(scanArgs...); err != nil { + return 0, errors.WithStack(err) + } + // only scan first row + break + } + + var obExplainRst struct { + Rows int64 `json:"EST.ROWS"` + } + if err := json.Unmarshal(scanVals[0], &obExplainRst); err != nil { + return 0, errors.WithStack(err) + } + + step := obExplainRst.Rows/int64(conc) + 1 + log.Printf("explain want to result size, rows:%d, step:%d", obExplainRst.Rows, step) + + return step, nil +} diff --git a/applications/mydata/internal/dbinfo/params.go b/applications/mydata/internal/dbinfo/params.go new file mode 100644 index 0000000..1645532 --- /dev/null +++ b/applications/mydata/internal/dbinfo/params.go @@ -0,0 +1,237 @@ +package dbinfo + +import ( + "errors" + "fmt" + "net/url" + "strconv" + "strings" + "time" + + "github.com/go-sql-driver/mysql" +) + +// parseDSNParams parses the DSN "query string" +// Values must be url.QueryEscape'ed +func parseDSNParams(cfg *mysql.Config, params string) (err error) { + for _, v := range strings.Split(params, "&") { + key, value, found := strings.Cut(v, "=") + if !found { + continue + } + + // cfg params + switch key { + // Disable INFILE allowlist / enable all files + case "allowAllFiles": + var isBool bool + cfg.AllowAllFiles, isBool = readBool(value) + if !isBool { + return errors.New("invalid bool value: " + value) + } + + // Use cleartext authentication mode (MySQL 5.5.10+) + case "allowCleartextPasswords": + var isBool bool + cfg.AllowCleartextPasswords, isBool = readBool(value) + if !isBool { + return errors.New("invalid bool value: " + value) + } + + // Allow fallback to unencrypted connection if server does not support TLS + case "allowFallbackToPlaintext": + var isBool bool + cfg.AllowFallbackToPlaintext, isBool = readBool(value) + if !isBool { + return errors.New("invalid bool value: " + value) + } + + // Use native password authentication + case "allowNativePasswords": + var isBool bool + cfg.AllowNativePasswords, isBool = readBool(value) + if !isBool { + return errors.New("invalid bool value: " + value) + } + + // Use old authentication mode (pre MySQL 4.1) + case "allowOldPasswords": + var isBool bool + cfg.AllowOldPasswords, isBool = readBool(value) + if !isBool { + return errors.New("invalid bool value: " + value) + } + + // Check connections for Liveness before using them + case "checkConnLiveness": + var isBool bool + cfg.CheckConnLiveness, isBool = readBool(value) + if !isBool { + return errors.New("invalid bool value: " + value) + } + + // Switch "rowsAffected" mode + case "clientFoundRows": + var isBool bool + cfg.ClientFoundRows, isBool = readBool(value) + if !isBool { + return errors.New("invalid bool value: " + value) + } + + // Collation + case "collation": + cfg.Collation = value + + case "columnsWithAlias": + var isBool bool + cfg.ColumnsWithAlias, isBool = readBool(value) + if !isBool { + return errors.New("invalid bool value: " + value) + } + + // Compression + case "compress": + return errors.New("compression not implemented yet") + + // Enable client side placeholder substitution + case "interpolateParams": + var isBool bool + cfg.InterpolateParams, isBool = readBool(value) + if !isBool { + return errors.New("invalid bool value: " + value) + } + + // Time Location + case "loc": + if value, err = url.QueryUnescape(value); err != nil { + return + } + cfg.Loc, err = time.LoadLocation(value) + if err != nil { + return + } + + // multiple statements in one query + case "multiStatements": + var isBool bool + cfg.MultiStatements, isBool = readBool(value) + if !isBool { + return errors.New("invalid bool value: " + value) + } + + // time.Time parsing + case "parseTime": + var isBool bool + cfg.ParseTime, isBool = readBool(value) + if !isBool { + return errors.New("invalid bool value: " + value) + } + + // time.Time truncation + //case "timeTruncate": + // cfg.timeTruncate, err = time.ParseDuration(value) + // if err != nil { + // return fmt.Errorf("invalid timeTruncate value: %v, error: %w", value, err) + // } + + // I/O read Timeout + case "readTimeout": + cfg.ReadTimeout, err = time.ParseDuration(value) + if err != nil { + return + } + + // Reject read-only connections + case "rejectReadOnly": + var isBool bool + cfg.RejectReadOnly, isBool = readBool(value) + if !isBool { + return errors.New("invalid bool value: " + value) + } + + // Server public key + case "serverPubKey": + name, err := url.QueryUnescape(value) + if err != nil { + return fmt.Errorf("invalid value for server pub key name: %v", err) + } + cfg.ServerPubKey = name + + // Strict mode + case "strict": + panic("strict mode has been removed. See https://github.com/go-sql-driver/mysql/wiki/strict-mode") + + // Dial Timeout + case "timeout": + cfg.Timeout, err = time.ParseDuration(value) + if err != nil { + return + } + + // TLS-Encryption + case "tls": + boolValue, isBool := readBool(value) + if isBool { + if boolValue { + cfg.TLSConfig = "true" + } else { + cfg.TLSConfig = "false" + } + } else if vl := strings.ToLower(value); vl == "skip-verify" || vl == "preferred" { + cfg.TLSConfig = vl + } else { + name, err := url.QueryUnescape(value) + if err != nil { + return fmt.Errorf("invalid value for TLS config name: %v", err) + } + cfg.TLSConfig = name + } + + // I/O write Timeout + case "writeTimeout": + cfg.WriteTimeout, err = time.ParseDuration(value) + if err != nil { + return + } + case "maxAllowedPacket": + cfg.MaxAllowedPacket, err = strconv.Atoi(value) + if err != nil { + return + } + + // Connection attributes + case "connectionAttributes": + connectionAttributes, err := url.QueryUnescape(value) + if err != nil { + return fmt.Errorf("invalid connectionAttributes value: %v", err) + } + cfg.ConnectionAttributes = connectionAttributes + + default: + // lazy init + if cfg.Params == nil { + cfg.Params = make(map[string]string) + } + + if cfg.Params[key], err = url.QueryUnescape(value); err != nil { + return + } + } + } + + return +} + +// Returns the bool value of the input. +// The 2nd return value indicates if the input was a valid bool value +func readBool(input string) (value bool, valid bool) { + switch input { + case "1", "true", "TRUE", "True": + return true, true + case "0", "false", "FALSE", "False": + return false, true + } + + // Not a valid bool value + return +} diff --git a/applications/mydata/internal/file/format.go b/applications/mydata/internal/file/format.go new file mode 100644 index 0000000..24e60cd --- /dev/null +++ b/applications/mydata/internal/file/format.go @@ -0,0 +1,73 @@ +package file + +import ( + "encoding/hex" + "fmt" + "strings" + + "github.com/pkg/errors" +) + +type FileFormat struct { + FieldsTerminated string + FieldsEnclosed string + EnclosedOptFlag bool + FieldsEscaped string + LinesTerminated string +} + +// var FieldsTerminated = 0x2c // 列分隔符:, 允许多字节 +// var FieldsEnclosed = 0x27 // 包裹符:' 单字节要求 +// var FieldsEnclosed = 0x22 // 包裹符:" 单字节要求 +// var FieldsEscaped = 0x5c // 转义符:\ 单字节要求 +// var LinesTerminated = 0x0a // 行分隔符:\n 允许多字节 +var ( + FieldsTerminated []byte // 多字节 + LinesTerminated []byte // 多字节 + FieldsEscaped byte // 单字节 + FieldsEnclosed byte // 单字节 +) + +func (f FileFormat) String() string { + return fmt.Sprintf("{FieldsTerminated:%#x LinesTerminated:%#x FieldsEscaped:%v FieldsEnclosed:%v EnclosedOptFlag:%v}", + string(FieldsTerminated), string(LinesTerminated), string(FieldsEscaped), string(FieldsEnclosed), f.EnclosedOptFlag) +} + +func (f *FileFormat) AdjustAndSetFlags() error { + if strings.HasPrefix(f.FieldsTerminated, "0x") && len(f.FieldsTerminated) > 2 { + decodeString, err := hex.DecodeString(f.FieldsTerminated[2:]) + if err != nil { + return errors.Wrapf(err, "%v", f.FieldsTerminated) + } + FieldsTerminated = decodeString + } else { + FieldsTerminated = []byte(f.FieldsTerminated) + } + if len(FieldsTerminated) == 0 { + return errors.New("FIELDS_TERMINATED is null") + } + + if strings.HasPrefix(f.LinesTerminated, "0x") && len(f.LinesTerminated) > 2 { + decodeString, err := hex.DecodeString(f.LinesTerminated[2:]) + if err != nil { + return errors.Wrapf(err, "%v", f.LinesTerminated) + } + LinesTerminated = decodeString + } else { + LinesTerminated = []byte(f.LinesTerminated) + } + + if len(LinesTerminated) == 0 { + return errors.New("LINES_TERMINATED is null") + } + + if len(f.FieldsEscaped) > 0 { + FieldsEscaped = []byte(f.FieldsEscaped)[0] + } + + if len(f.FieldsEnclosed) > 0 { + FieldsEnclosed = []byte(f.FieldsEnclosed)[0] + } + + return nil +} diff --git a/applications/mydata/internal/file/infile.go b/applications/mydata/internal/file/infile.go new file mode 100644 index 0000000..be016bd --- /dev/null +++ b/applications/mydata/internal/file/infile.go @@ -0,0 +1,59 @@ +package file + +import ( + "bufio" + "bytes" + "io" + "os" + + "github.com/pkg/errors" +) + +type MyInfile struct { + f *os.File + scanner *bufio.Scanner + Err error +} + +func NewMyInFile(fileName string, bufSize int) (*MyInfile, error) { + inFile, err := os.Open(fileName) + if err != nil { + return nil, errors.WithStack(err) + } + + return &MyInfile{ + f: inFile, + scanner: NewScannerDelim(inFile, bufSize), + }, nil +} + +var LineDelim []byte + +func NewScannerDelim(r io.Reader, bufSize int) *bufio.Scanner { + scanner := bufio.NewScanner(r) + LineDelim = LinesTerminated // 设置行分割符 + scanner.Split(ScanLines) // 设置行分割符函数 + + return scanner +} + +func ScanLines(data []byte, atEOF bool) (advance int, token []byte, err error) { + if atEOF && len(data) == 0 { + return 0, nil, nil + } + if i := bytes.Index(data, LineDelim); i >= 0 { + return i + len(LineDelim), dropCR(data[0:i]), nil + } + if atEOF { + return len(data), dropCR(data), nil + } + + return 0, nil, nil +} + +func dropCR(data []byte) []byte { + if len(data) > 0 && data[len(data)-1] == '\r' { + return data[0 : len(data)-1] + } + return data +} diff --git a/applications/mydata/internal/file/merge.go b/applications/mydata/internal/file/merge.go new file mode 100644 index 0000000..77c8e72 --- /dev/null +++ b/applications/mydata/internal/file/merge.go @@ -0,0 +1,44 @@ +package file + +import ( + "io" + "log" + "os" + + "github.com/pkg/errors" +) + +func MergeAndCleanN(dstName string, srcNames []string) error { + buf := make([]byte, 64*1024) + dst, err := os.OpenFile(dstName, os.O_CREATE|os.O_WRONLY|os.O_APPEND, os.ModePerm) + if err != nil { + return errors.WithStack(err) + } + defer dst.Close() + + for _, srcName := range srcNames { + if err := func() error { + src, err := os.OpenFile(srcName, os.O_RDONLY, os.ModePerm) + if err != nil { + return errors.WithStack(err) + } + defer src.Close() + + written, err := io.CopyBuffer(dst, src, buf) + if err != nil { + return errors.WithStack(err) + } + log.Printf("merge file:%s, byte size:%d", srcName, written) + + return nil + }(); err != nil { + return err + } + + if err := os.Remove(srcName); err != nil { + return errors.WithStack(err) + } + } + + return nil +} diff --git a/applications/mydata/internal/file/outfile.go b/applications/mydata/internal/file/outfile.go new file mode 100644 index 0000000..abbabd9 --- /dev/null +++ b/applications/mydata/internal/file/outfile.go @@ -0,0 +1,50 @@ +package file + +import ( + "bufio" + "os" + + "github.com/pkg/errors" +) + +type MyOutfile struct { + f *os.File + w *bufio.Writer + Err error +} + +func NewMyOutFile(fileName string, bufSize int) (*MyOutfile, error) { + outFile, err := os.Create(fileName) + if err != nil { + return nil, errors.Wrapf(err, "file:%s", fileName) + } + + return &MyOutfile{ + f: outFile, + w: bufio.NewWriterSize(outFile, bufSize), + }, nil +} + +func (my *MyOutfile) WriteSingleByte(c byte) { + if my.Err == nil { + my.Err = my.w.WriteByte(c) + } +} + +func (my *MyOutfile) Write(p []byte) { + if my.Err == nil { + if n, err1 := my.w.Write(p); err1 != nil || n != len(p) { + my.Err = errors.Wrapf(err1, "wrote %d, want %d", n, len(p)) + } + } +} + +func (my *MyOutfile) Close() { + if my.Err == nil { + my.Err = my.w.Flush() + } + + if my.Err == nil { + my.Err = my.f.Close() + } +} diff --git a/applications/mydata/internal/parser/parser.go b/applications/mydata/internal/parser/parser.go new file mode 100644 index 0000000..7a52907 --- /dev/null +++ b/applications/mydata/internal/parser/parser.go @@ -0,0 +1,199 @@ +package parser + +import ( + "fmt" + "log" + "strings" + + "github.com/pkg/errors" + + "github.com/pingcap/tidb/pkg/parser" + "github.com/pingcap/tidb/pkg/parser/ast" + "github.com/pingcap/tidb/pkg/parser/format" + _ "github.com/pingcap/tidb/pkg/parser/test_driver" +) + +func init() { + log.SetFlags(log.LstdFlags) +} + +type tabX struct { + tabNames []string +} + +func (v *tabX) Enter(in ast.Node) (ast.Node, bool) { + if name, ok := in.(*ast.TableName); ok { + if len(name.Schema.O) > 0 { + v.tabNames = append(v.tabNames, fmt.Sprintf("%s.%s", name.Schema.O, name.Name.O)) + } else { + v.tabNames = append(v.tabNames, name.Name.O) + } + + } + return in, false +} + +func (v *tabX) Leave(in ast.Node) (ast.Node, bool) { + return in, true +} + +func ExtractTable(rootNode *ast.SelectStmt) (string, bool) { + v := &tabX{} + (*rootNode).Accept(v) + + if len(v.tabNames) != 1 { + return "", false + } + + return v.tabNames[0], true +} + +// NewSelectStmt 创建select stmt +func NewSelectStmt(query string) (*ast.SelectStmt, error) { + p := parser.New() + + stmt, err := p.ParseOneStmt(query, "", "") + if err != nil { + return nil, errors.Errorf("[%s] parse failed", query) + } + + s, ok := stmt.(*ast.SelectStmt) + if !ok { + return nil, errors.Errorf("[%s] is not select sql", query) + } + + return s, nil +} + +// JudgeSimpleSQL 判断query-sql中是否是否符合并发分块要求, +// 当符合要求时,需要按照分块chunk处理where条件拼接chunk range范围 +func JudgeSimpleSQL(s *ast.SelectStmt) bool { + if s.StraightJoin { + return false + } + + if s.GroupBy != nil { + return false + } + + if s.Having != nil { + return false + } + + if s.OrderBy != nil { + return false + } + + if s.Limit != nil { + return false + } + + return true +} + +func GetOrigWhere(s *ast.SelectStmt) string { + var sb strings.Builder + var ff = format.RestoreStringWithoutCharset | format.RestoreStringSingleQuotes | format.RestoreSpacesAroundBinaryOperation + + origTxt := s.OriginalText() + if s.Where != nil { + //sb.WriteString(origTxt[:s.Where.OriginTextPosition()]) + //sb.WriteString(" and (") + if err := s.Where.Restore(format.NewRestoreCtx(ff, &sb)); err != nil { + return origTxt + } + //sb.WriteString(") ") + } + + return sb.String() +} + +// 修改where条件 +func AddSubWhereSql(s *ast.SelectStmt, cond string) string { + var sb strings.Builder + var ff = format.RestoreStringWithoutCharset | format.RestoreStringSingleQuotes | format.RestoreSpacesAroundBinaryOperation + + origTxt := s.OriginalText() + var isWhere = 0 + var notOnlyWhere = false + if s.Where != nil { + sb.WriteString(origTxt[:s.Where.OriginTextPosition()]) + sb.WriteString(cond) + sb.WriteString(" and (") + if err := s.Where.Restore(format.NewRestoreCtx(ff, &sb)); err != nil { + return origTxt + } + sb.WriteString(") ") + isWhere++ + notOnlyWhere = true + } + + if s.GroupBy != nil { + if isWhere == 0 { + sb.WriteString(" where ") + sb.WriteString(cond) + isWhere++ + } + sb.WriteString(" ") + if err := s.GroupBy.Restore(format.NewRestoreCtx(ff, &sb)); err != nil { + return origTxt + } + + notOnlyWhere = true + } + + if s.OrderBy != nil { + if isWhere == 0 { + sb.WriteString(" where ") + sb.WriteString(cond) + isWhere++ + } + sb.WriteString(" ") + if err := s.OrderBy.Restore(format.NewRestoreCtx(ff, &sb)); err != nil { + return origTxt + } + + notOnlyWhere = true + } + + if s.Limit != nil { + if isWhere == 0 { + sb.WriteString(" where ") + sb.WriteString(cond) + isWhere++ + } + sb.WriteString(" ") + if err := s.Limit.Restore(format.NewRestoreCtx(ff, &sb)); err != nil { + return origTxt + } + + notOnlyWhere = true + } + + if !notOnlyWhere { + if isWhere == 0 { + sb.WriteString(origTxt) + sb.WriteString(" where ") + sb.WriteString(cond) + isWhere++ + } + } + + return sb.String() +} + +func GetTabAndSchema(tab string, dbname string) (string, string, error) { + if strings.Contains(tab, ".") { + s := strings.Split(tab, ".") + if len(s) != 2 { + return "", "", errors.Errorf("tabname format failed:%s", tab) + } + return s[0], s[1], nil + } + + if len(dbname) == 0 { + return "", "", errors.Errorf("not dbname and no schema") + } + + return dbname, tab, nil +} diff --git a/applications/mydata/internal/parser/parser_test.go b/applications/mydata/internal/parser/parser_test.go new file mode 100644 index 0000000..22e7990 --- /dev/null +++ b/applications/mydata/internal/parser/parser_test.go @@ -0,0 +1,122 @@ +package parser + +import ( + "fmt" + "log" + "testing" + + "mydata/internal/dbinfo" +) + +func init() { + log.SetFlags(log.LstdFlags | log.Lshortfile) +} + +func TestJudgeSampleSQL(t *testing.T) { + cases := []struct { + in string + isSimple bool + }{ + {"select * from t1", true}, + {"select /*+ query_time(1000000) */ * from a.t1", true}, + {"select /*+ query_time(1000000) */ * from t1 as a", true}, + {"select /*+ query_time(1000000) */ * from t2 where id > 1", true}, + {"select /*+ query_time(1000000) */ * from t1 where id > 1 or age > 20", true}, + {"select /*+ query_time(1000000) */ * from t1 where id > 1 or age > 20 or name = 'yueyt'", true}, + {"select /*+ query_time(1000000) */ * from t1 where id in ('A', 'B')", true}, + {"select * from testdb.t1 where id in ('A', 'B')", true}, + + {"select /*+ query_time(1000000) */ * from t1 order by name desc", false}, + {"select /*+ query_time(1000000) */ id,name,age from t1 group by id,name,age ", false}, + {"select id,name,age,sum(1) as count from t1 group by id,name,age having count > 1", false}, + {"select a.*,b.* from t1 left join t2 on t1.id = t2.id where t1.id > 10", false}, + {"select a.*,b.* from t1,t2 where t1.id = t2.id", false}, + {"select a.*,b.* from t1,t2", false}, + {"select a.*,b.* from t1 where id in (select id from t2)", false}, + {"select a.*,b.* from t1 as a where id in (select id from t2)", false}, + {"select a.*,b.* from t1 as a where id in (select id from t2)", false}, + {"select a.*,b.*", false}, + {"with a as (select id from t1) select * from a", false}, + {"with a as (select id from t1) select * from a where a.id > 10", false}, + + {"insert into tab values(1,2,'3')", false}, + {"delete from tab where id > 1 or name = 'yueyt'", false}, + {"update t1 set name = 'yueyt', addr = 10", false}, + } + + dbc := dbinfo.DbConf{Dbname: "testdb", Username: "root", Password: "root", Addr: "127.0.0.1:3306"} + conn, err := dbinfo.NewDB(dbc) + if err != nil { + panic(err) + } + defer conn.Close() + + for i, c := range cases { + stmt, err := NewSelectStmt(c.in) + if err != nil { + t.Error(err) + continue + } + + tableOrig, isOneTable := ExtractTable(stmt) + if isOneTable == false { + log.Printf("case %02d 复杂:[%s]", i, c.in) + continue + } + + if isSimpleSql := JudgeSimpleSQL(stmt); !isSimpleSql { + log.Printf("case %02d 复杂:[%s]", i, c.in) + continue + } + + schema, table, err := GetTabAndSchema(tableOrig, dbc.Dbname) + if err != nil { + log.Printf("err:%v", err) + continue + } + + keyNameStr, err := dbinfo.GetKeyName(conn, schema, table) + if err != nil { + log.Printf("err:%v", err) + continue + } + + keyVals, err := dbinfo.GetKeyValueByStep(conn, GetOrigWhere(stmt), schema, table, keyNameStr, 1) + if err != nil { + log.Printf("err:%+v", err) + continue + } + + keyRanges, err := dbinfo.GetKeyRange(keyVals) + if err != nil { + log.Printf("err:%v", err) + continue + } + + log.Printf("case %02d 简单:[%s], 单表:%s", i, c.in, table) + for _, vv := range keyRanges { + subWhere := fmt.Sprintf("(%s)>=%s and (%s)<%s", keyNameStr, vv.Start, keyNameStr, vv.End) + newSql := AddSubWhereSql(stmt, subWhere) + log.Printf("case %02d 改写:[%s]", i, newSql) + } + } +} + +func TestParse(t *testing.T) { + for i, v := range []string{ + "select * from t1 where id > 1 and name < 10 order by name", + "select * from t1", + "select /*+ query_timeout(1000) */ id from t1", + } { + log.Printf(">>> [%s]", v) + stmt, err := NewSelectStmt(v) + if err != nil { + t.Error(err) + } + + if stmt.TableHints != nil { + log.Printf("%d->%s", i, v) + } + + } +} diff --git a/applications/mydata/internal/transfor/stdiconv.go b/applications/mydata/internal/transfor/stdiconv.go new file mode 100644 index 0000000..006f179 --- /dev/null +++ b/applications/mydata/internal/transfor/stdiconv.go @@ -0,0 +1,40 @@ +package transfor + +import ( + "bytes" + "log" + + "github.com/djimenez/iconv-go" + "github.com/pkg/errors" +) + +func StdIconv(inByte, outByte []byte, fromCode, toCode string) error { + converter, err := iconv.NewConverter(fromCode, toCode) + if err != nil { + return errors.WithStack(err) + } + defer converter.Close() + + read, written, err := converter.Convert(inByte, outByte) + if err != nil { + return errors.WithStack(err) + } + + log.Printf("read:%d, written:%d", read, written) + return nil +} + +func StdIconv2() { + reader, err := iconv.NewReader(bytes.NewReader([]byte("我是中国人")), "utf-8", "gbk") + if err != nil { + panic(err) + } + + buf := make([]byte, 10) + n, err := reader.Read(buf) + if err != nil { + panic(err) + } + + log.Printf("read:%d, buf:%X", n, buf) +} diff --git a/applications/mydata/internal/transfor/stdiconv_test.go b/applications/mydata/internal/transfor/stdiconv_test.go new file mode 100644 index 0000000..162f5f8 --- /dev/null +++ b/applications/mydata/internal/transfor/stdiconv_test.go @@ -0,0 +1,65 @@ +package transfor + +import ( + "bufio" + "encoding/hex" + "log" + "os" + "testing" + + "github.com/djimenez/iconv-go" +) + +func TestIconvFunc(t *testing.T) { + dst := make([]byte, 3) + srcFile, err := os.Open("/Users/abc/workspaces/mydata/gbk.file") + if err != nil { + t.Fatal(err) + } + defer srcFile.Close() + + dstFile, err := os.Create("utf8") + if err != nil { + t.Fatal(err) + } + defer dstFile.Close() + + writer := bufio.NewWriter(dstFile) + defer writer.Flush() + + bufReader := bufio.NewReader(srcFile) + + reader, err := iconv.NewReader(bufReader, "gbk", "utf8") + if err != nil { + t.Error(err) + } + + for { + //dst := make([]byte, 3) + n, err := reader.Read(dst) + if err != nil { + t.Error(err) + break + } + log.Printf("read:%d, buf:\n%s", n, hex.Dump(dst)) + nn, err := writer.Write(dst) + if err != nil { + t.Error(err) + break + } + log.Printf("written:%d", nn) + + } + + //conv, err := myconv.NewConverter("utf-8", "gbk") + //if err != nil { + // t.Error(err) + //} + //defer conv.Close() + // + //read, written, err := conv.Convert(src, dst) + //if err != nil { + // t.Error(err) + //} + //log.Printf("read:%d, written:%d", read, written) +} diff --git a/applications/mydata/internal/transfor/transfor.go b/applications/mydata/internal/transfor/transfor.go new file mode 100644 index 0000000..6861900 --- /dev/null +++ b/applications/mydata/internal/transfor/transfor.go @@ -0,0 +1,23 @@ +package transfor + +import ( + "github.com/pkg/errors" + "golang.org/x/text/encoding/simplifiedchinese" +) + +func ConvStr2GBK(b []byte) ([]byte, error) { + bs, err := simplifiedchinese.GBK.NewEncoder().Bytes(b) + if err != nil { + return nil, errors.WithStack(err) + } + return bs, nil +} + +func ConvGBK2Str(gb []byte) ([]byte, error) { + bs, err := simplifiedchinese.GBK.NewDecoder().Bytes(gb) + if err != nil { + return nil, errors.WithStack(err) + } + + return bs, nil +} diff --git a/applications/mydata/internal/transfor/transfor_test.go b/applications/mydata/internal/transfor/transfor_test.go new file mode 100644 index 0000000..bc082db --- /dev/null +++ b/applications/mydata/internal/transfor/transfor_test.go @@ -0,0 +1,27 @@ +package transfor + +import ( + "bytes" + "log" + "testing" +) + +func TestConvGBK2Str(t *testing.T) { + srcByte := []byte("中国人") + gbkByte, err := ConvStr2GBK(srcByte) + if err != nil { + t.Error(err) + } + log.Printf(">>>%x", gbkByte) + + utf8Byte, err := ConvGBK2Str(gbkByte) + if err != nil { + t.Error(err) + } + + if bytes.Compare(srcByte, utf8Byte) != 0 { + log.Printf("srcByte:%X, utf8Byte:%X", srcByte, utf8Byte) + } + + log.Printf("srcByte:%X, utf8Byte:%X", srcByte, utf8Byte) +} diff --git a/applications/mydata/internal/version/version.go b/applications/mydata/internal/version/version.go new file mode 100644 index 0000000..d0334ad --- /dev/null +++ b/applications/mydata/internal/version/version.go @@ -0,0 +1,29 @@ +package version + +import ( + "fmt" + "log" +) + +var ( + ReleaseVersion = "None" + BuildTS = "None" + GitHash = "None" + GitBranch = "None" + GoVersion = "None" +) + +func LogVersionInfo() { + log.Printf("welcome to use the command, release-version:%s, git-hash:%s, git-branch:%s, utc-build-time:%s, go-version:%s", + ReleaseVersion, GitHash, GitBranch, BuildTS, GoVersion) +} + +func GetRawInfo() string { + var info string + info += fmt.Sprintf("Release version : %s\n", ReleaseVersion) + info += fmt.Sprintf("Git Commit Hash : %s\n", GitHash) + info += fmt.Sprintf("Git Branch : %s\n", GitBranch) + info += fmt.Sprintf("Build Time : %s\n", BuildTS) + info += fmt.Sprintf("Go Version : %s\n", GoVersion) + return info +} diff --git a/applications/mydata/main.go b/applications/mydata/main.go new file mode 100644 index 0000000..413491b --- /dev/null +++ b/applications/mydata/main.go @@ -0,0 +1,9 @@ +package main + +import ( + "mydata/cmd" +) + +func main() { + cmd.Execute() +}