Skip to content

Commit

Permalink
support to replicate import msg
Browse files Browse the repository at this point in the history
Signed-off-by: SimFG <[email protected]>
  • Loading branch information
SimFG committed Jan 17, 2025
1 parent 9209a70 commit 9191fb5
Show file tree
Hide file tree
Showing 60 changed files with 3,653 additions and 1,785 deletions.
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -271,6 +271,7 @@ replace (
github.com/go-kit/kit => github.com/go-kit/kit v0.1.0
github.com/greatroar/blobloom => github.com/milvus-io/blobloom v0.0.0-20240603110411-471ae49f3b93
github.com/ianlancetaylor/cgosymbolizer => github.com/milvus-io/cgosymbolizer v0.0.0-20240722103217-b7dee0e50119
github.com/milvus-io/milvus-proto/go-api/v2 => github.com/SimFG/milvus-proto/go-api/v2 v2.0.0-20250109072021-b66909fa356b
github.com/milvus-io/milvus/pkg => ./pkg
github.com/streamnative/pulsarctl => github.com/xiaofan-luan/pulsarctl v0.5.1
github.com/tecbot/gorocksdb => github.com/milvus-io/gorocksdb v0.0.0-20220624081344-8c5f4212846b // indirect
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,8 @@ github.com/OneOfOne/xxhash v1.2.2/go.mod h1:HSdplMjZKSmBqAxg5vPj2TmRDmfkzw+cTzAE
github.com/Shopify/goreferrer v0.0.0-20181106222321-ec9c9a553398/go.mod h1:a1uqRtAwp2Xwc6WNPJEufxJ7fx3npB4UV/JOLmbu5I0=
github.com/SimFG/expr v0.0.0-20241226082220-a9a764953bf8 h1:boN3QhAWQU9O8EYQWxN7AEYav39PuD29QzZwTiI8Ca0=
github.com/SimFG/expr v0.0.0-20241226082220-a9a764953bf8/go.mod h1:8/vRC7+7HBzESEqt5kKpYXxrxkr31SaO8r40VO/1IT4=
github.com/SimFG/milvus-proto/go-api/v2 v2.0.0-20250109072021-b66909fa356b h1:O4zGq8JOuCY9k9srUd4voUsdJItMy/gtVFkAosfyQgk=
github.com/SimFG/milvus-proto/go-api/v2 v2.0.0-20250109072021-b66909fa356b/go.mod h1:/6UT4zZl6awVeXLeE7UGDWZvXj3IWkRsh3mqsn0DiAs=
github.com/actgardner/gogen-avro/v10 v10.1.0/go.mod h1:o+ybmVjEa27AAr35FRqU98DJu1fXES56uXniYFv4yDA=
github.com/actgardner/gogen-avro/v10 v10.2.1/go.mod h1:QUhjeHPchheYmMDni/Nx7VB0RsT/ee8YIgGY/xpEQgQ=
github.com/actgardner/gogen-avro/v9 v9.1.0/go.mod h1:nyTj6wPqDJoxM3qdnjcLv+EnMDSDFqE0qDpva2QRmKc=
Expand Down Expand Up @@ -630,8 +632,6 @@ github.com/milvus-io/cgosymbolizer v0.0.0-20240722103217-b7dee0e50119 h1:9VXijWu
github.com/milvus-io/cgosymbolizer v0.0.0-20240722103217-b7dee0e50119/go.mod h1:DvXTE/K/RtHehxU8/GtDs4vFtfw64jJ3PaCnFri8CRg=
github.com/milvus-io/gorocksdb v0.0.0-20220624081344-8c5f4212846b h1:TfeY0NxYxZzUfIfYe5qYDBzt4ZYRqzUjTR6CvUzjat8=
github.com/milvus-io/gorocksdb v0.0.0-20220624081344-8c5f4212846b/go.mod h1:iwW+9cWfIzzDseEBCCeDSN5SD16Tidvy8cwQ7ZY8Qj4=
github.com/milvus-io/milvus-proto/go-api/v2 v2.5.0-beta.0.20250102080446-c3ba3d26a90f h1:So6RKU5wqP/8EaKogicJP8gZ2SrzzS/JprusBaE3RKc=
github.com/milvus-io/milvus-proto/go-api/v2 v2.5.0-beta.0.20250102080446-c3ba3d26a90f/go.mod h1:/6UT4zZl6awVeXLeE7UGDWZvXj3IWkRsh3mqsn0DiAs=
github.com/milvus-io/pulsar-client-go v0.12.1 h1:O2JZp1tsYiO7C0MQ4hrUY/aJXnn2Gry6hpm7UodghmE=
github.com/milvus-io/pulsar-client-go v0.12.1/go.mod h1:dkutuH4oS2pXiGm+Ti7fQZ4MRjrMPZ8IJeEGAWMeckk=
github.com/minio/asm2plan9s v0.0.0-20200509001527-cdd76441f9d8 h1:AMFGa4R4MiIpspGNG7Z948v4n35fFGB3RR3G/ry4FWs=
Expand Down
4 changes: 3 additions & 1 deletion internal/.mockery.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@ packages:
github.com/milvus-io/milvus/internal/streamingnode/server/flusher:
interfaces:
Flusher:
FlushMsgHandler:
github.com/milvus-io/milvus/internal/streamingnode/server/wal:
interfaces:
OpenerBuilder:
Expand Down Expand Up @@ -82,6 +81,9 @@ packages:
github.com/milvus-io/milvus/internal/util/searchutil/optimizers:
interfaces:
QueryHook:
# github.com/milvus-io/milvus/internal/flushcommon/util:
# interfaces:
# MsgHandler:
google.golang.org/grpc/resolver:
interfaces:
ClientConn:
Expand Down
44 changes: 42 additions & 2 deletions internal/datacoord/compaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ type compactionPlanContext interface {
getCompactionTasksNumBySignalID(signalID int64) int
getCompactionInfo(ctx context.Context, signalID int64) *compactionInfo
removeTasksByChannel(channel string)
getCompactionTasksNum(filters ...compactionTaskFilter) int
}

var (
Expand Down Expand Up @@ -144,7 +145,7 @@ func (sna *SlotBasedNodeAssigner) pickAnyNode(task CompactionTask) (nodeID int64
}

type compactionPlanHandler struct {
queueTasks CompactionQueue
queueTasks *CompactionQueue

executingGuard lock.RWMutex
executingTasks map[int64]CompactionTask // planID -> task
Expand Down Expand Up @@ -254,7 +255,7 @@ func newCompactionPlanHandler(cluster Cluster, sessions session.DataNodeManager,
// TODO[GOOSE]: Higher capacity makes tasks waiting longer, which need to be get rid of.
capacity := paramtable.Get().DataCoordCfg.CompactionTaskQueueCapacity.GetAsInt()
return &compactionPlanHandler{
queueTasks: *NewCompactionQueue(capacity, getPrioritizer()),
queueTasks: NewCompactionQueue(capacity, getPrioritizer()),
meta: meta,
sessions: sessions,
allocator: allocator,
Expand Down Expand Up @@ -777,6 +778,45 @@ func (c *compactionPlanHandler) checkDelay(t CompactionTask) {
}
}

func (c *compactionPlanHandler) getCompactionTasksNum(filters ...compactionTaskFilter) int {
cnt := 0
isMatch := func(task CompactionTask) bool {
for _, f := range filters {
if !f(task) {
return false
}
}
return true
}
c.queueTasks.ForEach(func(task CompactionTask) {
if isMatch(task) {
cnt += 1
}
})
c.executingGuard.RLock()
for _, t := range c.executingTasks {
if isMatch(t) {
cnt += 1
}
}
c.executingGuard.RUnlock()
return cnt
}

type compactionTaskFilter func(task CompactionTask) bool

func CollectionIDCompactionTaskFilter(collectionID int64) compactionTaskFilter {
return func(task CompactionTask) bool {
return task.GetTaskProto().GetCollectionID() == collectionID
}
}

func L0CompactionCompactionTaskFilter() compactionTaskFilter {
return func(task CompactionTask) bool {
return task.GetTaskProto().GetType() == datapb.CompactionType_Level0DeleteCompaction
}
}

var (
ioPool *conc.Pool[any]
ioPoolInitOnce sync.Once
Expand Down
42 changes: 39 additions & 3 deletions internal/datacoord/compaction_policy_l0.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,19 +18,52 @@ type l0CompactionPolicy struct {
meta *meta

activeCollections *activeCollections

// key: collectionID, value: reference count
skipCompactionCollections map[int64]int
skipLocker sync.RWMutex
}

func newL0CompactionPolicy(meta *meta) *l0CompactionPolicy {
return &l0CompactionPolicy{
meta: meta,
activeCollections: newActiveCollections(),
meta: meta,
activeCollections: newActiveCollections(),
skipCompactionCollections: make(map[int64]int),
}
}

func (policy *l0CompactionPolicy) Enable() bool {
return Params.DataCoordCfg.EnableAutoCompaction.GetAsBool()
}

func (policy *l0CompactionPolicy) AddSkipCollection(collectionID UniqueID) {
policy.skipLocker.Lock()
defer policy.skipLocker.Unlock()

if _, ok := policy.skipCompactionCollections[collectionID]; !ok {
policy.skipCompactionCollections[collectionID] = 1
} else {
policy.skipCompactionCollections[collectionID]++
}
}

func (policy *l0CompactionPolicy) RemoveSkipCollection(collectionID UniqueID) {
policy.skipLocker.Lock()
defer policy.skipLocker.Unlock()
refCount := policy.skipCompactionCollections[collectionID]
if refCount > 1 {
policy.skipCompactionCollections[collectionID]--
} else {
delete(policy.skipCompactionCollections, collectionID)
}
}

func (policy *l0CompactionPolicy) isSkipCollection(collectionID UniqueID) bool {
policy.skipLocker.RLock()
defer policy.skipLocker.RUnlock()
return policy.skipCompactionCollections[collectionID] > 0
}

// Notify policy to record the active updated(when adding a new L0 segment) collections.
func (policy *l0CompactionPolicy) OnCollectionUpdate(collectionID int64) {
policy.activeCollections.Record(collectionID)
Expand All @@ -50,8 +83,11 @@ func (policy *l0CompactionPolicy) Trigger() (events map[CompactionTriggerType][]
idleCollsSet := typeutil.NewUniqueSet(idleColls...)
activeL0Views, idleL0Views := []CompactionView{}, []CompactionView{}
for collID, segments := range latestCollSegs {
policy.activeCollections.Read(collID)
if policy.isSkipCollection(collID) {
continue
}

policy.activeCollections.Read(collID)
levelZeroSegments := lo.Filter(segments, func(info *SegmentInfo, _ int) bool {
return info.GetLevel() == datapb.SegmentLevel_L0
})
Expand Down
24 changes: 24 additions & 0 deletions internal/datacoord/compaction_policy_l0_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,30 @@ func (s *L0CompactionPolicySuite) TestTriggerIdle() {
for _, view := range cView.GetSegmentsView() {
s.Equal(datapb.SegmentLevel_L0, view.Level)
}

// test for skip collection
s.l0_policy.AddSkipCollection(1)
s.l0_policy.AddSkipCollection(1)
// Test for skip collection
events, err = s.l0_policy.Trigger()
s.NoError(err)
s.Empty(events)

// Test for skip collection with ref count
s.l0_policy.RemoveSkipCollection(1)
events, err = s.l0_policy.Trigger()
s.NoError(err)
s.Empty(events)

s.l0_policy.RemoveSkipCollection(1)
events, err = s.l0_policy.Trigger()
s.NoError(err)
s.Equal(1, len(events))
gotViews, ok = events[TriggerTypeLevelZeroViewIDLE]
s.True(ok)
s.NotNil(gotViews)
s.Equal(1, len(gotViews))

log.Info("cView", zap.String("string", cView.String()))
}

Expand Down
58 changes: 58 additions & 0 deletions internal/datacoord/compaction_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"time"

"github.com/cockroachdb/errors"
"github.com/magiconair/properties/assert"
"github.com/samber/lo"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/suite"
Expand Down Expand Up @@ -1122,3 +1123,60 @@ func TestCheckDelay(t *testing.T) {
}, nil, nil, nil, nil, nil)
handler.checkDelay(t3)
}

func TestGetCompactionTasksNum(t *testing.T) {
queueTasks := NewCompactionQueue(10, DefaultPrioritizer)
queueTasks.Enqueue(
newMixCompactionTask(&datapb.CompactionTask{
StartTime: time.Now().Add(-100 * time.Minute).Unix(),
CollectionID: 1,
Type: datapb.CompactionType_MixCompaction,
}, nil, nil, nil),
)
queueTasks.Enqueue(
newL0CompactionTask(&datapb.CompactionTask{
StartTime: time.Now().Add(-100 * time.Minute).Unix(),
CollectionID: 1,
Type: datapb.CompactionType_Level0DeleteCompaction,
}, nil, nil, nil),
)
queueTasks.Enqueue(
newClusteringCompactionTask(&datapb.CompactionTask{
StartTime: time.Now().Add(-100 * time.Minute).Unix(),
CollectionID: 10,
Type: datapb.CompactionType_ClusteringCompaction,
}, nil, nil, nil, nil, nil),
)
executingTasks := make(map[int64]CompactionTask, 0)
executingTasks[1] = newMixCompactionTask(&datapb.CompactionTask{
StartTime: time.Now().Add(-100 * time.Minute).Unix(),
CollectionID: 1,
Type: datapb.CompactionType_MixCompaction,
}, nil, nil, nil)
executingTasks[2] = newL0CompactionTask(&datapb.CompactionTask{
StartTime: time.Now().Add(-100 * time.Minute).Unix(),
CollectionID: 10,
Type: datapb.CompactionType_Level0DeleteCompaction,
}, nil, nil, nil)

handler := &compactionPlanHandler{
queueTasks: queueTasks,
executingTasks: executingTasks,
}
t.Run("no filter", func(t *testing.T) {
i := handler.getCompactionTasksNum()
assert.Equal(t, 5, i)
})
t.Run("collection id filter", func(t *testing.T) {
i := handler.getCompactionTasksNum(CollectionIDCompactionTaskFilter(1))
assert.Equal(t, 3, i)
})
t.Run("l0 compaction filter", func(t *testing.T) {
i := handler.getCompactionTasksNum(L0CompactionCompactionTaskFilter())
assert.Equal(t, 2, i)
})
t.Run("collection id and l0 compaction filter", func(t *testing.T) {
i := handler.getCompactionTasksNum(CollectionIDCompactionTaskFilter(1), L0CompactionCompactionTaskFilter())
assert.Equal(t, 1, i)
})
}
5 changes: 5 additions & 0 deletions internal/datacoord/compaction_trigger_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,11 @@ type spyCompactionHandler struct {
meta *meta
}

// getCompactionTasksNum implements compactionPlanContext.
func (h *spyCompactionHandler) getCompactionTasksNum(filters ...compactionTaskFilter) int {
return 0
}

func (h *spyCompactionHandler) getCompactionTasksNumBySignalID(signalID int64) int {
return 0
}
Expand Down
Loading

0 comments on commit 9191fb5

Please sign in to comment.