diff --git a/orchestrator/parallelprocessor.go b/orchestrator/parallelprocessor.go index 68349449b..4b00aba29 100644 --- a/orchestrator/parallelprocessor.go +++ b/orchestrator/parallelprocessor.go @@ -78,8 +78,8 @@ func BuildParallelProcessor( // FIXME: Are all the progress messages properly sent? When we skip some stores and mark them complete, // for whatever reason, - execOutSegmenter := reqPlan.WriteOutSegmenter() - if execOutSegmenter != nil { + if reqPlan.WriteExecOut != nil { + execOutSegmenter := reqPlan.WriteOutSegmenter() // note: since we are *NOT* in a sub-request and are setting up output module is a map requestedModule := outputGraph.OutputModule() if requestedModule.GetKindStore() != nil { diff --git a/orchestrator/stage/stage.go b/orchestrator/stage/stage.go index 54f462b63..a5a64d27f 100644 --- a/orchestrator/stage/stage.go +++ b/orchestrator/stage/stage.go @@ -60,6 +60,6 @@ func stageKind(mods []*pbsubstreams.Module) Kind { type Kind int const ( - KindMap = iota + KindMap = Kind(iota) KindStore ) diff --git a/orchestrator/stage/stages.go b/orchestrator/stage/stages.go index 9e65f3205..f7f654e8d 100644 --- a/orchestrator/stage/stages.go +++ b/orchestrator/stage/stages.go @@ -57,18 +57,18 @@ func NewStages( logger := reqctx.Logger(ctx) stagedModules := outputGraph.StagedUsedModules() - lastIndex := len(stagedModules) - 1 out = &Stages{ ctx: ctx, traceID: traceID, segmenter: segmenter, segmentOffset: segmenter.IndexForStartBlock(outputGraph.LowestInitBlock()), } - for idx, mods := range stagedModules { - isLastStage := idx == lastIndex - kind := stageKind(mods) - if kind == KindMap && !isLastStage { - continue + for idx, stageLayer := range stagedModules { + mods := stageLayer.LastLayer() + + kind := KindMap + if mods.IsStoreLayer() { + kind = KindStore } var moduleStates []*ModuleState @@ -96,7 +96,7 @@ func (s *Stages) AllStagesFinished() bool { } for idx, stage := range s.stages { - if stage.kind == KindMap { + if stage.kind != KindStore { continue } if s.getState(Unit{Segment: lastSegment, Stage: idx}) != UnitCompleted { @@ -129,7 +129,10 @@ func (s *Stages) InitialProgressMessages() map[string]block.Ranges { func (s *Stages) CmdStartMerge() loop.Cmd { var cmds []loop.Cmd - for idx := range s.stages { + for idx, stage := range s.stages { + if stage.kind != KindStore { + continue + } cmds = append(cmds, s.CmdTryMerge(idx)) } return loop.Batch(cmds...) diff --git a/pipeline/launch.go b/pipeline/launch.go index e7f112275..af3f41b85 100644 --- a/pipeline/launch.go +++ b/pipeline/launch.go @@ -73,17 +73,17 @@ func (p *Pipeline) OnStreamTerminated(ctx context.Context, err error) error { return fmt.Errorf("sending bytes meter %w", err) } - if p.stores.partialsWritten != nil { - p.respFunc(&pbssinternal.ProcessRangeResponse{ - ModuleName: reqDetails.OutputModule, - Type: &pbssinternal.ProcessRangeResponse_Completed{ - Completed: &pbssinternal.Completed{ - AllProcessedRanges: toPBInternalBlockRanges(p.stores.partialsWritten), - TraceId: p.traceID, - }, - }, - }) - } + //if p.stores.partialsWritten != nil { + // p.respFunc(&pbssinternal.ProcessRangeResponse{ + // ModuleName: reqDetails.OutputModule, + // Type: &pbssinternal.ProcessRangeResponse_Completed{ + // Completed: &pbssinternal.Completed{ + // AllProcessedRanges: toPBInternalBlockRanges(p.stores.partialsWritten), + // TraceId: p.traceID, + // }, + // }, + // }) + //} return nil } diff --git a/pipeline/outputmodules/graph.go b/pipeline/outputmodules/graph.go index 53b2411de..c2cfd3242 100644 --- a/pipeline/outputmodules/graph.go +++ b/pipeline/outputmodules/graph.go @@ -9,8 +9,8 @@ import ( type Graph struct { requestModules *pbsubstreams.Modules - usedModules []*pbsubstreams.Module // all modules that need to be processed (requested directly or a required module ancestor) - stagedUsedModules [][]*pbsubstreams.Module // all modules that need to be processed (requested directly or a required module ancestor) + usedModules []*pbsubstreams.Module // all modules that need to be processed (requested directly or a required module ancestor) + stagedUsedModules ExecutionStages // all modules that need to be processed (requested directly or a required module ancestor) moduleHashes *manifest.ModuleHashes stores []*pbsubstreams.Module // subset of allModules: only the stores lowestInitBlock uint64 @@ -21,13 +21,13 @@ type Graph struct { schedulableAncestorsMap map[string][]string // modules that are ancestors (therefore dependencies) of a given module } -func (g *Graph) OutputModule() *pbsubstreams.Module { return g.outputModule } -func (g *Graph) Stores() []*pbsubstreams.Module { return g.stores } -func (g *Graph) UsedModules() []*pbsubstreams.Module { return g.usedModules } -func (g *Graph) StagedUsedModules() [][]*pbsubstreams.Module { return g.stagedUsedModules } -func (g *Graph) IsOutputModule(name string) bool { return g.outputModule.Name == name } -func (g *Graph) ModuleHashes() *manifest.ModuleHashes { return g.moduleHashes } -func (g *Graph) LowestInitBlock() uint64 { return g.lowestInitBlock } +func (g *Graph) OutputModule() *pbsubstreams.Module { return g.outputModule } +func (g *Graph) Stores() []*pbsubstreams.Module { return g.stores } +func (g *Graph) UsedModules() []*pbsubstreams.Module { return g.usedModules } +func (g *Graph) StagedUsedModules() ExecutionStages { return g.stagedUsedModules } +func (g *Graph) IsOutputModule(name string) bool { return g.outputModule.Name == name } +func (g *Graph) ModuleHashes() *manifest.ModuleHashes { return g.moduleHashes } +func (g *Graph) LowestInitBlock() uint64 { return g.lowestInitBlock } func NewOutputModuleGraph(outputModule string, productionMode bool, modules *pbsubstreams.Modules) (out *Graph, err error) { out = &Graph{ @@ -88,14 +88,40 @@ func computeLowestInitBlock(modules []*pbsubstreams.Module) (out uint64) { return lowest } -func computeStages(mods []*pbsubstreams.Module) (stages [][]*pbsubstreams.Module) { +// A list of units that we can schedule, that might include some mappers and a store, +// or the last module could be an exeuction layer with only a map. +type ExecutionStages []stageLayers + +// For a given execution stage, the layers of execution, for example: +// a layer of mappers, followed by a layer of stores. +type stageLayers []layerModules + +func (l stageLayers) isStoreStage() bool { + return l.LastLayer().IsStoreLayer() +} + +func (l stageLayers) LastLayer() layerModules { + return l[len(l)-1] +} + +// The list of modules in a given layer of either maps or stores. A given layer +// will always be comprised of only the same kind of modules. +type layerModules []*pbsubstreams.Module + +func (l layerModules) IsStoreLayer() bool { + return l[0].GetKindStore() != nil +} + +func computeStages(mods []*pbsubstreams.Module) (stages ExecutionStages) { seen := map[string]bool{} + var layers stageLayers + for i := 0; ; i++ { if len(seen) == len(mods) { break } - var stage []*pbsubstreams.Module + var layer layerModules modLoop: for _, mod := range mods { switch mod.Kind.(type) { @@ -132,15 +158,29 @@ func computeStages(mods []*pbsubstreams.Module) (stages [][]*pbsubstreams.Module } } - stage = append(stage, mod) + layer = append(layer, mod) } - if len(stage) != 0 { - stages = append(stages, stage) - for _, mod := range stage { + if len(layer) != 0 { + layers = append(layers, layer) + for _, mod := range layer { seen[mod.Name] = true } } } + + lastLayerIndex := len(layers) - 1 + var newStage stageLayers + for idx, layer := range layers { + isLastStage := idx == lastLayerIndex + isStoreLayer := layer.IsStoreLayer() + + newStage = append(newStage, layer) + if isStoreLayer || isLastStage { + stages = append(stages, newStage) + newStage = nil + } + } + return stages } diff --git a/pipeline/outputmodules/graph_test.go b/pipeline/outputmodules/graph_test.go index 99dc2dca0..642c203d9 100644 --- a/pipeline/outputmodules/graph_test.go +++ b/pipeline/outputmodules/graph_test.go @@ -1,13 +1,117 @@ package outputmodules import ( + "fmt" + "strings" "testing" + "github.com/stretchr/testify/assert" + pbsubstreamsrpc "github.com/streamingfast/substreams/pb/sf/substreams/rpc/v2" pbsubstreams "github.com/streamingfast/substreams/pb/sf/substreams/v1" - "github.com/stretchr/testify/assert" ) +func TestGraph_computeStages(t *testing.T) { + tests := []struct { + name string + input string + expect string + }{ + { + name: "some graph", + input: "Sa Mb Mc Sd:Sa,Mb Me:Sd", + expect: "[[Sa]] [[Mb Mc] [Sd]] [[Me]]", + }, + { + name: "other graph", + input: "Ma Mb:Ma Sc:Mb", + expect: "[[Ma] [Mb] [Sc]]", + }, + { + name: "third graph", + input: "Ma Mb:Ma Sc:Mb Md:Sc Se:Md", + expect: "[[Ma] [Mb] [Sc]] [[Md] [Se]]", + }, + { + name: "fourth graph", + input: "Ma Mb:Ma Sc:Mb Md:Sc Se:Md,Sg Mf:Ma Sg:Mf", + expect: "[[Ma] [Mb Mf] [Sc Sg]] [[Md] [Se]]", + }, + { + name: "fifth graph", + input: "Ma Mb:Ma Sc:Mb Md:Sc Se:Md,Sg Mf:Ma Sg:Mf Mh:Se,Ma", + expect: "[[Ma] [Mb Mf] [Sc Sg]] [[Md] [Se]] [[Mh]]", + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + out := computeStages(computeStagesInput(test.input)) + assert.Equal(t, test.expect, computeStagesOutput(out)) + }) + } +} + +func computeStagesInput(in string) (out []*pbsubstreams.Module) { + for _, mod := range strings.Split(in, " ") { + if mod == "" { + continue + } + params := strings.Split(mod, ":") + modName := params[0] + newMod := &pbsubstreams.Module{} + switch modName[0] { + case 'S': + newMod.Kind = &pbsubstreams.Module_KindStore_{KindStore: &pbsubstreams.Module_KindStore{}} + newMod.Name = modName[1:] + case 'M': + newMod.Kind = &pbsubstreams.Module_KindMap_{KindMap: &pbsubstreams.Module_KindMap{}} + newMod.Name = modName[1:] + default: + panic("invalid prefix in word: " + modName) + } + if len(params) > 1 { + for _, input := range strings.Split(params[1], ",") { + inputName := input[1:] + switch input[0] { + case 'S': + newMod.Inputs = append(newMod.Inputs, &pbsubstreams.Module_Input{Input: &pbsubstreams.Module_Input_Store_{Store: &pbsubstreams.Module_Input_Store{ModuleName: inputName}}}) + case 'M': + newMod.Inputs = append(newMod.Inputs, &pbsubstreams.Module_Input{Input: &pbsubstreams.Module_Input_Map_{Map: &pbsubstreams.Module_Input_Map{ModuleName: inputName}}}) + case 'P': + newMod.Inputs = append(newMod.Inputs, &pbsubstreams.Module_Input{Input: &pbsubstreams.Module_Input_Params_{}}) + case 'R': + newMod.Inputs = append(newMod.Inputs, &pbsubstreams.Module_Input{Input: &pbsubstreams.Module_Input_Source_{}}) + default: + panic("invalid input prefix: " + input) + } + } + } + out = append(out, newMod) + } + return +} + +func computeStagesOutput(in ExecutionStages) string { + var level1 []string + for _, l1 := range in { + var level2 []string + for _, l2 := range l1 { + var level3 []string + for _, l3 := range l2 { + modKind := "S" + if l3.GetKindMap() != nil { + modKind = "M" + } + level3 = append(level3, modKind+l3.Name) + } + level2 = append(level2, fmt.Sprintf("%v", level3)) + } + level1 = append(level1, fmt.Sprintf("%v", level2)) + } + return strings.Join(level1, " ") +} + func TestGraph_computeSchedulableModules(t *testing.T) { tests := []struct { name string @@ -52,5 +156,4 @@ func TestGraph_computeSchedulableModules(t *testing.T) { assert.Equal(t, test.expect, out) }) } - } diff --git a/pipeline/outputmodules/testing.go b/pipeline/outputmodules/testing.go index 606eedd2e..1929b16fe 100644 --- a/pipeline/outputmodules/testing.go +++ b/pipeline/outputmodules/testing.go @@ -21,31 +21,40 @@ func TestGraphStagedModules(initialBlock1, ib2, ib3, ib4, ib5 uint64) *Graph { lowest = utils.MinOf(lowest, ib5) return &Graph{ lowestInitBlock: lowest, - stagedUsedModules: [][]*pbsubstreams.Module{ + stagedUsedModules: ExecutionStages{ { - &pbsubstreams.Module{ - Kind: &pbsubstreams.Module_KindMap_{KindMap: &pbsubstreams.Module_KindMap{}}, - InitialBlock: initialBlock1, + { + &pbsubstreams.Module{ + Kind: &pbsubstreams.Module_KindMap_{KindMap: &pbsubstreams.Module_KindMap{}}, + InitialBlock: initialBlock1, + }, + }, { + &pbsubstreams.Module{ + Kind: &pbsubstreams.Module_KindStore_{KindStore: &pbsubstreams.Module_KindStore{}}, + InitialBlock: ib2, + }, }, - }, { - &pbsubstreams.Module{ - Kind: &pbsubstreams.Module_KindStore_{KindStore: &pbsubstreams.Module_KindStore{}}, - InitialBlock: ib2, - }, - }, { - &pbsubstreams.Module{ - Kind: &pbsubstreams.Module_KindMap_{KindMap: &pbsubstreams.Module_KindMap{}}, - InitialBlock: ib3, - }, - }, { - &pbsubstreams.Module{ - Kind: &pbsubstreams.Module_KindStore_{KindStore: &pbsubstreams.Module_KindStore{}}, - InitialBlock: ib4, + }, + { + + { + &pbsubstreams.Module{ + Kind: &pbsubstreams.Module_KindMap_{KindMap: &pbsubstreams.Module_KindMap{}}, + InitialBlock: ib3, + }, + }, { + &pbsubstreams.Module{ + Kind: &pbsubstreams.Module_KindStore_{KindStore: &pbsubstreams.Module_KindStore{}}, + InitialBlock: ib4, + }, }, - }, { - &pbsubstreams.Module{ - Kind: &pbsubstreams.Module_KindMap_{KindMap: &pbsubstreams.Module_KindMap{}}, - InitialBlock: ib5, + }, + { + { + &pbsubstreams.Module{ + Kind: &pbsubstreams.Module_KindMap_{KindMap: &pbsubstreams.Module_KindMap{}}, + InitialBlock: ib5, + }, }, }, }, diff --git a/pipeline/pipeline.go b/pipeline/pipeline.go index c5b4ff476..5d6297773 100644 --- a/pipeline/pipeline.go +++ b/pipeline/pipeline.go @@ -156,7 +156,7 @@ func (p *Pipeline) InitWASM(ctx context.Context) (err error) { stagedModules := p.outputGraph.StagedUsedModules() - // truncate stages to highest + // truncate stages to highest scheduled stage if highest := p.highestStage; highest != nil { if len(stagedModules) < *highest+1 { return fmt.Errorf("invalid stage %d, there aren't that many", highest) @@ -407,80 +407,85 @@ func (p *Pipeline) returnInternalModuleProgressOutputs(clock *pbsubstreams.Clock // them over there. // moduleExecutorsInitialized bool // moduleExecutors []exec.ModuleExecutor -func (p *Pipeline) buildWASM(ctx context.Context, stages [][]*pbsubstreams.Module) error { +func (p *Pipeline) buildWASM(ctx context.Context, stages outputmodules.ExecutionStages) error { reqModules := reqctx.Details(ctx).Modules tracer := otel.GetTracerProvider().Tracer("executor") loadedModules := make(map[uint32]wasm.Module) for _, stage := range stages { - for _, module := range stage { - if _, exists := loadedModules[module.BinaryIndex]; exists { - continue - } - code := reqModules.Binaries[module.BinaryIndex] - m, err := p.wasmRuntime.NewModule(ctx, code.Content) - if err != nil { - return fmt.Errorf("new wasm module: %w", err) + for _, layer := range stage { + for _, module := range layer { + if _, exists := loadedModules[module.BinaryIndex]; exists { + continue + } + code := reqModules.Binaries[module.BinaryIndex] + m, err := p.wasmRuntime.NewModule(ctx, code.Content) + if err != nil { + return fmt.Errorf("new wasm module: %w", err) + } + loadedModules[module.BinaryIndex] = m } - loadedModules[module.BinaryIndex] = m } } + p.loadedModules = loadedModules var stagedModuleExecutors [][]exec.ModuleExecutor for _, stage := range stages { - var moduleExecutors []exec.ModuleExecutor - for _, module := range stage { - inputs, err := p.renderWasmInputs(module) - if err != nil { - return fmt.Errorf("module %q: get wasm inputs: %w", module.Name, err) - } + for _, layer := range stage { + var moduleExecutors []exec.ModuleExecutor + for _, module := range layer { + inputs, err := p.renderWasmInputs(module) + if err != nil { + return fmt.Errorf("module %q: get wasm inputs: %w", module.Name, err) + } - entrypoint := module.BinaryEntrypoint - mod := loadedModules[module.BinaryIndex] - - switch kind := module.Kind.(type) { - case *pbsubstreams.Module_KindMap_: - outType := strings.TrimPrefix(module.Output.Type, "proto:") - baseExecutor := exec.NewBaseExecutor( - ctx, - module.Name, - mod, - p.wasmRuntime.InstanceCacheEnabled(), - inputs, - entrypoint, - tracer, - ) - executor := exec.NewMapperModuleExecutor(baseExecutor, outType) - moduleExecutors = append(moduleExecutors, executor) - - case *pbsubstreams.Module_KindStore_: - updatePolicy := kind.KindStore.UpdatePolicy - valueType := kind.KindStore.ValueType - - outputStore, found := p.stores.StoreMap.Get(module.Name) - if !found { - return fmt.Errorf("store %q not found", module.Name) + entrypoint := module.BinaryEntrypoint + mod := loadedModules[module.BinaryIndex] + + switch kind := module.Kind.(type) { + case *pbsubstreams.Module_KindMap_: + outType := strings.TrimPrefix(module.Output.Type, "proto:") + baseExecutor := exec.NewBaseExecutor( + ctx, + module.Name, + mod, + p.wasmRuntime.InstanceCacheEnabled(), + inputs, + entrypoint, + tracer, + ) + executor := exec.NewMapperModuleExecutor(baseExecutor, outType) + moduleExecutors = append(moduleExecutors, executor) + + case *pbsubstreams.Module_KindStore_: + updatePolicy := kind.KindStore.UpdatePolicy + valueType := kind.KindStore.ValueType + + outputStore, found := p.stores.StoreMap.Get(module.Name) + if !found { + return fmt.Errorf("store %q not found", module.Name) + } + inputs = append(inputs, wasm.NewStoreWriterOutput(module.Name, outputStore, updatePolicy, valueType)) + + baseExecutor := exec.NewBaseExecutor( + ctx, + module.Name, + mod, + p.wasmRuntime.InstanceCacheEnabled(), + inputs, + entrypoint, + tracer, + ) + executor := exec.NewStoreModuleExecutor(baseExecutor, outputStore) + moduleExecutors = append(moduleExecutors, executor) + + default: + panic(fmt.Errorf("invalid kind %q input module %q", module.Kind, module.Name)) } - inputs = append(inputs, wasm.NewStoreWriterOutput(module.Name, outputStore, updatePolicy, valueType)) - - baseExecutor := exec.NewBaseExecutor( - ctx, - module.Name, - mod, - p.wasmRuntime.InstanceCacheEnabled(), - inputs, - entrypoint, - tracer, - ) - executor := exec.NewStoreModuleExecutor(baseExecutor, outputStore) - moduleExecutors = append(moduleExecutors, executor) - - default: - panic(fmt.Errorf("invalid kind %q input module %q", module.Kind, module.Name)) } + stagedModuleExecutors = append(stagedModuleExecutors, moduleExecutors) } - stagedModuleExecutors = append(stagedModuleExecutors, moduleExecutors) } p.moduleExecutors = stagedModuleExecutors diff --git a/pipeline/process_block.go b/pipeline/process_block.go index e5ec058e8..c0fcbb369 100644 --- a/pipeline/process_block.go +++ b/pipeline/process_block.go @@ -185,13 +185,20 @@ func (p *Pipeline) handleStepNew(ctx context.Context, block *bstream.Block, cloc return io.EOF } + // FIXME: when handling the real-time segment, it's dangerous + // to save the stores, as they might have components that get + // reverted, and we won't go change the stores then. + // So we _shouldn't_ save the stores unless we're in irreversible-only + // mode. Basically, tier1 shouldn't save unless it's a StepNewIrreversible + // (we're in a historical segment) + // When we're in the real-time segment, we shouldn't save anything. if err := p.stores.flushStores(ctx, clock.Number); err != nil { return fmt.Errorf("step new irr: stores end of stream: %w", err) } // note: if we start on a forked cursor, the undo signal will appear BEFORE we send the snapshot - if p.gate.shouldSendSnapshot() { - if err := p.sendSnapshots(ctx, p.stores.StoreMap); err != nil { + if p.gate.shouldSendSnapshot() && !reqDetails.IsTier2Request { + if err := p.sendSnapshots(p.stores.StoreMap, reqDetails.DebugInitialStoreSnapshotForModules); err != nil { return fmt.Errorf("send initial snapshots: %w", err) } } diff --git a/pipeline/resolve.go b/pipeline/resolve.go index 437af006c..a012e3ba4 100644 --- a/pipeline/resolve.go +++ b/pipeline/resolve.go @@ -57,6 +57,7 @@ func BuildRequestDetailsFromSubrequest(request *pbssinternal.ProcessRangeRequest OutputModule: request.OutputModule, ProductionMode: true, IsTier2Request: true, + Tier2Stage: int(request.Stage), StopBlockNum: request.StopBlockNum, LinearHandoffBlockNum: request.StopBlockNum, ResolvedStartBlockNum: request.StartBlockNum, diff --git a/pipeline/snapshot.go b/pipeline/snapshot.go index 419dd1752..9953acd17 100644 --- a/pipeline/snapshot.go +++ b/pipeline/snapshot.go @@ -1,21 +1,15 @@ package pipeline import ( - "context" "fmt" "github.com/streamingfast/substreams/storage/store" "github.com/streamingfast/substreams" pbsubstreamsrpc "github.com/streamingfast/substreams/pb/sf/substreams/rpc/v2" - "github.com/streamingfast/substreams/reqctx" ) -func (p *Pipeline) sendSnapshots(ctx context.Context, storeMap store.Map) error { - if reqctx.Details(ctx).IsTier2Request { - return nil - } - snapshotModules := reqctx.Details(ctx).DebugInitialStoreSnapshotForModules +func (p *Pipeline) sendSnapshots(storeMap store.Map, snapshotModules []string) error { if len(snapshotModules) == 0 { return nil } diff --git a/pipeline/stores.go b/pipeline/stores.go index 1647a9a43..54887bf01 100644 --- a/pipeline/stores.go +++ b/pipeline/stores.go @@ -57,7 +57,7 @@ func (s *Stores) resetStores() { func (s *Stores) flushStores(ctx context.Context, blockNum uint64) (err error) { logger := reqctx.Logger(ctx) reqStats := reqctx.ReqStats(ctx) - // FIXME: use Segmenters here, pleaseeee no bounder intervals gna gna gna. + boundaryIntervals := s.bounder.GetStoreFlushRanges(s.isTier2Request, s.bounder.requestStopBlock, blockNum) if len(boundaryIntervals) > 0 { logger.Info("flushing boundaries", zap.Uint64s("boundaries", boundaryIntervals)) @@ -68,7 +68,6 @@ func (s *Stores) flushStores(ctx context.Context, blockNum uint64) (err error) { if err := s.saveStoresSnapshots(ctx, boundaryBlock); err != nil { return fmt.Errorf("saving stores snapshot at bound %d: %w", boundaryBlock, err) } - reqStats.RecordFlush(time.Since(t0)) } return nil @@ -111,7 +110,7 @@ func (s *Stores) saveStoreSnapshot(ctx context.Context, saveStore store.Store, b } if reqctx.Details(ctx).ShouldReturnWrittenPartials(saveStore.Name()) { - s.partialsWritten = append(s.partialsWritten, file.Range) + //s.partialsWritten = append(s.partialsWritten, file.Range) reqctx.Logger(ctx).Debug("adding partials written", zap.Stringer("range", file.Range), zap.Stringer("ranges", s.partialsWritten), diff --git a/reqctx/request.go b/reqctx/request.go index 3791ec9cf..992ee7f86 100644 --- a/reqctx/request.go +++ b/reqctx/request.go @@ -24,6 +24,7 @@ type RequestDetails struct { ProductionMode bool IsTier2Request bool + Tier2Stage int } func (d *RequestDetails) UniqueIDString() string { @@ -39,7 +40,10 @@ func (d *RequestDetails) IsOutputModule(modName string) bool { // leaf stores we've been asked to produce. We know the scheduler will have // created jobs to produce those stores we're skipping here. func (d *RequestDetails) SkipSnapshotSave(modName string) bool { - return d.IsTier2Request && !d.IsOutputModule(modName) + // TODO: we need to save + // func ShouldSaveStoreSnapshot() + return !d.IsTier2Request || d.IsOutputModule(modName) + //return d.IsTier2Request && !d.IsOutputModule(modName) } func (d *RequestDetails) ShouldReturnWrittenPartials(modName string) bool { diff --git a/service/tier1.go b/service/tier1.go index dce1c2345..920b2376f 100644 --- a/service/tier1.go +++ b/service/tier1.go @@ -310,7 +310,7 @@ func (s *Tier1Service) blocks(ctx context.Context, request *pbsubstreamsrpc.Requ requestStats.Start(10 * time.Second) defer requestStats.Shutdown() } - logger.Info("initializing pipeline", + logger.Info("initializing tier1 pipeline", zap.Int64("request_start_block", request.StartBlockNum), zap.Uint64("resolved_start_block", requestDetails.ResolvedStartBlockNum), zap.Uint64("request_stop_block", request.StopBlockNum), diff --git a/service/tier2.go b/service/tier2.go index af54338df..ad35d998a 100644 --- a/service/tier2.go +++ b/service/tier2.go @@ -223,8 +223,8 @@ func (s *Tier2Service) processRange(ctx context.Context, request *pbssinternal.P execOutputCacheEngine, s.runtimeConfig, respFunc, - // This must always be the parent/global trace id, the one that comes from tier1 "tier2", + // This must always be the parent/global trace id, the one that comes from tier1 parentTraceID, opts..., ) @@ -233,10 +233,11 @@ func (s *Tier2Service) processRange(ctx context.Context, request *pbssinternal.P requestStats.Start(10 * time.Second) defer requestStats.Shutdown() } - logger.Info("initializing pipeline", + logger.Info("initializing tier2 pipeline", zap.Uint64("request_start_block", requestDetails.ResolvedStartBlockNum), zap.Uint64("request_stop_block", request.StopBlockNum), zap.String("output_module", request.OutputModule), + zap.Uint32("stage", request.Stage), ) if err := pipe.InitTier2Stores(ctx); err != nil { return fmt.Errorf("error building pipeline: %w", err)