Skip to content

Commit

Permalink
Act history v3 (#34)
Browse files Browse the repository at this point in the history
* fix(mantaray): adjust walker to take into account the order & remove unused Walk fn

* test(mantaray): adjust to take into account the order & remove unnecessary test

* chore: export mantaray manifest

* feat: add history based on mantaray

* test: history

* chore: requested changes

---------

Co-authored-by: Levente Kiss <[email protected]>
  • Loading branch information
LevilkTheReal and Levente Kiss authored Apr 9, 2024
1 parent 082c228 commit 3f3fa48
Show file tree
Hide file tree
Showing 6 changed files with 284 additions and 372 deletions.
147 changes: 133 additions & 14 deletions pkg/dynamicaccess/history.go
Original file line number Diff line number Diff line change
@@ -1,36 +1,155 @@
package dynamicaccess

import (
"github.com/ethereum/go-ethereum/common"
"github.com/ethersphere/bee/v2/pkg/kvs"
"context"
"errors"
"fmt"
"math"
"strconv"
"time"

"github.com/ethersphere/bee/v2/pkg/file"
"github.com/ethersphere/bee/v2/pkg/manifest"
"github.com/ethersphere/bee/v2/pkg/manifest/mantaray"
"github.com/ethersphere/bee/v2/pkg/swarm"
)

type History interface {
Add(timestamp int64, kvs kvs.KeyValueStore) error
Get(timestamp int64) (kvs.KeyValueStore, error)
Lookup(at int64) (kvs.KeyValueStore, error)
Add(ctx context.Context, ref swarm.Address, timestamp *int64) error
Lookup(ctx context.Context, timestamp int64) (swarm.Address, error)
Store(ctx context.Context) (swarm.Address, error)
}

var _ History = (*history)(nil)

var ErrEndIteration = errors.New("end iteration")

type history struct {
history map[int64]*kvs.KeyValueStore
manifest *manifest.MantarayManifest
ls file.LoadSaver
}

func NewHistory(topic []byte, owner common.Address) *history {
return &history{history: make(map[int64]*kvs.KeyValueStore)}
func NewHistory(ls file.LoadSaver, ref *swarm.Address) (*history, error) {
var err error
var m manifest.Interface

if ref != nil {
m, err = manifest.NewDefaultManifestReference(*ref, ls)
} else {
m, err = manifest.NewDefaultManifest(ls, false)
}
if err != nil {
return nil, err
}

mm, ok := m.(*manifest.MantarayManifest)
if !ok {
return nil, fmt.Errorf("expected MantarayManifest, got %T", m)
}

return &history{manifest: mm, ls: ls}, nil
}

func (h *history) Add(timestamp int64, kvs kvs.KeyValueStore) error {
func (h *history) Add(ctx context.Context, ref swarm.Address, timestamp *int64) error {
// Do we need any extra meta/act?
meta := map[string]string{}
// add timestamps transformed so that the latests timestamp becomes the smallest key
var unixTime int64
if timestamp != nil {
unixTime = *timestamp
} else {
unixTime = time.Now().Unix()
}

return nil
key := strconv.FormatInt(math.MaxInt64-unixTime, 10)
return h.manifest.Add(ctx, key, manifest.NewEntry(ref, meta))
}

func (h *history) Lookup(at int64) (kvs.KeyValueStore, error) {
return nil, nil
// Lookup finds the entry for a path or returns error if not found
func (h *history) Lookup(ctx context.Context, timestamp int64) (swarm.Address, error) {
if timestamp <= 0 {
return swarm.ZeroAddress, errors.New("invalid timestamp")
}

reversedTimestamp := math.MaxInt64 - timestamp
node, err := h.LookupNode(ctx, reversedTimestamp)
if err != nil {
return swarm.ZeroAddress, err
}

if node != nil {
return swarm.NewAddress(node.Entry()), nil
}

return swarm.ZeroAddress, nil
}

func (h *history) Get(timestamp int64) (kvs.KeyValueStore, error) {
// get the feed
func (h *history) LookupNode(ctx context.Context, searchedTimestamp int64) (*mantaray.Node, error) {
// before node's timestamp is the closest one that is less than or equal to the searched timestamp
// for instance: 2030, 2020, 1994 -> search for 2021 -> before is 2020
var beforeNode *mantaray.Node
// after node's timestamp is after the latest
// for instance: 2030, 2020, 1994 -> search for 1980 -> after is 1994
var afterNode *mantaray.Node

walker := func(pathTimestamp []byte, currNode *mantaray.Node, err error) error {
if err != nil {
return err
}

if currNode.IsValueType() && len(currNode.Entry()) > 0 {
afterNode = currNode

match, err := isBeforeMatch(pathTimestamp, searchedTimestamp)
if match {
beforeNode = currNode
// return error to stop the walk, this is how WalkNode works...
return ErrEndIteration
}

return err
}

return nil
}

rootNode := h.manifest.Root()
err := rootNode.WalkNode(ctx, []byte{}, h.ls, walker)

if err != nil && !errors.Is(err, ErrEndIteration) {
return nil, fmt.Errorf("history lookup node error: %w", err)
}

if beforeNode != nil {
return beforeNode, nil
}
if afterNode != nil {
return afterNode, nil

}
return nil, nil
}

func (h *history) Store(ctx context.Context) (swarm.Address, error) {
return h.manifest.Store(ctx)
}

func bytesToInt64(b []byte) (int64, error) {
num, err := strconv.ParseInt(string(b), 10, 64)
if err != nil {
return -1, err
}

return num, nil
}

func isBeforeMatch(pathTimestamp []byte, searchedTimestamp int64) (bool, error) {
targetTimestamp, err := bytesToInt64(pathTimestamp)
if err != nil {
return false, err
}
if targetTimestamp == 0 {
return false, nil
}
return searchedTimestamp <= targetTimestamp, nil
}
153 changes: 108 additions & 45 deletions pkg/dynamicaccess/history_test.go
Original file line number Diff line number Diff line change
@@ -1,59 +1,122 @@
package dynamicaccess_test

import (
"encoding/hex"
"context"
"testing"
"time"

"github.com/ethersphere/bee/v2/pkg/dynamicaccess"
"github.com/ethersphere/bee/v2/pkg/dynamicaccess/mock"
kvsmock "github.com/ethersphere/bee/v2/pkg/kvs/mock"
"github.com/ethersphere/bee/v2/pkg/file/loadsave"
"github.com/ethersphere/bee/v2/pkg/file/pipeline"
"github.com/ethersphere/bee/v2/pkg/file/pipeline/builder"
"github.com/ethersphere/bee/v2/pkg/storage"
mockstorer "github.com/ethersphere/bee/v2/pkg/storer/mock"
"github.com/ethersphere/bee/v2/pkg/swarm"
"github.com/stretchr/testify/assert"
)

func TestHistoryLookup(t *testing.T) {
h := prepareTestHistory()
now := time.Now()

tests := []struct {
input int64
expected string
}{
{input: 0, expected: "value3"},
{input: now.Unix(), expected: "value3"},
{input: now.AddDate(0, -5, 0).Unix(), expected: "value3"},
{input: now.AddDate(0, -6, 0).Unix(), expected: "value3"},
{input: now.AddDate(-1, 0, 0).Unix(), expected: "value3"},
{input: now.AddDate(-1, -6, 0).Unix(), expected: "value2"},
{input: now.AddDate(-2, -0, 0).Unix(), expected: "value2"},
{input: now.AddDate(-2, -6, 0).Unix(), expected: "value1"},
{input: now.AddDate(-3, -0, 0).Unix(), expected: "value1"},
}
func TestHistoryAdd(t *testing.T) {
h, err := dynamicaccess.NewHistory(nil, nil)
assert.NoError(t, err)

for _, tt := range tests {
t.Run("", func(t *testing.T) {
sAt, _ := h.Lookup(tt.input)
output, _ := sAt.Get([]byte("key1"))
assert.Equal(t, output, hex.EncodeToString([]byte(tt.expected)))
})
}
addr := swarm.NewAddress([]byte("addr"))

ctx := context.Background()

err = h.Add(ctx, addr, nil)
assert.NoError(t, err)
}

func TestSingleNodeHistoryLookup(t *testing.T) {
storer := mockstorer.New()
ctx := context.Background()
ls := loadsave.New(storer.ChunkStore(), storer.Cache(), pipelineFactory(storer.Cache(), false))

h, err := dynamicaccess.NewHistory(ls, nil)
assert.NoError(t, err)

testActRef := swarm.RandAddress(t)
err = h.Add(ctx, testActRef, nil)
assert.NoError(t, err)

_, err = h.Store(ctx)
assert.NoError(t, err)

searchedTime := time.Now().Unix()
actRef, err := h.Lookup(ctx, searchedTime)
assert.NoError(t, err)
assert.True(t, actRef.Equal(testActRef))
}

func TestMultiNodeHistoryLookup(t *testing.T) {
storer := mockstorer.New()
ctx := context.Background()
ls := loadsave.New(storer.ChunkStore(), storer.Cache(), pipelineFactory(storer.Cache(), false))

h, _ := dynamicaccess.NewHistory(ls, nil)

testActRef1 := swarm.NewAddress([]byte("39a5ea87b141fe44aa609c3327ecd891"))
firstTime := time.Date(1994, time.April, 1, 0, 0, 0, 0, time.UTC).Unix()
h.Add(ctx, testActRef1, &firstTime)

testActRef2 := swarm.NewAddress([]byte("39a5ea87b141fe44aa609c3327ecd892"))
secondTime := time.Date(2000, time.April, 1, 0, 0, 0, 0, time.UTC).Unix()
h.Add(ctx, testActRef2, &secondTime)

testActRef3 := swarm.NewAddress([]byte("39a5ea87b141fe44aa609c3327ecd893"))
thirdTime := time.Date(2015, time.April, 1, 0, 0, 0, 0, time.UTC).Unix()
h.Add(ctx, testActRef3, &thirdTime)

testActRef4 := swarm.NewAddress([]byte("39a5ea87b141fe44aa609c3327ecd894"))
fourthTime := time.Date(2020, time.April, 1, 0, 0, 0, 0, time.UTC).Unix()
h.Add(ctx, testActRef4, &fourthTime)

testActRef5 := swarm.NewAddress([]byte("39a5ea87b141fe44aa609c3327ecd895"))
fifthTime := time.Date(2030, time.April, 1, 0, 0, 0, 0, time.UTC).Unix()
h.Add(ctx, testActRef5, &fifthTime)

// latest
searchedTime := time.Date(1980, time.April, 1, 0, 0, 0, 0, time.UTC).Unix()
actRef, err := h.Lookup(ctx, searchedTime)
assert.NoError(t, err)
assert.True(t, actRef.Equal(testActRef1))

// before first time
searchedTime = time.Date(2021, time.April, 1, 0, 0, 0, 0, time.UTC).Unix()
actRef, err = h.Lookup(ctx, searchedTime)
assert.NoError(t, err)
assert.True(t, actRef.Equal(testActRef4))

// same time
searchedTime = time.Date(2000, time.April, 1, 0, 0, 0, 0, time.UTC).Unix()
actRef, err = h.Lookup(ctx, searchedTime)
assert.NoError(t, err)
assert.True(t, actRef.Equal(testActRef2))

// after time
searchedTime = time.Date(2045, time.April, 1, 0, 0, 0, 0, time.UTC).Unix()
actRef, err = h.Lookup(ctx, searchedTime)
assert.NoError(t, err)
assert.True(t, actRef.Equal(testActRef5))
}

func prepareTestHistory() dynamicaccess.History {
var (
h = mock.NewHistory()
now = time.Now()
s1 = kvsmock.New()
s2 = kvsmock.New()
s3 = kvsmock.New()
)
s1.Put([]byte("key1"), []byte("value1"))
s2.Put([]byte("key1"), []byte("value2"))
s3.Put([]byte("key1"), []byte("value3"))

h.Insert(now.AddDate(-3, 0, 0).Unix(), s1)
h.Insert(now.AddDate(-2, 0, 0).Unix(), s2)
h.Insert(now.AddDate(-1, 0, 0).Unix(), s3)

return h
func TestHistoryStore(t *testing.T) {
storer := mockstorer.New()
ctx := context.Background()
ls := loadsave.New(storer.ChunkStore(), storer.Cache(), pipelineFactory(storer.Cache(), false))

h, _ := dynamicaccess.NewHistory(ls, nil)

testActRef1 := swarm.NewAddress([]byte("39a5ea87b141fe44aa609c3327ecd891"))
firstTime := time.Date(1994, time.April, 1, 0, 0, 0, 0, time.UTC).Unix()
h.Add(ctx, testActRef1, &firstTime)

_, err := h.Store(ctx)
assert.NoError(t, err)
}

func pipelineFactory(s storage.Putter, encrypt bool) func() pipeline.Interface {
return func() pipeline.Interface {
return builder.NewPipelineBuilder(context.Background(), s, encrypt, 0)
}
}
Loading

0 comments on commit 3f3fa48

Please sign in to comment.