From bc2f767d37f1ec147d53aeb62765fdcd84a12207 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?And=C5=BEej=20Maciusovi=C4=8D?= Date: Mon, 3 Feb 2025 09:04:38 +0200 Subject: [PATCH] Enable streams compression (#464) --- cmd/agent/daemon/state/castai_events_exporter.go | 3 ++- cmd/agent/daemon/state/castai_netflow_exporter.go | 3 ++- cmd/agent/daemon/state/castai_process_tree_exporter.go | 3 ++- cmd/agent/daemon/state/castai_stats_exporter.go | 3 ++- 4 files changed, 8 insertions(+), 4 deletions(-) diff --git a/cmd/agent/daemon/state/castai_events_exporter.go b/cmd/agent/daemon/state/castai_events_exporter.go index f4a9d2dc..df891dcc 100644 --- a/cmd/agent/daemon/state/castai_events_exporter.go +++ b/cmd/agent/daemon/state/castai_events_exporter.go @@ -10,6 +10,7 @@ import ( "github.com/castai/kvisor/pkg/logging" "github.com/prometheus/client_golang/prometheus" "google.golang.org/grpc" + "google.golang.org/grpc/encoding/gzip" ) func NewCastaiEventsExporter(log *logging.Logger, apiClient *castai.Client, queueSize int) *CastaiEventsExporter { @@ -38,7 +39,7 @@ func (c *CastaiEventsExporter) Run(ctx context.Context) error { defer c.log.Info("export loop done") ws := castai.NewWriteStream[*castpb.Event, *castpb.WriteStreamResponse](ctx, func(ctx context.Context) (grpc.ClientStream, error) { - return c.apiClient.GRPC.EventsWriteStream(ctx) + return c.apiClient.GRPC.EventsWriteStream(ctx, grpc.UseCompressor(gzip.Name)) }) defer ws.Close() ws.ReopenDelay = c.writeStreamCreateRetryDelay diff --git a/cmd/agent/daemon/state/castai_netflow_exporter.go b/cmd/agent/daemon/state/castai_netflow_exporter.go index f25dfa78..2602b70e 100644 --- a/cmd/agent/daemon/state/castai_netflow_exporter.go +++ b/cmd/agent/daemon/state/castai_netflow_exporter.go @@ -9,6 +9,7 @@ import ( "github.com/castai/kvisor/pkg/castai" "github.com/castai/kvisor/pkg/logging" "google.golang.org/grpc" + "google.golang.org/grpc/encoding/gzip" ) func NewCastaiNetflowExporter(log *logging.Logger, apiClient *castai.Client, queueSize int) *CastaiNetflowExporter { @@ -32,7 +33,7 @@ func (c *CastaiNetflowExporter) Run(ctx context.Context) error { defer c.log.Info("export loop done") ws := castai.NewWriteStream[*castpb.Netflow, *castpb.WriteStreamResponse](ctx, func(ctx context.Context) (grpc.ClientStream, error) { - return c.apiClient.GRPC.NetflowWriteStream(ctx) + return c.apiClient.GRPC.NetflowWriteStream(ctx, grpc.UseCompressor(gzip.Name)) }) defer ws.Close() ws.ReopenDelay = c.writeStreamCreateRetryDelay diff --git a/cmd/agent/daemon/state/castai_process_tree_exporter.go b/cmd/agent/daemon/state/castai_process_tree_exporter.go index bcde695c..92cfbe4c 100644 --- a/cmd/agent/daemon/state/castai_process_tree_exporter.go +++ b/cmd/agent/daemon/state/castai_process_tree_exporter.go @@ -11,6 +11,7 @@ import ( "github.com/castai/kvisor/pkg/processtree" "github.com/prometheus/client_golang/prometheus" "google.golang.org/grpc" + "google.golang.org/grpc/encoding/gzip" ) const ( @@ -51,7 +52,7 @@ func (c *CastaiProcessTreeExporter) Run(ctx context.Context) error { defer c.log.Info("export process tree loop done") ws := castai.NewWriteStream[*castpb.ProcessTreeEvent, *castpb.WriteStreamResponse](ctx, func(ctx context.Context) (grpc.ClientStream, error) { - return c.apiClient.GRPC.ProcessEventsWriteStream(ctx) + return c.apiClient.GRPC.ProcessEventsWriteStream(ctx, grpc.UseCompressor(gzip.Name)) }) defer ws.Close() ws.ReopenDelay = c.writeStreamCreateRetryDelay diff --git a/cmd/agent/daemon/state/castai_stats_exporter.go b/cmd/agent/daemon/state/castai_stats_exporter.go index d49ed6a7..3e8f286d 100644 --- a/cmd/agent/daemon/state/castai_stats_exporter.go +++ b/cmd/agent/daemon/state/castai_stats_exporter.go @@ -9,6 +9,7 @@ import ( "github.com/castai/kvisor/pkg/castai" "github.com/castai/kvisor/pkg/logging" "google.golang.org/grpc" + "google.golang.org/grpc/encoding/gzip" ) func NewCastaiStatsExporter(log *logging.Logger, apiClient *castai.Client, queueSize int) *CastaiStatsExporter { @@ -32,7 +33,7 @@ func (c *CastaiStatsExporter) Run(ctx context.Context) error { defer c.log.Info("export loop done") ws := castai.NewWriteStream[*castpb.StatsBatch, *castpb.WriteStreamResponse](ctx, func(ctx context.Context) (grpc.ClientStream, error) { - return c.apiClient.GRPC.StatsWriteStream(ctx) + return c.apiClient.GRPC.StatsWriteStream(ctx, grpc.UseCompressor(gzip.Name)) }) defer ws.Close() ws.ReopenDelay = c.writeStreamCreateRetryDelay