Skip to content

Commit

Permalink
Rework of tier1 and tier2 splut with the RequestPLan.
Browse files Browse the repository at this point in the history
Renamed IsSubRequest to IsTier2Request, and a few other simplifications for
the two different flows.

Build the Tier1Request plan way up in the first call plan.
  • Loading branch information
Alexandre Bourget committed Jun 29, 2023
1 parent 4112332 commit 1b9fb05
Show file tree
Hide file tree
Showing 19 changed files with 148 additions and 100 deletions.
37 changes: 17 additions & 20 deletions orchestrator/execout/execout_walker.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"go.uber.org/zap"
"google.golang.org/protobuf/types/known/anypb"

"github.com/streamingfast/substreams/block"
"github.com/streamingfast/substreams/orchestrator/loop"
"github.com/streamingfast/substreams/orchestrator/response"
pbsubstreamsrpc "github.com/streamingfast/substreams/pb/sf/substreams/rpc/v2"
Expand All @@ -21,33 +22,29 @@ import (
)

type Walker struct {
ctx context.Context
requestStartBlock uint64
exclusiveEndBlock uint64
fileWalker *execout.FileWalker
streamOut *response.Stream
module *pbsubstreams.Module
logger *zap.Logger
ctx context.Context
*block.Range
fileWalker *execout.FileWalker
streamOut *response.Stream
module *pbsubstreams.Module
logger *zap.Logger
}

func NewWalker(
ctx context.Context,
module *pbsubstreams.Module,
fileWalker *execout.FileWalker,
startBlock uint64,
exclusiveEndBlock uint64,
walkRange *block.Range,
stream *response.Stream,

) *Walker {
logger := reqctx.Logger(ctx)
return &Walker{
ctx: ctx,
module: module,
fileWalker: fileWalker,
requestStartBlock: startBlock,
exclusiveEndBlock: exclusiveEndBlock,
streamOut: stream,
logger: logger,
ctx: ctx,
module: module,
fileWalker: fileWalker,
Range: walkRange,
streamOut: stream,
logger: logger,
}
}

Expand Down Expand Up @@ -82,7 +79,7 @@ func (r *Walker) sendItems(sortedItems []*pboutput.Item) error {
if item == nil {
continue // why would that happen?!
}
if item.BlockNum < r.requestStartBlock {
if item.BlockNum < r.StartBlock {
continue
}

Expand All @@ -95,9 +92,9 @@ func (r *Walker) sendItems(sortedItems []*pboutput.Item) error {
return fmt.Errorf("calling response func: %w", err)
}

if blockScopedData.Clock.Number >= r.exclusiveEndBlock {
if blockScopedData.Clock.Number >= r.ExclusiveEndBlock {
r.logger.Info("stop pulling block scoped data, end block reach",
zap.Uint64("exclusive_end_block_num", r.exclusiveEndBlock),
zap.Uint64("exclusive_end_block_num", r.ExclusiveEndBlock),
zap.Uint64("cache_item_block_num", blockScopedData.Clock.Number),
)
return nil
Expand Down
13 changes: 2 additions & 11 deletions orchestrator/parallelprocessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,22 +25,14 @@ type ParallelProcessor struct {
// BuildParallelProcessor is only called on tier1
func BuildParallelProcessor(
ctx context.Context,
reqDetails *reqctx.RequestDetails,
reqPlan *plan.RequestPlan,
runtimeConfig config.RuntimeConfig,
outputGraph *outputmodules.Graph,
execoutStorage *execout.Configs,
respFunc func(resp substreams.ResponseFromAnyTier) error,
storeConfigs store.ConfigMap,
traceID string,
) (*ParallelProcessor, error) {
reqPlan := plan.BuildTier1RequestPlan(
reqDetails.ProductionMode,
runtimeConfig.SubrequestsSplitSize,
outputGraph.LowestInitBlock(),
reqDetails.ResolvedStartBlockNum,
reqDetails.LinearHandoffBlockNum,
reqDetails.StopBlockNum,
)

stream := response.New(respFunc)
sched := scheduler.New(ctx, stream)
Expand Down Expand Up @@ -100,8 +92,7 @@ func BuildParallelProcessor(
ctx,
requestedModule,
walker,
reqDetails.ResolvedStartBlockNum,
reqDetails.LinearHandoffBlockNum,
reqPlan.WriteExecOut,
stream,
)
}
Expand Down
3 changes: 3 additions & 0 deletions orchestrator/plan/requestplan.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,9 @@ func BuildTier1RequestPlan(productionMode bool, segmentInterval uint64, graphIni
if resolvedStartBlock < graphInitBlock {
panic(fmt.Errorf("start block cannot be prior to the lowest init block in the requested module graph (%d)", graphInitBlock))
}
if resolvedStartBlock == linearHandoffBlock && graphInitBlock == resolvedStartBlock {
return plan
}
if productionMode {
storesStopOnBound := plan.LinearPipeline == nil
endStoreBound := linearHandoffBlock
Expand Down
12 changes: 12 additions & 0 deletions orchestrator/plan/requestplan_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,18 @@ func TestBuildConfig(t *testing.T) {
expectExecOutRange string
expectLinearPipelineRange string
}{
{
"no parallel work to do prod mode",
100, 100,
false, 621, 621, 621, 742,
"nil", "nil", "621-742",
},
{
"no parallel work to do dev mode",
100, 100,
true, 621, 621, 621, 742,
"nil", "nil", "621-742",
},
{
"g1. dev mode with stop within same segment as start block",
100, 100,
Expand Down
5 changes: 5 additions & 0 deletions orchestrator/scheduler/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package scheduler

import (
"context"
"fmt"
"time"

"go.uber.org/zap"
Expand Down Expand Up @@ -61,6 +62,7 @@ func (s *Scheduler) Init() loop.Cmd {
}

func (s *Scheduler) Update(msg loop.Msg) loop.Cmd {
fmt.Printf("UPDATE: %T %v\n", msg, msg)
var cmds []loop.Cmd

switch msg := msg.(type) {
Expand Down Expand Up @@ -132,6 +134,9 @@ func (s *Scheduler) Update(msg loop.Msg) loop.Cmd {

}

if len(cmds) != 0 {
fmt.Printf("Schedule: %T %v\n", cmds, cmds)
}
return loop.Batch(cmds...)
}

Expand Down
2 changes: 1 addition & 1 deletion pipeline/gate.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ type gate struct {
func newGate(ctx context.Context) *gate {
reqDetails := reqctx.Details(ctx)
return &gate{
disabled: reqDetails.IsSubRequest,
disabled: reqDetails.IsTier2Request,
requestStartBlockNum: reqDetails.ResolvedStartBlockNum,
}
}
Expand Down
47 changes: 35 additions & 12 deletions pipeline/pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (

"github.com/streamingfast/substreams"
"github.com/streamingfast/substreams/orchestrator"
"github.com/streamingfast/substreams/orchestrator/plan"
pbssinternal "github.com/streamingfast/substreams/pb/sf/substreams/intern/v2"
pbsubstreamsrpc "github.com/streamingfast/substreams/pb/sf/substreams/rpc/v2"
pbsubstreams "github.com/streamingfast/substreams/pb/sf/substreams/v1"
Expand Down Expand Up @@ -104,12 +105,12 @@ func New(
for _, opt := range opts {
opt(pipe)
}
pipe.registerHandlers(ctx)
return pipe
}

func (p *Pipeline) InitStoresAndBackprocess(ctx context.Context) (err error) {
func (p *Pipeline) registerHandlers(ctx context.Context) (err error) {
reqDetails := reqctx.Details(ctx)
logger := reqctx.Logger(ctx)

p.forkHandler.registerUndoHandler(func(clock *pbsubstreams.Clock, moduleOutputs []*pbssinternal.ModuleOutput) {
for _, modOut := range moduleOutputs {
Expand All @@ -118,20 +119,32 @@ func (p *Pipeline) InitStoresAndBackprocess(ctx context.Context) (err error) {
})

p.setupProcessingModule(reqDetails)
return nil
}

func (p *Pipeline) InitTier2Stores(ctx context.Context) (err error) {
logger := reqctx.Logger(ctx)

var storeMap store.Map
if reqDetails.IsSubRequest {
logger.Info("stores loaded", zap.Object("stores", p.stores.StoreMap))
if storeMap, err = p.setupSubrequestStores(ctx); err != nil {
return fmt.Errorf("subrequest stores setup failed: %w", err)
}
} else {
if storeMap, err = p.runParallelProcess(ctx); err != nil {
logger.Info("stores loaded", zap.Object("stores", p.stores.StoreMap))
if storeMap, err = p.setupSubrequestStores(ctx); err != nil {
return fmt.Errorf("subrequest stores setup failed: %w", err)
}
p.stores.SetStoreMap(storeMap)

return nil
}

func (p *Pipeline) InitTier1StoresAndBackprocess(ctx context.Context, reqPlan *plan.RequestPlan) (err error) {
var storeMap store.Map
if reqPlan.BuildStores != nil {
if storeMap, err = p.runParallelProcess(ctx, reqPlan); err != nil {
return fmt.Errorf("run_parallel_process failed: %w", err)
}
} else {
storeMap = p.setupEmptyStores(ctx)
}
p.stores.SetStoreMap(storeMap)

return nil
}

Expand Down Expand Up @@ -207,8 +220,18 @@ func (p *Pipeline) setupSubrequestStores(ctx context.Context) (storeMap store.Ma
return storeMap, nil
}

func (p *Pipeline) setupEmptyStores(ctx context.Context) store.Map {
logger := reqctx.Logger(ctx)
storeMap := store.NewMap()
for _, storeConfig := range p.stores.configs {
fullStore := storeConfig.NewFullKV(logger)
storeMap.Set(fullStore)
}
return storeMap
}

// runParallelProcess
func (p *Pipeline) runParallelProcess(ctx context.Context) (storeMap store.Map, err error) {
func (p *Pipeline) runParallelProcess(ctx context.Context, reqPlan *plan.RequestPlan) (storeMap store.Map, err error) {
ctx, span := reqctx.WithSpan(p.ctx, fmt.Sprintf("substreams/%s/pipeline/parallel_process", p.tier))
defer span.EndWithErr(&err)
reqDetails := reqctx.Details(ctx)
Expand All @@ -222,7 +245,7 @@ func (p *Pipeline) runParallelProcess(ctx context.Context) (storeMap store.Map,

parallelProcessor, err := orchestrator.BuildParallelProcessor(
p.ctx,
reqDetails,
reqPlan,
p.runtimeConfig,
p.outputGraph,
p.execoutStorage,
Expand Down
2 changes: 1 addition & 1 deletion pipeline/process_block.go
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,7 @@ func (p *Pipeline) handleStepNew(ctx context.Context, block *bstream.Block, cloc
//fmt.Println("accumulated time for all modules", exec.Timer, "avg", sumDuration/time.Duration(sumCount))

if reqDetails.ShouldReturnProgressMessages() {
if reqDetails.IsSubRequest {
if reqDetails.IsTier2Request {
forceSend := (clock.Number+1)%p.runtimeConfig.CacheSaveInterval == 0

if err = p.returnInternalModuleProgressOutputs(clock, forceSend); err != nil {
Expand Down
9 changes: 5 additions & 4 deletions pipeline/resolve.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,13 @@ import (
"github.com/streamingfast/bstream"
"github.com/streamingfast/bstream/hub"
"github.com/streamingfast/dstore"
pbssinternal "github.com/streamingfast/substreams/pb/sf/substreams/intern/v2"
pbsubstreamsrpc "github.com/streamingfast/substreams/pb/sf/substreams/rpc/v2"
"github.com/streamingfast/substreams/reqctx"
"go.uber.org/zap"
grpccodes "google.golang.org/grpc/codes"
"google.golang.org/grpc/status"

pbssinternal "github.com/streamingfast/substreams/pb/sf/substreams/intern/v2"
pbsubstreamsrpc "github.com/streamingfast/substreams/pb/sf/substreams/rpc/v2"
"github.com/streamingfast/substreams/reqctx"
)

type getBlockFunc func() (uint64, error)
Expand Down Expand Up @@ -55,7 +56,7 @@ func BuildRequestDetailsFromSubrequest(request *pbssinternal.ProcessRangeRequest
Modules: request.Modules,
OutputModule: request.OutputModule,
ProductionMode: true,
IsSubRequest: true,
IsTier2Request: true,
StopBlockNum: request.StopBlockNum,
LinearHandoffBlockNum: request.StopBlockNum,
ResolvedStartBlockNum: request.StartBlockNum,
Expand Down
2 changes: 1 addition & 1 deletion pipeline/snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import (
)

func (p *Pipeline) sendSnapshots(ctx context.Context, storeMap store.Map) error {
if reqctx.Details(ctx).IsSubRequest {
if reqctx.Details(ctx).IsTier2Request {
return nil
}
snapshotModules := reqctx.Details(ctx).DebugInitialStoreSnapshotForModules
Expand Down
32 changes: 20 additions & 12 deletions pipeline/stores.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,33 +5,40 @@ import (
"fmt"
"time"

"go.opentelemetry.io/otel/attribute"
"go.uber.org/zap"

"github.com/streamingfast/substreams/block"
pbssinternal "github.com/streamingfast/substreams/pb/sf/substreams/intern/v2"
"github.com/streamingfast/substreams/reqctx"
"github.com/streamingfast/substreams/storage/store"
"go.opentelemetry.io/otel/attribute"
"go.uber.org/zap"
)

type Stores struct {
isSubRequest bool
bounder *storeBoundary
configs store.ConfigMap
StoreMap store.Map
isTier2Request bool // means we're processing a tier2 request
bounder *storeBoundary
configs store.ConfigMap
StoreMap store.Map
// DEPRECATED: we don't need to report back, these file names are now implicitly conveyed from
// tier1 to tier2.
partialsWritten block.Ranges // when backprocessing, to report back to orchestrator
tier string
}

func NewStores(storeConfigs store.ConfigMap, storeSnapshotSaveInterval, requestStartBlockNum, stopBlockNum uint64, isSubRequest bool, tier string) *Stores {
func NewStores(storeConfigs store.ConfigMap, storeSnapshotSaveInterval, requestStartBlockNum, stopBlockNum uint64, isTier2Request bool) *Stores {
// FIXME(abourget): a StoreBoundary should exist for EACH Store
// because the module's Initial Block could change the range of each
// store.
tier := "tier1"
if isTier2Request {
tier = "tier2"
}
bounder := NewStoreBoundary(storeSnapshotSaveInterval, requestStartBlockNum, stopBlockNum)
return &Stores{
configs: storeConfigs,
isSubRequest: isSubRequest,
bounder: bounder,
tier: tier,
configs: storeConfigs,
isTier2Request: isTier2Request,
bounder: bounder,
tier: tier,
}
}

Expand All @@ -50,7 +57,8 @@ func (s *Stores) resetStores() {
func (s *Stores) flushStores(ctx context.Context, blockNum uint64) (err error) {
logger := reqctx.Logger(ctx)
reqStats := reqctx.ReqStats(ctx)
boundaryIntervals := s.bounder.GetStoreFlushRanges(s.isSubRequest, s.bounder.requestStopBlock, blockNum)
// FIXME: use Segmenters here, pleaseeee no bounder intervals gna gna gna.
boundaryIntervals := s.bounder.GetStoreFlushRanges(s.isTier2Request, s.bounder.requestStopBlock, blockNum)
if len(boundaryIntervals) > 0 {
logger.Info("flushing boundaries", zap.Uint64s("boundaries", boundaryIntervals))
}
Expand Down
Loading

0 comments on commit 1b9fb05

Please sign in to comment.