Skip to content

Commit

Permalink
v0.3
Browse files Browse the repository at this point in the history
  • Loading branch information
Abraham Leal committed Aug 30, 2020
1 parent 7dd4977 commit a4bc65e
Show file tree
Hide file tree
Showing 12 changed files with 504 additions and 37 deletions.
81 changes: 81 additions & 0 deletions cmd/integrationTests/docker-compose.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
---
version: '2'
services:
zookeeper:
image: confluentinc/cp-zookeeper:5.5.1
hostname: zookeeper
container_name: zookeeper-testcontainers
ports:
- "2181:2181"
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000

broker:
image: confluentinc/cp-server:5.5.1
hostname: broker
container_name: broker
depends_on:
- zookeeper
ports:
- "9092:9092"
- "9101:9101"
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://broker:29092,PLAINTEXT_HOST://localhost:9092
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
KAFKA_CONFLUENT_LICENSE_TOPIC_REPLICATION_FACTOR: 1
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1

schema-registry-src:
image: confluentinc/cp-schema-registry:5.5.1
hostname: schema-registry-src
container_name: schema-registry-testcontainers-src
depends_on:
- zookeeper
- broker
ports:
- "8081:8081"
volumes:
- "./testingBasicAuth:/testing"
environment:
SCHEMA_REGISTRY_CONFLUENT_LICENSE: ""
SCHEMA_REGISTRY_HOST_NAME: schema-registry-src
SCHEMA_REGISTRY_SCHEMA_REGISTRY_GROUP_ID: schema-src
SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: 'broker:29092'
SCHEMA_REGISTRY_AUTHENTICATION_METHOD: "BASIC"
SCHEMA_REGISTRY_AUTHENTICATION_ROLES: "onerole"
SCHEMA_REGISTRY_AUTHENTICATION_REALM: "test"
SCHEMA_REGISTRY_OPTS: "-Djava.security.auth.login.config=/testing/jaas"
SCHEMA_REGISTRY_KAFKASTORE_TOPIC: "_schemas"
SCHEMA_REGISTRY_KAFKASTORE_TOPIC_REPLICATION_FACTOR: 1
SCHEMA_REGISTRY_MODE_MUTABILITY: "true"

schema-registry-dst:
image: confluentinc/cp-schema-registry:5.5.1
hostname: schema-registry-dst
container_name: schema-registry-testcontainers-dst
depends_on:
- zookeeper
- broker
ports:
- "8082:8082"
volumes:
- "./testingBasicAuth:/testing"
environment:
SCHEMA_REGISTRY_CONFLUENT_LICENSE: ""
SCHEMA_REGISTRY_HOST_NAME: schema-registry-dst
SCHEMA_REGISTRY_SCHEMA_REGISTRY_GROUP_ID: schema-dst
SCHEMA_REGISTRY_LISTENERS: "http://0.0.0.0:8082"
SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: 'broker:29092'
SCHEMA_REGISTRY_AUTHENTICATION_METHOD: "BASIC"
SCHEMA_REGISTRY_AUTHENTICATION_ROLES: "onerole"
SCHEMA_REGISTRY_AUTHENTICATION_REALM: "test"
SCHEMA_REGISTRY_OPTS: "-Djava.security.auth.login.config=/testing/jaas"
SCHEMA_REGISTRY_KAFKASTORE_TOPIC: "_schemas-dst"
SCHEMA_REGISTRY_MODE_MUTABILITY: "true"
SCHEMA_REGISTRY_KAFKASTORE_TOPIC_REPLICATION_FACTOR: 1
248 changes: 248 additions & 0 deletions cmd/integrationTests/exporter-integration_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,248 @@
package integration

import (
client "github.com/abraham-leal/ccloud-schema-exporter/cmd/internals"
"github.com/stretchr/testify/assert"
"github.com/testcontainers/testcontainers-go"
"io/ioutil"
"log"
"os"
"path/filepath"
"reflect"
"testing"
"time"
)

var composeEnv *testcontainers.LocalDockerCompose
var testClientSrc *client.SchemaRegistryClient
var testClientDst *client.SchemaRegistryClient

func TestMain(m *testing.M) {
setup()
code := m.Run()
tearDown()
os.Exit(code)
}

func setup () {
composeEnv = testcontainers.NewLocalDockerCompose([]string{"docker-compose.yml"},"integrationtests")
composeEnv.WithCommand([]string{"up","-d"}).Invoke()
time.Sleep(time.Duration(15) * time.Second) // give services time to set up
client.ScrapeInterval = 3
client.SyncDeletes = true

testClientSrc = client.NewSchemaRegistryClient("http://localhost:8081","testUser", "testPass", "src")
testClientDst = client.NewSchemaRegistryClient("http://localhost:8082","testUser", "testPass", "dst")

if testClientSrc.IsReachable() && testClientDst.IsReachable() {
// Set up our source registry
subjects := []string{"someSubject-value","someSubject-key"}
id := 1
versions := []int{1, 2, 3}

err := testClientSrc.SetMode("IMPORT")
if err == false {
log.Println("Could not set source registry to IMPORT ModeRecord.")
os.Exit(0)
}

schema := "{\"type\": \"record\",\"namespace\": \"com.mycorp.mynamespace\",\"name\": \"value_newnew\",\"doc\": \"Sample schema to help you get started.\"," +
"\"fields\": [{\"name\": \"this\",\"type\":\"int\",\"doc\": \"The int type is a 32-bit signed integer.\"},{\"name\": \"that\",\"type\": \"double\"," +
"\"doc\": \"The double type is a double precision (64-bit) IEEE 754 floating-point number.\"},{\"name\": \"too\",\"type\": \"string\",\"doc\": \"The string is a unicode character sequence.\"}]}"

schema2 := "{\"type\": \"record\",\"namespace\": \"com.mycorp.mynamespace\",\"name\": \"value_newnew\",\"doc\": \"Sample schema to help you get started.\"," +
"\"fields\": [{\"name\": \"this\",\"type\":\"int\",\"doc\": \"The int type is a 32-bit signed integer.\"},{\"name\": \"that\",\"type\": \"double\"," +
"\"doc\": \"The double type is a double precision (64-bit) IEEE 754 floating-point number.\"}]}"

schema3 := "{\"type\": \"record\",\"namespace\": \"com.mycorp.mynamespace\",\"name\": \"value_newnew\",\"doc\": \"Sample schema to help you get started.\"," +
"\"fields\": [{\"name\": \"this\",\"type\":\"int\",\"doc\": \"The int type is a 32-bit signed integer.\"}]}"

schema4 := "{\"type\": \"record\",\"namespace\": \"com.mycorp.wassup\",\"name\": \"value_newnew\",\"doc\": \"Sample schema to help you get started.\"," +
"\"fields\": [{\"name\": \"this\",\"type\":\"int\",\"doc\": \"The int type is a 32-bit signed integer.\"},{\"name\": \"that\",\"type\": \"double\"," +
"\"doc\": \"The double type is a double precision (64-bit) IEEE 754 floating-point number.\"},{\"name\": \"too\",\"type\": \"string\",\"doc\": \"The string is a unicode character sequence.\"}]}"

schema5 := "{\"type\": \"record\",\"namespace\": \"com.mycorp.wassup\",\"name\": \"value_newnew\",\"doc\": \"Sample schema to help you get started.\"," +
"\"fields\": [{\"name\": \"this\",\"type\":\"int\",\"doc\": \"The int type is a 32-bit signed integer.\"},{\"name\": \"that\",\"type\": \"double\"," +
"\"doc\": \"The double type is a double precision (64-bit) IEEE 754 floating-point number.\"}]}"

schema6 := "{\"type\": \"record\",\"namespace\": \"com.mycorp.wassup\",\"name\": \"value_newnew\",\"doc\": \"Sample schema to help you get started.\"," +
"\"fields\": [{\"name\": \"this\",\"type\":\"int\",\"doc\": \"The int type is a 32-bit signed integer.\"}]}"

schemas := []string{schema, schema2, schema3, schema4, schema5, schema6}

for _, subject := range subjects {
for _ , version := range versions {

currentRecord := client.SchemaRecord{
Subject: subject,
Schema: schemas[id-1],
SType: "AVRO",
Version: int64(version),
Id: int64(id),
}

testClientSrc.RegisterSchemaBySubjectAndIDAndVersion(
currentRecord.Schema,
currentRecord.Subject,
int(currentRecord.Id),
int(currentRecord.Version),
"AVRO")

log.Printf("Registering schema with subject %s, version %d, and id %d", currentRecord.Subject, currentRecord.Version, currentRecord.Id)
id = id + 1
}
}

} else {
panic("Could not find Schema Registries")
}

time.Sleep(time.Duration(5) * time.Second) // Allow registration catch up
}

func tearDown () {
composeEnv.WithCommand([]string{"down","-v"}).Invoke()
}

func TestExportMode(t *testing.T) {

err := testClientDst.SetMode("IMPORT")
if err == false {
log.Println("Could not set destination registry to IMPORT ModeRecord.")
os.Exit(0)
}

client.BatchExport(testClientSrc,testClientDst)

srcChan := make(chan map[string][]int)
go testClientDst.GetSubjectsWithVersions(srcChan)
srcSubjects := <- srcChan

dstChan := make(chan map[string][]int)
go testClientDst.GetSubjectsWithVersions(dstChan)
dstSubjects := <- dstChan

assert.True(t,reflect.DeepEqual(srcSubjects,dstSubjects))
}

func TestLocalMode(t *testing.T) {
currentPath, _ := os.Executable()

// Test Relative Paths
err := os.Mkdir(filepath.Dir(currentPath)+"/testingLocalBackupRelativePath", 0755)
if err != nil {
panic(err)
}

defer os.RemoveAll(filepath.Dir(currentPath)+"/testingLocalBackupRelativePath")
client.WriteToFS(testClientSrc, "testingLocalBackupRelativePath")

files, err2 := ioutil.ReadDir(filepath.Dir(currentPath)+"/testingLocalBackupRelativePath")
if err2 != nil {
panic(err2)
}
assert.Equal(t,6,len(files))

// Test Absolute Paths
_ = os.Mkdir("/tmp/testingLocalBackupAbsPath", 0755)
defer os.RemoveAll("/tmp/testingLocalBackupAbsPath")
client.WriteToFS(testClientSrc, "/tmp/testingLocalBackupAbsPath")

files2,_ := ioutil.ReadDir("/tmp/testingLocalBackupAbsPath")
assert.Equal(t,6,len(files2))

}


func TestSyncMode(t *testing.T) {

// Set mode
err := testClientDst.SetMode("IMPORT")
if err == false {
log.Println("Could not set destination registry to IMPORT ModeRecord.")
os.Exit(0)
}

// Start sync in another goroutine
go client.Sync(testClientSrc,testClientDst)
time.Sleep(time.Duration(5) * time.Second) // Give time for sync

// Assert schemas in dest deep equal schemas in src
srcSubjects := make (map[string][]int)
destSubjects := make (map[string][]int)

srcChan := make(chan map[string][]int)
destChan := make(chan map[string][]int)

go testClientSrc.GetSubjectsWithVersions(srcChan)
go testClientDst.GetSubjectsWithVersions(destChan)

srcSubjects = <- srcChan
destSubjects = <- destChan

log.Printf("Source: %v", srcSubjects)
log.Printf("Destination: %v", destSubjects)

assert.True(t, reflect.DeepEqual(srcSubjects, destSubjects))

// inject a new write
schema := "{\"type\":\"record\",\"name\":\"value_newnew\",\"namespace\":\"com.mycorp.mynamespace\",\"doc\":\"Sample schema to help you get started.\",\"fields\":[{\"name\":\"this\",\"type\":\"int\",\"doc\":\"The int type is a 32-bit signed integer.\"},{\"default\": null,\"name\": \"onefield\",\"type\": [\"null\",\"string\"]}]}"

newRegister := client.SchemaRecord{
Subject: "someSubject-value",
Schema: schema,
SType: "AVRO",
Version: 4,
Id: 7,
}
testClientSrc.RegisterSchemaBySubjectAndIDAndVersion(newRegister.Schema,
newRegister.Subject,int(newRegister.Id),int(newRegister.Version),newRegister.SType)
time.Sleep(time.Duration(5) * time.Second) // Give time for sync

// Assert schemas in dest deep equal schemas in src
srcSubjects = make (map[string][]int)
destSubjects = make (map[string][]int)

srcChan = make(chan map[string][]int)
destChan = make(chan map[string][]int)


go testClientSrc.GetSubjectsWithVersions(srcChan)
go testClientDst.GetSubjectsWithVersions(destChan)

srcSubjects = <- srcChan
destSubjects = <- destChan

log.Printf("Source: %v", srcSubjects)
log.Printf("Destination: %v", destSubjects)

assert.True(t, reflect.DeepEqual(srcSubjects, destSubjects))

// inject a soft delete
testClientSrc.PerformSoftDelete("someSubject-value",1)
time.Sleep(time.Duration(5) * time.Second) // Give time for sync

// Assert schemas in dest deep equal schemas in src

srcSubjects = make (map[string][]int)
destSubjects = make (map[string][]int)

srcChan = make(chan map[string][]int)
destChan = make(chan map[string][]int)

go testClientSrc.GetSubjectsWithVersions(srcChan)
go testClientDst.GetSubjectsWithVersions(destChan)

srcSubjects = <- srcChan
destSubjects = <- destChan

log.Printf("Source: %v", srcSubjects)
log.Printf("Destination: %v", destSubjects)

assert.True(t, reflect.DeepEqual(srcSubjects, destSubjects))

log.Println("Killing sync goroutine")
client.TestHarnessRun = true
time.Sleep(time.Duration(3) * time.Second) // Give thread time to die

}
10 changes: 10 additions & 0 deletions cmd/integrationTests/testingBasicAuth/jaas
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
test {
org.eclipse.jetty.jaas.spi.PropertyFileLoginModule required
file="/testing/testUsers"
debug="false";
};





1 change: 1 addition & 0 deletions cmd/integrationTests/testingBasicAuth/testUsers
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
testUser:testPass,onerole
2 changes: 1 addition & 1 deletion cmd/internals/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ func GetFlags() {
flag.Parse()

if *syncDeletesFlag {
syncDeletes = true
SyncDeletes = true
}

if *versionFlag {
Expand Down
5 changes: 3 additions & 2 deletions cmd/internals/meta.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import (

var httpCallTimeout int
var ScrapeInterval int
var Version = "0.3-SNAPSHOT"
var Version = "0.3"
var httpClient http.Client

var SrcSRUrl string
Expand All @@ -21,5 +21,6 @@ var DestSRUrl string
var DestSRKey string
var DestSRSecret string
var RunMode string
var syncDeletes bool
var SyncDeletes bool
var PathToWrite string
var TestHarnessRun bool
Loading

0 comments on commit a4bc65e

Please sign in to comment.