Skip to content

Commit

Permalink
feat: shared secret auth to influxdb in OSS (#2576)
Browse files Browse the repository at this point in the history
* feat: shared secret auth to influxdb in OSS

* test: fix default values in update test
  • Loading branch information
lesam authored Jun 11, 2021
1 parent 53cf295 commit e5cd456
Show file tree
Hide file tree
Showing 7 changed files with 213 additions and 11 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
- [#2555](https://github.com/influxdata/kapacitor/pull/2555): run flux tasks with built-in flux engine
- [#2559](https://github.com/influxdata/kapacitor/pull/2559): kapacitor cli supports flux tasks
- [#2560](https://github.com/influxdata/kapacitor/pull/2560): enable new-style slack apps
- [#2576](https://github.com/influxdata/kapacitor/pull/2576): shared secret auth to influxdb in OSS

### Bugfixes
- [#2564](https://github.com/influxdata/kapacitor/pull/2564): Fix a panic in the scraper handler when debug mode is enabled
Expand Down
7 changes: 4 additions & 3 deletions influxdb/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,13 +126,14 @@ type Credentials struct {
Method AuthenticationMethod

// UserAuthentication fields

Username string
Password string

// BearerAuthentication fields

// TokenAuthentication fields
Token string

// BearerAuthentication fields
HttpSharedSecret bool
}

// HTTPClient is safe for concurrent use.
Expand Down
186 changes: 186 additions & 0 deletions influxdb/token_client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,186 @@
package influxdb

import (
"context"
"sync"
"sync/atomic"
"time"

"github.com/dgrijalva/jwt-go"
"github.com/influxdata/flux"
"github.com/influxdata/kapacitor/keyvalue"
"github.com/pkg/errors"
)

type Diagnostic interface {
Error(msg string, err error, ctx ...keyvalue.T)
}

type TokenClientCreator struct {
httpSharedSecret []byte
tokenDuration time.Duration
diag Diagnostic
}

func NewTokenClientCreator(httpSharedSecret []byte, tokenDuration time.Duration, d Diagnostic) *TokenClientCreator {
return &TokenClientCreator{
httpSharedSecret: httpSharedSecret,
tokenDuration: tokenDuration,
diag: d,
}
}

func (cc *TokenClientCreator) Create(config Config) (ClientUpdater, error) {
if config.Credentials.Method != BearerAuthentication {
// Config doesn't need token auth, use normal client
return NewHTTPClient(config)
}
if !config.Credentials.HttpSharedSecret {
// This should not happen since we only set BearerAuthentication in services/influxdb.httpConfig
// if http-shared-secret was false. Eventually we could add an additional config value to set a shared secret
// that is not the one from KAPACITOR_HTTP_SHARED_SECRET
return nil, errors.New("invalid config: bearer auth configured but http-shared-secret was false")
}
// Generate the first token
token, err := generateToken(config.Credentials.Username, cc.httpSharedSecret, cc.tokenDuration)
if err != nil {
return nil, errors.Wrap(err, "generating first token")
}
// Update credentials to use token
config.Credentials.Method = BearerAuthentication
config.Credentials.Token = token

cli, err := NewHTTPClient(config)
if err != nil {
return nil, err
}
tcli := &tokenClient{
client: cli,
username: config.Credentials.Username,
sharedSecret: cc.httpSharedSecret,
tokenDuration: cc.tokenDuration,
diag: cc.diag,
closing: make(chan struct{}),
closed: true,
}
tcli.configValue.Store(config)

if err := tcli.Open(); err != nil {
return nil, errors.Wrap(err, "failed to open client")
}
return tcli, nil
}

type tokenClient struct {
wg sync.WaitGroup
configValue atomic.Value // influxdb.Config
client *HTTPClient
username string
sharedSecret []byte
tokenDuration time.Duration

mu sync.Mutex
closing chan struct{}
closed bool
diag Diagnostic
}

// Update updates the running configuration.
func (tc *tokenClient) Update(c Config) error {
tc.configValue.Store(c)
return tc.client.Update(c)
}

// Ping checks that status of cluster
func (tc *tokenClient) Ping(c context.Context) (time.Duration, string, error) {
return tc.client.Ping(c)
}

// Write takes a BatchPoints object and writes all Points to InfluxDB.
func (tc *tokenClient) Write(bp BatchPoints) error {
return tc.client.Write(bp)
}

// Query makes an InfluxDB Query on the database.
func (tc *tokenClient) Query(q Query) (*Response, error) {
return tc.client.Query(q)
}

// WriteV2 writes to InfluxDB using the v2 write protocol
func (tc *tokenClient) WriteV2(w FluxWrite) error {
return tc.client.WriteV2(w)
}

// QueryFlux makes a flux query to InfluxDB
func (tc *tokenClient) QueryFlux(q FluxQuery) (flux.ResultIterator, error) {
return tc.client.QueryFlux(q)
}

// QueryFluxResponse makes a flux query to InfluxDB and translates it to a Response
func (tc *tokenClient) QueryFluxResponse(q FluxQuery) (*Response, error) {
return tc.client.QueryFluxResponse(q)
}

func (tc *tokenClient) Open() error {
tc.mu.Lock()
defer tc.mu.Unlock()
if !tc.closed {
return nil
}
tc.closed = false
// Start background routine to preemptively update the token before it expires.
tc.wg.Add(1)
go func() {
defer tc.wg.Done()
tc.manageToken()
}()
return nil
}

// Close the client.
func (tc *tokenClient) Close() error {
tc.mu.Lock()
defer tc.mu.Unlock()
if tc.closed {
return nil
}
close(tc.closing)
tc.closed = true
tc.wg.Wait()
return tc.client.Close()
}

// Preemptively update the token before it can expire.
func (tc *tokenClient) manageToken() {
ticker := time.NewTicker(tc.tokenDuration / 2)
defer ticker.Stop()
for {
select {
case <-ticker.C:
token, err := generateToken(tc.username, tc.sharedSecret, tc.tokenDuration)
if err != nil {
tc.diag.Error("failed to generate new token", err)
continue
}
c := tc.configValue.Load().(Config)
c.Credentials.Token = token
tc.Update(c)
case <-tc.closing:
return
}
}
}

// Generate a new signed token for the user. The token will expire after tokenDuration has elapsed.
func generateToken(username string, secret []byte, tokenDuration time.Duration) (string, error) {
// Create a new token object, specifying signing method and the claims
// you would like it to contain.
token := jwt.NewWithClaims(jwt.SigningMethodHS512, jwt.MapClaims{
"username": username,
"exp": time.Now().Add(tokenDuration).Unix(),
})

// Sign and get the complete encoded token as a string using the secret
tokenString, err := token.SignedString(secret)
return tokenString, errors.Wrap(err, "signing authentication token")
}
8 changes: 7 additions & 1 deletion server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"runtime"
"runtime/pprof"
"sync"
"time"

"github.com/influxdata/influxdb/influxql"
"github.com/influxdata/influxdb/services/collectd"
Expand Down Expand Up @@ -473,6 +474,11 @@ func (s *Server) appendLoadService() error {
return nil
}

const (
// Tokens for the InfluxDB clusters are generate with an expiration this far into the future.
tokenExpirationDuration = 10 * time.Minute
)

func (s *Server) appendInfluxDBService() error {
c := s.config.InfluxDB
d := s.DiagService.NewInfluxDBHandler()
Expand All @@ -493,7 +499,7 @@ func (s *Server) appendInfluxDBService() error {
srv.HTTPDService = s.HTTPDService
srv.PointsWriter = s.TaskMaster
srv.AuthService = s.AuthService
srv.ClientCreator = iclient.ClientCreator{}
srv.ClientCreator = iclient.NewTokenClientCreator([]byte(s.config.HTTP.SharedSecret), tokenExpirationDuration, s.DiagService.NewInfluxDBHandler())

s.InfluxDBService = srv
s.TaskMaster.InfluxDBService = srv
Expand Down
1 change: 1 addition & 0 deletions server/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6497,6 +6497,7 @@ func TestServer_UpdateConfig(t *testing.T) {
"http-port": float64(0),
"insecure-skip-verify": false,
"kapacitor-hostname": "",
"http-shared-secret": false,
"name": "default",
"password": true,
"ssl-ca": "",
Expand Down
15 changes: 8 additions & 7 deletions services/influxdb/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,13 +30,14 @@ const (
)

type Config struct {
Enabled bool `toml:"enabled" override:"enabled"`
Name string `toml:"name" override:"name"`
Default bool `toml:"default" override:"default"`
URLs []string `toml:"urls" override:"urls"`
Username string `toml:"username" override:"username"`
Password string `toml:"password" override:"password,redact"`
Token string `toml:"token" override:"token,redact"`
Enabled bool `toml:"enabled" override:"enabled"`
Name string `toml:"name" override:"name"`
Default bool `toml:"default" override:"default"`
URLs []string `toml:"urls" override:"urls"`
Username string `toml:"username" override:"username"`
Password string `toml:"password" override:"password,redact"`
Token string `toml:"token" override:"token,redact"`
HttpSharedSecret bool `toml:"http-shared-secret" override:"http-shared-secret"`

// Path to CA file
SSLCA string `toml:"ssl-ca" override:"ssl-ca"`
Expand Down
6 changes: 6 additions & 0 deletions services/influxdb/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -511,6 +511,12 @@ func httpConfig(c Config) (influxdb.Config, error) {
Method: influxdb.TokenAuthentication,
Token: c.Token,
}
} else if c.HttpSharedSecret {
credentials = influxdb.Credentials{
Method: influxdb.BearerAuthentication,
Username: c.Username,
HttpSharedSecret: true,
}
} else if c.Username != "" {
credentials = influxdb.Credentials{
Method: influxdb.UserAuthentication,
Expand Down

0 comments on commit e5cd456

Please sign in to comment.