Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove CID lists #217

Merged
merged 2 commits into from
Jun 8, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions channelmonitor/channelmonitor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -611,3 +611,7 @@ func (m *mockChannelState) Stages() *datatransfer.ChannelStages {
func (m *mockChannelState) ReceivedCids() []cid.Cid {
panic("implement me")
}

func (m *mockChannelState) ReceivedCidsLen() int {
panic("implement me")
}
17 changes: 13 additions & 4 deletions channels/channel_state.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ type channelState struct {
voucherResults []internal.EncodedVoucherResult
voucherResultDecoder DecoderByTypeFunc
voucherDecoder DecoderByTypeFunc
channelCIDsReader ChannelCIDsReader
receivedCids ReceivedCidsReader

// stages tracks the timeline of events related to a data transfer, for
// traceability purposes.
Expand Down Expand Up @@ -100,13 +100,22 @@ func (c channelState) Voucher() datatransfer.Voucher {

// ReceivedCids returns the cids received so far on this channel
func (c channelState) ReceivedCids() []cid.Cid {
receivedCids, err := c.channelCIDsReader(c.ChannelID())
receivedCids, err := c.receivedCids.ToArray(c.ChannelID())
if err != nil {
log.Error(err)
}
return receivedCids
}

// ReceivedCids returns the number of cids received so far on this channel
func (c channelState) ReceivedCidsLen() int {
len, err := c.receivedCids.Len(c.ChannelID())
if err != nil {
log.Error(err)
}
return len
}

// Sender returns the peer id for the node that is sending data
func (c channelState) Sender() peer.ID { return c.sender }

Expand Down Expand Up @@ -190,7 +199,7 @@ func (c channelState) Stages() *datatransfer.ChannelStages {
return c.stages
}

func fromInternalChannelState(c internal.ChannelState, voucherDecoder DecoderByTypeFunc, voucherResultDecoder DecoderByTypeFunc, channelCIDsReader ChannelCIDsReader) datatransfer.ChannelState {
func fromInternalChannelState(c internal.ChannelState, voucherDecoder DecoderByTypeFunc, voucherResultDecoder DecoderByTypeFunc, receivedCidsReader ReceivedCidsReader) datatransfer.ChannelState {
return channelState{
selfPeer: c.SelfPeer,
isPull: c.Initiator == c.Recipient,
Expand All @@ -209,7 +218,7 @@ func fromInternalChannelState(c internal.ChannelState, voucherDecoder DecoderByT
voucherResults: c.VoucherResults,
voucherResultDecoder: voucherResultDecoder,
voucherDecoder: voucherDecoder,
channelCIDsReader: channelCIDsReader,
receivedCids: receivedCidsReader,
stages: c.Stages,
}
}
Expand Down
69 changes: 49 additions & 20 deletions channels/channels.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,10 @@ import (

type DecoderByTypeFunc func(identifier datatransfer.TypeIdentifier) (encoding.Decoder, bool)

type ChannelCIDsReader func(chid datatransfer.ChannelID) ([]cid.Cid, error)
type ReceivedCidsReader interface {
ToArray(chid datatransfer.ChannelID) ([]cid.Cid, error)
Len(chid datatransfer.ChannelID) (int, error)
}

type Notifier func(datatransfer.Event, datatransfer.ChannelState)

Expand All @@ -55,7 +58,6 @@ type Channels struct {
voucherResultDecoder DecoderByTypeFunc
stateMachines fsm.Group
migrateStateMachines func(context.Context) error
cidLists cidlists.CIDLists
seenCIDs *cidsets.CIDSetManager
}

Expand All @@ -78,7 +80,6 @@ func New(ds datastore.Batching,

seenCIDsDS := namespace.Wrap(ds, datastore.NewKey("seencids"))
c := &Channels{
cidLists: cidLists,
seenCIDs: cidsets.NewCIDSetManager(seenCIDsDS),
notifier: notifier,
voucherDecoder: voucherDecoder,
Expand Down Expand Up @@ -123,7 +124,7 @@ func (c *Channels) dispatch(eventName fsm.EventName, channel fsm.StateType) {
Timestamp: time.Now(),
}

c.notifier(evt, fromInternalChannelState(realChannel, c.voucherDecoder, c.voucherResultDecoder, c.cidLists.ReadList))
c.notifier(evt, c.fromInternalChannelState(realChannel))

// When the channel has been cleaned up, remove the caches of seen cids
if evt.Code == datatransfer.CleanupComplete {
Expand Down Expand Up @@ -180,10 +181,6 @@ func (c *Channels) CreateNew(selfPeer peer.ID, tid datatransfer.TransferID, base
if err != nil {
return datatransfer.ChannelID{}, err
}
err = c.cidLists.CreateList(chid, nil)
if err != nil {
return datatransfer.ChannelID{}, err
}
return chid, c.stateMachines.Send(chid, datatransfer.Open)
}

Expand All @@ -197,7 +194,7 @@ func (c *Channels) InProgress() (map[datatransfer.ChannelID]datatransfer.Channel
channels := make(map[datatransfer.ChannelID]datatransfer.ChannelState, len(internalChannels))
for _, internalChannel := range internalChannels {
channels[datatransfer.ChannelID{ID: internalChannel.TransferID, Responder: internalChannel.Responder, Initiator: internalChannel.Initiator}] =
fromInternalChannelState(internalChannel, c.voucherDecoder, c.voucherResultDecoder, c.cidLists.ReadList)
c.fromInternalChannelState(internalChannel)
}
return channels, nil
}
Expand All @@ -210,7 +207,7 @@ func (c *Channels) GetByID(ctx context.Context, chid datatransfer.ChannelID) (da
if err != nil {
return nil, NewErrNotFound(chid)
}
return fromInternalChannelState(internalChannel, c.voucherDecoder, c.voucherResultDecoder, c.cidLists.ReadList), nil
return c.fromInternalChannelState(internalChannel), nil
}

// Accept marks a data transfer as accepted
Expand Down Expand Up @@ -239,11 +236,6 @@ func (c *Channels) DataQueued(chid datatransfer.ChannelID, k cid.Cid, delta uint

// Returns true if this is the first time the block has been received
func (c *Channels) DataReceived(chid datatransfer.ChannelID, k cid.Cid, delta uint64) (bool, error) {
err := c.cidLists.AppendList(chid, k)
if err != nil {
return false, err
}

return c.fireProgressEvent(chid, datatransfer.DataReceived, datatransfer.DataReceivedProgress, k, delta)
}

Expand Down Expand Up @@ -361,12 +353,12 @@ func (c *Channels) HasChannel(chid datatransfer.ChannelID) (bool, error) {
// blocks that have already been queued / sent / received
func (c *Channels) removeSeenCIDCaches(chid datatransfer.ChannelID) error {
progressStates := []datatransfer.EventCode{
datatransfer.DataQueuedProgress,
datatransfer.DataSentProgress,
datatransfer.DataReceivedProgress,
datatransfer.DataQueued,
datatransfer.DataSent,
datatransfer.DataReceived,
}
for _, evt := range progressStates {
sid := cidsets.SetID(chid.String() + "/" + datatransfer.Events[evt])
sid := seenCidsSetID(chid, evt)
err := c.seenCIDs.DeleteSet(sid)
if err != nil {
return err
Expand All @@ -388,7 +380,7 @@ func (c *Channels) fireProgressEvent(chid datatransfer.ChannelID, evt datatransf
}

// Check if the block has already been seen
sid := cidsets.SetID(chid.String() + "/" + datatransfer.Events[evt])
sid := seenCidsSetID(chid, evt)
seen, err := c.seenCIDs.InsertSetCID(sid, k)
if err != nil {
return false, err
Expand Down Expand Up @@ -424,3 +416,40 @@ func (c *Channels) checkChannelExists(chid datatransfer.ChannelID, code datatran
}
return nil
}

// Get the ID of the CID set for the given channel ID and event code.
// The CID set stores a unique list of queued / sent / received CIDs.
func seenCidsSetID(chid datatransfer.ChannelID, evt datatransfer.EventCode) cidsets.SetID {
return cidsets.SetID(chid.String() + "/" + datatransfer.Events[evt])
}

// Convert from the internally used channel state format to the externally exposed ChannelState
func (c *Channels) fromInternalChannelState(ch internal.ChannelState) datatransfer.ChannelState {
rcr := &receivedCidsReader{
seenCIDs: c.seenCIDs,
}
return fromInternalChannelState(ch, c.voucherDecoder, c.voucherResultDecoder, rcr)
}

// Implements the ReceivedCidsReader interface so that the internal channel
// state has access to the received CIDs.
// The interface is used (instead of passing these values directly)
// so the values can be loaded lazily. Reading all CIDs from the datastore
// is an expensive operation so we want to avoid doing it unless necessary.
// Note that the received CIDs get cleaned up when the channel completes, so
// these methods will return an empty array after that point.
type receivedCidsReader struct {
seenCIDs *cidsets.CIDSetManager
}

func (r *receivedCidsReader) ToArray(chid datatransfer.ChannelID) ([]cid.Cid, error) {
sid := seenCidsSetID(chid, datatransfer.DataReceived)
return r.seenCIDs.SetToArray(sid)
}

func (r *receivedCidsReader) Len(chid datatransfer.ChannelID) (int, error) {
sid := seenCidsSetID(chid, datatransfer.DataReceived)
return r.seenCIDs.SetLen(sid)
}

var _ ReceivedCidsReader = (*receivedCidsReader)(nil)
11 changes: 7 additions & 4 deletions channels/channels_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -179,23 +179,23 @@ func TestChannels(t *testing.T) {
state = checkEvent(ctx, t, received, datatransfer.DataReceived)
require.Equal(t, uint64(100), state.Received())
require.Equal(t, uint64(100), state.Sent())
require.Equal(t, []cid.Cid{cids[0], cids[1]}, state.ReceivedCids())
require.ElementsMatch(t, []cid.Cid{cids[0], cids[1]}, state.ReceivedCids())

isNew, err = channelList.DataSent(datatransfer.ChannelID{Initiator: peers[0], Responder: peers[1], ID: tid1}, cids[1], 25)
require.NoError(t, err)
require.False(t, isNew)
state = checkEvent(ctx, t, received, datatransfer.DataSent)
require.Equal(t, uint64(100), state.Received())
require.Equal(t, uint64(100), state.Sent())
require.Equal(t, []cid.Cid{cids[0], cids[1]}, state.ReceivedCids())
require.ElementsMatch(t, []cid.Cid{cids[0], cids[1]}, state.ReceivedCids())

isNew, err = channelList.DataReceived(datatransfer.ChannelID{Initiator: peers[0], Responder: peers[1], ID: tid1}, cids[0], 50)
require.NoError(t, err)
require.False(t, isNew)
state = checkEvent(ctx, t, received, datatransfer.DataReceived)
require.Equal(t, uint64(100), state.Received())
require.Equal(t, uint64(100), state.Sent())
require.Equal(t, []cid.Cid{cids[0], cids[1], cids[0]}, state.ReceivedCids())
require.ElementsMatch(t, []cid.Cid{cids[0], cids[1]}, state.ReceivedCids())
})

t.Run("pause/resume", func(t *testing.T) {
Expand Down Expand Up @@ -613,7 +613,10 @@ func TestMigrationsV1(t *testing.T) {
require.Equal(t, messages[i], channel.Message())
require.Equal(t, vouchers[i], channel.LastVoucher())
require.Equal(t, voucherResults[i], channel.LastVoucherResult())
require.Equal(t, receivedCids[i], channel.ReceivedCids())
// No longer relying on this migration to migrate CID lists as they
// have been deprecated since we moved to CID sets:
// https://github.com/filecoin-project/go-data-transfer/pull/217
//require.Equal(t, receivedCids[i], channel.ReceivedCids())
}
}

Expand Down
1 change: 1 addition & 0 deletions cidlists/cidlists.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
datatransfer "github.com/filecoin-project/go-data-transfer"
)

// Deprecated: CIDLists have now been replaced by CID sets (see cidsets directory).
// CIDLists maintains files that contain a list of CIDs received for different data transfers
type CIDLists interface {
CreateList(chid datatransfer.ChannelID, initalCids []cid.Cid) error
Expand Down
Loading