Skip to content

Commit

Permalink
move metering middleware definitions to metering package
Browse files Browse the repository at this point in the history
  • Loading branch information
colindickson committed Oct 10, 2024
1 parent b2a3525 commit 599fda8
Show file tree
Hide file tree
Showing 3 changed files with 40 additions and 50 deletions.
37 changes: 37 additions & 0 deletions metering/metering.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,9 @@ import (
"fmt"
"time"

"github.com/streamingfast/bstream"
pbbstream "github.com/streamingfast/bstream/pb/sf/bstream/v1"

"github.com/streamingfast/substreams/metrics"

"github.com/streamingfast/dmetering"
Expand Down Expand Up @@ -79,6 +82,40 @@ func GetTotalBytesWritten(meter dmetering.Meter) uint64 {
return total
}

func LiveSourceMiddlewareHandlerFactory(ctx context.Context) func(handler bstream.Handler) bstream.Handler {
return func(next bstream.Handler) bstream.Handler {
return bstream.HandlerFunc(func(blk *pbbstream.Block, obj interface{}) error {
stepable, ok := obj.(bstream.Stepable)
if ok {
step := stepable.Step()
if step.Matches(bstream.StepNew) {
dmetering.GetBytesMeter(ctx).CountInc(MeterLiveUncompressedReadBytes, len(blk.GetPayload().GetValue()))
} else {
dmetering.GetBytesMeter(ctx).CountInc(MeterLiveUncompressedReadForkedBytes, len(blk.GetPayload().GetValue()))
}
}
return next.ProcessBlock(blk, obj)
})
}
}

func FileSourceMiddlewareHandlerFactory(ctx context.Context) func(handler bstream.Handler) bstream.Handler {
return func(next bstream.Handler) bstream.Handler {
return bstream.HandlerFunc(func(blk *pbbstream.Block, obj interface{}) error {
stepable, ok := obj.(bstream.Stepable)
if ok {
step := stepable.Step()
if step.Matches(bstream.StepNew) {
dmetering.GetBytesMeter(ctx).CountInc(MeterFileUncompressedReadBytes, len(blk.GetPayload().GetValue()))
} else {
dmetering.GetBytesMeter(ctx).CountInc(MeterFileUncompressedReadForkedBytes, len(blk.GetPayload().GetValue()))
}
}
return next.ProcessBlock(blk, obj)
})
}
}

func Send(ctx context.Context, userID, apiKeyID, ip, userMeta, endpoint string, resp proto.Message) {
if reqctx.IsBackfillerRequest(ctx) {
endpoint = fmt.Sprintf("%s%s", endpoint, "Backfill")
Expand Down
34 changes: 2 additions & 32 deletions service/tier1.go
Original file line number Diff line number Diff line change
Expand Up @@ -576,36 +576,6 @@ func (s *Tier1Service) blocks(ctx context.Context, request *pbsubstreamsrpc.Requ
streamHandler = pipe
}

liveSourceMiddlewareHandler := func(next bstream.Handler) bstream.Handler {
return bstream.HandlerFunc(func(blk *pbbstream.Block, obj interface{}) error {
stepable, ok := obj.(bstream.Stepable)
if ok {
step := stepable.Step()
if step.Matches(bstream.StepNew) {
dmetering.GetBytesMeter(ctx).CountInc(metering.MeterLiveUncompressedReadBytes, len(blk.GetPayload().GetValue()))
} else {
dmetering.GetBytesMeter(ctx).CountInc(metering.MeterLiveUncompressedReadForkedBytes, len(blk.GetPayload().GetValue()))
}
}
return next.ProcessBlock(blk, obj)
})
}

fileSourceMiddlewareHandler := func(next bstream.Handler) bstream.Handler {
return bstream.HandlerFunc(func(blk *pbbstream.Block, obj interface{}) error {
stepable, ok := obj.(bstream.Stepable)
if ok {
step := stepable.Step()
if step.Matches(bstream.StepNew) {
dmetering.GetBytesMeter(ctx).CountInc(metering.MeterFileUncompressedReadBytes, len(blk.GetPayload().GetValue()))
} else {
dmetering.GetBytesMeter(ctx).CountInc(metering.MeterFileUncompressedReadForkedBytes, len(blk.GetPayload().GetValue()))
}
}
return next.ProcessBlock(blk, obj)
})
}

blockStream, err := s.streamFactoryFunc(
ctx,
streamHandler,
Expand All @@ -615,8 +585,8 @@ func (s *Tier1Service) blocks(ctx context.Context, request *pbsubstreamsrpc.Requ
request.FinalBlocksOnly,
cursorIsTarget,
logger.Named("stream"),
bsstream.WithLiveSourceHandlerMiddleware(liveSourceMiddlewareHandler),
bsstream.WithFileSourceHandlerMiddleware(fileSourceMiddlewareHandler),
bsstream.WithLiveSourceHandlerMiddleware(metering.LiveSourceMiddlewareHandlerFactory(ctx)),
bsstream.WithFileSourceHandlerMiddleware(metering.FileSourceMiddlewareHandlerFactory(ctx)),
)
if err != nil {
return fmt.Errorf("error getting stream: %w", err)
Expand Down
19 changes: 1 addition & 18 deletions service/tier2.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,6 @@ import (

"connectrpc.com/connect"
"github.com/RoaringBitmap/roaring/roaring64"
"github.com/streamingfast/bstream"
pbbstream "github.com/streamingfast/bstream/pb/sf/bstream/v1"
"github.com/streamingfast/bstream/stream"
bsstream "github.com/streamingfast/bstream/stream"
"github.com/streamingfast/dauth"
Expand Down Expand Up @@ -417,21 +415,6 @@ excludable:
streamFactoryFunc = s.streamFactoryFuncOverride
}

fileSourceMiddlewareHandler := func(next bstream.Handler) bstream.Handler {
return bstream.HandlerFunc(func(blk *pbbstream.Block, obj interface{}) error {
stepable, ok := obj.(bstream.Stepable)
if ok {
step := stepable.Step()
if step.Matches(bstream.StepNew) {
dmetering.GetBytesMeter(ctx).CountInc(metering.MeterFileUncompressedReadBytes, len(blk.GetPayload().GetValue()))
} else {
dmetering.GetBytesMeter(ctx).CountInc(metering.MeterFileUncompressedReadForkedBytes, len(blk.GetPayload().GetValue()))
}
}
return next.ProcessBlock(blk, obj)
})
}

blockStream, err := streamFactoryFunc(
ctx,
pipe,
Expand All @@ -441,7 +424,7 @@ excludable:
true,
false,
logger.Named("stream"),
bsstream.WithFileSourceHandlerMiddleware(fileSourceMiddlewareHandler),
bsstream.WithFileSourceHandlerMiddleware(metering.FileSourceMiddlewareHandlerFactory(ctx)),
)
if err != nil {
return fmt.Errorf("error getting stream: %w", err)
Expand Down

0 comments on commit 599fda8

Please sign in to comment.