Skip to content

Commit

Permalink
fix(Logging): Add examples for providing logs (#74)
Browse files Browse the repository at this point in the history
* log example in source connector java

* fix

* fix the stdout

* log example in python source connector

* java destination

* python destination

* nodejs source connector changes

* golang changes

* add log level constants

* renaming method

* golang config changes

* readme changes

---------

Co-authored-by: Satvik Patil <[email protected]>
  • Loading branch information
fivetran-satvikpatil and Satvik Patil authored Oct 30, 2024
1 parent 6bbc065 commit dccea0e
Show file tree
Hide file tree
Showing 10 changed files with 127 additions and 36 deletions.
2 changes: 1 addition & 1 deletion examples/connector/golang/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,5 +7,5 @@ Run all commands from the golang folder root
> scripts/copy_protos.sh
> scripts/compile_protos.sh
> scripts/build.sh
> main
> ./main
```
15 changes: 8 additions & 7 deletions examples/connector/golang/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,15 @@ module fivetran.com/fivetran_sdk
go 1.21

require (
google.golang.org/grpc v1.59.0
google.golang.org/protobuf v1.31.0
google.golang.org/grpc v1.65.0
google.golang.org/protobuf v1.35.1
)

require (
github.com/golang/protobuf v1.5.3 // indirect
golang.org/x/net v0.14.0 // indirect
golang.org/x/sys v0.11.0 // indirect
golang.org/x/text v0.12.0 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20230822172742-b8732ec3820d // indirect
github.com/golang/protobuf v1.5.4 // indirect
golang.org/x/net v0.30.0 // indirect
golang.org/x/sys v0.26.0 // indirect
golang.org/x/text v0.19.0 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20241021214115-324edc3d5d38 // indirect
google.golang.org/grpc/cmd/protoc-gen-go-grpc v1.5.1 // indirect
)
17 changes: 17 additions & 0 deletions examples/connector/golang/go.sum
Original file line number Diff line number Diff line change
@@ -1,21 +1,38 @@
github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaSAoJOfIk=
github.com/golang/protobuf v1.5.3 h1:KhyjKVUg7Usr/dYsdSqoFveMYd5ko72D+zANwlG1mmg=
github.com/golang/protobuf v1.5.3/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY=
github.com/golang/protobuf v1.5.4 h1:i7eJL8qZTpSEXOPTxNKhASYpMn+8e5Q6AdndVa1dWek=
github.com/golang/protobuf v1.5.4/go.mod h1:lnTiLA8Wa4RWRcIUkrtSVa5nRhsEGBg48fD6rSs7xps=
github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/go-cmp v0.5.9 h1:O2Tfq5qg4qc4AmwVlvv0oLiVAGB7enBSJ2x2DqQFi38=
github.com/google/go-cmp v0.5.9/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI=
golang.org/x/net v0.14.0 h1:BONx9s002vGdD9umnlX1Po8vOZmrgH34qlHcD1MfK14=
golang.org/x/net v0.14.0/go.mod h1:PpSgVXXLK0OxS0F31C1/tv6XNguvCrnXIDrFMspZIUI=
golang.org/x/net v0.30.0 h1:AcW1SDZMkb8IpzCdQUaIq2sP4sZ4zw+55h6ynffypl4=
golang.org/x/net v0.30.0/go.mod h1:2wGyMJ5iFasEhkwi13ChkO/t1ECNC4X4eBKkVFyYFlU=
golang.org/x/sys v0.11.0 h1:eG7RXZHdqOJ1i+0lgLgCpSXAp6M3LYlAo6osgSi0xOM=
golang.org/x/sys v0.11.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.26.0 h1:KHjCJyddX0LoSTb3J+vWpupP9p0oznkqVk/IfjymZbo=
golang.org/x/sys v0.26.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/text v0.12.0 h1:k+n5B8goJNdU7hSvEtMUz3d1Q6D/XW4COJSJR6fN0mc=
golang.org/x/text v0.12.0/go.mod h1:TvPlkZtksWOMsz7fbANvkp4WM8x/WCo/om8BMLbz+aE=
golang.org/x/text v0.19.0 h1:kTxAhCbGbxhK0IwgSKiMO5awPoDQ0RpfiVYBfK860YM=
golang.org/x/text v0.19.0/go.mod h1:BuEKDfySbSR4drPmRPG/7iBdf8hvFMuRexcpahXilzY=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
google.golang.org/genproto/googleapis/rpc v0.0.0-20230822172742-b8732ec3820d h1:uvYuEyMHKNt+lT4K3bN6fGswmK8qSvcreM3BwjDh+y4=
google.golang.org/genproto/googleapis/rpc v0.0.0-20230822172742-b8732ec3820d/go.mod h1:+Bk1OCOj40wS2hwAMA+aCW9ypzm63QTBBHp6lQ3p+9M=
google.golang.org/genproto/googleapis/rpc v0.0.0-20241021214115-324edc3d5d38 h1:zciRKQ4kBpFgpfC5QQCVtnnNAcLIqweL7plyZRQHVpI=
google.golang.org/genproto/googleapis/rpc v0.0.0-20241021214115-324edc3d5d38/go.mod h1:GX3210XPVPUjJbTUbvwI8f2IpZDMZuPJWDzDuebbviI=
google.golang.org/grpc v1.59.0 h1:Z5Iec2pjwb+LEOqzpB2MR12/eKFhDPhuqW91O+4bwUk=
google.golang.org/grpc v1.59.0/go.mod h1:aUPDwccQo6OTjy7Hct4AfBPD1GptF4fyUjIkQ9YtF98=
google.golang.org/grpc v1.65.0 h1:bs/cUb4lp1G5iImFFd3u5ixQzweKizoZJAwBNLR42lc=
google.golang.org/grpc v1.65.0/go.mod h1:WgYC2ypjlB0EiQi6wdKixMqukr6lBc0Vo+oOgjrM5ZQ=
google.golang.org/grpc/cmd/protoc-gen-go-grpc v1.5.1 h1:F29+wU6Ee6qgu9TddPgooOdaqsxTMunOoj8KA5yuS5A=
google.golang.org/grpc/cmd/protoc-gen-go-grpc v1.5.1/go.mod h1:5KF+wpkbTSbGcR9zteSqZV6fqFOWBl4Yde8En8MryZA=
google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw=
google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc=
google.golang.org/protobuf v1.31.0 h1:g0LDEJHgrBl9N9r17Ru3sqWhkIx2NB67okBHPwC7hs8=
google.golang.org/protobuf v1.31.0/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I=
google.golang.org/protobuf v1.35.1 h1:m3LfL6/Ca+fqnjnlqQXNpFPABW1UD7mjh8KO2mKFytA=
google.golang.org/protobuf v1.35.1/go.mod h1:9fA7Ob0pmnwhb644+1+CVWFRbNajQ6iRojtC/QF5bRE=
24 changes: 23 additions & 1 deletion examples/connector/golang/golang_connector/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,10 @@ import (
"google.golang.org/grpc"
)

const INFO = "INFO"
const WARNING = "WARNING"
const SEVERE = "SEVERE"

var port = flag.Int("port", 50051, "The server port")

type MyState struct {
Expand All @@ -30,7 +34,8 @@ func (s *server) Update(in *pb.UpdateRequest, stream pb.Connector_UpdateServer)
state := MyState{}
json.Unmarshal([]byte(state_json), &state)

log.Println("config: ", config, "selection: ", selection, "state_json: ", state_json, "mystate: ", state)
message := fmt.Sprintf("config: %s, selection: %s, state_json: %s, mystate: %s", config, selection, state_json, state)
LogMessage(INFO, message)

// -- Send a log message
stream.Send(&pb.UpdateResponse{
Expand Down Expand Up @@ -66,6 +71,8 @@ func (s *server) Update(in *pb.UpdateRequest, stream pb.Connector_UpdateServer)
state.Cursor++
}

LogMessage(INFO, "Completed sending upsert records")

// -- Send UPDATE record
stream.Send(&pb.UpdateResponse{
Response: &pb.UpdateResponse_Operation{
Expand All @@ -86,6 +93,8 @@ func (s *server) Update(in *pb.UpdateRequest, stream pb.Connector_UpdateServer)
})
state.Cursor++

LogMessage(INFO, "Completed sending update records")

// -- Send DELETE record
stream.Send(&pb.UpdateResponse{
Response: &pb.UpdateResponse_Operation{
Expand All @@ -105,6 +114,8 @@ func (s *server) Update(in *pb.UpdateRequest, stream pb.Connector_UpdateServer)
})
state.Cursor++

LogMessage(WARNING, "Sample warning message: Completed sending delete records")

// Serialize state from struct to JSON string
new_state_json, _ := json.Marshal(state)
new_state := string(new_state_json)
Expand Down Expand Up @@ -133,6 +144,7 @@ func (s *server) Update(in *pb.UpdateRequest, stream pb.Connector_UpdateServer)
},
})

LogMessage(SEVERE, "Sample severe message: Update call completed")
// End the RPC call
return nil
}
Expand Down Expand Up @@ -258,6 +270,16 @@ func (s *server) Test(ctx context.Context, in *pb.TestRequest) (*pb.TestResponse
}, nil
}

func LogMessage(level string, message string) {
log := map[string]interface{}{
"level": level,
"message": message,
"message-origin": "sdk_connector",
}
logJSON, _ := json.Marshal(log)
fmt.Println(string(logJSON))
}

func main() {
flag.Parse()
lis, err := net.Listen("tcp", fmt.Sprintf(":%d", *port))
Expand Down
2 changes: 1 addition & 1 deletion examples/connector/golang/scripts/compile_protos.sh
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
protoc \
PATH="${PATH}:${HOME}/go/bin" protoc \
--proto_path=proto \
--go_out=proto \
--go_opt=paths=source_relative \
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,12 @@
import java.util.*;

public class ConnectorServiceImpl extends ConnectorGrpc.ConnectorImplBase {
private final String INFO = "INFO";
private final String WARNING = "WARNING";
private final String SEVERE = "SEVERE";
@Override
public void configurationForm(ConfigurationFormRequest request, StreamObserver<ConfigurationFormResponse> responseObserver) {
logMessage(INFO, "Started fetching configuration form");
responseObserver.onNext(
ConfigurationFormResponse.newBuilder()
.setSchemaSelectionSupported(true)
Expand Down Expand Up @@ -41,23 +45,28 @@ public void configurationForm(ConfigurationFormRequest request, StreamObserver<C
ConfigurationTest.newBuilder().setName("select").setLabel("Tests selection").build()))
.build());

logMessage(INFO, "Fetching configuration form completed");
responseObserver.onCompleted();
}

@Override
public void test(TestRequest request, StreamObserver<TestResponse> responseObserver) {

Map<String, String> configuration = request.getConfigurationMap();

// Name of the test to be run
String testName = request.getName();
System.out.println("test name: " + testName);
String message = String.format("test name: %s", testName);
logMessage(INFO, message);

responseObserver.onNext(TestResponse.newBuilder().setSuccess(true).build());
responseObserver.onCompleted();
}

@Override
public void schema(SchemaRequest request, StreamObserver<SchemaResponse> responseObserver) {

logMessage(WARNING, "Sample warning message while fetching schema");
Map<String, String> configuration = request.getConfigurationMap();

TableList tableList = TableList.newBuilder()
Expand Down Expand Up @@ -177,10 +186,16 @@ public void update(UpdateRequest request, StreamObserver<UpdateResponse> respons
.build())
.build());
} catch (JsonProcessingException e) {
String message = e.getMessage();
logMessage(SEVERE, message);
responseObserver.onError(e);
}

// End the streaming RPC call
responseObserver.onCompleted();
}

private void logMessage(String level, String message) {
System.out.println(String.format("{\"level\":\"%s\", \"message\": \"%s\", \"message-origin\": \"sdk_connector\"}", level, message));
}
}
25 changes: 13 additions & 12 deletions examples/connector/nodejs/src/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,18 @@ const options = {
};
var packageDefinitionConnector = protoLoader.loadSync(PROTO_PATH_CONNECTOR, options);

const INFO = "INFO";
const WARNING = "WARNING";
const SEVERE = "SEVERE";

const protoDescriptor = grpc.loadPackageDefinition(packageDefinitionConnector);

const connectorSdkProto = protoDescriptor.fivetran_sdk;

const server = new grpc.Server();

const configurationForm = (call, callback) => {
logMessage(INFO, "Fetching configuration form")
callback(null, {
schema_selection_supported: true,
table_selection_supported: true,
Expand All @@ -39,12 +44,13 @@ const configurationForm = (call, callback) => {
const test = (call, callback) => {
const configuration = call.request.configuration;
const testName = call.request.name;
console.log(`Test name: ${testName}`);
logMessage(INFO, `Test name: ${testName}`)
callback(null, { success: true });
};

// Implement the Schema RPC method
const schema = (call, callback) => {
logMessage(INFO, "Fetching the schema from the implemented method")
const tableList = {
tables: [
{
Expand Down Expand Up @@ -77,15 +83,6 @@ const configurationForm = (call, callback) => {
call.write(response);
};

const sendLogEntry = (message) => {
sendResponse({
log_entry: {
level: "INFO",
message: message
}
});
};

const sendOperation = (operation) => {
sendResponse({
operation: operation
Expand All @@ -94,7 +91,7 @@ const configurationForm = (call, callback) => {

try {
// Send a log message
sendLogEntry("Sync STARTING");
logMessage(WARNING, "Sample Warning message: Sync STARTING");

// Send UPSERT records
for (let i = 0; i < 3; i++) {
Expand Down Expand Up @@ -145,7 +142,7 @@ const configurationForm = (call, callback) => {
});

// Send a log message
sendLogEntry("Sync DONE");
logMessage(SEVERE, "Sample severe message: Sync done")

} catch (error) {
callback(error);
Expand All @@ -155,6 +152,10 @@ const configurationForm = (call, callback) => {
call.end();
};

function logMessage(level, message) {
console.log(`{"level":"${level}", "message": "${message}", "message-origin": "sdk_connector"}`);
}


server.addService(connectorSdkProto.Connector.service, {configurationForm, test, schema, update})

Expand Down
19 changes: 16 additions & 3 deletions examples/connector/python/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,13 @@
from sdk_pb2 import common_pb2
from sdk_pb2 import connector_sdk_pb2

INFO = "INFO"
WARNING = "WARNING"
SEVERE = "SEVERE"

class ConnectorService(connector_sdk_pb2_grpc.ConnectorServicer):
def ConfigurationForm(self, request, context):
log_message(INFO, "Fetching configuration form")
form_fields = common_pb2.ConfigurationFormResponse(schema_selection_supported=True,
table_selection_supported=True)
form_fields.fields.add(name="apiKey", label="API Key", required=True, text_field=common_pb2.TextField.PlainText)
Expand All @@ -35,8 +39,9 @@ def Test(self, request, context):
configuration = request.configuration
# Name of the test to be run
test_name = request.name
print("Configuration: ", configuration)
print("Test name: ", test_name)

log_message(INFO, "Test Name: " + str(test_name))

return common_pb2.TestResponse(success=True)

def Schema(self, request, context):
Expand Down Expand Up @@ -114,6 +119,8 @@ def Update(self, request, context):
yield connector_sdk_pb2.UpdateResponse(operation=operation)
state["cursor"] += 1

log_message(WARNING, "Completed sending update records")

# -- Send DELETE record
operation = connector_sdk_pb2.Operation()
val1 = common_pb2.ValueType()
Expand All @@ -139,6 +146,12 @@ def Update(self, request, context):
log.message = "Sync Done"
yield connector_sdk_pb2.UpdateResponse(log_entry=log)

log_message(SEVERE, "Sending severe log: Completed Update method")


def log_message(level, message):
print(f'{{"level":"{level}", "message": "{message}", "message-origin": "sdk_connector"}}')


def start_server():
server = grpc.server(futures.ThreadPoolExecutor(max_workers=1))
Expand All @@ -152,4 +165,4 @@ def start_server():

if __name__ == '__main__':
print("Starting the server...")
start_server()
start_server()
Loading

0 comments on commit dccea0e

Please sign in to comment.