diff --git a/go.mod b/go.mod index e9b0717..3328906 100644 --- a/go.mod +++ b/go.mod @@ -5,7 +5,6 @@ go 1.21 toolchain go1.21.6 require ( - github.com/aws/aws-sdk-go v1.53.15 github.com/gogo/protobuf v1.3.2 github.com/golang/snappy v0.0.4 github.com/pkg/errors v0.9.1 @@ -18,9 +17,27 @@ require ( ) require ( + github.com/aws/aws-sdk-go-v2/credentials v1.17.13 // indirect + github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.16.1 // indirect + github.com/aws/aws-sdk-go-v2/internal/configsources v1.3.5 // indirect + github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.6.5 // indirect + github.com/aws/aws-sdk-go-v2/internal/ini v1.8.0 // indirect + github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.11.2 // indirect + github.com/aws/aws-sdk-go-v2/service/internal/endpoint-discovery v1.9.6 // indirect + github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.11.7 // indirect + github.com/aws/aws-sdk-go-v2/service/sso v1.20.6 // indirect + github.com/aws/aws-sdk-go-v2/service/ssooidc v1.24.0 // indirect + github.com/aws/aws-sdk-go-v2/service/sts v1.28.7 // indirect + github.com/aws/smithy-go v1.20.2 // indirect +) + +require ( + github.com/aws/aws-sdk-go-v2 v1.26.2 + github.com/aws/aws-sdk-go-v2/config v1.27.13 + github.com/aws/aws-sdk-go-v2/service/timestreamquery v1.23.1 + github.com/aws/aws-sdk-go-v2/service/timestreamwrite v1.25.6 github.com/beorn7/perks v1.0.1 // indirect github.com/cespare/xxhash/v2 v2.3.0 // indirect - github.com/jmespath/go-jmespath v0.4.0 // indirect github.com/prometheus/client_model v0.6.1 // indirect github.com/prometheus/procfs v0.12.0 // indirect go.uber.org/multierr v1.11.0 // indirect diff --git a/go.sum b/go.sum index b82b2b4..3984841 100644 --- a/go.sum +++ b/go.sum @@ -1,10 +1,39 @@ -github.com/aws/aws-sdk-go v1.53.15 h1:FtZmkg7xM8RfP2oY6p7xdKBYrRgkITk9yve2QV7N938= -github.com/aws/aws-sdk-go v1.53.15/go.mod h1:LF8svs817+Nz+DmiMQKTO3ubZ/6IaTpq3TjupRn3Eqk= +github.com/aws/aws-sdk-go-v2 v1.26.2 h1:OTRAL8EPdNoOdiq5SUhCaHhVPBU2wxAUe5uwasoJGRM= +github.com/aws/aws-sdk-go-v2 v1.26.2/go.mod h1:ffIFB97e2yNsv4aTSGkqtHnppsIJzw7G7BReUZ3jCXM= +github.com/aws/aws-sdk-go-v2/config v1.27.13 h1:WbKW8hOzrWoOA/+35S5okqO/2Ap8hkkFUzoW8Hzq24A= +github.com/aws/aws-sdk-go-v2/config v1.27.13/go.mod h1:XLiyiTMnguytjRER7u5RIkhIqS8Nyz41SwAWb4xEjxs= +github.com/aws/aws-sdk-go-v2/credentials v1.17.13 h1:XDCJDzk/u5cN7Aple7D/MiAhx1Rjo/0nueJ0La8mRuE= +github.com/aws/aws-sdk-go-v2/credentials v1.17.13/go.mod h1:FMNcjQrmuBYvOTZDtOLCIu0esmxjF7RuA/89iSXWzQI= +github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.16.1 h1:FVJ0r5XTHSmIHJV6KuDmdYhEpvlHpiSd38RQWhut5J4= +github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.16.1/go.mod h1:zusuAeqezXzAB24LGuzuekqMAEgWkVYukBec3kr3jUg= +github.com/aws/aws-sdk-go-v2/internal/configsources v1.3.5 h1:aw39xVGeRWlWx9EzGVnhOR4yOjQDHPQ6o6NmBlscyQg= +github.com/aws/aws-sdk-go-v2/internal/configsources v1.3.5/go.mod h1:FSaRudD0dXiMPK2UjknVwwTYyZMRsHv3TtkabsZih5I= +github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.6.5 h1:PG1F3OD1szkuQPzDw3CIQsRIrtTlUC3lP84taWzHlq0= +github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.6.5/go.mod h1:jU1li6RFryMz+so64PpKtudI+QzbKoIEivqdf6LNpOc= +github.com/aws/aws-sdk-go-v2/internal/ini v1.8.0 h1:hT8rVHwugYE2lEfdFE0QWVo81lF7jMrYJVDWI+f+VxU= +github.com/aws/aws-sdk-go-v2/internal/ini v1.8.0/go.mod h1:8tu/lYfQfFe6IGnaOdrpVgEL2IrrDOf6/m9RQum4NkY= +github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.11.2 h1:Ji0DY1xUsUr3I8cHps0G+XM3WWU16lP6yG8qu1GAZAs= +github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.11.2/go.mod h1:5CsjAbs3NlGQyZNFACh+zztPDI7fU6eW9QsxjfnuBKg= +github.com/aws/aws-sdk-go-v2/service/internal/endpoint-discovery v1.9.6 h1:6tayEze2Y+hiL3kdnEUxSPsP+pJsUfwLSFspFl1ru9Q= +github.com/aws/aws-sdk-go-v2/service/internal/endpoint-discovery v1.9.6/go.mod h1:qVNb/9IOVsLCZh0x2lnagrBwQ9fxajUpXS7OZfIsKn0= +github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.11.7 h1:ogRAwT1/gxJBcSWDMZlgyFUM962F51A5CRhDLbxLdmo= +github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.11.7/go.mod h1:YCsIZhXfRPLFFCl5xxY+1T9RKzOKjCut+28JSX2DnAk= +github.com/aws/aws-sdk-go-v2/service/sso v1.20.6 h1:o5cTaeunSpfXiLTIBx5xo2enQmiChtu1IBbzXnfU9Hs= +github.com/aws/aws-sdk-go-v2/service/sso v1.20.6/go.mod h1:qGzynb/msuZIE8I75DVRCUXw3o3ZyBmUvMwQ2t/BrGM= +github.com/aws/aws-sdk-go-v2/service/ssooidc v1.24.0 h1:Qe0r0lVURDDeBQJ4yP+BOrJkvkiCo/3FH/t+wY11dmw= +github.com/aws/aws-sdk-go-v2/service/ssooidc v1.24.0/go.mod h1:mUYPBhaF2lGiukDEjJX2BLRRKTmoUSitGDUgM4tRxak= +github.com/aws/aws-sdk-go-v2/service/sts v1.28.7 h1:et3Ta53gotFR4ERLXXHIHl/Uuk1qYpP5uU7cvNql8ns= +github.com/aws/aws-sdk-go-v2/service/sts v1.28.7/go.mod h1:FZf1/nKNEkHdGGJP/cI2MoIMquumuRK6ol3QQJNDxmw= +github.com/aws/aws-sdk-go-v2/service/timestreamquery v1.23.1 h1:zioYJE/HJP3gbEcA8lBm3behwF5rxxEwX3PJWNjZZIw= +github.com/aws/aws-sdk-go-v2/service/timestreamquery v1.23.1/go.mod h1:y0QPauOtWLFrhzxEjjKP5cBi3ved6SGcozYbf1fleMY= +github.com/aws/aws-sdk-go-v2/service/timestreamwrite v1.25.6 h1:7nQoWdsGHF9K6tFbcE0ACjGe1dpXZ/3EYTByJ1IPjbE= +github.com/aws/aws-sdk-go-v2/service/timestreamwrite v1.25.6/go.mod h1:9R1IlrgiivwTCZdbKgMPkseFS+moUM+DLh0TEjO6pvE= +github.com/aws/smithy-go v1.20.2 h1:tbp628ireGtzcHDDmLT/6ADHidqnwgF57XOXZe6tp4Q= +github.com/aws/smithy-go v1.20.2/go.mod h1:krry+ya/rV9RDcV/Q16kpu6ypI4K2czasz0NC3qS14E= github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM= github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw= github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs= github.com/cespare/xxhash/v2 v2.3.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= -github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc h1:U9qPSI2PIWSS1VwoXQT9A3Wy9MM3WgvqSxFWenqJduM= github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/gogo/protobuf v1.3.2 h1:Ov1cvc58UF3b5XjBnZv7+opcTcQFZebYjWzi34vdm4Q= @@ -13,15 +42,10 @@ github.com/golang/snappy v0.0.4 h1:yAGX7huGHXlcLOEtBnF4w7FQwA26wojNCwOYAEhLjQM= github.com/golang/snappy v0.0.4/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI= github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= -github.com/jmespath/go-jmespath v0.4.0 h1:BEgLn5cpjn8UN1mAw4NjwDrS35OdebyEtFe+9YPoQUg= -github.com/jmespath/go-jmespath v0.4.0/go.mod h1:T8mJZnbsbmF+m6zOOFylbeCJqk5+pHWvzYPziyZiYoo= -github.com/jmespath/go-jmespath/internal/testify v1.5.1 h1:shLQSRRSCCPj3f2gpwzGwWFoC7ycTf1rcQZHOlsJ6N8= -github.com/jmespath/go-jmespath/internal/testify v1.5.1/go.mod h1:L3OGu8Wl2/fWfCI6z80xFu9LTZmf1ZRjMHUOPmWr69U= github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8= github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck= github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= -github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 h1:Jamvg5psRIccs7FGNTlIRMkT8wgtp5eCXdBlqhYGL6U= github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/prometheus/client_golang v1.19.1 h1:wZWJDwK+NameRJuPGDhlnFgx8e8HN3XHQeLaYJFJBOE= @@ -36,7 +60,6 @@ github.com/prometheus/prometheus v0.52.1 h1:BrQ29YG+mzdGh8DgHPirHbeMGNqtL+INe0rq github.com/prometheus/prometheus v0.52.1/go.mod h1:3z74cVsmVH0iXOR5QBjB7Pa6A0KJeEAK5A6UsmAFb1g= github.com/spf13/pflag v1.0.5 h1:iy+VFUOCP1a+8yFto/drg2CJ5u0yRoB7fZw3DKv/JXA= github.com/spf13/pflag v1.0.5/go.mod h1:McXfInJRrz4CZXVZOBLb0bTZqETkiAhM9Iw0y3An2Bg= -github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg= github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= @@ -80,9 +103,5 @@ golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8T golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= google.golang.org/protobuf v1.34.0 h1:Qo/qEd2RZPCf2nKuorzksSknv0d3ERwp1vFG38gSmH4= google.golang.org/protobuf v1.34.0/go.mod h1:c6P6GXX6sHbq/GpV6MGZEdwhWPcYBgnhAHhKbcUYpos= -gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= -gopkg.in/yaml.v2 v2.2.8/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= -gopkg.in/yaml.v2 v2.4.0 h1:D8xgwECY7CYvx+Y2n4sBz93Jn9JRvxdiyyo8CTfuKaY= -gopkg.in/yaml.v2 v2.4.0/go.mod h1:RDklbk79AGWmwhnvt/jBztapEOGDOx6ZbXqjP6csGnQ= gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/handler.go b/handler.go index 0449e95..d727910 100644 --- a/handler.go +++ b/handler.go @@ -19,17 +19,19 @@ package main import ( + "context" + "io" + "net/http" + "github.com/gogo/protobuf/proto" "github.com/golang/snappy" "github.com/prometheus/prometheus/prompb" "go.uber.org/zap" - "io/ioutil" - "net/http" ) -func writeHandler(logger *zap.SugaredLogger, ad PrometheusRemoteStorageAdapter) http.HandlerFunc { +func writeHandler(ctx context.Context, logger *zap.SugaredLogger, ad PrometheusRemoteStorageAdapter) http.HandlerFunc { return func(w http.ResponseWriter, r *http.Request) { - compressed, err := ioutil.ReadAll(r.Body) + compressed, err := io.ReadAll(r.Body) if err != nil { logger.Errorw("Read error", "err", err.Error()) http.Error(w, err.Error(), http.StatusInternalServerError) @@ -50,7 +52,7 @@ func writeHandler(logger *zap.SugaredLogger, ad PrometheusRemoteStorageAdapter) return } - err = ad.Write(&req) + err = ad.Write(ctx, &req) if err != nil { w.WriteHeader(http.StatusInternalServerError) @@ -59,9 +61,9 @@ func writeHandler(logger *zap.SugaredLogger, ad PrometheusRemoteStorageAdapter) } } -func readHandler(logger *zap.SugaredLogger, ad PrometheusRemoteStorageAdapter) http.HandlerFunc { +func readHandler(ctx context.Context, logger *zap.SugaredLogger, ad PrometheusRemoteStorageAdapter) http.HandlerFunc { return func(w http.ResponseWriter, r *http.Request) { - compressed, err := ioutil.ReadAll(r.Body) + compressed, err := io.ReadAll(r.Body) if err != nil { logger.Errorw("Read error", "err", err.Error()) http.Error(w, err.Error(), http.StatusInternalServerError) @@ -83,7 +85,7 @@ func readHandler(logger *zap.SugaredLogger, ad PrometheusRemoteStorageAdapter) h } var resp *prompb.ReadResponse - resp, err = ad.Read(&req) + resp, err = ad.Read(ctx, &req) if err != nil { logger.Errorw("Error executing query", "query", req, "err", err.Error()) http.Error(w, err.Error(), http.StatusInternalServerError) diff --git a/main.go b/main.go index 80d1129..5e710bf 100644 --- a/main.go +++ b/main.go @@ -19,6 +19,7 @@ package main import ( + "context" "log" "net/http" "os" @@ -31,7 +32,7 @@ import ( flag "github.com/spf13/pflag" ) -type config struct { +type adapterCfg struct { help bool awsRegion string databaseName string @@ -45,7 +46,7 @@ type config struct { } var ( - cfg = new(config) + cfg = new(adapterCfg) receivedSamples = prometheus.NewCounter( prometheus.CounterOpts{ @@ -103,6 +104,8 @@ func main() { return } + ctx := context.Background() + zapConfig := zap.NewProductionConfig() zapConfig.DisableStacktrace = true @@ -130,23 +133,23 @@ func main() { sugar.Debugf("Sending logs to timeseries database %s in AWS Region %s", cfg.databaseName, cfg.awsRegion) - timeStreamAdapter := newTimeStreamAdapter(sugar, cfg, nil, nil) - if err := serve(sugar, cfg.listenAddr, timeStreamAdapter); err != nil { + timeStreamAdapter := newTimeStreamAdapter(ctx, sugar, cfg, newTimestreamQueryPaginator, nil, nil) + if err := serve(ctx, sugar, cfg.listenAddr, timeStreamAdapter); err != nil { sugar.Errorw("Failed to listen", "addr", cfg.listenAddr, "err", err) os.Exit(1) } } type PrometheusRemoteStorageAdapter interface { - Write(records *prompb.WriteRequest) error - Read(request *prompb.ReadRequest) (*prompb.ReadResponse, error) + Write(ctx context.Context, records *prompb.WriteRequest) error + Read(ctx context.Context, request *prompb.ReadRequest) (*prompb.ReadResponse, error) Name() string } -func serve(logger *zap.SugaredLogger, addr string, storageAdapter PrometheusRemoteStorageAdapter) error { +func serve(ctx context.Context, logger *zap.SugaredLogger, addr string, storageAdapter PrometheusRemoteStorageAdapter) error { http.Handle(cfg.telemetryPath, promhttp.Handler()) - http.Handle("/write", writeHandler(logger, storageAdapter)) - http.Handle("/read", readHandler(logger, storageAdapter)) + http.Handle("/write", writeHandler(ctx, logger, storageAdapter)) + http.Handle("/read", readHandler(ctx, logger, storageAdapter)) http.Handle("/health", healthHandler(logger)) if cfg.tls { diff --git a/timestream.go b/timestream.go index c368806..9ee5634 100644 --- a/timestream.go +++ b/timestream.go @@ -19,38 +19,52 @@ package main import ( + "context" "fmt" "math" - "net" "net/http" "strconv" "strings" "time" - "github.com/aws/aws-sdk-go/aws" - "github.com/aws/aws-sdk-go/aws/session" - "github.com/aws/aws-sdk-go/service/timestreamquery" - "github.com/aws/aws-sdk-go/service/timestreamquery/timestreamqueryiface" - "github.com/aws/aws-sdk-go/service/timestreamwrite" - "github.com/aws/aws-sdk-go/service/timestreamwrite/timestreamwriteiface" - + "github.com/aws/aws-sdk-go-v2/aws" + awshttp "github.com/aws/aws-sdk-go-v2/aws/transport/http" + "github.com/aws/aws-sdk-go-v2/config" + "github.com/aws/aws-sdk-go-v2/service/timestreamquery" + "github.com/aws/aws-sdk-go-v2/service/timestreamwrite" + writetypes "github.com/aws/aws-sdk-go-v2/service/timestreamwrite/types" + "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/common/model" "github.com/prometheus/prometheus/prompb" - - "github.com/pkg/errors" "go.uber.org/zap" "golang.org/x/net/http2" ) const TimestreamMaxRecordsPerRequest = 100 +type TimestreamQueryApi interface { + Query(ctx context.Context, params *timestreamquery.QueryInput, optFns ...func(*timestreamquery.Options)) (*timestreamquery.QueryOutput, error) +} + +type TimestreamWriteApi interface { + WriteRecords(ctx context.Context, params *timestreamwrite.WriteRecordsInput, optFns ...func(*timestreamwrite.Options)) (*timestreamwrite.WriteRecordsOutput, error) +} + +type NewQueryPaginator func(client TimestreamQueryApi, params *timestreamquery.QueryInput, optFns ...func(*timestreamquery.QueryPaginatorOptions)) PaginatorApi + +type PaginatorApi interface { + HasMorePages() bool + NextPage(ctx context.Context, optFns ...func(*timestreamquery.Options)) (*timestreamquery.QueryOutput, error) +} + type TimeStreamAdapter struct { databaseName string logger *zap.SugaredLogger tableName string - timestreamqueryiface.TimestreamQueryAPI - timestreamwriteiface.TimestreamWriteAPI + TimestreamQueryApi + TimestreamWriteApi + NewQueryPaginator } type queryTask struct { @@ -60,50 +74,43 @@ type queryTask struct { type writeTask struct { measureName string - dimensions []*timestreamwrite.Dimension + dimensions []writetypes.Dimension } -func newTimeStreamAdapter(logger *zap.SugaredLogger, cfg *config, writeSvc timestreamwriteiface.TimestreamWriteAPI, readSvc timestreamqueryiface.TimestreamQueryAPI) TimeStreamAdapter { - tr := &http.Transport{ - ResponseHeaderTimeout: 20 * time.Second, - // Using DefaultTransport values for other parameters: https://golang.org/pkg/net/http/#RoundTripper - Proxy: http.ProxyFromEnvironment, - DialContext: (&net.Dialer{ - KeepAlive: 30 * time.Second, - Timeout: 30 * time.Second, - }).DialContext, - MaxIdleConns: 100, - IdleConnTimeout: 90 * time.Second, - TLSHandshakeTimeout: 10 * time.Second, - ExpectContinueTimeout: 1 * time.Second, - } +func newTimestreamQueryPaginator(client TimestreamQueryApi, params *timestreamquery.QueryInput, optFns ...func(*timestreamquery.QueryPaginatorOptions)) PaginatorApi { + return timestreamquery.NewQueryPaginator(client, params, optFns...) +} + +func newTimeStreamAdapter(ctx context.Context, logger *zap.SugaredLogger, cfg *adapterCfg, newQueryPaginator NewQueryPaginator, writeSvc *timestreamwrite.Client, querySvc *timestreamquery.Client) TimeStreamAdapter { + if writeSvc == nil || querySvc == nil { + client := awshttp.NewBuildableClient().WithTransportOptions(func(tr *http.Transport) { + tr.ResponseHeaderTimeout = 20 * time.Second - // So client makes HTTP/2 requests - _ = http2.ConfigureTransport(tr) + // Enable HTTP/2 + err := http2.ConfigureTransport(tr) + if err != nil { + logger.Fatalw("Unable to configure HTTP/2 transport", "error", err) + } + }) - if writeSvc == nil || readSvc == nil { - sess := session.Must(session.NewSession( - &aws.Config{ - Region: aws.String(cfg.awsRegion), - MaxRetries: aws.Int(10), - HTTPClient: &http.Client{ - Transport: tr, - }, - }, - )) + awsCfg, err := config.LoadDefaultConfig(ctx, config.WithRegion(cfg.awsRegion), config.WithHTTPClient(client), config.WithRetryMaxAttempts(10)) + if err != nil { + logger.Fatalw("Unable to load AWS SDK config", "error", err) + } if writeSvc == nil { - writeSvc = timestreamwrite.New(sess) + writeSvc = timestreamwrite.NewFromConfig(awsCfg) } - if readSvc == nil { - readSvc = timestreamquery.New(sess) + if querySvc == nil { + querySvc = timestreamquery.NewFromConfig(awsCfg) } } return TimeStreamAdapter{ - TimestreamQueryAPI: readSvc, - TimestreamWriteAPI: writeSvc, + TimestreamQueryApi: querySvc, + TimestreamWriteApi: writeSvc, + NewQueryPaginator: newQueryPaginator, databaseName: cfg.databaseName, logger: logger, tableName: cfg.tableName, @@ -116,7 +123,7 @@ func (t TimeStreamAdapter) Name() string { return "prometheus-timestream-adapter" } -func (t TimeStreamAdapter) Write(req *prompb.WriteRequest) (err error) { +func (t TimeStreamAdapter) Write(ctx context.Context, req *prompb.WriteRequest) (err error) { timer := prometheus.NewTimer(sentBatchDuration.WithLabelValues(t.Name())) defer timer.ObserveDuration() @@ -124,7 +131,7 @@ func (t TimeStreamAdapter) Write(req *prompb.WriteRequest) (err error) { receivedSamples.Add(float64(len(records))) for _, chunk := range t.splitRecords(records) { - _, err = t.WriteRecords(×treamwrite.WriteRecordsInput{ + _, err = t.WriteRecords(ctx, ×treamwrite.WriteRecordsInput{ DatabaseName: aws.String(t.databaseName), TableName: aws.String(t.tableName), Records: chunk, @@ -153,7 +160,7 @@ func (t TimeStreamAdapter) allCharactersValid(str string) bool { return true } -func (t TimeStreamAdapter) toRecords(writeRequest *prompb.WriteRequest) (records []*timestreamwrite.Record) { +func (t TimeStreamAdapter) toRecords(writeRequest *prompb.WriteRequest) (records []writetypes.Record) { for _, ts := range writeRequest.Timeseries { task := t.readLabels(ts.Labels) for _, s := range ts.Samples { @@ -168,13 +175,13 @@ func (t TimeStreamAdapter) toRecords(writeRequest *prompb.WriteRequest) (records continue } - records = append(records, ×treamwrite.Record{ + records = append(records, writetypes.Record{ Dimensions: task.dimensions, MeasureName: aws.String(task.measureName), - MeasureValueType: aws.String("DOUBLE"), + MeasureValueType: writetypes.MeasureValueTypeDouble, MeasureValue: aws.String(fmt.Sprint(s.Value)), Time: aws.String(fmt.Sprint(s.Timestamp)), - TimeUnit: aws.String("MILLISECONDS"), + TimeUnit: writetypes.TimeUnitMilliseconds, }) } } @@ -182,8 +189,8 @@ func (t TimeStreamAdapter) toRecords(writeRequest *prompb.WriteRequest) (records return } -func (t TimeStreamAdapter) splitRecords(records []*timestreamwrite.Record) [][]*timestreamwrite.Record { - var chunked [][]*timestreamwrite.Record +func (t TimeStreamAdapter) splitRecords(records []writetypes.Record) [][]writetypes.Record { + var chunked [][]writetypes.Record for i := 0; i < len(records); i += TimestreamMaxRecordsPerRequest { end := i + TimestreamMaxRecordsPerRequest @@ -199,12 +206,12 @@ func (t TimeStreamAdapter) splitRecords(records []*timestreamwrite.Record) [][]* // Read implementation -func (t TimeStreamAdapter) Read(request *prompb.ReadRequest) (response *prompb.ReadResponse, err error) { +func (t TimeStreamAdapter) Read(ctx context.Context, request *prompb.ReadRequest) (response *prompb.ReadResponse, err error) { var queryResult prompb.QueryResult var queryResults []*prompb.QueryResult for _, q := range request.Queries { - queryResult, err = t.runReadRequestQuery(q) + queryResult, err = t.runReadRequestQuery(ctx, q) if err != nil { return @@ -220,26 +227,25 @@ func (t TimeStreamAdapter) Read(request *prompb.ReadRequest) (response *prompb.R return } -func (t TimeStreamAdapter) runReadRequestQuery(q *prompb.Query) (result prompb.QueryResult, err error) { - task, err := t.buildTimeStreamQueryString(q) +func (t TimeStreamAdapter) runReadRequestQuery(ctx context.Context, q *prompb.Query) (result prompb.QueryResult, err error) { + task, err := t.buildTimeStreamQueryString(ctx, q) if err != nil { return } var timeSeries []*prompb.TimeSeries - err = t.QueryPages( - ×treamquery.QueryInput{ - QueryString: &task.query, - }, - func(output *timestreamquery.QueryOutput, lastPage bool) bool { - timeSeries = t.handleQueryResult(output, timeSeries, task.measureName) - return !lastPage - }, - ) + paginator := t.NewQueryPaginator(t, ×treamquery.QueryInput{ + QueryString: aws.String(task.query), + }) - if err != nil { - return + for paginator.HasMorePages() { + page, err := paginator.NextPage(ctx) + if err != nil { + return result, err + } + + timeSeries = t.handleQueryResult(page, timeSeries, task.measureName) } result = prompb.QueryResult{ @@ -250,7 +256,7 @@ func (t TimeStreamAdapter) runReadRequestQuery(q *prompb.Query) (result prompb.Q } // BuildCommand generates the proper SQL for the runReadRequestQuery -func (t TimeStreamAdapter) buildTimeStreamQueryString(q *prompb.Query) (task queryTask, err error) { +func (t TimeStreamAdapter) buildTimeStreamQueryString(ctx context.Context, q *prompb.Query) (task queryTask, err error) { matchers := make([]string, 0, len(q.Matchers)) for _, m := range q.Matchers { // Metric Names @@ -285,7 +291,7 @@ func (t TimeStreamAdapter) buildTimeStreamQueryString(q *prompb.Query) (task que matchers = append(matchers, fmt.Sprintf("time BETWEEN from_milliseconds(%d) AND from_milliseconds(%d)", q.StartTimestampMs, q.EndTimestampMs)) - dimensions, err := t.readDimension(task.measureName) + dimensions, err := t.readDimension(ctx, task.measureName) if err != nil { return @@ -299,10 +305,10 @@ func (t TimeStreamAdapter) buildTimeStreamQueryString(q *prompb.Query) (task que return } -func (t TimeStreamAdapter) readDimension(measureName string) (dimensions []string, err error) { +func (t TimeStreamAdapter) readDimension(ctx context.Context, measureName string) (dimensions []string, err error) { query := fmt.Sprintf("SHOW MEASURES FROM \"%s\".\"%s\" LIKE '%s'", cfg.databaseName, cfg.tableName, measureName) - queryOutput, err := t.Query(×treamquery.QueryInput{QueryString: &query}) + queryOutput, err := t.Query(ctx, ×treamquery.QueryInput{QueryString: &query}) if err != nil { return } @@ -332,7 +338,7 @@ func (t TimeStreamAdapter) readLabels(labels []prompb.Label) (task writeTask) { task.measureName = s.Value continue } - task.dimensions = append(task.dimensions, ×treamwrite.Dimension{ + task.dimensions = append(task.dimensions, writetypes.Dimension{ Name: aws.String(s.Name), Value: aws.String(s.Value), }) diff --git a/timestream_test.go b/timestream_test.go index e9e2d75..80a1393 100644 --- a/timestream_test.go +++ b/timestream_test.go @@ -19,54 +19,55 @@ package main import ( + "context" "errors" "math" "reflect" "testing" "time" - "github.com/aws/aws-sdk-go/aws" - "github.com/aws/aws-sdk-go/service/timestreamquery" - "github.com/aws/aws-sdk-go/service/timestreamquery/timestreamqueryiface" - "github.com/aws/aws-sdk-go/service/timestreamwrite" - "github.com/aws/aws-sdk-go/service/timestreamwrite/timestreamwriteiface" - + "github.com/aws/aws-sdk-go-v2/aws" + "github.com/aws/aws-sdk-go-v2/service/timestreamquery" + querytypes "github.com/aws/aws-sdk-go-v2/service/timestreamquery/types" + "github.com/aws/aws-sdk-go-v2/service/timestreamwrite" + writetypes "github.com/aws/aws-sdk-go-v2/service/timestreamwrite/types" "github.com/prometheus/prometheus/prompb" "go.uber.org/zap" ) var ( timeStreamAdapter = &TimeStreamAdapter{ - TimestreamQueryAPI: TimeStreamQueryMock{}, - TimestreamWriteAPI: TimeStreamWriterMock{}, + TimestreamQueryApi: &TimeStreamQueryMock{}, + TimestreamWriteApi: &TimeStreamWriterMock{}, + NewQueryPaginator: NewPaginatorMock, databaseName: "mockDatabase", logger: zap.NewNop().Sugar(), tableName: "mockTable", } measureOutput = ×treamquery.QueryOutput{ - ColumnInfo: []*timestreamquery.ColumnInfo{ + ColumnInfo: []querytypes.ColumnInfo{ { Name: aws.String("measure_name"), - Type: ×treamquery.Type{ScalarType: aws.String("VARCHAR")}, + Type: &querytypes.Type{ScalarType: querytypes.ScalarTypeVarchar}, }, { Name: aws.String("data_type"), - Type: ×treamquery.Type{ScalarType: aws.String("VARCHAR")}, + Type: &querytypes.Type{ScalarType: querytypes.ScalarTypeVarchar}, }, { Name: aws.String("dimensions"), - Type: ×treamquery.Type{ - ArrayColumnInfo: ×treamquery.ColumnInfo{ - Type: ×treamquery.Type{ - RowColumnInfo: []*timestreamquery.ColumnInfo{ + Type: &querytypes.Type{ + ArrayColumnInfo: &querytypes.ColumnInfo{ + Type: &querytypes.Type{ + RowColumnInfo: []querytypes.ColumnInfo{ { Name: aws.String("dimension_name"), - Type: ×treamquery.Type{ScalarType: aws.String("VARCHAR")}, + Type: &querytypes.Type{ScalarType: querytypes.ScalarTypeVarchar}, }, { Name: aws.String("data_type"), - Type: ×treamquery.Type{ScalarType: aws.String("VARCHAR")}, + Type: &querytypes.Type{ScalarType: querytypes.ScalarTypeVarchar}, }, }, }, @@ -75,24 +76,24 @@ var ( }, }, QueryId: aws.String("MOCK"), - Rows: []*timestreamquery.Row{ + Rows: []querytypes.Row{ { - Data: []*timestreamquery.Datum{ + Data: []querytypes.Datum{ {ScalarValue: aws.String("mock")}, {ScalarValue: aws.String("double")}, { - ArrayValue: []*timestreamquery.Datum{ + ArrayValue: []querytypes.Datum{ { - RowValue: ×treamquery.Row{ - Data: []*timestreamquery.Datum{ + RowValue: &querytypes.Row{ + Data: []querytypes.Datum{ {ScalarValue: aws.String("instance")}, {ScalarValue: aws.String("varchar")}, }, }, }, { - RowValue: ×treamquery.Row{ - Data: []*timestreamquery.Datum{ + RowValue: &querytypes.Row{ + Data: []querytypes.Datum{ {ScalarValue: aws.String("job")}, {ScalarValue: aws.String("varchar")}, }, @@ -105,20 +106,20 @@ var ( }, } - queryOutputColumns = []*timestreamquery.ColumnInfo{ + queryOutputColumns = []querytypes.ColumnInfo{ { Name: aws.String("instance"), - Type: ×treamquery.Type{ScalarType: aws.String("VARCHAR")}, + Type: &querytypes.Type{ScalarType: querytypes.ScalarTypeVarchar}, }, { Name: aws.String("job"), - Type: ×treamquery.Type{ScalarType: aws.String("VARCHAR")}, + Type: &querytypes.Type{ScalarType: querytypes.ScalarTypeVarchar}, }, { Name: aws.String("mock"), - Type: ×treamquery.Type{ - TimeSeriesMeasureValueColumnInfo: ×treamquery.ColumnInfo{ - Type: ×treamquery.Type{ScalarType: aws.String("DOUBLE")}, + Type: &querytypes.Type{ + TimeSeriesMeasureValueColumnInfo: &querytypes.ColumnInfo{ + Type: &querytypes.Type{ScalarType: querytypes.ScalarTypeDouble}, }, }, }, @@ -132,16 +133,16 @@ var ( queryOutput1 = ×treamquery.QueryOutput{ ColumnInfo: queryOutputColumns, QueryId: aws.String("MOCK"), - Rows: []*timestreamquery.Row{ + Rows: []querytypes.Row{ { - Data: []*timestreamquery.Datum{ + Data: []querytypes.Datum{ {ScalarValue: aws.String("host:9100")}, {ScalarValue: aws.String("mock-exporter")}, { - TimeSeriesValue: []*timestreamquery.TimeSeriesDataPoint{ + TimeSeriesValue: []querytypes.TimeSeriesDataPoint{ { Time: aws.String("2020-01-01 00:00:00.000000000"), - Value: ×treamquery.Datum{ScalarValue: aws.String("1.0")}, + Value: &querytypes.Datum{ScalarValue: aws.String("1.0")}, }, }, }, @@ -153,16 +154,16 @@ var ( queryOutput2 = ×treamquery.QueryOutput{ ColumnInfo: queryOutputColumns, QueryId: aws.String("MOCK"), - Rows: []*timestreamquery.Row{ + Rows: []querytypes.Row{ { - Data: []*timestreamquery.Datum{ + Data: []querytypes.Datum{ {ScalarValue: aws.String("host:9100")}, {ScalarValue: aws.String("mock-exporter")}, { - TimeSeriesValue: []*timestreamquery.TimeSeriesDataPoint{ + TimeSeriesValue: []querytypes.TimeSeriesDataPoint{ { Time: aws.String("2020-01-01 00:00:01.000000000"), - Value: ×treamquery.Datum{ScalarValue: aws.String("2.0")}, + Value: &querytypes.Datum{ScalarValue: aws.String("2.0")}, }, }, }, @@ -172,15 +173,36 @@ var ( } ) -type TimeStreamWriterMock struct { - timestreamwriteiface.TimestreamWriteAPI +type TimeStreamWriterMock struct{} + +type TimeStreamQueryMock struct{} + +type PaginatorMock struct { + callCount int } -type TimeStreamQueryMock struct { - timestreamqueryiface.TimestreamQueryAPI +func NewPaginatorMock(client TimestreamQueryApi, params *timestreamquery.QueryInput, optFns ...func(*timestreamquery.QueryPaginatorOptions)) PaginatorApi { + return &PaginatorMock{} } -func (t TimeStreamWriterMock) WriteRecords(input *timestreamwrite.WriteRecordsInput) (*timestreamwrite.WriteRecordsOutput, error) { +func (p *PaginatorMock) HasMorePages() bool { + p.callCount++ + // Return true for the first two calls, then false. + return p.callCount <= 2 +} + +func (p *PaginatorMock) NextPage(ctx context.Context, optFns ...func(*timestreamquery.Options)) (*timestreamquery.QueryOutput, error) { + switch p.callCount { + case 1: + return queryOutput1, nil + case 2: + return queryOutput2, nil + default: + return nil, errors.New("no more pages") + } +} + +func (t *TimeStreamWriterMock) WriteRecords(ctx context.Context, input *timestreamwrite.WriteRecordsInput, optFns ...func(*timestreamwrite.Options)) (*timestreamwrite.WriteRecordsOutput, error) { for _, i := range input.Records { if *i.MeasureName == "sample_name_error" { return nil, errors.New("error writing to mock timestream database") @@ -189,22 +211,14 @@ func (t TimeStreamWriterMock) WriteRecords(input *timestreamwrite.WriteRecordsIn return ×treamwrite.WriteRecordsOutput{}, nil } -func (t TimeStreamQueryMock) Query(input *timestreamquery.QueryInput) (output *timestreamquery.QueryOutput, err error) { +func (t *TimeStreamQueryMock) Query(ctx context.Context, input *timestreamquery.QueryInput, optFns ...func(*timestreamquery.Options)) (*timestreamquery.QueryOutput, error) { switch *input.QueryString { case "SHOW MEASURES FROM \"prometheus-database\".\"prometheus-table\" LIKE 'mock'": - output = measureOutput + return measureOutput, nil case "SELECT instance, job, CREATE_TIME_SERIES(time, measure_value::double) AS mock FROM \"prometheus-database\".\"prometheus-table\" WHERE measure_name = 'mock' AND time BETWEEN from_milliseconds(1577836800000) AND from_milliseconds(1577836800000) GROUP BY instance, job": - output = queryOutput1 + return queryOutput1, nil } - - return -} - -func (t TimeStreamQueryMock) QueryPages(input *timestreamquery.QueryInput, handler func(*timestreamquery.QueryOutput, bool) bool) (err error) { - handler(queryOutput0, false) - handler(queryOutput1, false) - handler(queryOutput2, true) - return + return nil, nil } func TestTimeSteamAdapter_readLabels(t *testing.T) { @@ -232,7 +246,7 @@ func TestTimeSteamAdapter_readLabels(t *testing.T) { }, wantTask: writeTask{ measureName: "sample_metric", - dimensions: []*timestreamwrite.Dimension{ + dimensions: []writetypes.Dimension{ { Name: aws.String("job"), Value: aws.String("testing"), @@ -257,7 +271,7 @@ func TestTimeSteamAdapter_toRecords(t *testing.T) { tests := []struct { name string args args - wantRecords []*timestreamwrite.Record + wantRecords []writetypes.Record }{ { name: "Prom data request", @@ -285,9 +299,9 @@ func TestTimeSteamAdapter_toRecords(t *testing.T) { }, }, }, - wantRecords: []*timestreamwrite.Record{ + wantRecords: []writetypes.Record{ { - Dimensions: []*timestreamwrite.Dimension{ + Dimensions: []writetypes.Dimension{ { Name: aws.String("job"), Value: aws.String("testing"), @@ -295,9 +309,9 @@ func TestTimeSteamAdapter_toRecords(t *testing.T) { }, MeasureName: aws.String("sample_metric"), MeasureValue: aws.String("12345"), - MeasureValueType: aws.String("DOUBLE"), + MeasureValueType: writetypes.MeasureValueTypeDouble, Time: aws.String("1577836800000"), - TimeUnit: aws.String("MILLISECONDS"), + TimeUnit: writetypes.TimeUnitMilliseconds, }, }, }, @@ -483,7 +497,7 @@ func TestTimeSteamAdapter_Write(t *testing.T) { } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - if err := timeStreamAdapter.Write(tt.args.req); (err != nil) != tt.wantErr { + if err := timeStreamAdapter.Write(context.TODO(), tt.args.req); (err != nil) != tt.wantErr { t.Errorf("Write() error = %v, wantErr %v", err, tt.wantErr) } }) @@ -579,7 +593,7 @@ func TestTimeSteamAdapter_Read(t *testing.T) { } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - gotResponse, err := timeStreamAdapter.Read(tt.args.request) + gotResponse, err := timeStreamAdapter.Read(context.TODO(), tt.args.request) if (err != nil) != tt.wantErr { t.Errorf("Read() error = %v, wantErr %v", err, tt.wantErr) return