Skip to content

Commit

Permalink
Enable streams compression (#464)
Browse files Browse the repository at this point in the history
  • Loading branch information
anjmao authored Feb 3, 2025
1 parent f2d4706 commit bc2f767
Show file tree
Hide file tree
Showing 4 changed files with 8 additions and 4 deletions.
3 changes: 2 additions & 1 deletion cmd/agent/daemon/state/castai_events_exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"github.com/castai/kvisor/pkg/logging"
"github.com/prometheus/client_golang/prometheus"
"google.golang.org/grpc"
"google.golang.org/grpc/encoding/gzip"
)

func NewCastaiEventsExporter(log *logging.Logger, apiClient *castai.Client, queueSize int) *CastaiEventsExporter {
Expand Down Expand Up @@ -38,7 +39,7 @@ func (c *CastaiEventsExporter) Run(ctx context.Context) error {
defer c.log.Info("export loop done")

ws := castai.NewWriteStream[*castpb.Event, *castpb.WriteStreamResponse](ctx, func(ctx context.Context) (grpc.ClientStream, error) {
return c.apiClient.GRPC.EventsWriteStream(ctx)
return c.apiClient.GRPC.EventsWriteStream(ctx, grpc.UseCompressor(gzip.Name))
})
defer ws.Close()
ws.ReopenDelay = c.writeStreamCreateRetryDelay
Expand Down
3 changes: 2 additions & 1 deletion cmd/agent/daemon/state/castai_netflow_exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"github.com/castai/kvisor/pkg/castai"
"github.com/castai/kvisor/pkg/logging"
"google.golang.org/grpc"
"google.golang.org/grpc/encoding/gzip"
)

func NewCastaiNetflowExporter(log *logging.Logger, apiClient *castai.Client, queueSize int) *CastaiNetflowExporter {
Expand All @@ -32,7 +33,7 @@ func (c *CastaiNetflowExporter) Run(ctx context.Context) error {
defer c.log.Info("export loop done")

ws := castai.NewWriteStream[*castpb.Netflow, *castpb.WriteStreamResponse](ctx, func(ctx context.Context) (grpc.ClientStream, error) {
return c.apiClient.GRPC.NetflowWriteStream(ctx)
return c.apiClient.GRPC.NetflowWriteStream(ctx, grpc.UseCompressor(gzip.Name))
})
defer ws.Close()
ws.ReopenDelay = c.writeStreamCreateRetryDelay
Expand Down
3 changes: 2 additions & 1 deletion cmd/agent/daemon/state/castai_process_tree_exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/castai/kvisor/pkg/processtree"
"github.com/prometheus/client_golang/prometheus"
"google.golang.org/grpc"
"google.golang.org/grpc/encoding/gzip"
)

const (
Expand Down Expand Up @@ -51,7 +52,7 @@ func (c *CastaiProcessTreeExporter) Run(ctx context.Context) error {
defer c.log.Info("export process tree loop done")

ws := castai.NewWriteStream[*castpb.ProcessTreeEvent, *castpb.WriteStreamResponse](ctx, func(ctx context.Context) (grpc.ClientStream, error) {
return c.apiClient.GRPC.ProcessEventsWriteStream(ctx)
return c.apiClient.GRPC.ProcessEventsWriteStream(ctx, grpc.UseCompressor(gzip.Name))
})
defer ws.Close()
ws.ReopenDelay = c.writeStreamCreateRetryDelay
Expand Down
3 changes: 2 additions & 1 deletion cmd/agent/daemon/state/castai_stats_exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"github.com/castai/kvisor/pkg/castai"
"github.com/castai/kvisor/pkg/logging"
"google.golang.org/grpc"
"google.golang.org/grpc/encoding/gzip"
)

func NewCastaiStatsExporter(log *logging.Logger, apiClient *castai.Client, queueSize int) *CastaiStatsExporter {
Expand All @@ -32,7 +33,7 @@ func (c *CastaiStatsExporter) Run(ctx context.Context) error {
defer c.log.Info("export loop done")

ws := castai.NewWriteStream[*castpb.StatsBatch, *castpb.WriteStreamResponse](ctx, func(ctx context.Context) (grpc.ClientStream, error) {
return c.apiClient.GRPC.StatsWriteStream(ctx)
return c.apiClient.GRPC.StatsWriteStream(ctx, grpc.UseCompressor(gzip.Name))
})
defer ws.Close()
ws.ReopenDelay = c.writeStreamCreateRetryDelay
Expand Down

0 comments on commit bc2f767

Please sign in to comment.