Skip to content

Commit

Permalink
improvements to local write
Browse files Browse the repository at this point in the history
  • Loading branch information
Abraham Leal committed Aug 27, 2020
1 parent 5b129d5 commit 7dd4977
Show file tree
Hide file tree
Showing 5 changed files with 36 additions and 18 deletions.
14 changes: 7 additions & 7 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
Exporter scripts for Confluent Schema Registry.

A tool to export all schemas from a Confluent Cloud Schema Registry to another.
This app supports three modes: `batchExport`, `sync`, and `getLocalCopy`
This app supports three modes: `batchExport`, `sync`, and `getLocalCopy`.

- `batchExport` will do a one time migration between schema registries, then it will reset the destination registry to `READWRTIE` mode.
- `sync` will continuously sync newly registered schemas into the destination registry.
Expand Down Expand Up @@ -43,7 +43,7 @@ docker run \

A sample docker-compose is also provided at the root of this directory.

The docker image is built to handle the case of `-sync` continuous sync. For a one time export, it is recommended
The docker image handles `-sync` continuous sync. For a one time export, it is recommended
to use a release binary.

For Docker, the `latest` tag will build directly from master. The master branch of this project is kept non-breaking;
Expand All @@ -54,10 +54,10 @@ However, for stable images tag a release.
- `./ccloud-schema-exporter -sync` : Running the app with this flag will start a continuous sync
between source and destination schema registries.
- `./ccloud-schema-exporter -getLocalCopy` : Running the app with this flag will get a snapshot of your Schema Registry
into local files with naming structure subjectName-version-id.json per schema. The default directory is
currentPath/SchemaRegistryBackup/.
into local files with naming structure subjectName-version-id per schema. The default directory is
{currentPath}/SchemaRegistryBackup/.

When both flags are applied, `sync` mode prevails.
When multiple flags are applied, prevalence is `sync` -> `batchExport` -> `getLocalCopy`

### Options

Expand All @@ -75,9 +75,9 @@ Usage of ./ccloud-schema-exporter:
-dest-sr-url string
Url to the Destination Schema Registry Cluster
-getLocalCopy
Perform a local back-up of all schemas in the source registry. Defaults to a folder (SchemaRegistryBackup) in the current path, but can be overridden by passing in a desired path with -getLocalCopyPath.
Perform a local back-up of all schemas in the source registry. Defaults to a folder (SchemaRegistryBackup) in the current path.
-getLocalCopyPath string
Optional custom path for local copy. This must be an absolute path.
Optional custom path for local copy. This must be an existing directory.
-scrapeInterval int
Amount of time ccloud-schema-exporter will delay between schema sync checks in seconds (default 60)
-src-sr-key string
Expand Down
2 changes: 1 addition & 1 deletion cmd/ccloud-schema-exporter/ccloud-schema-exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ func main() {
os.Exit(0)
}

if client.LocalCopy {
if client.RunMode == "LOCAL" {
client.WriteToFS(srcClient, client.PathToWrite)

log.Println("-----------------------------------------------")
Expand Down
11 changes: 7 additions & 4 deletions cmd/internals/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,13 @@ func GetFlags() {
flag.StringVar(&DestSRSecret, "dest-sr-secret", "", "API SECRET for the Destination Schema Registry Cluster")
flag.IntVar(&httpCallTimeout, "timeout", 60, "Timeout, in seconds, to use for all REST calls with the Schema Registries")
flag.IntVar(&ScrapeInterval, "scrapeInterval", 60, "Amount of time ccloud-schema-exporter will delay between schema sync checks in seconds")
flag.BoolVar(&LocalCopy, "getLocalCopy", false,
"Perform a local back-up of all schemas in the source registry. Defaults to a folder (SchemaRegistryBackup) in the current path, but can be overridden by passing in a desired path with -getLocalCopyPath.")
flag.StringVar(&PathToWrite, "getLocalCopyPath", "",
"Optional custom path for local copy. This must be an absolute path with existing directory structure.")
"Optional custom path for local copy. This must be an existing directory structure.")
versionFlag := flag.Bool("version", false, "Print the current version and exit")
usageFlag := flag.Bool("usage", false, "Print the usage of this tool")
batchExportFlag := flag.Bool("batchExport", false, "Perform a one-time export of all schemas")
syncFlag := flag.Bool("sync", false, "Sync schemas continuously")
localCopyFlag := flag.Bool("getLocalCopy", false, "Perform a local back-up of all schemas in the source registry. Defaults to a folder (SchemaRegistryBackup) in the current path")
deleteFlag := flag.Bool("deleteAllFromDestination", false, "Setting this will run a delete on all schemas written to the destination registry")
syncDeletesFlag := flag.Bool("syncDeletes", false, "Setting this will sync soft deletes from the source cluster to the destination")

Expand All @@ -55,14 +54,18 @@ func GetFlags() {
os.Exit(0)
}

if !*syncFlag && !*batchExportFlag && !LocalCopy {
if !*syncFlag && !*batchExportFlag && !*localCopyFlag {
fmt.Println("You must specify a mode to run on.")
fmt.Println("Usage:")
fmt.Println("")
flag.PrintDefaults()
os.Exit(0)
}

if *localCopyFlag {
RunMode = "LOCAL"
}

if *batchExportFlag {
RunMode = "BATCH"
}
Expand Down
1 change: 0 additions & 1 deletion cmd/internals/meta.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,4 +23,3 @@ var DestSRSecret string
var RunMode string
var syncDeletes bool
var PathToWrite string
var LocalCopy bool
26 changes: 21 additions & 5 deletions cmd/internals/writeToLocalFS.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,24 @@ import (

func WriteToFS (srcClient *SchemaRegistryClient, definedPath string) {

abspath, _ := os.Executable()
currentPath, _ := os.Executable()

if definedPath == "" {
log.Println("Path not defined, writing to new local folder SchemaRegistryBackup")
_ = os.Mkdir("SchemaRegistryBackup", 0755)
definedPath = fmt.Sprintf("%s%s",filepath.Dir(abspath),"/SchemaRegistryBackup")
definedPath = filepath.Join(filepath.Dir(currentPath), "SchemaRegistryBackup")
} else {
if filepath.IsAbs(definedPath){
if _, err := os.Stat(definedPath); os.IsNotExist(err) {
log.Fatalln("The directory specified does not exist.")
}
} else {
definedPath = filepath.Join(filepath.Dir(currentPath),definedPath)
_, err := os.Stat(definedPath)
if os.IsNotExist(err) {
log.Fatalln("The directory specified does not exist.")
}
}
}

srcChan := make(chan map[string][]int)
Expand All @@ -35,10 +49,12 @@ func writeSchema (srcClient *SchemaRegistryClient, pathToWrite string, subject s
rawSchema := srcClient.GetSchema(subject,int64(version))


log.Printf("Writing schema: %s with version: %d and ID: %d and Type: %s",
rawSchema.Subject, rawSchema.Version, rawSchema.Id, rawSchema.SType)
log.Printf("Writing schema: %s with version: %d and ID: %d",
rawSchema.Subject, rawSchema.Version, rawSchema.Id)

filename := fmt.Sprintf("%s-%d-%d",rawSchema.Subject, rawSchema.Version, rawSchema.Id)
f , err := os.Create(filepath.Join(pathToWrite,filename))

f , err := os.Create(fmt.Sprintf("%s/%s-%d-%d.json",pathToWrite,rawSchema.Subject,rawSchema.Version,rawSchema.Id))
check(err)
defer f.Close()

Expand Down

0 comments on commit 7dd4977

Please sign in to comment.