Skip to content

Commit

Permalink
Merge pull request #95 from gfanton/fix/ps-payload-handler
Browse files Browse the repository at this point in the history
  • Loading branch information
gfanton authored Mar 28, 2022
2 parents 748476e + ac252ea commit a15cec2
Show file tree
Hide file tree
Showing 11 changed files with 224 additions and 390 deletions.
18 changes: 18 additions & 0 deletions baseorbitdb/events.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
package baseorbitdb

import (
"berty.tech/go-orbit-db/iface"
"github.com/libp2p/go-libp2p-core/peer"
)

type EventExchangeHeads struct {
Peer peer.ID
Message *iface.MessageExchangeHeads
}

func NewEventExchangeHeads(p peer.ID, msg *iface.MessageExchangeHeads) EventExchangeHeads {
return EventExchangeHeads{
Peer: p,
Message: msg,
}
}
65 changes: 24 additions & 41 deletions baseorbitdb/events_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,47 +2,23 @@ package baseorbitdb

import (
"context"
"encoding/json"
"fmt"

ipfslog "berty.tech/go-ipfs-log"
"berty.tech/go-ipfs-log/enc"
"berty.tech/go-ipfs-log/entry"
"berty.tech/go-orbit-db/iface"
"berty.tech/go-orbit-db/stores"
"go.uber.org/zap"
)

func (o *orbitDB) handleEventPubSubPayload(ctx context.Context, e *iface.EventPubSubPayload, sharedKey enc.SharedKey) error {
heads := &exchangedHeads{}
payload := e.Payload

if sharedKey != nil {
var err error

payload, err = sharedKey.Open(payload)
if err != nil {
return fmt.Errorf("unable to decrypt payload: %w", err)
}
}

err := json.Unmarshal(payload, &heads)
if err != nil {
o.logger.Error("unable to unmarshal heads", zap.Error(err))
}

o.logger.Debug(fmt.Sprintf("%s: Received %d heads for '%s':", o.PeerID().String(), len(heads.Heads), heads.Address))
store, ok := o.getStore(heads.Address)

if !ok {
return fmt.Errorf("heads from unknown store, skipping")
func (o *orbitDB) handleEventExchangeHeads(ctx context.Context, e *iface.MessageExchangeHeads, store iface.Store) error {
untypedHeads := make([]ipfslog.Entry, len(e.Heads))
for i, h := range e.Heads {
untypedHeads[i] = h
}

if len(heads.Heads) > 0 {
untypedHeads := make([]ipfslog.Entry, len(heads.Heads))
for i := range heads.Heads {
untypedHeads[i] = heads.Heads[i]
}
o.logger.Debug(fmt.Sprintf("%s: Received %d heads for '%s':", o.PeerID().String(), len(untypedHeads), e.Address))

if len(untypedHeads) > 0 {
if err := store.Sync(ctx, untypedHeads); err != nil {
return fmt.Errorf("unable to sync heads: %w", err)
}
Expand All @@ -51,7 +27,7 @@ func (o *orbitDB) handleEventPubSubPayload(ctx context.Context, e *iface.EventPu
return nil
}

func (o *orbitDB) handleEventWrite(ctx context.Context, e *stores.EventWrite, store Store, topic iface.PubSubTopic) error {
func (o *orbitDB) handleEventWrite(ctx context.Context, e *stores.EventWrite, topic iface.PubSubTopic, store Store) error {
o.logger.Debug("received stores.write event")
if len(e.Heads) == 0 {
return fmt.Errorf("'heads' are not defined")
Expand All @@ -64,19 +40,26 @@ func (o *orbitDB) handleEventWrite(ctx context.Context, e *stores.EventWrite, st
}

if len(peer) > 0 {
headsBytes, err := json.Marshal(e.Heads)
if err != nil {
return fmt.Errorf("unable to serialize heads %w", err)
entries := make([]*entry.Entry, len(e.Heads))
for i, head := range e.Heads {
if entry, ok := head.(*entry.Entry); ok {
entries[i] = entry
} else {
return fmt.Errorf("unable to unwrap entry")
}
}

if key := store.SharedKey(); key != nil {
headsBytes, err = key.Seal(headsBytes)
if err != nil {
return fmt.Errorf("unable to encrypt heads %w", err)
}
msg := &iface.MessageExchangeHeads{
Address: store.Address().String(),
Heads: entries,
}

payload, err := o.messageMarshaler.Marshal(msg)
if err != nil {
return fmt.Errorf("unable to serialize heads %w", err)
}

err = topic.Publish(ctx, headsBytes)
err = topic.Publish(ctx, payload)
if err != nil {
return fmt.Errorf("unable to publish message on pubsub %w", err)
}
Expand Down
Loading

0 comments on commit a15cec2

Please sign in to comment.