From 1b9fb057114ce9a91c4aa198a34f0e80ef4333ff Mon Sep 17 00:00:00 2001 From: Alexandre Bourget Date: Thu, 29 Jun 2023 18:13:44 -0400 Subject: [PATCH] Rework of tier1 and tier2 splut with the RequestPLan. Renamed IsSubRequest to IsTier2Request, and a few other simplifications for the two different flows. Build the Tier1Request plan way up in the first call plan. --- orchestrator/execout/execout_walker.go | 37 ++++++++++---------- orchestrator/parallelprocessor.go | 13 ++----- orchestrator/plan/requestplan.go | 3 ++ orchestrator/plan/requestplan_test.go | 12 +++++++ orchestrator/scheduler/scheduler.go | 5 +++ pipeline/gate.go | 2 +- pipeline/pipeline.go | 47 +++++++++++++++++++------- pipeline/process_block.go | 2 +- pipeline/resolve.go | 9 ++--- pipeline/snapshot.go | 2 +- pipeline/stores.go | 32 +++++++++++------- reqctx/request.go | 8 ++--- reqctx/request_test.go | 8 ++--- service/testing.go | 3 +- service/tier1.go | 14 ++++++-- service/tier2.go | 4 +-- test/integration_test.go | 16 ++++----- test/runnable_test.go | 24 ++++++------- test/worker_test.go | 7 ++-- 19 files changed, 148 insertions(+), 100 deletions(-) diff --git a/orchestrator/execout/execout_walker.go b/orchestrator/execout/execout_walker.go index 8a2f021bb..fd9dd3ccd 100644 --- a/orchestrator/execout/execout_walker.go +++ b/orchestrator/execout/execout_walker.go @@ -11,6 +11,7 @@ import ( "go.uber.org/zap" "google.golang.org/protobuf/types/known/anypb" + "github.com/streamingfast/substreams/block" "github.com/streamingfast/substreams/orchestrator/loop" "github.com/streamingfast/substreams/orchestrator/response" pbsubstreamsrpc "github.com/streamingfast/substreams/pb/sf/substreams/rpc/v2" @@ -21,33 +22,29 @@ import ( ) type Walker struct { - ctx context.Context - requestStartBlock uint64 - exclusiveEndBlock uint64 - fileWalker *execout.FileWalker - streamOut *response.Stream - module *pbsubstreams.Module - logger *zap.Logger + ctx context.Context + *block.Range + fileWalker *execout.FileWalker + streamOut *response.Stream + module *pbsubstreams.Module + logger *zap.Logger } func NewWalker( ctx context.Context, module *pbsubstreams.Module, fileWalker *execout.FileWalker, - startBlock uint64, - exclusiveEndBlock uint64, + walkRange *block.Range, stream *response.Stream, - ) *Walker { logger := reqctx.Logger(ctx) return &Walker{ - ctx: ctx, - module: module, - fileWalker: fileWalker, - requestStartBlock: startBlock, - exclusiveEndBlock: exclusiveEndBlock, - streamOut: stream, - logger: logger, + ctx: ctx, + module: module, + fileWalker: fileWalker, + Range: walkRange, + streamOut: stream, + logger: logger, } } @@ -82,7 +79,7 @@ func (r *Walker) sendItems(sortedItems []*pboutput.Item) error { if item == nil { continue // why would that happen?! } - if item.BlockNum < r.requestStartBlock { + if item.BlockNum < r.StartBlock { continue } @@ -95,9 +92,9 @@ func (r *Walker) sendItems(sortedItems []*pboutput.Item) error { return fmt.Errorf("calling response func: %w", err) } - if blockScopedData.Clock.Number >= r.exclusiveEndBlock { + if blockScopedData.Clock.Number >= r.ExclusiveEndBlock { r.logger.Info("stop pulling block scoped data, end block reach", - zap.Uint64("exclusive_end_block_num", r.exclusiveEndBlock), + zap.Uint64("exclusive_end_block_num", r.ExclusiveEndBlock), zap.Uint64("cache_item_block_num", blockScopedData.Clock.Number), ) return nil diff --git a/orchestrator/parallelprocessor.go b/orchestrator/parallelprocessor.go index 24d57ab0e..68349449b 100644 --- a/orchestrator/parallelprocessor.go +++ b/orchestrator/parallelprocessor.go @@ -25,7 +25,7 @@ type ParallelProcessor struct { // BuildParallelProcessor is only called on tier1 func BuildParallelProcessor( ctx context.Context, - reqDetails *reqctx.RequestDetails, + reqPlan *plan.RequestPlan, runtimeConfig config.RuntimeConfig, outputGraph *outputmodules.Graph, execoutStorage *execout.Configs, @@ -33,14 +33,6 @@ func BuildParallelProcessor( storeConfigs store.ConfigMap, traceID string, ) (*ParallelProcessor, error) { - reqPlan := plan.BuildTier1RequestPlan( - reqDetails.ProductionMode, - runtimeConfig.SubrequestsSplitSize, - outputGraph.LowestInitBlock(), - reqDetails.ResolvedStartBlockNum, - reqDetails.LinearHandoffBlockNum, - reqDetails.StopBlockNum, - ) stream := response.New(respFunc) sched := scheduler.New(ctx, stream) @@ -100,8 +92,7 @@ func BuildParallelProcessor( ctx, requestedModule, walker, - reqDetails.ResolvedStartBlockNum, - reqDetails.LinearHandoffBlockNum, + reqPlan.WriteExecOut, stream, ) } diff --git a/orchestrator/plan/requestplan.go b/orchestrator/plan/requestplan.go index de2626a6d..05060a936 100644 --- a/orchestrator/plan/requestplan.go +++ b/orchestrator/plan/requestplan.go @@ -53,6 +53,9 @@ func BuildTier1RequestPlan(productionMode bool, segmentInterval uint64, graphIni if resolvedStartBlock < graphInitBlock { panic(fmt.Errorf("start block cannot be prior to the lowest init block in the requested module graph (%d)", graphInitBlock)) } + if resolvedStartBlock == linearHandoffBlock && graphInitBlock == resolvedStartBlock { + return plan + } if productionMode { storesStopOnBound := plan.LinearPipeline == nil endStoreBound := linearHandoffBlock diff --git a/orchestrator/plan/requestplan_test.go b/orchestrator/plan/requestplan_test.go index cc604e5ea..2da102388 100644 --- a/orchestrator/plan/requestplan_test.go +++ b/orchestrator/plan/requestplan_test.go @@ -23,6 +23,18 @@ func TestBuildConfig(t *testing.T) { expectExecOutRange string expectLinearPipelineRange string }{ + { + "no parallel work to do prod mode", + 100, 100, + false, 621, 621, 621, 742, + "nil", "nil", "621-742", + }, + { + "no parallel work to do dev mode", + 100, 100, + true, 621, 621, 621, 742, + "nil", "nil", "621-742", + }, { "g1. dev mode with stop within same segment as start block", 100, 100, diff --git a/orchestrator/scheduler/scheduler.go b/orchestrator/scheduler/scheduler.go index 9d26c67d6..6fa4f1926 100644 --- a/orchestrator/scheduler/scheduler.go +++ b/orchestrator/scheduler/scheduler.go @@ -2,6 +2,7 @@ package scheduler import ( "context" + "fmt" "time" "go.uber.org/zap" @@ -61,6 +62,7 @@ func (s *Scheduler) Init() loop.Cmd { } func (s *Scheduler) Update(msg loop.Msg) loop.Cmd { + fmt.Printf("UPDATE: %T %v\n", msg, msg) var cmds []loop.Cmd switch msg := msg.(type) { @@ -132,6 +134,9 @@ func (s *Scheduler) Update(msg loop.Msg) loop.Cmd { } + if len(cmds) != 0 { + fmt.Printf("Schedule: %T %v\n", cmds, cmds) + } return loop.Batch(cmds...) } diff --git a/pipeline/gate.go b/pipeline/gate.go index 746882289..37f99560c 100644 --- a/pipeline/gate.go +++ b/pipeline/gate.go @@ -18,7 +18,7 @@ type gate struct { func newGate(ctx context.Context) *gate { reqDetails := reqctx.Details(ctx) return &gate{ - disabled: reqDetails.IsSubRequest, + disabled: reqDetails.IsTier2Request, requestStartBlockNum: reqDetails.ResolvedStartBlockNum, } } diff --git a/pipeline/pipeline.go b/pipeline/pipeline.go index 9ae1638e7..c5b4ff476 100644 --- a/pipeline/pipeline.go +++ b/pipeline/pipeline.go @@ -13,6 +13,7 @@ import ( "github.com/streamingfast/substreams" "github.com/streamingfast/substreams/orchestrator" + "github.com/streamingfast/substreams/orchestrator/plan" pbssinternal "github.com/streamingfast/substreams/pb/sf/substreams/intern/v2" pbsubstreamsrpc "github.com/streamingfast/substreams/pb/sf/substreams/rpc/v2" pbsubstreams "github.com/streamingfast/substreams/pb/sf/substreams/v1" @@ -104,12 +105,12 @@ func New( for _, opt := range opts { opt(pipe) } + pipe.registerHandlers(ctx) return pipe } -func (p *Pipeline) InitStoresAndBackprocess(ctx context.Context) (err error) { +func (p *Pipeline) registerHandlers(ctx context.Context) (err error) { reqDetails := reqctx.Details(ctx) - logger := reqctx.Logger(ctx) p.forkHandler.registerUndoHandler(func(clock *pbsubstreams.Clock, moduleOutputs []*pbssinternal.ModuleOutput) { for _, modOut := range moduleOutputs { @@ -118,20 +119,32 @@ func (p *Pipeline) InitStoresAndBackprocess(ctx context.Context) (err error) { }) p.setupProcessingModule(reqDetails) + return nil +} + +func (p *Pipeline) InitTier2Stores(ctx context.Context) (err error) { + logger := reqctx.Logger(ctx) var storeMap store.Map - if reqDetails.IsSubRequest { - logger.Info("stores loaded", zap.Object("stores", p.stores.StoreMap)) - if storeMap, err = p.setupSubrequestStores(ctx); err != nil { - return fmt.Errorf("subrequest stores setup failed: %w", err) - } - } else { - if storeMap, err = p.runParallelProcess(ctx); err != nil { + logger.Info("stores loaded", zap.Object("stores", p.stores.StoreMap)) + if storeMap, err = p.setupSubrequestStores(ctx); err != nil { + return fmt.Errorf("subrequest stores setup failed: %w", err) + } + p.stores.SetStoreMap(storeMap) + + return nil +} + +func (p *Pipeline) InitTier1StoresAndBackprocess(ctx context.Context, reqPlan *plan.RequestPlan) (err error) { + var storeMap store.Map + if reqPlan.BuildStores != nil { + if storeMap, err = p.runParallelProcess(ctx, reqPlan); err != nil { return fmt.Errorf("run_parallel_process failed: %w", err) } + } else { + storeMap = p.setupEmptyStores(ctx) } p.stores.SetStoreMap(storeMap) - return nil } @@ -207,8 +220,18 @@ func (p *Pipeline) setupSubrequestStores(ctx context.Context) (storeMap store.Ma return storeMap, nil } +func (p *Pipeline) setupEmptyStores(ctx context.Context) store.Map { + logger := reqctx.Logger(ctx) + storeMap := store.NewMap() + for _, storeConfig := range p.stores.configs { + fullStore := storeConfig.NewFullKV(logger) + storeMap.Set(fullStore) + } + return storeMap +} + // runParallelProcess -func (p *Pipeline) runParallelProcess(ctx context.Context) (storeMap store.Map, err error) { +func (p *Pipeline) runParallelProcess(ctx context.Context, reqPlan *plan.RequestPlan) (storeMap store.Map, err error) { ctx, span := reqctx.WithSpan(p.ctx, fmt.Sprintf("substreams/%s/pipeline/parallel_process", p.tier)) defer span.EndWithErr(&err) reqDetails := reqctx.Details(ctx) @@ -222,7 +245,7 @@ func (p *Pipeline) runParallelProcess(ctx context.Context) (storeMap store.Map, parallelProcessor, err := orchestrator.BuildParallelProcessor( p.ctx, - reqDetails, + reqPlan, p.runtimeConfig, p.outputGraph, p.execoutStorage, diff --git a/pipeline/process_block.go b/pipeline/process_block.go index 90f864433..e5ec058e8 100644 --- a/pipeline/process_block.go +++ b/pipeline/process_block.go @@ -216,7 +216,7 @@ func (p *Pipeline) handleStepNew(ctx context.Context, block *bstream.Block, cloc //fmt.Println("accumulated time for all modules", exec.Timer, "avg", sumDuration/time.Duration(sumCount)) if reqDetails.ShouldReturnProgressMessages() { - if reqDetails.IsSubRequest { + if reqDetails.IsTier2Request { forceSend := (clock.Number+1)%p.runtimeConfig.CacheSaveInterval == 0 if err = p.returnInternalModuleProgressOutputs(clock, forceSend); err != nil { diff --git a/pipeline/resolve.go b/pipeline/resolve.go index 6a260a8c1..437af006c 100644 --- a/pipeline/resolve.go +++ b/pipeline/resolve.go @@ -9,12 +9,13 @@ import ( "github.com/streamingfast/bstream" "github.com/streamingfast/bstream/hub" "github.com/streamingfast/dstore" - pbssinternal "github.com/streamingfast/substreams/pb/sf/substreams/intern/v2" - pbsubstreamsrpc "github.com/streamingfast/substreams/pb/sf/substreams/rpc/v2" - "github.com/streamingfast/substreams/reqctx" "go.uber.org/zap" grpccodes "google.golang.org/grpc/codes" "google.golang.org/grpc/status" + + pbssinternal "github.com/streamingfast/substreams/pb/sf/substreams/intern/v2" + pbsubstreamsrpc "github.com/streamingfast/substreams/pb/sf/substreams/rpc/v2" + "github.com/streamingfast/substreams/reqctx" ) type getBlockFunc func() (uint64, error) @@ -55,7 +56,7 @@ func BuildRequestDetailsFromSubrequest(request *pbssinternal.ProcessRangeRequest Modules: request.Modules, OutputModule: request.OutputModule, ProductionMode: true, - IsSubRequest: true, + IsTier2Request: true, StopBlockNum: request.StopBlockNum, LinearHandoffBlockNum: request.StopBlockNum, ResolvedStartBlockNum: request.StartBlockNum, diff --git a/pipeline/snapshot.go b/pipeline/snapshot.go index d59febf2d..419dd1752 100644 --- a/pipeline/snapshot.go +++ b/pipeline/snapshot.go @@ -12,7 +12,7 @@ import ( ) func (p *Pipeline) sendSnapshots(ctx context.Context, storeMap store.Map) error { - if reqctx.Details(ctx).IsSubRequest { + if reqctx.Details(ctx).IsTier2Request { return nil } snapshotModules := reqctx.Details(ctx).DebugInitialStoreSnapshotForModules diff --git a/pipeline/stores.go b/pipeline/stores.go index aed384e00..1647a9a43 100644 --- a/pipeline/stores.go +++ b/pipeline/stores.go @@ -5,33 +5,40 @@ import ( "fmt" "time" + "go.opentelemetry.io/otel/attribute" + "go.uber.org/zap" + "github.com/streamingfast/substreams/block" pbssinternal "github.com/streamingfast/substreams/pb/sf/substreams/intern/v2" "github.com/streamingfast/substreams/reqctx" "github.com/streamingfast/substreams/storage/store" - "go.opentelemetry.io/otel/attribute" - "go.uber.org/zap" ) type Stores struct { - isSubRequest bool - bounder *storeBoundary - configs store.ConfigMap - StoreMap store.Map + isTier2Request bool // means we're processing a tier2 request + bounder *storeBoundary + configs store.ConfigMap + StoreMap store.Map + // DEPRECATED: we don't need to report back, these file names are now implicitly conveyed from + // tier1 to tier2. partialsWritten block.Ranges // when backprocessing, to report back to orchestrator tier string } -func NewStores(storeConfigs store.ConfigMap, storeSnapshotSaveInterval, requestStartBlockNum, stopBlockNum uint64, isSubRequest bool, tier string) *Stores { +func NewStores(storeConfigs store.ConfigMap, storeSnapshotSaveInterval, requestStartBlockNum, stopBlockNum uint64, isTier2Request bool) *Stores { // FIXME(abourget): a StoreBoundary should exist for EACH Store // because the module's Initial Block could change the range of each // store. + tier := "tier1" + if isTier2Request { + tier = "tier2" + } bounder := NewStoreBoundary(storeSnapshotSaveInterval, requestStartBlockNum, stopBlockNum) return &Stores{ - configs: storeConfigs, - isSubRequest: isSubRequest, - bounder: bounder, - tier: tier, + configs: storeConfigs, + isTier2Request: isTier2Request, + bounder: bounder, + tier: tier, } } @@ -50,7 +57,8 @@ func (s *Stores) resetStores() { func (s *Stores) flushStores(ctx context.Context, blockNum uint64) (err error) { logger := reqctx.Logger(ctx) reqStats := reqctx.ReqStats(ctx) - boundaryIntervals := s.bounder.GetStoreFlushRanges(s.isSubRequest, s.bounder.requestStopBlock, blockNum) + // FIXME: use Segmenters here, pleaseeee no bounder intervals gna gna gna. + boundaryIntervals := s.bounder.GetStoreFlushRanges(s.isTier2Request, s.bounder.requestStopBlock, blockNum) if len(boundaryIntervals) > 0 { logger.Info("flushing boundaries", zap.Uint64s("boundaries", boundaryIntervals)) } diff --git a/reqctx/request.go b/reqctx/request.go index 1ac4c0be0..3791ec9cf 100644 --- a/reqctx/request.go +++ b/reqctx/request.go @@ -23,7 +23,7 @@ type RequestDetails struct { UniqueID uint64 ProductionMode bool - IsSubRequest bool + IsTier2Request bool } func (d *RequestDetails) UniqueIDString() string { @@ -39,15 +39,15 @@ func (d *RequestDetails) IsOutputModule(modName string) bool { // leaf stores we've been asked to produce. We know the scheduler will have // created jobs to produce those stores we're skipping here. func (d *RequestDetails) SkipSnapshotSave(modName string) bool { - return d.IsSubRequest && !d.IsOutputModule(modName) + return d.IsTier2Request && !d.IsOutputModule(modName) } func (d *RequestDetails) ShouldReturnWrittenPartials(modName string) bool { - return d.IsSubRequest && d.IsOutputModule(modName) + return d.IsTier2Request && d.IsOutputModule(modName) } func (d *RequestDetails) ShouldReturnProgressMessages() bool { - return d.IsSubRequest + return d.IsTier2Request } func (d *RequestDetails) ShouldStreamCachedOutputs() bool { diff --git a/reqctx/request_test.go b/reqctx/request_test.go index bc3b7a292..cf11bacc5 100644 --- a/reqctx/request_test.go +++ b/reqctx/request_test.go @@ -7,8 +7,8 @@ import ( ) func TestRequestDetails_SkipSnapshotSave(t *testing.T) { - assert.True(t, (&RequestDetails{IsSubRequest: true, OutputModule: "A"}).SkipSnapshotSave("B")) - assert.False(t, (&RequestDetails{IsSubRequest: true, OutputModule: "A"}).SkipSnapshotSave("A")) - assert.False(t, (&RequestDetails{IsSubRequest: false, OutputModule: "A"}).SkipSnapshotSave("B")) - assert.False(t, (&RequestDetails{IsSubRequest: false, OutputModule: "A"}).SkipSnapshotSave("A")) + assert.True(t, (&RequestDetails{IsTier2Request: true, OutputModule: "A"}).SkipSnapshotSave("B")) + assert.False(t, (&RequestDetails{IsTier2Request: true, OutputModule: "A"}).SkipSnapshotSave("A")) + assert.False(t, (&RequestDetails{IsTier2Request: false, OutputModule: "A"}).SkipSnapshotSave("B")) + assert.False(t, (&RequestDetails{IsTier2Request: false, OutputModule: "A"}).SkipSnapshotSave("A")) } diff --git a/service/testing.go b/service/testing.go index 4e22b27d1..30a935011 100644 --- a/service/testing.go +++ b/service/testing.go @@ -5,6 +5,7 @@ import ( "fmt" "github.com/streamingfast/bstream/stream" + "github.com/streamingfast/substreams" pbssinternal "github.com/streamingfast/substreams/pb/sf/substreams/intern/v2" pbsubstreamsrpc "github.com/streamingfast/substreams/pb/sf/substreams/rpc/v2" @@ -57,7 +58,7 @@ func TestNewServiceTier2(runtimeConfig config.RuntimeConfig, streamFactoryFunc S } } -func (s *Tier2Service) TestBlocks(ctx context.Context, request *pbssinternal.ProcessRangeRequest, respFunc substreams.ResponseFunc, traceID *string) error { +func (s *Tier2Service) TestProcessRange(ctx context.Context, request *pbssinternal.ProcessRangeRequest, respFunc substreams.ResponseFunc, traceID *string) error { if traceID == nil { traceID = &TestTraceID } diff --git a/service/tier1.go b/service/tier1.go index 1913cedb9..dce1c2345 100644 --- a/service/tier1.go +++ b/service/tier1.go @@ -29,6 +29,7 @@ import ( "github.com/streamingfast/substreams" "github.com/streamingfast/substreams/client" "github.com/streamingfast/substreams/metrics" + "github.com/streamingfast/substreams/orchestrator/plan" "github.com/streamingfast/substreams/orchestrator/work" pbsubstreamsrpc "github.com/streamingfast/substreams/pb/sf/substreams/rpc/v2" "github.com/streamingfast/substreams/pipeline" @@ -271,7 +272,7 @@ func (s *Tier1Service) blocks(ctx context.Context, request *pbsubstreamsrpc.Requ return fmt.Errorf("configuring stores: %w", err) } - stores := pipeline.NewStores(storeConfigs, s.runtimeConfig.CacheSaveInterval, requestDetails.LinearHandoffBlockNum, request.StopBlockNum, false, "tier1") + stores := pipeline.NewStores(storeConfigs, s.runtimeConfig.CacheSaveInterval, requestDetails.LinearHandoffBlockNum, request.StopBlockNum, false) execOutputCacheEngine, err := cache.NewEngine(ctx, s.runtimeConfig, nil, s.blockType) if err != nil { @@ -323,7 +324,16 @@ func (s *Tier1Service) blocks(ctx context.Context, request *pbsubstreamsrpc.Requ // needs to be produced. // But it seems a bit more involved in here. - if err := pipe.InitStoresAndBackprocess(ctx); err != nil { + reqPlan := plan.BuildTier1RequestPlan( + requestDetails.ProductionMode, + s.runtimeConfig.SubrequestsSplitSize, + outputGraph.LowestInitBlock(), + requestDetails.ResolvedStartBlockNum, + requestDetails.LinearHandoffBlockNum, + requestDetails.StopBlockNum, + ) + + if err := pipe.InitTier1StoresAndBackprocess(ctx, reqPlan); err != nil { return fmt.Errorf("error during init_stores_and_backprocess: %w", err) } if requestDetails.LinearHandoffBlockNum == request.StopBlockNum { diff --git a/service/tier2.go b/service/tier2.go index ba8fbb8b2..af54338df 100644 --- a/service/tier2.go +++ b/service/tier2.go @@ -195,7 +195,7 @@ func (s *Tier2Service) processRange(ctx context.Context, request *pbssinternal.P if err != nil { return fmt.Errorf("configuring stores: %w", err) } - stores := pipeline.NewStores(storeConfigs, s.runtimeConfig.CacheSaveInterval, requestDetails.ResolvedStartBlockNum, request.StopBlockNum, true, "tier2") + stores := pipeline.NewStores(storeConfigs, s.runtimeConfig.CacheSaveInterval, requestDetails.ResolvedStartBlockNum, request.StopBlockNum, true) outputModule := outputGraph.OutputModule() execOutWriter := execout.NewWriter( @@ -238,7 +238,7 @@ func (s *Tier2Service) processRange(ctx context.Context, request *pbssinternal.P zap.Uint64("request_stop_block", request.StopBlockNum), zap.String("output_module", request.OutputModule), ) - if err := pipe.InitStoresAndBackprocess(ctx); err != nil { + if err := pipe.InitTier2Stores(ctx); err != nil { return fmt.Errorf("error building pipeline: %w", err) } diff --git a/test/integration_test.go b/test/integration_test.go index ed70187f0..c3ef11146 100644 --- a/test/integration_test.go +++ b/test/integration_test.go @@ -286,7 +286,7 @@ func TestOneStoreOneMap(t *testing.T) { stopBlock: 29, production: true, preWork: func(t *testing.T, run *testRun, workerFactory work.WorkerFactory) { - partialPreWork(t, 1, 10, "setup_test_store_add_i64", run, workerFactory, "11111111111111111111") + partialPreWork(t, 1, 10, 0, run, workerFactory, "11111111111111111111") }, expectedResponseCount: 28, expectFiles: []string{ @@ -307,8 +307,8 @@ func TestOneStoreOneMap(t *testing.T) { stopBlock: 29, production: true, preWork: func(t *testing.T, run *testRun, workerFactory work.WorkerFactory) { - partialPreWork(t, 1, 10, "setup_test_store_add_i64", run, workerFactory, "11111111111111111111") - partialPreWork(t, 1, 10, "setup_test_store_add_i64", run, workerFactory, "22222222222222222222") + partialPreWork(t, 1, 10, 0, run, workerFactory, "11111111111111111111") + partialPreWork(t, 1, 10, 0, run, workerFactory, "22222222222222222222") }, expectedResponseCount: 28, expectFiles: []string{ @@ -331,7 +331,7 @@ func TestOneStoreOneMap(t *testing.T) { production: true, preWork: func(t *testing.T, run *testRun, workerFactory work.WorkerFactory) { // Using an empty trace id brings up the old behavior where files are not suffixed with a trace id - partialPreWork(t, 1, 10, "setup_test_store_add_i64", run, workerFactory, "") + partialPreWork(t, 1, 10, 0, run, workerFactory, "") }, expectedResponseCount: 28, expectFiles: []string{ @@ -353,8 +353,8 @@ func TestOneStoreOneMap(t *testing.T) { production: true, preWork: func(t *testing.T, run *testRun, workerFactory work.WorkerFactory) { // Using an empty trace id brings up the old behavior where files are not suffixed with a trace id - partialPreWork(t, 1, 10, "setup_test_store_add_i64", run, workerFactory, "") - partialPreWork(t, 1, 10, "setup_test_store_add_i64", run, workerFactory, "11111111111111111111") + partialPreWork(t, 1, 10, 0, run, workerFactory, "") + partialPreWork(t, 1, 10, 0, run, workerFactory, "11111111111111111111") }, expectedResponseCount: 28, expectFiles: []string{ @@ -461,14 +461,14 @@ func assertFiles(t *testing.T, tempDir string, wantedFiles ...string) { assert.ElementsMatch(t, wantedFiles, actualFiles) } -func partialPreWork(t *testing.T, start, end uint64, module string, run *testRun, workerFactory work.WorkerFactory, traceID string) { +func partialPreWork(t *testing.T, start, end uint64, stageIdx int, run *testRun, workerFactory work.WorkerFactory, traceID string) { worker := workerFactory(zlog) worker.(*TestWorker).traceID = &traceID // FIXME: use the new `Work` interface here, and validate that the // caller to `partialPreWork` doesn't need to be changed too much? :) segmenter := block.NewSegmenter(10, 0, 0) - unit := stage.Unit{Segment: segmenter.IndexForStartBlock(start), Stage: 1} + unit := stage.Unit{Segment: segmenter.IndexForStartBlock(start), Stage: stageIdx} cmd := worker.Work(run.Context, unit, block.NewRange(start, end), nil) result := cmd() msg, ok := result.(work.MsgJobSucceeded) diff --git a/test/runnable_test.go b/test/runnable_test.go index 0fe3427f5..5a18ca5d9 100644 --- a/test/runnable_test.go +++ b/test/runnable_test.go @@ -5,7 +5,6 @@ import ( "encoding/hex" "errors" "fmt" - "go.opentelemetry.io/otel/attribute" "io" "os" "path/filepath" @@ -13,10 +12,17 @@ import ( "testing" "time" + "go.opentelemetry.io/otel/attribute" + "github.com/streamingfast/bstream" "github.com/streamingfast/dstore" tracing "github.com/streamingfast/sf-tracing" "github.com/streamingfast/shutter" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "go.opentelemetry.io/otel" + "go.uber.org/zap" + "github.com/streamingfast/substreams/manifest" "github.com/streamingfast/substreams/orchestrator/work" pbssinternal "github.com/streamingfast/substreams/pb/sf/substreams/intern/v2" @@ -26,10 +32,6 @@ import ( "github.com/streamingfast/substreams/reqctx" "github.com/streamingfast/substreams/service" "github.com/streamingfast/substreams/service/config" - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" - "go.opentelemetry.io/otel" - "go.uber.org/zap" ) type testPreWork func(t *testing.T, run *testRun, workerFactory work.WorkerFactory) @@ -236,12 +238,8 @@ func processInternalRequest( workerFactory work.WorkerFactory, newGenerator BlockGeneratorFactory, responseCollector *responseCollector, - isSubRequest bool, blockProcessedCallBack blockProcessedCallBack, testTempDir string, - subrequestsSplitSize uint64, - parallelSubrequests uint64, - linearHandoffBlockNum uint64, traceID *string, ) error { t.Helper() @@ -257,16 +255,16 @@ func processInternalRequest( } runtimeConfig := config.NewRuntimeConfig( 10, - subrequestsSplitSize, - parallelSubrequests, - 10, + 0, + 0, + 0, 0, baseStoreStore, workerFactory, ) svc := service.TestNewServiceTier2(runtimeConfig, tr.StreamFactory) - return svc.TestBlocks(ctx, request, responseCollector.Collect, traceID) + return svc.TestProcessRange(ctx, request, responseCollector.Collect, traceID) } func processRequest( diff --git a/test/worker_test.go b/test/worker_test.go index 6d7978bb8..3b732c379 100644 --- a/test/worker_test.go +++ b/test/worker_test.go @@ -41,16 +41,15 @@ func (w *TestWorker) Work(ctx context.Context, unit stage.Unit, workRange *block logger = logger.With(zap.Uint64("workerId", w.id)) ctx = reqctx.WithLogger(ctx, logger) - logger.Info("worker running job", + logger.Info("worker running test job", zap.String("output_module", request.OutputModule), zap.Uint64("start_block_num", request.StartBlockNum), zap.Uint64("stop_block_num", request.StopBlockNum), ) - subrequestsSplitSize := uint64(10) return func() loop.Msg { - if err := processInternalRequest(w.t, ctx, request, nil, w.newBlockGenerator, w.responseCollector, true, w.blockProcessedCallBack, w.testTempDir, subrequestsSplitSize, 1, 0, w.traceID); err != nil { - return work.MsgJobFailed{Unit: unit, Error: fmt.Errorf("processing sub request: %w", err)} + if err := processInternalRequest(w.t, ctx, request, nil, w.newBlockGenerator, w.responseCollector, w.blockProcessedCallBack, w.testTempDir, w.traceID); err != nil { + return work.MsgJobFailed{Unit: unit, Error: fmt.Errorf("processing test tier2 request: %w", err)} } logger.Info("worker done running job", zap.String("output_module", request.OutputModule),