diff --git a/cmd/substreams/build.go b/cmd/substreams/build.go index 124989ca..6fdcc805 100644 --- a/cmd/substreams/build.go +++ b/cmd/substreams/build.go @@ -39,11 +39,16 @@ func init() { func runBuildE(cmd *cobra.Command, args []string) error { ctx := cmd.Context() + var manifestPath string + if len(args) > 1 && args[1] != "" { + manifestPath = args[1] + } else { + manifestPath = sflags.MustGetString(cmd, "manifest") + } // Parse substreams.yaml - manifestPath := sflags.MustGetString(cmd, "manifest") if manifestPath == "" { var err error - manifestPath, err = findManifest() + manifestPath, err = findManifest(manifestPath) if err != nil { return fmt.Errorf("error finding manifest: %w", err) } @@ -316,17 +321,13 @@ func (s *SPKGPacker) Build(ctx context.Context) error { // findManifest searches for the substreams.yaml file starting from the current directory // and moving up to the parent directories until it finds the file or reaches the user's $HOME directory. -func findManifest() (string, error) { +func findManifest(originalPath string) (string, error) { homeDir, err := os.UserHomeDir() if err != nil { return "", fmt.Errorf("error getting user home directory: %w", err) } - originalDir, err := os.Getwd() - if err != nil { - return "", fmt.Errorf("error getting current directory: %w", err) - } - + originalDir := filepath.Dir(originalPath) currentDir := originalDir for { manifestPath := filepath.Join(currentDir, "substreams.yaml") diff --git a/pipeline/process_block.go b/pipeline/process_block.go index 1a6bdfb1..f0b76c63 100644 --- a/pipeline/process_block.go +++ b/pipeline/process_block.go @@ -124,8 +124,6 @@ func (p *Pipeline) processBlock( } case bstream.StepNew: p.blockStepMap[bstream.StepNew]++ - - dmetering.GetBytesMeter(ctx).AddBytesRead(execOutput.Len()) err = p.handleStepNew(ctx, clock, cursor, execOutput) if err != nil && err != io.EOF { return fmt.Errorf("step new: handler step new: %w", err) diff --git a/service/metering.go b/service/metering.go index abf39e8c..f4900c48 100644 --- a/service/metering.go +++ b/service/metering.go @@ -17,6 +17,12 @@ func sendMetering(ctx context.Context, meter dmetering.Meter, userID, apiKeyID, inputBytes := meter.GetCount("wasm_input_bytes") meter.ResetCount("wasm_input_bytes") + uncompressedBytesRead := meter.GetCount("uncompressed_read_bytes") + meter.ResetCount("uncompressed_read_bytes") + + uncompressedLiveBytesRead := meter.GetCount("uncompressed_live_read_bytes") + meter.ResetCount("uncompressed_live_read_bytes") + event := dmetering.Event{ UserID: userID, ApiKeyID: apiKeyID, @@ -25,11 +31,13 @@ func sendMetering(ctx context.Context, meter dmetering.Meter, userID, apiKeyID, Endpoint: endpoint, Metrics: map[string]float64{ - "egress_bytes": float64(egressBytes), - "written_bytes": float64(bytesWritten), - "read_bytes": float64(bytesRead), - "wasm_input_bytes": float64(inputBytes), - "message_count": 1, + "egress_bytes": float64(egressBytes), + "written_bytes": float64(bytesWritten), + "read_bytes": float64(bytesRead), + "wasm_input_bytes": float64(inputBytes), + "uncompressed_read_bytes": float64(uncompressedBytesRead), + "uncompressed_live_read_bytes": float64(uncompressedLiveBytesRead), + "message_count": 1, }, Timestamp: time.Now(), } diff --git a/service/tier1.go b/service/tier1.go index 33647604..d791636b 100644 --- a/service/tier1.go +++ b/service/tier1.go @@ -202,6 +202,8 @@ func (s *Tier1Service) Blocks( ctx = reqctx.WithTracer(ctx, s.tracer) ctx = dmetering.WithBytesMeter(ctx) ctx = dmetering.WithCounter(ctx, "wasm_input_bytes") + ctx = dmetering.WithCounter(ctx, "live_bytes_uncompressed") + ctx = dmetering.WithCounter(ctx, "stored_bytes_uncompressed") ctx = reqctx.WithTier2RequestParameters(ctx, s.tier2RequestParameters) ctx, span := reqctx.WithSpan(ctx, "substreams/tier1/request") @@ -568,9 +570,26 @@ func (s *Tier1Service) blocks(ctx context.Context, request *pbsubstreamsrpc.Requ streamHandler = pipe } + // We need to meter the bytes read from the live stream, as they are not metered by the dstore meters + blockMeteredHandlerFunc := bstream.HandlerFunc(func(blk *pbbstream.Block, obj interface{}) error { + step := obj.(bstream.Stepable).Step() + switch step { + case bstream.StepNewIrreversible: + // already metered by the store in compressed bytes. + // we will meter them here for potential future use + dmetering.GetBytesMeter(ctx).CountInc("uncompressed_read_bytes", len(blk.Payload.GetValue())) + default: + // all other cases are live blocks to be metered + dmetering.GetBytesMeter(ctx).AddBytesRead(len(blk.Payload.GetValue())) + dmetering.GetBytesMeter(ctx).CountInc("uncompressed_live_read_bytes", len(blk.Payload.GetValue())) + } + + return streamHandler.ProcessBlock(blk, obj) + }) + blockStream, err := s.streamFactoryFunc( ctx, - streamHandler, + blockMeteredHandlerFunc, int64(requestDetails.LinearHandoffBlockNum), request.StopBlockNum, cursor,