diff --git a/examples/connector/golang/README.md b/examples/connector/golang/README.md index c5dd67f..4068114 100644 --- a/examples/connector/golang/README.md +++ b/examples/connector/golang/README.md @@ -7,5 +7,5 @@ Run all commands from the golang folder root > scripts/copy_protos.sh > scripts/compile_protos.sh > scripts/build.sh -> main +> ./main ``` \ No newline at end of file diff --git a/examples/connector/golang/go.mod b/examples/connector/golang/go.mod index 2465455..7e86d9a 100644 --- a/examples/connector/golang/go.mod +++ b/examples/connector/golang/go.mod @@ -3,14 +3,15 @@ module fivetran.com/fivetran_sdk go 1.21 require ( - google.golang.org/grpc v1.59.0 - google.golang.org/protobuf v1.31.0 + google.golang.org/grpc v1.65.0 + google.golang.org/protobuf v1.35.1 ) require ( - github.com/golang/protobuf v1.5.3 // indirect - golang.org/x/net v0.14.0 // indirect - golang.org/x/sys v0.11.0 // indirect - golang.org/x/text v0.12.0 // indirect - google.golang.org/genproto/googleapis/rpc v0.0.0-20230822172742-b8732ec3820d // indirect + github.com/golang/protobuf v1.5.4 // indirect + golang.org/x/net v0.30.0 // indirect + golang.org/x/sys v0.26.0 // indirect + golang.org/x/text v0.19.0 // indirect + google.golang.org/genproto/googleapis/rpc v0.0.0-20241021214115-324edc3d5d38 // indirect + google.golang.org/grpc/cmd/protoc-gen-go-grpc v1.5.1 // indirect ) diff --git a/examples/connector/golang/go.sum b/examples/connector/golang/go.sum index 16f3bb4..9bfa0a3 100644 --- a/examples/connector/golang/go.sum +++ b/examples/connector/golang/go.sum @@ -1,21 +1,38 @@ github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaSAoJOfIk= github.com/golang/protobuf v1.5.3 h1:KhyjKVUg7Usr/dYsdSqoFveMYd5ko72D+zANwlG1mmg= github.com/golang/protobuf v1.5.3/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY= +github.com/golang/protobuf v1.5.4 h1:i7eJL8qZTpSEXOPTxNKhASYpMn+8e5Q6AdndVa1dWek= +github.com/golang/protobuf v1.5.4/go.mod h1:lnTiLA8Wa4RWRcIUkrtSVa5nRhsEGBg48fD6rSs7xps= github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-cmp v0.5.9 h1:O2Tfq5qg4qc4AmwVlvv0oLiVAGB7enBSJ2x2DqQFi38= github.com/google/go-cmp v0.5.9/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= +github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI= golang.org/x/net v0.14.0 h1:BONx9s002vGdD9umnlX1Po8vOZmrgH34qlHcD1MfK14= golang.org/x/net v0.14.0/go.mod h1:PpSgVXXLK0OxS0F31C1/tv6XNguvCrnXIDrFMspZIUI= +golang.org/x/net v0.30.0 h1:AcW1SDZMkb8IpzCdQUaIq2sP4sZ4zw+55h6ynffypl4= +golang.org/x/net v0.30.0/go.mod h1:2wGyMJ5iFasEhkwi13ChkO/t1ECNC4X4eBKkVFyYFlU= golang.org/x/sys v0.11.0 h1:eG7RXZHdqOJ1i+0lgLgCpSXAp6M3LYlAo6osgSi0xOM= golang.org/x/sys v0.11.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.26.0 h1:KHjCJyddX0LoSTb3J+vWpupP9p0oznkqVk/IfjymZbo= +golang.org/x/sys v0.26.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= golang.org/x/text v0.12.0 h1:k+n5B8goJNdU7hSvEtMUz3d1Q6D/XW4COJSJR6fN0mc= golang.org/x/text v0.12.0/go.mod h1:TvPlkZtksWOMsz7fbANvkp4WM8x/WCo/om8BMLbz+aE= +golang.org/x/text v0.19.0 h1:kTxAhCbGbxhK0IwgSKiMO5awPoDQ0RpfiVYBfK860YM= +golang.org/x/text v0.19.0/go.mod h1:BuEKDfySbSR4drPmRPG/7iBdf8hvFMuRexcpahXilzY= golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= google.golang.org/genproto/googleapis/rpc v0.0.0-20230822172742-b8732ec3820d h1:uvYuEyMHKNt+lT4K3bN6fGswmK8qSvcreM3BwjDh+y4= google.golang.org/genproto/googleapis/rpc v0.0.0-20230822172742-b8732ec3820d/go.mod h1:+Bk1OCOj40wS2hwAMA+aCW9ypzm63QTBBHp6lQ3p+9M= +google.golang.org/genproto/googleapis/rpc v0.0.0-20241021214115-324edc3d5d38 h1:zciRKQ4kBpFgpfC5QQCVtnnNAcLIqweL7plyZRQHVpI= +google.golang.org/genproto/googleapis/rpc v0.0.0-20241021214115-324edc3d5d38/go.mod h1:GX3210XPVPUjJbTUbvwI8f2IpZDMZuPJWDzDuebbviI= google.golang.org/grpc v1.59.0 h1:Z5Iec2pjwb+LEOqzpB2MR12/eKFhDPhuqW91O+4bwUk= google.golang.org/grpc v1.59.0/go.mod h1:aUPDwccQo6OTjy7Hct4AfBPD1GptF4fyUjIkQ9YtF98= +google.golang.org/grpc v1.65.0 h1:bs/cUb4lp1G5iImFFd3u5ixQzweKizoZJAwBNLR42lc= +google.golang.org/grpc v1.65.0/go.mod h1:WgYC2ypjlB0EiQi6wdKixMqukr6lBc0Vo+oOgjrM5ZQ= +google.golang.org/grpc/cmd/protoc-gen-go-grpc v1.5.1 h1:F29+wU6Ee6qgu9TddPgooOdaqsxTMunOoj8KA5yuS5A= +google.golang.org/grpc/cmd/protoc-gen-go-grpc v1.5.1/go.mod h1:5KF+wpkbTSbGcR9zteSqZV6fqFOWBl4Yde8En8MryZA= google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw= google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc= google.golang.org/protobuf v1.31.0 h1:g0LDEJHgrBl9N9r17Ru3sqWhkIx2NB67okBHPwC7hs8= google.golang.org/protobuf v1.31.0/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I= +google.golang.org/protobuf v1.35.1 h1:m3LfL6/Ca+fqnjnlqQXNpFPABW1UD7mjh8KO2mKFytA= +google.golang.org/protobuf v1.35.1/go.mod h1:9fA7Ob0pmnwhb644+1+CVWFRbNajQ6iRojtC/QF5bRE= diff --git a/examples/connector/golang/golang_connector/main.go b/examples/connector/golang/golang_connector/main.go index e26eb6d..c0597cb 100644 --- a/examples/connector/golang/golang_connector/main.go +++ b/examples/connector/golang/golang_connector/main.go @@ -13,6 +13,10 @@ import ( "google.golang.org/grpc" ) +const INFO = "INFO" +const WARNING = "WARNING" +const SEVERE = "SEVERE" + var port = flag.Int("port", 50051, "The server port") type MyState struct { @@ -30,7 +34,8 @@ func (s *server) Update(in *pb.UpdateRequest, stream pb.Connector_UpdateServer) state := MyState{} json.Unmarshal([]byte(state_json), &state) - log.Println("config: ", config, "selection: ", selection, "state_json: ", state_json, "mystate: ", state) + message := fmt.Sprintf("config: %s, selection: %s, state_json: %s, mystate: %s", config, selection, state_json, state) + LogMessage(INFO, message) // -- Send a log message stream.Send(&pb.UpdateResponse{ @@ -66,6 +71,8 @@ func (s *server) Update(in *pb.UpdateRequest, stream pb.Connector_UpdateServer) state.Cursor++ } + LogMessage(INFO, "Completed sending upsert records") + // -- Send UPDATE record stream.Send(&pb.UpdateResponse{ Response: &pb.UpdateResponse_Operation{ @@ -86,6 +93,8 @@ func (s *server) Update(in *pb.UpdateRequest, stream pb.Connector_UpdateServer) }) state.Cursor++ + LogMessage(INFO, "Completed sending update records") + // -- Send DELETE record stream.Send(&pb.UpdateResponse{ Response: &pb.UpdateResponse_Operation{ @@ -105,6 +114,8 @@ func (s *server) Update(in *pb.UpdateRequest, stream pb.Connector_UpdateServer) }) state.Cursor++ + LogMessage(WARNING, "Sample warning message: Completed sending delete records") + // Serialize state from struct to JSON string new_state_json, _ := json.Marshal(state) new_state := string(new_state_json) @@ -133,6 +144,7 @@ func (s *server) Update(in *pb.UpdateRequest, stream pb.Connector_UpdateServer) }, }) + LogMessage(SEVERE, "Sample severe message: Update call completed") // End the RPC call return nil } @@ -258,6 +270,16 @@ func (s *server) Test(ctx context.Context, in *pb.TestRequest) (*pb.TestResponse }, nil } +func LogMessage(level string, message string) { + log := map[string]interface{}{ + "level": level, + "message": message, + "message-origin": "sdk_connector", + } + logJSON, _ := json.Marshal(log) + fmt.Println(string(logJSON)) +} + func main() { flag.Parse() lis, err := net.Listen("tcp", fmt.Sprintf(":%d", *port)) diff --git a/examples/connector/golang/scripts/compile_protos.sh b/examples/connector/golang/scripts/compile_protos.sh index 9c4df7a..ef651d6 100755 --- a/examples/connector/golang/scripts/compile_protos.sh +++ b/examples/connector/golang/scripts/compile_protos.sh @@ -1,4 +1,4 @@ -protoc \ +PATH="${PATH}:${HOME}/go/bin" protoc \ --proto_path=proto \ --go_out=proto \ --go_opt=paths=source_relative \ diff --git a/examples/connector/java/src/main/java/connector/ConnectorServiceImpl.java b/examples/connector/java/src/main/java/connector/ConnectorServiceImpl.java index f822398..9cbb5aa 100644 --- a/examples/connector/java/src/main/java/connector/ConnectorServiceImpl.java +++ b/examples/connector/java/src/main/java/connector/ConnectorServiceImpl.java @@ -9,8 +9,12 @@ import java.util.*; public class ConnectorServiceImpl extends ConnectorGrpc.ConnectorImplBase { + private final String INFO = "INFO"; + private final String WARNING = "WARNING"; + private final String SEVERE = "SEVERE"; @Override public void configurationForm(ConfigurationFormRequest request, StreamObserver responseObserver) { + logMessage(INFO, "Started fetching configuration form"); responseObserver.onNext( ConfigurationFormResponse.newBuilder() .setSchemaSelectionSupported(true) @@ -41,16 +45,19 @@ public void configurationForm(ConfigurationFormRequest request, StreamObserver responseObserver) { + Map configuration = request.getConfigurationMap(); // Name of the test to be run String testName = request.getName(); - System.out.println("test name: " + testName); + String message = String.format("test name: %s", testName); + logMessage(INFO, message); responseObserver.onNext(TestResponse.newBuilder().setSuccess(true).build()); responseObserver.onCompleted(); @@ -58,6 +65,8 @@ public void test(TestRequest request, StreamObserver responseObser @Override public void schema(SchemaRequest request, StreamObserver responseObserver) { + + logMessage(WARNING, "Sample warning message while fetching schema"); Map configuration = request.getConfigurationMap(); TableList tableList = TableList.newBuilder() @@ -177,10 +186,16 @@ public void update(UpdateRequest request, StreamObserver respons .build()) .build()); } catch (JsonProcessingException e) { + String message = e.getMessage(); + logMessage(SEVERE, message); responseObserver.onError(e); } // End the streaming RPC call responseObserver.onCompleted(); } + + private void logMessage(String level, String message) { + System.out.println(String.format("{\"level\":\"%s\", \"message\": \"%s\", \"message-origin\": \"sdk_connector\"}", level, message)); + } } diff --git a/examples/connector/nodejs/src/index.js b/examples/connector/nodejs/src/index.js index f224f31..992946c 100644 --- a/examples/connector/nodejs/src/index.js +++ b/examples/connector/nodejs/src/index.js @@ -11,6 +11,10 @@ const options = { }; var packageDefinitionConnector = protoLoader.loadSync(PROTO_PATH_CONNECTOR, options); +const INFO = "INFO"; +const WARNING = "WARNING"; +const SEVERE = "SEVERE"; + const protoDescriptor = grpc.loadPackageDefinition(packageDefinitionConnector); const connectorSdkProto = protoDescriptor.fivetran_sdk; @@ -18,6 +22,7 @@ const connectorSdkProto = protoDescriptor.fivetran_sdk; const server = new grpc.Server(); const configurationForm = (call, callback) => { + logMessage(INFO, "Fetching configuration form") callback(null, { schema_selection_supported: true, table_selection_supported: true, @@ -39,12 +44,13 @@ const configurationForm = (call, callback) => { const test = (call, callback) => { const configuration = call.request.configuration; const testName = call.request.name; - console.log(`Test name: ${testName}`); + logMessage(INFO, `Test name: ${testName}`) callback(null, { success: true }); }; // Implement the Schema RPC method const schema = (call, callback) => { + logMessage(INFO, "Fetching the schema from the implemented method") const tableList = { tables: [ { @@ -77,15 +83,6 @@ const configurationForm = (call, callback) => { call.write(response); }; - const sendLogEntry = (message) => { - sendResponse({ - log_entry: { - level: "INFO", - message: message - } - }); - }; - const sendOperation = (operation) => { sendResponse({ operation: operation @@ -94,7 +91,7 @@ const configurationForm = (call, callback) => { try { // Send a log message - sendLogEntry("Sync STARTING"); + logMessage(WARNING, "Sample Warning message: Sync STARTING"); // Send UPSERT records for (let i = 0; i < 3; i++) { @@ -145,7 +142,7 @@ const configurationForm = (call, callback) => { }); // Send a log message - sendLogEntry("Sync DONE"); + logMessage(SEVERE, "Sample severe message: Sync done") } catch (error) { callback(error); @@ -155,6 +152,10 @@ const configurationForm = (call, callback) => { call.end(); }; + function logMessage(level, message) { + console.log(`{"level":"${level}", "message": "${message}", "message-origin": "sdk_connector"}`); + } + server.addService(connectorSdkProto.Connector.service, {configurationForm, test, schema, update}) diff --git a/examples/connector/python/main.py b/examples/connector/python/main.py index ddb9016..9367b52 100644 --- a/examples/connector/python/main.py +++ b/examples/connector/python/main.py @@ -8,9 +8,13 @@ from sdk_pb2 import common_pb2 from sdk_pb2 import connector_sdk_pb2 +INFO = "INFO" +WARNING = "WARNING" +SEVERE = "SEVERE" class ConnectorService(connector_sdk_pb2_grpc.ConnectorServicer): def ConfigurationForm(self, request, context): + log_message(INFO, "Fetching configuration form") form_fields = common_pb2.ConfigurationFormResponse(schema_selection_supported=True, table_selection_supported=True) form_fields.fields.add(name="apiKey", label="API Key", required=True, text_field=common_pb2.TextField.PlainText) @@ -35,8 +39,9 @@ def Test(self, request, context): configuration = request.configuration # Name of the test to be run test_name = request.name - print("Configuration: ", configuration) - print("Test name: ", test_name) + + log_message(INFO, "Test Name: " + str(test_name)) + return common_pb2.TestResponse(success=True) def Schema(self, request, context): @@ -114,6 +119,8 @@ def Update(self, request, context): yield connector_sdk_pb2.UpdateResponse(operation=operation) state["cursor"] += 1 + log_message(WARNING, "Completed sending update records") + # -- Send DELETE record operation = connector_sdk_pb2.Operation() val1 = common_pb2.ValueType() @@ -139,6 +146,12 @@ def Update(self, request, context): log.message = "Sync Done" yield connector_sdk_pb2.UpdateResponse(log_entry=log) + log_message(SEVERE, "Sending severe log: Completed Update method") + + +def log_message(level, message): + print(f'{{"level":"{level}", "message": "{message}", "message-origin": "sdk_connector"}}') + def start_server(): server = grpc.server(futures.ThreadPoolExecutor(max_workers=1)) @@ -152,4 +165,4 @@ def start_server(): if __name__ == '__main__': print("Starting the server...") - start_server() + start_server() \ No newline at end of file diff --git a/examples/destination/java/src/main/java/destination/DestinationServiceImpl.java b/examples/destination/java/src/main/java/destination/DestinationServiceImpl.java index 738e748..54c40b6 100644 --- a/examples/destination/java/src/main/java/destination/DestinationServiceImpl.java +++ b/examples/destination/java/src/main/java/destination/DestinationServiceImpl.java @@ -7,8 +7,14 @@ import java.util.Map; public class DestinationServiceImpl extends DestinationGrpc.DestinationImplBase { + + private final String INFO = "INFO"; + private final String WARNING = "WARNING"; + private final String SEVERE = "SEVERE"; + @Override public void configurationForm(ConfigurationFormRequest request, StreamObserver responseObserver) { + logMessage(INFO, "Fetching configuration form"); responseObserver.onNext( ConfigurationFormResponse.newBuilder() .setSchemaSelectionSupported(true) @@ -45,7 +51,8 @@ public void configurationForm(ConfigurationFormRequest request, StreamObserver responseObserver) { Map configuration = request.getConfigurationMap(); String testName = request.getName(); - System.out.println("test name: " + testName); + String message = String.format("Test Name: %s", testName); + logMessage(INFO, message); responseObserver.onNext(TestResponse.newBuilder().setSuccess(true).build()); responseObserver.onCompleted(); @@ -66,6 +73,7 @@ public void describeTable(DescribeTableRequest request, StreamObserver responseObserver) { Map configuration = request.getConfigurationMap(); - System.out.println("[CreateTable]: " - + request.getSchemaName() + " | " + request.getTable().getName() + " | " + request.getTable().getColumnsList()); + String message = "[CreateTable]: " + + request.getSchemaName() + " | " + request.getTable().getName() + " | " + request.getTable().getColumnsList(); + logMessage(INFO, message); responseObserver.onNext(CreateTableResponse.newBuilder().setSuccess(true).build()); responseObserver.onCompleted(); } @@ -83,8 +92,9 @@ public void createTable(CreateTableRequest request, StreamObserver responseObserver) { Map configuration = request.getConfigurationMap(); - System.out.println("[AlterTable]: " + - request.getSchemaName() + " | " + request.getTable().getName() + " | " + request.getTable().getColumnsList()); + String message = "[AlterTable]: " + + request.getSchemaName() + " | " + request.getTable().getName() + " | " + request.getTable().getColumnsList(); + logMessage(INFO, message); responseObserver.onNext(AlterTableResponse.newBuilder().setSuccess(true).build()); responseObserver.onCompleted(); } @@ -99,7 +109,8 @@ public void truncate(TruncateRequest request, StreamObserver r @Override public void writeBatch(WriteBatchRequest request, StreamObserver responseObserver) { - System.out.println("[WriteBatch]: " + request.getSchemaName() + " | " + request.getTable().getName()); + String message = "[WriteBatch]: " + request.getSchemaName() + " | " + request.getTable().getName(); + logMessage(WARNING, String.format("Sample severe message: %s", message)); for (String file : request.getReplaceFilesList()) { System.out.println("Replace files: " + file); } @@ -112,4 +123,8 @@ public void writeBatch(WriteBatchRequest request, StreamObserver