Skip to content

Commit

Permalink
Merge branch 'main' into update-pk
Browse files Browse the repository at this point in the history
  • Loading branch information
mergify[bot] authored Jan 16, 2025
2 parents ebd4f04 + 7605955 commit 8cf9840
Show file tree
Hide file tree
Showing 19 changed files with 1,345 additions and 719 deletions.
4 changes: 2 additions & 2 deletions pkg/datasync/consumer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -551,7 +551,7 @@ func TestConsumeEntries(t *testing.T) {
err := c.consumeEntries(ctx, []logservice.LogRecord{
{
Lsn: 10,
Data: make([]byte, 100),
Data: dataWithValidVersion(make([]byte, 100)),
},
}, false)
assert.NoError(t, err)
Expand All @@ -573,7 +573,7 @@ func TestConsumeEntries(t *testing.T) {
err := c.consumeEntries(ctx, []logservice.LogRecord{
{
Lsn: 10,
Data: make([]byte, 100),
Data: dataWithValidVersion(make([]byte, 100)),
},
}, true)
assert.Error(t, err)
Expand Down
27 changes: 7 additions & 20 deletions pkg/datasync/entry.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
package datasync

import (
"bytes"
"strings"

"github.com/matrixorigin/matrixone/pkg/container/types"
Expand Down Expand Up @@ -51,31 +50,19 @@ func getLocations(rec logservice.LogRecord, tag string) []string {
logutil.Errorf("invalid data size %d", len(data))
return nil
}
buffer := bytes.NewBuffer(data[dataHeaderSize:])
m := &logservicedriver.Meta{}
_, err := m.ReadFrom(buffer)
if err != nil {
logutil.Errorf("failed to read data from buffer: %v", err)
return nil
}
if m.GetType() != logservicedriver.TNormal {
return nil
}
var locations []string
for range m.GetAddr() {
e := entry.NewEmptyEntry()
_, err := e.ReadFrom(buffer)
if err != nil {
logutil.Errorf("failed to read data from buffer: %v", err)
return nil
}
ei := e.Entry.GetInfo().(*entry2.Info)
payload := e.Entry.GetPayload()
_, err := logservicedriver.DecodeLogEntry(data[headerSize+replicaIDSize:], func(en *entry.Entry) {
ei := en.Entry.GetInfo().(*entry2.Info)
payload := en.Entry.GetPayload()
if ei.Group == wal.GroupPrepare {
locations = append(locations, parseCommonFiles(payload, tag)...)
} else if ei.Group == store.GroupFiles {
locations = append(locations, parseMetaFiles(payload, tag)...)
}
})
if err != nil {
logutil.Errorf("decode logentry error %s", err.Error())
return nil
}
return locations
}
Expand Down
47 changes: 29 additions & 18 deletions pkg/datasync/entry_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,23 +40,24 @@ import (
)

type mockEntry struct {
entryType uint16
version uint16
meta logservicedriver.Meta
entries []*driverEntry.Entry
payloadSize uint64
logservicedriver.V1Meta
entryType uint16
version uint16
entries []*driverEntry.Entry
}

func newMockEntry() *mockEntry {
return &mockEntry{
entryType: objectio.IOET_ObjMeta,
version: logservicedriver.IOET_WALRecord_V1,
}
}

func (m *mockEntry) appendEntry(e *driverEntry.Entry) {
m.entries = append(m.entries, e)
m.meta.AddAddr(e.DSN, m.payloadSize)
m.payloadSize += uint64(e.GetSize())
sz := m.V1Meta.GetPayloadSize()
m.V1Meta.AddAddr(e.DSN, sz)
m.V1Meta.SetPayloadSize(sz + uint64(e.GetSize()))
}

func (m *mockEntry) WriteTo(w io.Writer) (int64, error) {
Expand All @@ -69,12 +70,12 @@ func (m *mockEntry) WriteTo(w io.Writer) (int64, error) {
return 0, err
}
n += 2
nn, err := m.meta.WriteTo(w)
nn, err := m.V1Meta.WriteTo(w)
if err != nil {
return 0, err
}
n += nn
if m.meta.GetType() == logservicedriver.TNormal {
if m.V1Meta.GetType() == logservicedriver.Cmd_Normal {
for _, e := range m.entries {
nn, err := e.WriteTo(w)
if err != nil {
Expand All @@ -101,13 +102,13 @@ func runWithBaseEnv(fn func(cat *catalog.Catalog, txn txnif.AsyncTxn) error) err
}

type parameter struct {
metaType logservicedriver.MetaType
cmdType logservicedriver.CmdType
groupType uint32
}

func newParameter() *parameter {
return &parameter{
metaType: logservicedriver.TNormal,
cmdType: logservicedriver.Cmd_Normal,
groupType: wal.GroupPrepare,
}
}
Expand All @@ -117,8 +118,8 @@ func (p *parameter) withGroupType(t uint32) *parameter {
return p
}

func (p *parameter) withMetaType(t logservicedriver.MetaType) *parameter {
p.metaType = t
func (p *parameter) withCmdType(t logservicedriver.CmdType) *parameter {
p.cmdType = t
return p
}

Expand Down Expand Up @@ -182,8 +183,10 @@ func generateCmdPayload(param parameter, loc objectio.Location) ([]byte, error)
drEntry.Entry.PrepareWrite()

me := newMockEntry()
me.meta.SetType(param.metaType)
me.appendEntry(drEntry)
me.V1Meta.SetType(param.cmdType)
if param.cmdType == logservicedriver.Cmd_Normal {
me.appendEntry(drEntry)
}
var buf bytes.Buffer
_, err = me.WriteTo(&buf)
if err != nil {
Expand Down Expand Up @@ -214,7 +217,7 @@ func generateCkpPayload(data []byte) ([]byte, error) {
drEntry.Entry.PrepareWrite()

me := newMockEntry()
me.meta.SetType(logservicedriver.TNormal)
me.V1Meta.SetType(logservicedriver.Cmd_Normal)
me.appendEntry(drEntry)
var buf bytes.Buffer
_, err = me.WriteTo(&buf)
Expand Down Expand Up @@ -244,6 +247,14 @@ func genRecord(payload []byte, upstreamLsn uint64) logservice.LogRecord {
return rec
}

func dataWithValidVersion(p []byte) []byte {
if len(p) >= 16 {
p[12] = 1
p[14] = 1
}
return p
}

func TestEntry_ParseLocation(t *testing.T) {
t.Run("invalid record type", func(t *testing.T) {
rec := logservice.LogRecord{
Expand All @@ -261,14 +272,14 @@ func TestEntry_ParseLocation(t *testing.T) {

t.Run("read buffer error", func(t *testing.T) {
rec := logservice.LogRecord{
Data: make([]byte, 22),
Data: dataWithValidVersion(make([]byte, 22)),
}
assert.Equal(t, []string(nil), getLocations(rec, ""))
})

t.Run("invalid meta type", func(t *testing.T) {
p, err := generateCmdPayload(
*newParameter().withMetaType(logservicedriver.TReplay),
*newParameter().withCmdType(logservicedriver.Cmd_SkipDSN),
genLocation(uuid.New(), 0, 0, 0, 0, 0),
)
assert.NoError(t, err)
Expand Down
2 changes: 2 additions & 0 deletions pkg/datasync/producer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,7 @@ func TestProducer_Start(t *testing.T) {
withProducerStarted(t, rt, c, true, true, func(ctx context.Context, p Producer) {
var wg sync.WaitGroup
rec := c.GetLogRecord(16)
dataWithValidVersion(rec.Data)
w := newWrappedData(rec.Data, 0, &wg)
wg.Add(1)
p.Enqueue(ctx, w)
Expand Down Expand Up @@ -202,6 +203,7 @@ func TestProducer_Start(t *testing.T) {
withProducerStarted(t, rt, c, true, false, func(ctx context.Context, p Producer) {
var wg sync.WaitGroup
rec := c.GetLogRecord(16)
dataWithValidVersion(rec.Data)
w := newWrappedData(rec.Data, 2, &wg)
wg.Add(1)
p.Enqueue(ctx, w)
Expand Down
163 changes: 0 additions & 163 deletions pkg/vm/engine/tae/logstore/driver/logservicedriver/appender.go

This file was deleted.

Loading

0 comments on commit 8cf9840

Please sign in to comment.