From 90758978ac9c50076b1eef1e8a9015441cae024b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?St=C3=A9phane=20Duchesneau?= Date: Wed, 5 Jul 2023 13:10:54 -0400 Subject: [PATCH] write partial spkg to cache, allow read partial spkg --- docs/release-notes/change-log.md | 2 ++ manifest/reader.go | 14 +++++++++-- service/tier1.go | 40 +++++++++++++++++++++++++++++--- 3 files changed, 51 insertions(+), 5 deletions(-) diff --git a/docs/release-notes/change-log.md b/docs/release-notes/change-log.md index 7eeb0ba7f..c08c74bfa 100644 --- a/docs/release-notes/change-log.md +++ b/docs/release-notes/change-log.md @@ -12,6 +12,8 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), * Max-subrequests can now be overriden by auth header `X-Sf-Substreams-Parallel-Jobs` (note: if your auth plugin is 'trust', make sure that you filter out this header from public access * Request Stats logging. When enable it will log metrics associated to a Tier1 and Tier2 request +* On request, save "substreams.partial.spkg" file to the state cache for debugging purposes. +* Manifest reader can now read 'partial' spkg files (without protobuf and metadata) with an option. #### Fixed diff --git a/manifest/reader.go b/manifest/reader.go index ea72dc094..004b55e5e 100644 --- a/manifest/reader.go +++ b/manifest/reader.go @@ -46,6 +46,13 @@ func SkipModuleOutputTypeValidationReader() Options { } } +func SkipPackageValidationReader() Options { + return func(r *Reader) *Reader { + r.skipPackageValidation = true + return r + } +} + func WithCollectProtoDefinitions(f func(protoDefinitions []*desc.FileDescriptor)) Options { return func(r *Reader) *Reader { r.collectProtoDefinitionsFunc = f @@ -65,6 +72,7 @@ type Reader struct { //options skipSourceCodeImportValidation bool skipModuleOutputTypeValidation bool + skipPackageValidation bool constructorErr error } @@ -329,8 +337,10 @@ func (r *Reader) fromContents(contents []byte) (pkg *pbsubstreams.Package, err e } func (r *Reader) validate(pkg *pbsubstreams.Package) error { - if err := r.validatePackage(pkg); err != nil { - return fmt.Errorf("package validation failed: %w", err) + if !r.skipPackageValidation { + if err := r.validatePackage(pkg); err != nil { + return fmt.Errorf("package validation failed: %w", err) + } } if err := ValidateModules(pkg.Modules); err != nil { diff --git a/service/tier1.go b/service/tier1.go index 60211412f..748445a4e 100644 --- a/service/tier1.go +++ b/service/tier1.go @@ -1,9 +1,14 @@ package service import ( + "bytes" "context" "errors" "fmt" + "strconv" + "strings" + "sync" + "github.com/streamingfast/bstream/hub" "github.com/streamingfast/bstream/stream" bsstream "github.com/streamingfast/bstream/stream" @@ -14,9 +19,6 @@ import ( "github.com/streamingfast/logging" tracing "github.com/streamingfast/sf-tracing" "github.com/streamingfast/shutter" - "strconv" - "strings" - "sync" "github.com/bufbuild/connect-go" "github.com/streamingfast/substreams" @@ -25,6 +27,7 @@ import ( "github.com/streamingfast/substreams/orchestrator/work" pbsubstreamsrpc "github.com/streamingfast/substreams/pb/sf/substreams/rpc/v2" ssconnect "github.com/streamingfast/substreams/pb/sf/substreams/rpc/v2/pbsubstreamsrpcconnect" + pbsubstreams "github.com/streamingfast/substreams/pb/sf/substreams/v1" "github.com/streamingfast/substreams/pipeline" "github.com/streamingfast/substreams/pipeline/cache" "github.com/streamingfast/substreams/pipeline/exec" @@ -39,6 +42,7 @@ import ( "go.uber.org/zap" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" + "google.golang.org/protobuf/proto" ) type Tier1Service struct { @@ -208,6 +212,9 @@ func (s *Tier1Service) Blocks( return err } + if err := s.writePackage(ctx, request, outputGraph); err != nil { + logger.Warn("cannot write package", zap.Error(err)) + } // On app shutdown, we cancel the running '.blocks()' command, // we catch this situation via IsTerminating() to return a special error. runningContext, cancelRunning := context.WithCancel(ctx) @@ -244,6 +251,33 @@ func (s *Tier1Service) Blocks( return nil } +func (s *Tier1Service) writePackage(ctx context.Context, request *pbsubstreamsrpc.Request, outputGraph *outputmodules.Graph) error { + asPackage := &pbsubstreams.Package{ + Modules: request.Modules, + ModuleMeta: []*pbsubstreams.ModuleMetadata{}, + } + + cnt, err := proto.Marshal(asPackage) + if err != nil { + return fmt.Errorf("marshalling package: %w", err) + } + + moduleStore, err := s.runtimeConfig.BaseObjectStore.SubStore(outputGraph.ModuleHashes().Get(request.OutputModule)) + if err != nil { + return fmt.Errorf("getting substore: %w", err) + } + exists, err := moduleStore.FileExists(ctx, "substreams.partial.spkg") + if err != nil { + return fmt.Errorf("error checking fileExists: %w", err) + } + if !exists { + if err := moduleStore.WriteObject(ctx, "substreams.partial.spkg", bytes.NewReader(cnt)); err != nil { + return fmt.Errorf("writing substreams.partial object") + } + } + return nil +} + func (s *Tier1Service) blocks(ctx context.Context, request *pbsubstreamsrpc.Request, outputGraph *outputmodules.Graph, respFunc substreams.ResponseFunc) error { logger := reqctx.Logger(ctx)