Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Small refactor for delaying calls to expand ranges #20769

Merged
merged 18 commits into from
Jan 7, 2025
790 changes: 421 additions & 369 deletions pkg/pb/pipeline/pipeline.pb.go

Large diffs are not rendered by default.

21 changes: 16 additions & 5 deletions pkg/sql/compile/compile.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ import (
"github.com/matrixorigin/matrixone/pkg/vm"
"github.com/matrixorigin/matrixone/pkg/vm/engine"
"github.com/matrixorigin/matrixone/pkg/vm/engine/disttae"
"github.com/matrixorigin/matrixone/pkg/vm/engine/engine_util"
"github.com/matrixorigin/matrixone/pkg/vm/message"
"github.com/matrixorigin/matrixone/pkg/vm/process"
)
Expand Down Expand Up @@ -3986,7 +3987,7 @@ func (c *Compile) expandRanges(
blockFilterList []*plan.Expr, policy engine.DataCollectPolicy, rsp *engine.RangesShuffleParam) (engine.RelData, error) {

preAllocBlocks := 2
if policy&engine.Policy_CollectCommittedData != 0 {
if policy&engine.Policy_CollectCommittedPersistedData != 0 {
if !c.IsTpQuery() {
if len(blockFilterList) > 0 {
preAllocBlocks = 64
Expand Down Expand Up @@ -4015,7 +4016,8 @@ func (c *Compile) expandRanges(

if n.TableDef.Partition != nil {
begin := 0
if policy&engine.Policy_CollectUncommittedData != 0 {
if policy&engine.Policy_CollectCommittedInmemData != 0 ||
policy&engine.Policy_CollectUncommittedInmemData != 0 {
begin = 1 //skip empty block info
}
rangesParam.PreAllocBlocks = 2
Expand Down Expand Up @@ -4151,7 +4153,7 @@ func (c *Compile) handleDbRelContext(node *plan.Node, onRemoteCN bool) (engine.R
}

func (c *Compile) generateNodes(n *plan.Node) (engine.Nodes, error) {
_, _, _, err := c.handleDbRelContext(n, false)
rel, _, _, err := c.handleDbRelContext(n, false)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -4184,13 +4186,22 @@ func (c *Compile) generateNodes(n *plan.Node) (engine.Nodes, error) {

// scan on multi CN
for i := range c.cnList {
nodes = append(nodes, engine.Node{
node := engine.Node{
Id: c.cnList[i].Id,
Addr: c.cnList[i].Addr,
Mcpu: c.cnList[i].Mcpu,
CNCNT: int32(len(c.cnList)),
CNIDX: int32(i),
})
}
if node.Addr != c.addr {
uncommittedTombs, err := collectTombstones(c, n, rel, engine.Policy_CollectAllTombstones)
if err != nil {
return nil, err
}
node.Data = engine_util.BuildEmptyRelData()
node.Data.AttachTombstones(uncommittedTombs)
}
nodes = append(nodes, node)
}
return nodes, nil
}
Expand Down
31 changes: 25 additions & 6 deletions pkg/sql/compile/remoterun.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"unsafe"

"github.com/google/uuid"

"github.com/matrixorigin/matrixone/pkg/common/moerr"
"github.com/matrixorigin/matrixone/pkg/common/mpool"
"github.com/matrixorigin/matrixone/pkg/container/batch"
Expand Down Expand Up @@ -83,6 +84,7 @@ import (
plan2 "github.com/matrixorigin/matrixone/pkg/sql/plan"
"github.com/matrixorigin/matrixone/pkg/vm"
"github.com/matrixorigin/matrixone/pkg/vm/engine"
"github.com/matrixorigin/matrixone/pkg/vm/engine/engine_util"
"github.com/matrixorigin/matrixone/pkg/vm/process"
)

Expand Down Expand Up @@ -179,13 +181,19 @@ func generatePipeline(s *Scope, ctx *scopeContext, ctxId int32) (*pipeline.Pipel
// only encode the first one.
p.Qry = s.Plan
}

var data []byte
if s.NodeInfo.Data != nil {
if data, err = s.NodeInfo.Data.MarshalBinary(); err != nil {
return nil, -1, err
}
}
p.Node = &pipeline.NodeInfo{
Id: s.NodeInfo.Id,
Addr: s.NodeInfo.Addr,
Mcpu: int32(s.NodeInfo.Mcpu),
CnCnt: s.NodeInfo.CNCNT,
CnIdx: s.NodeInfo.CNIDX,
Id: s.NodeInfo.Id,
Addr: s.NodeInfo.Addr,
Mcpu: int32(s.NodeInfo.Mcpu),
Payload: string(data),
CnCnt: s.NodeInfo.CNCNT,
CnIdx: s.NodeInfo.CNIDX,
}
ctx.pipe = p
ctx.scope = s
Expand Down Expand Up @@ -332,6 +340,17 @@ func generateScope(proc *process.Process, p *pipeline.Pipeline, ctx *scopeContex
s.NodeInfo.Mcpu = int(p.Node.Mcpu)
s.NodeInfo.CNCNT = p.Node.CnCnt
s.NodeInfo.CNIDX = p.Node.CnIdx

bs := []byte(p.Node.Payload)
var relData engine.RelData
if len(bs) > 0 {
rd, err := engine_util.UnmarshalRelationData(bs)
if err != nil {
return nil, err
}
relData = rd
}
s.NodeInfo.Data = relData
}
s.Proc = proc.NewNoContextChildProcWithChannel(int(p.ChildrenCount), p.ChannelBufferSize, p.NilBatchCnt)
{
Expand Down
9 changes: 3 additions & 6 deletions pkg/sql/compile/scope.go
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,7 @@ func (s *Scope) Run(c *Compile) (err error) {
_, err = p.Run(s.Proc)
} else {
if s.DataSource.R == nil {
s.NodeInfo.Data = engine.BuildEmptyRelData()
s.NodeInfo.Data = engine_util.BuildEmptyRelData()
stats := statistic.StatsInfoFromContext(c.proc.GetTopContext())

buildStart := time.Now()
Expand Down Expand Up @@ -640,7 +640,7 @@ func (s *Scope) handleBlockList(c *Compile, runtimeInExprList []*plan.Expr) erro
rsp.IsLocalCN = true
}

commited, err = c.expandRanges(s.DataSource.node, rel, db, ctx, newExprList, engine.Policy_CollectCommittedData, rsp)
commited, err = c.expandRanges(s.DataSource.node, rel, db, ctx, newExprList, engine.Policy_CollectCommittedPersistedData, rsp)
if err != nil {
return err
}
Expand All @@ -658,10 +658,7 @@ func (s *Scope) handleBlockList(c *Compile, runtimeInExprList []*plan.Expr) erro
}
s.NodeInfo.Data.AppendBlockInfoSlice(commited.GetBlockInfoSlice())
} else {
tombstones, err := collectTombstones(c, s.DataSource.node, rel, engine.Policy_CollectCommittedTombstones)
if err != nil {
return err
}
tombstones := s.NodeInfo.Data.GetTombstones()
commited.AttachTombstones(tombstones)
s.NodeInfo.Data = commited
}
Expand Down
7 changes: 4 additions & 3 deletions pkg/vm/engine/disttae/txn_table.go
Original file line number Diff line number Diff line change
Expand Up @@ -587,7 +587,8 @@ func (tbl *txnTable) doRanges(ctx context.Context, rangesParam engine.RangesPara
var part *logtailreplay.PartitionState
var uncommittedObjects []objectio.ObjectStats
blocks := objectio.PreAllocBlockInfoSlice(rangesParam.PreAllocBlocks)
if rangesParam.Policy&engine.Policy_CollectUncommittedData != 0 {
if rangesParam.Policy&engine.Policy_CollectCommittedInmemData != 0 ||
rangesParam.Policy&engine.Policy_CollectUncommittedInmemData != 0 {
blocks.AppendBlockInfo(&objectio.EmptyBlockInfo)
}

Expand Down Expand Up @@ -670,12 +671,12 @@ func (tbl *txnTable) doRanges(ctx context.Context, rangesParam engine.RangesPara
}
}()

if rangesParam.Policy&engine.Policy_CollectUncommittedData != 0 {
if rangesParam.Policy&engine.Policy_CollectUncommittedPersistedData != 0 {
uncommittedObjects, _ = tbl.collectUnCommittedDataObjs(rangesParam.TxnOffset)
}

// get the table's snapshot
if rangesParam.Policy&engine.Policy_CollectCommittedData != 0 {
if rangesParam.Policy&engine.Policy_CollectCommittedPersistedData != 0 {
if part, err = tbl.getPartitionState(ctx); err != nil {
return
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/vm/engine/disttae/txn_table_sharding.go
Original file line number Diff line number Diff line change
Expand Up @@ -280,7 +280,7 @@ func (tbl *txnTableDelegate) Ranges(ctx context.Context, rangesParam engine.Rang

var blocks objectio.BlockInfoSlice
var uncommitted []objectio.ObjectStats
if rangesParam.Policy != engine.Policy_CheckCommittedOnly {
if rangesParam.Policy&engine.Policy_CollectUncommittedPersistedData != 0 {
uncommitted, _ = tbl.origin.collectUnCommittedDataObjs(rangesParam.TxnOffset)
}
err = tbl.origin.rangesOnePart(
Expand All @@ -303,7 +303,7 @@ func (tbl *txnTableDelegate) Ranges(ctx context.Context, rangesParam engine.Rang
func(param *shard.ReadParam) {
param.RangesParam.Exprs = rangesParam.BlockFilters
param.RangesParam.PreAllocSize = 2
param.RangesParam.DataCollectPolicy = engine.Policy_CollectCommittedData
param.RangesParam.DataCollectPolicy = engine.Policy_CollectCommittedPersistedData
param.RangesParam.TxnOffset = 0

},
Expand Down
149 changes: 148 additions & 1 deletion pkg/vm/engine/engine_util/relation_data.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,11 +35,158 @@ func UnmarshalRelationData(data []byte) (engine.RelData, error) {
return nil, err
}
return relData, nil
case engine.RelDataEmpty:
relData := BuildEmptyRelData()
if err := relData.UnmarshalBinary(data); err != nil {
return nil, err
}
return relData, nil
default:
return nil, moerr.NewInternalErrorNoCtx("unsupported relation data type")
}
}

type EmptyRelationData struct {
tombs engine.Tombstoner
}

func BuildEmptyRelData() engine.RelData {
return &EmptyRelationData{}
}

func (rd *EmptyRelationData) String() string {
return fmt.Sprintf("RelData[%d]", engine.RelDataEmpty)
}

func (rd *EmptyRelationData) GetShardIDList() []uint64 {
panic("not supported")
}

func (rd *EmptyRelationData) GetShardID(i int) uint64 {
panic("not supported")
}

func (rd *EmptyRelationData) SetShardID(i int, id uint64) {
panic("not supported")
}

func (rd *EmptyRelationData) AppendShardID(id uint64) {
panic("not supported")
}

func (rd *EmptyRelationData) GetBlockInfoSlice() objectio.BlockInfoSlice {
panic("not supported")
}

func (rd *EmptyRelationData) GetBlockInfo(i int) objectio.BlockInfo {
panic("not supported")
}

func (rd *EmptyRelationData) SetBlockInfo(i int, blk *objectio.BlockInfo) {
panic("not supported")
}

func (rd *EmptyRelationData) AppendBlockInfo(blk *objectio.BlockInfo) {
panic("not supported")
}

func (rd *EmptyRelationData) AppendBlockInfoSlice(objectio.BlockInfoSlice) {
panic("not supported")
}

func (rd *EmptyRelationData) GetType() engine.RelDataType {
return engine.RelDataEmpty
}

func (rd *EmptyRelationData) MarshalBinaryWithBuffer(w *bytes.Buffer) (err error) {
typ := uint8(rd.GetType())
if _, err = w.Write(types.EncodeUint8(&typ)); err != nil {
return
}

// marshal tombstones
offset := w.Len()
tombstoneLen := uint32(0)
if _, err = w.Write(types.EncodeUint32(&tombstoneLen)); err != nil {
return
}
if rd.tombs != nil {
if err = rd.tombs.MarshalBinaryWithBuffer(w); err != nil {
return
}
tombstoneLen = uint32(w.Len() - offset - 4)
buf := w.Bytes()
copy(buf[offset:], types.EncodeUint32(&tombstoneLen))
}
return nil
}
func (rd *EmptyRelationData) MarshalBinary() ([]byte, error) {
var w bytes.Buffer
if err := rd.MarshalBinaryWithBuffer(&w); err != nil {
return nil, err
}
buf := w.Bytes()
return buf, nil
}

func (rd *EmptyRelationData) UnmarshalBinary(data []byte) (err error) {
typ := engine.RelDataType(types.DecodeUint8(data))
if typ != engine.RelDataEmpty {
return moerr.NewInternalErrorNoCtxf("UnmarshalBinary empty rel data with type:%v", typ)
}
data = data[1:]

tombstoneLen := types.DecodeUint32(data)
data = data[4:]

if tombstoneLen == 0 {
return
}
rd.tombs, err = UnmarshalTombstoneData(data[:tombstoneLen])
return
}

func (rd *EmptyRelationData) AttachTombstones(tombstones engine.Tombstoner) error {
rd.tombs = tombstones
return nil
}

func (rd *EmptyRelationData) GetTombstones() engine.Tombstoner {
return rd.tombs
}

func (rd *EmptyRelationData) ForeachDataBlk(begin, end int, f func(blk any) error) error {
panic("Not Supported")
}

func (rd *EmptyRelationData) GetDataBlk(i int) any {
panic("Not Supported")
}

func (rd *EmptyRelationData) SetDataBlk(i int, blk any) {
panic("Not Supported")
}

func (rd *EmptyRelationData) DataSlice(begin, end int) engine.RelData {
panic("Not Supported")
}

func (rd *EmptyRelationData) GroupByPartitionNum() map[int16]engine.RelData {
panic("Not Supported")
}

func (rd *EmptyRelationData) AppendDataBlk(blk any) {
panic("Not Supported")
}

func (rd *EmptyRelationData) BuildEmptyRelData(i int) engine.RelData {
return &EmptyRelationData{}
}

func (rd *EmptyRelationData) DataCnt() int {
return 0
}

// emptyCnt is the number of empty blocks preserved
func NewBlockListRelationData(emptyCnt int) *BlockListRelData {
return &BlockListRelData{
Expand Down Expand Up @@ -156,12 +303,12 @@ func (relData *BlockListRelData) MarshalBinaryWithBuffer(w *bytes.Buffer) (err e
return
}

// marshal blk list
sizeofblks := uint32(relData.blklist.Size())
if _, err = w.Write(types.EncodeUint32(&sizeofblks)); err != nil {
return
}

// marshal blk list
if _, err = w.Write(relData.blklist); err != nil {
return
}
Expand Down
Loading
Loading