Skip to content

Commit

Permalink
Merge pull request #196 from grepplabs/feature/listener-cert-rotation
Browse files Browse the repository at this point in the history
cert rotation + refactoring
  • Loading branch information
everesio authored Feb 15, 2025
2 parents 2318bd6 + a7baef9 commit 2fbf93a
Show file tree
Hide file tree
Showing 998 changed files with 117,033 additions and 26,969 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/build.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ jobs:
- name: Setup go
uses: actions/setup-go@v4
with:
go-version: '1.22'
go-version: '1.23'
check-latest: true
- run: go version
- name: Run build
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/release.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ jobs:
- name: Setup go
uses: actions/setup-go@v4
with:
go-version: '1.22'
go-version: '1.23'
check-latest: true
- run: go version
- name: Run build and test
Expand Down
2 changes: 1 addition & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
FROM --platform=$BUILDPLATFORM golang:1.22-alpine3.21 AS builder
FROM --platform=$BUILDPLATFORM golang:1.23-alpine3.21 AS builder
RUN apk add alpine-sdk ca-certificates

ARG TARGETOS
Expand Down
2 changes: 1 addition & 1 deletion Dockerfile.all
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
FROM --platform=$BUILDPLATFORM golang:1.22-alpine3.21 AS builder
FROM --platform=$BUILDPLATFORM golang:1.23-alpine3.21 AS builder
RUN apk add alpine-sdk ca-certificates

ARG TARGETOS
Expand Down
15 changes: 12 additions & 3 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ VERSION ?= $(shell git describe --tags --always --dirty)
GOPKGS = $(shell go list ./... | grep -v /vendor/)
BUILD_FLAGS ?=
LDFLAGS ?= -X github.com/grepplabs/kafka-proxy/config.Version=$(VERSION) -w -s
TAG ?= "v0.3.12"
TAG ?= "v0.4.0"
GOOS ?= $(if $(TARGETOS),$(TARGETOS),linux)
GOARCH ?= $(if $(TARGETARCH),$(TARGETARCH),amd64)
GOARM ?= $(TARGETVARIANT)
Expand All @@ -22,6 +22,8 @@ PROTOC_VERSION ?= 22.2
PROTOC_BIN_DIR := .tools
PROTOC := $(PROTOC_BIN_DIR)/protoc

GOLANGCI_LINT = go run github.com/golangci/golangci-lint/cmd/[email protected]

default: build

test.race:
Expand All @@ -33,10 +35,17 @@ test:
fmt:
go fmt $(GOPKGS)

check:
golint $(GOPKGS)
check: lint-code
go vet $(GOPKGS)

.PHONY: lint-code
lint-code:
$(GOLANGCI_LINT) run --timeout 5m

.PHONY: lint-fix
lint-fix:
$(GOLANGCI_LINT) run --fix

.PHONY: build
build: build/$(BINARY)

Expand Down
17 changes: 12 additions & 5 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -48,11 +48,11 @@ As not every Kafka release adds new messages/versions which are relevant to the

Linux

curl -Ls https://github.com/grepplabs/kafka-proxy/releases/download/v0.3.12/kafka-proxy-v0.3.12-linux-amd64.tar.gz | tar xz
curl -Ls https://github.com/grepplabs/kafka-proxy/releases/download/v0.4.0/kafka-proxy-v0.4.0-linux-amd64.tar.gz | tar xz

macOS

curl -Ls https://github.com/grepplabs/kafka-proxy/releases/download/v0.3.12/kafka-proxy-v0.3.12-darwin-amd64.tar.gz | tar xz
curl -Ls https://github.com/grepplabs/kafka-proxy/releases/download/v0.4.0/kafka-proxy-v0.4.0-darwin-amd64.tar.gz | tar xz

2. Move the binary in to your PATH.

Expand All @@ -70,7 +70,7 @@ Docker images are available on [Docker Hub](https://hub.docker.com/r/grepplabs/k
You can launch a kafka-proxy container for trying it out with
docker run --rm -p 30001-30003:30001-30003 grepplabs/kafka-proxy:0.3.12 \
docker run --rm -p 30001-30003:30001-30003 grepplabs/kafka-proxy:0.4.0 \
server \
--bootstrap-server-mapping "localhost:19092,0.0.0.0:30001" \
--bootstrap-server-mapping "localhost:29092,0.0.0.0:30002" \
Expand All @@ -89,7 +89,7 @@ Docker images with precompiled plugins located in `/opt/kafka-proxy/bin/` are ta
You can launch a kafka-proxy container with auth-ldap plugin for trying it out with
docker run --rm -p 30001-30003:30001-30003 grepplabs/kafka-proxy:0.3.12-all \
docker run --rm -p 30001-30003:30001-30003 grepplabs/kafka-proxy:0.4.0-all \
server \
--bootstrap-server-mapping "localhost:19092,0.0.0.0:30001" \
--bootstrap-server-mapping "localhost:29092,0.0.0.0:30002" \
Expand Down Expand Up @@ -140,6 +140,7 @@ You can launch a kafka-proxy container with auth-ldap plugin for trying it out w
--debug-enable Enable Debug endpoint
--debug-listen-address string Debug listen address (default "0.0.0.0:6060")
--default-listener-ip string Default listener IP (default "0.0.0.0")
--deterministic-listeners Enable deterministic listeners (listener port = min port + broker id).
--dial-address-mapping stringArray Mapping of target broker address to new one (host:port,host:port). The mapping is performed during connection establishment
--dynamic-advertised-listener string Advertised address for dynamic listeners. If empty, default-listener-ip is used
--dynamic-listeners-disable Disable dynamic listeners.
Expand Down Expand Up @@ -170,26 +171,30 @@ You can launch a kafka-proxy container with auth-ldap plugin for trying it out w
--kafka-read-timeout duration How long to wait for a response (default 30s)
--kafka-write-timeout duration How long to wait for a transmit (default 30s)
--log-format string Log format text or json (default "text")
--log-level string Log level debug, info, warning, error, fatal or panic (default "info")
--log-level string Log level trace, debug, info, warning, error, fatal or panic (default "info")
--log-level-fieldname string Log level fieldname for json format (default "@level")
--log-msg-fieldname string Message fieldname for json format (default "@message")
--log-time-fieldname string Time fieldname for json format (default "@timestamp")
--producer-acks-0-disabled Assume fire-and-forget is never sent by the producer. Enabling this parameter will increase performance
--proxy-listener-ca-chain-cert-file string PEM encoded CA's certificate file. If provided, client certificate is required and verified
--proxy-listener-cert-file string PEM encoded file with server certificate
--proxy-listener-cipher-suites strings List of supported cipher suites
--proxy-listener-crl-file string PEM encoded X509 CRLs file
--proxy-listener-curve-preferences strings List of curve preferences
--proxy-listener-keep-alive duration Keep alive period for an active network connection. If zero, keep-alives are disabled (default 1m0s)
--proxy-listener-key-file string PEM encoded file with private key for the server certificate
--proxy-listener-key-password string Password to decrypt rsa private key
--proxy-listener-read-buffer-size int Size of the operating system's receive buffer associated with the connection. If zero, system default is used
--proxy-listener-tls-enable Whether or not to use TLS listener
--proxy-listener-tls-refresh duration Interval for refreshing server TLS certificates. If set to zero, the refresh watch is disabled
--proxy-listener-tls-required-client-subject strings Required client certificate subject common name; example; s:/CN=[value]/C=[state]/C=[DE,PL] or r:/CN=[^val.{2}$]/C=[state]/C=[DE,PL]; check manual for more details
--proxy-listener-write-buffer-size int Sets the size of the operating system's transmit buffer associated with the connection. If zero, system default is used
--proxy-request-buffer-size int Request buffer size pro tcp connection (default 4096)
--proxy-response-buffer-size int Response buffer size pro tcp connection (default 4096)
--sasl-aws-identity-lookup Verify AWS authentication identity
--sasl-aws-profile string AWS profile
--sasl-aws-region string Region for AWS IAM Auth
--sasl-aws-role-arn string AWS Role ARN to assume
--sasl-enable Connect using SASL
--sasl-jaas-config-file string Location of JAAS config file with SASL username and password
--sasl-method string SASL method to use (PLAIN, SCRAM-SHA-256, SCRAM-SHA-512, GSSAPI, AWS_MSK_IAM (default "PLAIN")
Expand All @@ -207,7 +212,9 @@ You can launch a kafka-proxy container with auth-ldap plugin for trying it out w
--tls-client-key-password string Password to decrypt rsa private key
--tls-enable Whether or not to use TLS when connecting to the broker
--tls-insecure-skip-verify It controls whether a client verifies the server's certificate chain and host name
--tls-refresh duration Interval for refreshing client TLS certificates. If set to zero, the refresh watch is disabled
--tls-same-client-cert-enable Use only when mutual TLS is enabled on proxy and broker. It controls whether a proxy validates if proxy client certificate exactly matches brokers client cert (tls-client-cert-file)
--tls-system-cert-pool Use system pool for root CAs
### Usage example
Expand Down
35 changes: 31 additions & 4 deletions cmd/kafka-proxy/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,14 @@ package server

import (
"fmt"
"log/slog"

"github.com/grepplabs/kafka-proxy/config"
"github.com/grepplabs/kafka-proxy/proxy"
"github.com/oklog/run"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promhttp"
sloglogrus "github.com/samber/slog-logrus/v2"
"github.com/sirupsen/logrus"
"github.com/spf13/cobra"

Expand Down Expand Up @@ -101,10 +103,12 @@ func initFlags() {
Server.Flags().DurationVar(&c.Proxy.ListenerKeepAlive, "proxy-listener-keep-alive", 60*time.Second, "Keep alive period for an active network connection. If zero, keep-alives are disabled")

Server.Flags().BoolVar(&c.Proxy.TLS.Enable, "proxy-listener-tls-enable", false, "Whether or not to use TLS listener")
Server.Flags().DurationVar(&c.Proxy.TLS.Refresh, "proxy-listener-tls-refresh", 0*time.Second, "Interval for refreshing server TLS certificates. If set to zero, the refresh watch is disabled")
Server.Flags().StringVar(&c.Proxy.TLS.ListenerCertFile, "proxy-listener-cert-file", "", "PEM encoded file with server certificate")
Server.Flags().StringVar(&c.Proxy.TLS.ListenerKeyFile, "proxy-listener-key-file", "", "PEM encoded file with private key for the server certificate")
Server.Flags().StringVar(&c.Proxy.TLS.ListenerKeyPassword, "proxy-listener-key-password", os.Getenv("PROXY_LISTENER_KEY_PASSWORD"), "Password to decrypt rsa private key")
Server.Flags().StringVar(&c.Proxy.TLS.CAChainCertFile, "proxy-listener-ca-chain-cert-file", "", "PEM encoded CA's certificate file. If provided, client certificate is required and verified")
Server.Flags().StringVar(&c.Proxy.TLS.ListenerCAChainCertFile, "proxy-listener-ca-chain-cert-file", "", "PEM encoded CA's certificate file. If provided, client certificate is required and verified")
Server.Flags().StringVar(&c.Proxy.TLS.ListenerCRLFile, "proxy-listener-crl-file", "", "PEM encoded X509 CRLs file")
Server.Flags().StringSliceVar(&c.Proxy.TLS.ListenerCipherSuites, "proxy-listener-cipher-suites", []string{}, "List of supported cipher suites")
Server.Flags().StringSliceVar(&c.Proxy.TLS.ListenerCurvePreferences, "proxy-listener-curve-preferences", []string{}, "List of curve preferences")

Expand Down Expand Up @@ -151,11 +155,13 @@ func initFlags() {

// TLS
Server.Flags().BoolVar(&c.Kafka.TLS.Enable, "tls-enable", false, "Whether or not to use TLS when connecting to the broker")
Server.Flags().DurationVar(&c.Kafka.TLS.Refresh, "tls-refresh", 0*time.Second, "Interval for refreshing client TLS certificates. If set to zero, the refresh watch is disabled")
Server.Flags().BoolVar(&c.Kafka.TLS.InsecureSkipVerify, "tls-insecure-skip-verify", false, "It controls whether a client verifies the server's certificate chain and host name")
Server.Flags().StringVar(&c.Kafka.TLS.ClientCertFile, "tls-client-cert-file", "", "PEM encoded file with client certificate")
Server.Flags().StringVar(&c.Kafka.TLS.ClientKeyFile, "tls-client-key-file", "", "PEM encoded file with private key for the client certificate")
Server.Flags().StringVar(&c.Kafka.TLS.ClientKeyPassword, "tls-client-key-password", os.Getenv("TLS_CLIENT_KEY_PASSWORD"), "Password to decrypt rsa private key")
Server.Flags().StringVar(&c.Kafka.TLS.CAChainCertFile, "tls-ca-chain-cert-file", "", "PEM encoded CA's certificate file")
Server.Flags().BoolVar(&c.Kafka.TLS.SystemCertPool, "tls-system-cert-pool", false, "Use system pool for root CAs")

//Same TLS client cert tls-same-client-cert-enable
Server.Flags().BoolVar(&c.Kafka.TLS.SameClientCertEnable, "tls-same-client-cert-enable", false, "Use only when mutual TLS is enabled on proxy and broker. It controls whether a proxy validates if proxy client certificate exactly matches brokers client cert (tls-client-cert-file)")
Expand All @@ -181,6 +187,8 @@ func initFlags() {
// SASL AWS_MSK_IAM
Server.Flags().StringVar(&c.Kafka.SASL.AWSConfig.Region, "sasl-aws-region", "", "Region for AWS IAM Auth")
Server.Flags().StringVar(&c.Kafka.SASL.AWSConfig.Profile, "sasl-aws-profile", "", "AWS profile")
Server.Flags().StringVar(&c.Kafka.SASL.AWSConfig.RoleArn, "sasl-aws-role-arn", "", "AWS Role ARN to assume")
Server.Flags().BoolVar(&c.Kafka.SASL.AWSConfig.IdentityLookup, "sasl-aws-identity-lookup", false, "Verify AWS authentication identity")

// SASL by Proxy plugin
Server.Flags().BoolVar(&c.Kafka.SASL.Plugin.Enable, "sasl-plugin-enable", false, "Use plugin for SASL authentication")
Expand All @@ -202,7 +210,7 @@ func initFlags() {

// Logging
Server.Flags().StringVar(&c.Log.Format, "log-format", "text", "Log format text or json")
Server.Flags().StringVar(&c.Log.Level, "log-level", "info", "Log level debug, info, warning, error, fatal or panic")
Server.Flags().StringVar(&c.Log.Level, "log-level", "info", "Log level trace, debug, info, warning, error, fatal or panic")
Server.Flags().StringVar(&c.Log.LevelFieldName, "log-level-fieldname", "@level", "Log level fieldname for json format")
Server.Flags().StringVar(&c.Log.TimeFiledName, "log-time-fieldname", "@timestamp", "Time fieldname for json format")
Server.Flags().StringVar(&c.Log.MsgFiledName, "log-msg-fieldname", "@message", "Message fieldname for json format")
Expand Down Expand Up @@ -444,7 +452,7 @@ func Run(_ *cobra.Command, _ []string) {
func NewHTTPHandler() http.Handler {
m := http.NewServeMux()
m.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
w.Write([]byte(
_, _ = w.Write([]byte(
`<html>
<head>
<title>kafka-proxy service</title>
Expand All @@ -456,7 +464,7 @@ func NewHTTPHandler() http.Handler {
</html>`))
})
m.HandleFunc(c.Http.HealthPath, func(w http.ResponseWriter, r *http.Request) {
w.Write([]byte(`OK`))
_, _ = w.Write([]byte(`OK`))
})
m.Handle(c.Http.MetricsPath, promhttp.Handler())

Expand All @@ -483,6 +491,25 @@ func SetLogger() {
level = logrus.InfoLevel
}
logrus.SetLevel(level)

slog.SetDefault(slog.New(sloglogrus.Option{Level: toSlogLevel(level), Logger: logrus.StandardLogger()}.NewLogrusHandler()))
}

func toSlogLevel(level logrus.Level) slog.Level {
switch level {
case logrus.TraceLevel:
return slog.LevelDebug
case logrus.DebugLevel:
return slog.LevelDebug
case logrus.InfoLevel:
return slog.LevelInfo
case logrus.WarnLevel:
return slog.LevelWarn
case logrus.ErrorLevel, logrus.PanicLevel:
return slog.LevelError
default:
return slog.LevelInfo
}
}

func NewPluginClient(handshakeConfig plugin.HandshakeConfig, plugins map[string]plugin.Plugin, logLevel string, command string, params []string) *plugin.Client {
Expand Down
5 changes: 4 additions & 1 deletion cmd/plugin-auth-ldap/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -257,7 +257,10 @@ func main() {

pluginMeta := &pluginMeta{}
flags := pluginMeta.flagSet()
flags.Parse(os.Args[1:])
if err := flags.Parse(os.Args[1:]); err != nil {
logrus.Errorf("error parsing flags: %v", err)
os.Exit(1)
}

urls, err := pluginMeta.getUrls()
if err != nil {
Expand Down
5 changes: 4 additions & 1 deletion cmd/plugin-auth-user/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,10 @@ func (f *PasswordAuthenticator) flagSet() *flag.FlagSet {
func main() {
passwordAuthenticator := &PasswordAuthenticator{}
flags := passwordAuthenticator.flagSet()
flags.Parse(os.Args[1:])
if err := flags.Parse(os.Args[1:]); err != nil {
logrus.Errorf("error parsing flags: %v", err)
os.Exit(1)
}

if passwordAuthenticator.Password == "" {
passwordAuthenticator.Password = os.Getenv(EnvSaslPassword)
Expand Down
Loading

0 comments on commit 2fbf93a

Please sign in to comment.