Skip to content

Commit

Permalink
Merge pull request #81 from batchcorp/blinktag/msg_size_fix
Browse files Browse the repository at this point in the history
Increasing max record size for relay payloads
  • Loading branch information
blinktag committed Mar 19, 2021
2 parents a47e340 + 35735b9 commit 2f1302b
Show file tree
Hide file tree
Showing 6 changed files with 11 additions and 7 deletions.
2 changes: 1 addition & 1 deletion relay/aws-sqs.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ func (r *Relay) handleSQS(ctx context.Context, conn *grpc.ClientConn, messages [
r.CallWithRetry(ctx, "AddSQSRecord", func(ctx context.Context) error {
_, err := client.AddSQSRecord(ctx, &services.SQSRecordRequest{
Records: sinkRecords,
})
}, grpc.MaxCallRecvMsgSize(MaxGRPCMessageSize))
return err
})

Expand Down
2 changes: 1 addition & 1 deletion relay/azure.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ func (r *Relay) handleAzure(ctx context.Context, conn *grpc.ClientConn, messages
return r.CallWithRetry(ctx, "AddAzureRecord", func(ctx context.Context) error {
_, err := client.AddAzureRecord(ctx, &services.AzureRecordRequest{
Records: sinkRecords,
})
}, grpc.MaxCallRecvMsgSize(MaxGRPCMessageSize))
return err
})
}
Expand Down
2 changes: 1 addition & 1 deletion relay/gcp-pubsub.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ func (r *Relay) handleGCP(ctx context.Context, conn *grpc.ClientConn, messages [
return r.CallWithRetry(ctx, "AddGCPRecord", func(ctx context.Context) error {
_, err := client.AddGCPRecord(ctx, &services.GCPRecordRequest{
Records: sinkRecords,
})
}, grpc.MaxCallRecvMsgSize(MaxGRPCMessageSize))
return err
})
}
Expand Down
2 changes: 1 addition & 1 deletion relay/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ func (r *Relay) handleKafka(ctx context.Context, conn *grpc.ClientConn, messages
return r.CallWithRetry(ctx, "AddKafkaRecord", func(ctx context.Context) error {
_, err := client.AddKafkaRecord(ctx, &services.KafkaSinkRecordRequest{
Records: sinkRecords,
})
}, grpc.MaxCallRecvMsgSize(MaxGRPCMessageSize))
return err
})
}
Expand Down
2 changes: 1 addition & 1 deletion relay/rabbitmq.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ func (r *Relay) handleRabbit(ctx context.Context, conn *grpc.ClientConn, message
return r.CallWithRetry(ctx, "AddAMQPRecord", func(ctx context.Context) error {
_, err := client.AddAMQPRecord(ctx, &services.AMQPRecordRequest{
Records: sinkRecords,
})
}, grpc.MaxCallRecvMsgSize(MaxGRPCMessageSize))
return err
})
}
Expand Down
8 changes: 6 additions & 2 deletions relay/relay.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,10 @@ const (
DefaultBatchSize = 100 // number of messages to batch

MaxGRPCRetries = 5
GRPCRetrySleep = time.Second * 5

// Maximum message size for GRPC client in bytes
MaxGRPCMessageSize = 1024 * 1024 * 100 // 100MB
GRPCRetrySleep = time.Second * 5
)

type Relay struct {
Expand Down Expand Up @@ -259,7 +262,8 @@ func (r *Relay) CallWithRetry(ctx context.Context, method string, publish func(c
var err error

for i := 1; i <= MaxGRPCRetries; i++ {
if err := publish(ctx); err != nil {
err = publish(ctx)
if err != nil {
r.log.Debugf("unable to complete %s call [retry %d/%d]", method, i, 5)
time.Sleep(GRPCRetrySleep)
continue
Expand Down

0 comments on commit 2f1302b

Please sign in to comment.