Skip to content

Commit

Permalink
Expose a configurable readiness check endpoint via http
Browse files Browse the repository at this point in the history
  • Loading branch information
ShuranZhang committed Feb 8, 2025
1 parent ef01234 commit af0cdcb
Show file tree
Hide file tree
Showing 4 changed files with 75 additions and 13 deletions.
15 changes: 15 additions & 0 deletions config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,21 @@ cassandra_to_spanner_configs:

# [Optional] client key path for mTLS configuration
clientKey : PATH/TO/CLIENT_KEY

# [Optional] Endpoint (host:port) to bind the HTTP server for the readiness check endpoint.
# This endpoint is used for monitoring purposes to verify if all configured listeners
# are successfully started and running.
#
# To check the status, you need to query the `/ready` path of this endpoint
#
# It will return an HTTP 200 OK status if all listeners are up and running.
# If not all listeners are ready, it returns HTTP 503 Service Unavailable.
#
# If this option is *not* set, the readiness check HTTP endpoint
# will *not* be registered and no readiness checks will be available via HTTP.
#
# Default: (Empty - readiness check endpoint disabled)
readinessCheckEndpoint: YOUR_READINESS_CHECK_ENDPOINT

listeners:
- name: YOUR_CLUSTER_NAME
Expand Down
18 changes: 15 additions & 3 deletions docs/config_options.md
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,21 @@ cassandra_to_spanner_configs:
# [Optional] client key path for mTLS configuration
clientKey : PATH/TO/CLIENT_KEY
# [Optional] Endpoint (host:port) to bind the HTTP server for the readiness check endpoint.
# This endpoint is used for monitoring purposes to verify if all configured listeners
# are successfully started and running.
#
# To check the status, you need to query the `/ready` path of this endpoint
#
# It will return an HTTP 200 OK status if all listeners are up and running.
# If not all listeners are ready, it returns HTTP 503 Service Unavailable.
#
# If this option is *not* set, the readiness check HTTP endpoint
# will *not* be registered and no readiness checks will be available via HTTP.
#
# Default: (Empty - readiness check endpoint disabled)
readinessCheckEndpoint: YOUR_READINESS_CHECK_ENDPOINT
listeners:
- name: YOUR_CLUSTER_NAME_1
Expand Down Expand Up @@ -96,9 +111,6 @@ otel:
# Name of the collector service to be setup as a sidecar
serviceName: YOUR_OTEL_COLLECTOR_SERVICE_NAME
# Whether or not to disable generating random service instance id key, Default 'False'.
disableRandomServiceInstanceIDKey: False
healthcheck:
# Enable the health check in this proxy application config only if the
# "health_check" extension is added to the OTEL collector service configuration.
Expand Down
53 changes: 43 additions & 10 deletions third_party/datastax/proxy/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,12 @@ import (
"io/ioutil"
"log"
"net"
"net/http"
"os"
"strconv"
"strings"
"sync"
"sync/atomic"
"time"

"github.com/alecthomas/kong"
Expand All @@ -44,6 +46,8 @@ var (
proxyReleaseVersion = "v1.0.4"
)
var readFile = os.ReadFile
var atomicStartedListenersCounter atomic.Int32
var expectedListeners int // to be set in Run function

const defaultConfigFile = "config.yaml"

Expand All @@ -57,16 +61,17 @@ type UserConfig struct {

// CassandraToSpannerConfigs contains configurations for Cassandra to Spanner
type CassandraToSpannerConfigs struct {
KeyspaceFlatter bool `yaml:"keyspaceFlatter"`
ProjectID string `yaml:"projectId"`
ConfigTableName string `yaml:"configTableName"`
UseRowTTL bool `yaml:"useRowTTL"`
UseRowTimestamp bool `yaml:"useRowTimestamp"`
Endpoint string `yaml:"endpoint"`
CaCertificate string `yaml:"caCertificate"`
ClientCertificate string `yaml:"clientCertificate"`
ClientKey string `yaml:"clientKey"`
UsePlainText bool `yaml:"usePlainText"`
KeyspaceFlatter bool `yaml:"keyspaceFlatter"`
ProjectID string `yaml:"projectId"`
ConfigTableName string `yaml:"configTableName"`
UseRowTTL bool `yaml:"useRowTTL"`
UseRowTimestamp bool `yaml:"useRowTimestamp"`
Endpoint string `yaml:"endpoint"`
CaCertificate string `yaml:"caCertificate"`
ClientCertificate string `yaml:"clientCertificate"`
ClientKey string `yaml:"clientKey"`
UsePlainText bool `yaml:"usePlainText"`
ReadinessCheckEndpoint string `yaml:"readinessCheckEndpoint"`
}

// OtelConfig defines the structure of the YAML configuration
Expand Down Expand Up @@ -149,6 +154,18 @@ type runConfig struct {
LogLevel string `yaml:"log-level" help:"Log level configuration." default:"info" env:"LOG_LEVEL"`
}

func readinessCheckHandler(w http.ResponseWriter, r *http.Request) {
started := int(atomicStartedListenersCounter.Load()) == expectedListeners

if started {
w.WriteHeader(http.StatusOK)
w.Write([]byte("All listeners are up and running"))
} else {
w.WriteHeader(http.StatusServiceUnavailable)
w.Write([]byte("Some listeners are still not ready"))
}
}

// Run starts the proxy command. 'args' shouldn't include the executable (i.e. os.Args[1:]). It returns the exit code
// for the proxy.
func Run(ctx context.Context, args []string) int {
Expand All @@ -165,6 +182,21 @@ func Run(ctx context.Context, args []string) int {
log.Fatalf("could not read configuration file %s: %v", configFile, err)
}

atomicStartedListenersCounter.Store(0)
expectedListeners = len(UserConfig.Listeners)

// Start HTTP server for readiness check if user has specified an endpoint for it.
if UserConfig.CassandraToSpannerConfigs.ReadinessCheckEndpoint != "" {
http.HandleFunc("/ready", readinessCheckHandler)
httpServer := &http.Server{Addr: UserConfig.CassandraToSpannerConfigs.ReadinessCheckEndpoint}
go func() {
if err := httpServer.ListenAndServe(); err != nil && err != http.ErrServerClosed {
log.Printf("Error when registering http health check endpoint:%v", err)
}
}()
defer httpServer.Shutdown(ctx)
}

parser, err := kong.New(&cfg)
if err != nil {
panic(err)
Expand Down Expand Up @@ -427,6 +459,7 @@ func (c *runConfig) listenAndServe(p *Proxy, ctx context.Context, logger *zap.Lo
}

logger.Info("proxy is listening", zap.Stringer("address", proxyListener.Addr()))
atomicStartedListenersCounter.Add(1)

wg.Add(numServers)

Expand Down
2 changes: 2 additions & 0 deletions third_party/datastax/proxy/run_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1116,6 +1116,8 @@ func TestLoadConfig(t *testing.T) {
appFs := afero.NewMemMapFs()
configFile := "/config.yaml"
configData := `
cassandra_to_spanner_configs:
readinessCheckEndpoint: localhost:8080
listeners:
- name: Listener1
port: 8080
Expand Down

0 comments on commit af0cdcb

Please sign in to comment.