Skip to content

Commit

Permalink
Redis metrics - send metrics to Data Warehouse
Browse files Browse the repository at this point in the history
PiperOrigin-RevId: 722641184
  • Loading branch information
shapirojoseph authored and copybara-github committed Feb 3, 2025
1 parent a84e9f2 commit a05edcf
Show file tree
Hide file tree
Showing 7 changed files with 136 additions and 5,036 deletions.
3 changes: 3 additions & 0 deletions .gitmodules
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
[submodule "workloadagentplatform"]
path = workloadagentplatform
url = https://github.com/GoogleCloudPlatform/workloadagentplatform
6 changes: 4 additions & 2 deletions internal/daemon/redis/redis.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"github.com/GoogleCloudPlatform/workloadagent/internal/redismetrics"
"github.com/GoogleCloudPlatform/workloadagent/internal/servicecommunication"
"github.com/GoogleCloudPlatform/workloadagent/internal/usagemetrics"
"github.com/GoogleCloudPlatform/workloadagent/internal/workloadmanager"
configpb "github.com/GoogleCloudPlatform/workloadagent/protos/configuration"
"github.com/GoogleCloudPlatform/workloadagentplatform/integration/common/shared/gce"
"github.com/GoogleCloudPlatform/workloadagentplatform/integration/common/shared/log"
Expand All @@ -40,6 +41,7 @@ type Service struct {
processes servicecommunication.DiscoveryResult
redisProcesses []servicecommunication.ProcessWrapper
dwActivated bool
WLMClient workloadmanager.WLMWriter
}

type runDiscoveryArgs struct {
Expand All @@ -63,7 +65,7 @@ func (s *Service) Start(ctx context.Context, a any) {
s.checkServiceCommunication(ctx)
}
})()
ticker := time.NewTicker(5 * time.Minute)
ticker := time.NewTicker(5 * time.Second)
defer ticker.Stop()
enabled := s.Config.GetRedisConfiguration().GetEnabled()
EnableCheck:
Expand Down Expand Up @@ -149,7 +151,7 @@ func runMetricCollection(ctx context.Context, a any) {
r := &redismetrics.RedisMetrics{
Config: args.s.Config,
}
err = r.InitDB(ctx, gceService)
err = r.InitDB(ctx, gceService, args.s.WLMClient)
if err != nil {
log.CtxLogger(ctx).Errorf("failed to initialize Redis DB client", "error", err)
return
Expand Down
34 changes: 27 additions & 7 deletions internal/redismetrics/redismetrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (

"github.com/redis/go-redis/v9"
"github.com/GoogleCloudPlatform/workloadagent/internal/secret"
"github.com/GoogleCloudPlatform/workloadagent/internal/workloadmanager"
configpb "github.com/GoogleCloudPlatform/workloadagent/protos/configuration"
"github.com/GoogleCloudPlatform/workloadagentplatform/integration/common/shared/log"
)
Expand All @@ -42,6 +43,8 @@ const (
yes = "yes"
up = "up"
defaultPort = 6379
persistenceKey = "persistence_enabled"
replicationKey = "replication_enabled"
)

type gceInterface interface {
Expand All @@ -56,8 +59,9 @@ type dbInterface interface {

// RedisMetrics contains variables and methods to collect metrics for Redis databases running on the current host.
type RedisMetrics struct {
Config *configpb.Configuration
db dbInterface
Config *configpb.Configuration
db dbInterface
WLMClient workloadmanager.WLMWriter
}

// password gets the password for the Redis database.
Expand All @@ -81,7 +85,7 @@ func (r *RedisMetrics) password(ctx context.Context, gceService gceInterface) (s
}

// InitDB initializes the Redis database client.
func (r *RedisMetrics) InitDB(ctx context.Context, gceService gceInterface) error {
func (r *RedisMetrics) InitDB(ctx context.Context, gceService gceInterface, wlmClient workloadmanager.WLMWriter) error {
pw, err := r.password(ctx, gceService)
if err != nil {
return fmt.Errorf("failed to get password: %v", err)
Expand All @@ -94,6 +98,7 @@ func (r *RedisMetrics) InitDB(ctx context.Context, gceService gceInterface) erro
Addr: fmt.Sprintf("localhost:%d", port),
Password: pw.SecretValue(),
})
r.WLMClient = wlmClient
return nil
}

Expand Down Expand Up @@ -166,10 +171,25 @@ func (r *RedisMetrics) persistenceEnabled(ctx context.Context) bool {
}

// CollectMetricsOnce collects metrics for Redis databases running on the host.
func (r *RedisMetrics) CollectMetricsOnce(ctx context.Context) {
func (r *RedisMetrics) CollectMetricsOnce(ctx context.Context) (*workloadmanager.WorkloadMetrics, error) {
replicationOn := r.replicationModeActive(ctx)
persistenceOn := r.persistenceEnabled(ctx)

// TODO: send these metrics to Data Warehouse.
log.CtxLogger(ctx).Debugw("Finished collecting metrics once.", "replicationOn", replicationOn, "persistenceOn", persistenceOn)
log.CtxLogger(ctx).Debugw("Finished collecting metrics once. Next step is to send to WLM (DW).", "replicationOn", replicationOn, "persistenceOn", persistenceOn)
metrics := workloadmanager.WorkloadMetrics{
WorkloadType: workloadmanager.REDIS,
Metrics: map[string]string{
replicationKey: strconv.FormatBool(replicationOn),
persistenceKey: strconv.FormatBool(persistenceOn),
},
}
res, err := workloadmanager.SendDataInsight(ctx, workloadmanager.SendDataInsightParams{
WLMetrics: metrics,
CloudProps: r.Config.GetCloudProperties(),
WLMService: r.WLMClient,
})
if err != nil {
return nil, err
}
log.CtxLogger(ctx).Debugw("WriteInsight response", "StatusCode", res.HTTPStatusCode)
return &metrics, nil
}
104 changes: 102 additions & 2 deletions internal/redismetrics/redismetrics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,14 @@ import (
"fmt"
"testing"

"github.com/google/go-cmp/cmp"
"github.com/redis/go-redis/v9"
"google.golang.org/api/googleapi"
"google.golang.org/protobuf/testing/protocmp"
"github.com/GoogleCloudPlatform/workloadagent/internal/workloadmanager"
configpb "github.com/GoogleCloudPlatform/workloadagent/protos/configuration"
gcefake "github.com/GoogleCloudPlatform/workloadagentplatform/integration/common/shared/gce/fake"
"github.com/GoogleCloudPlatform/workloadagentplatform/integration/common/shared/gce/wlm"
)

type testGCE struct {
Expand Down Expand Up @@ -177,6 +182,101 @@ func TestInitPassword(t *testing.T) {
}
}

func TestCollectMetricsOnce(t *testing.T) {
tests := []struct {
name string
r RedisMetrics
stringCmdValue string
saveMapContent map[string]string
appendMapContent map[string]string
wlmClient workloadmanager.WLMWriter
want *workloadmanager.WorkloadMetrics
wantErr bool
}{
{
name: "HappyPath",
r: RedisMetrics{
Config: &configpb.Configuration{
CloudProperties: &configpb.CloudProperties{
ProjectId: "fake-project-id",
},
},
WLMClient: &gcefake.TestWLM{
WriteInsightErrs: []error{nil},
WriteInsightResponses: []*wlm.WriteInsightResponse{
&wlm.WriteInsightResponse{ServerResponse: googleapi.ServerResponse{HTTPStatusCode: 201}},
},
},
},
stringCmdValue: "role:master\nconnected_slaves:1\n",
saveMapContent: map[string]string{
"save": "3600 1 300 100 60 10000",
},
appendMapContent: map[string]string{
"appendonly": "no",
},
want: &workloadmanager.WorkloadMetrics{
WorkloadType: workloadmanager.REDIS,
Metrics: map[string]string{
replicationKey: "true",
persistenceKey: "true",
},
},
wantErr: false,
},
{
name: "WLMError",
r: RedisMetrics{
Config: &configpb.Configuration{
CloudProperties: &configpb.CloudProperties{
ProjectId: "fake-project-id",
},
},
WLMClient: &gcefake.TestWLM{
WriteInsightErrs: []error{errors.New("fake-error")},
WriteInsightResponses: []*wlm.WriteInsightResponse{
&wlm.WriteInsightResponse{ServerResponse: googleapi.ServerResponse{HTTPStatusCode: 400}},
},
},
},
stringCmdValue: "role:master\nconnected_slaves:1\n",
saveMapContent: map[string]string{
"save": "3600 1 300 100 60 10000",
},
appendMapContent: map[string]string{
"appendonly": "no",
},
want: nil,
wantErr: true,
},
}

ctx := context.Background()

for _, tc := range tests {
testDB := &testDB{
info: &redis.StringCmd{},
saveConfig: &redis.MapStringStringCmd{},
appendConfig: &redis.MapStringStringCmd{},
}
testDB.info.SetVal(tc.stringCmdValue)
testDB.saveConfig.SetVal(tc.saveMapContent)
testDB.appendConfig.SetVal(tc.appendMapContent)
tc.r.db = testDB

got, err := tc.r.CollectMetricsOnce(ctx)
if tc.wantErr {
if err == nil {
t.Errorf("CollectMetricsOnce(%v) returned no error, want error", tc.name)
}
continue
}
if diff := cmp.Diff(tc.want, got, protocmp.Transform()); diff != "" {
t.Errorf("CollectMetricsOnce(%v) = %v, want %v", tc.name, got, tc.want)
}
}
}

func TestReplicationModeActive(t *testing.T) {
tests := []struct {
name string
Expand Down Expand Up @@ -374,12 +474,12 @@ func TestInitDB(t *testing.T) {
ctx := context.Background()

for _, tc := range tests {
err := tc.r.InitDB(ctx, tc.gceService)
err := tc.r.InitDB(ctx, tc.gceService, &gcefake.TestWLM{})
gotErr := err != nil
if gotErr != tc.wantErr {
t.Errorf("InitDB(%v) = %v, wantErr %v", tc.name, err, tc.wantErr)
}
if !gotErr && tc.r.db.String() != tc.want {
if !gotErr && (tc.r.db.String() != tc.want || tc.r.WLMClient == nil) {
t.Errorf("InitDB(%v) = %v, want %v", tc.name, tc.r.db.String(), tc.want)
}
}
Expand Down
Loading

0 comments on commit a05edcf

Please sign in to comment.