Skip to content

Commit

Permalink
add RPC call Metadata requesting sector ids
Browse files Browse the repository at this point in the history
Currently it only supports requesting entire list of sector ids.
In future it is planned to request a slice of sector ids by
specifying [begin, end) indices. The slice is accompanied with
a proof, that can be verified against the contract merkle root.

The implementation of proofs for slices is in review:
NebulousLabs/merkletree#17

That is why currently we support only the case begin=0 end=size.
In this case proof is empty. That is why the protocol in forward
compatible with the future plan.
  • Loading branch information
Boris Nagaev committed Sep 15, 2017
1 parent bb39873 commit a5083ff
Show file tree
Hide file tree
Showing 8 changed files with 189 additions and 1 deletion.
2 changes: 1 addition & 1 deletion doc/File Contract Negotiation.md
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ untrusted environment. Managing data on Sia happens through several protocols:
+ (planned for later) Storage Proof Request - the renter requests that the host
perform an out-of-band storage proof.

+ (planned for later) Metadata Request - the renter requests some metadata
+ Metadata Request - the renter requests some metadata
about the file contract from the host, namely the list of hashes that compose
the file. This list of hashes is provided along with a cryptographic proof
that the hashes are valid. The proof is only needed if only a subset of
Expand Down
1 change: 1 addition & 0 deletions modules/host.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,7 @@ type (
RenewCalls uint64 `json:"renewcalls"`
ReviseCalls uint64 `json:"revisecalls"`
SettingsCalls uint64 `json:"settingscalls"`
MetadataCalls uint64 `json:"metadatacalls"`
UnrecognizedCalls uint64 `json:"unrecognizedcalls"`
}

Expand Down
1 change: 1 addition & 0 deletions modules/host/host.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,7 @@ type Host struct {
atomicRenewCalls uint64
atomicReviseCalls uint64
atomicSettingsCalls uint64
atomicMetadataCalls uint64
atomicUnrecognizedCalls uint64

// Error management. There are a few different types of errors returned by
Expand Down
51 changes: 51 additions & 0 deletions modules/host/negotiatemetadata.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
package host

import (
"errors"
"net"

"github.com/NebulousLabs/Sia/encoding"
"github.com/NebulousLabs/Sia/modules"
)

// managedRPCMetadata accepts a request to get list of sector ids.
func (h *Host) managedRPCMetadata(conn net.Conn) error {
// Perform the file contract revision exchange, giving the renter the most
// recent file contract revision and getting the storage obligation that
// will be used to get sector ids.
_, so, err := h.managedRPCRecentRevision(conn)
if err != nil {
return extendErr("RPCRecentRevision failed: ", err)
}
// The storage obligation is received with a lock on it. Defer a call to
// unlock the storage obligation.
defer func() {
h.managedUnlockStorageObligation(so.id())
}()
// Receive boundaries of so.SectorRoots to return.
var begin, end uint64
err = encoding.ReadObject(conn, &begin, 8)
if err != nil {
return extendErr("unable to read 'begin': ", ErrorConnection(err.Error()))
}
err = encoding.ReadObject(conn, &end, 8)
if err != nil {
return extendErr("unable to read 'end': ", ErrorConnection(err.Error()))
}
if begin != 0 || end != uint64(len(so.SectorRoots)) {
// TODO: support slices with proofs. Blocked by
// https://github.com/NebulousLabs/merkletree/pull/17/
err = errors.New("Requesting a slice is not supported")
modules.WriteNegotiationRejection(conn, err)
return err
}
if err = modules.WriteNegotiationAcceptance(conn); err != nil {
return extendErr("failed to write [begin,end) acceptance: ", ErrorConnection(err.Error()))
}
// Write roots of all sectors.
err = encoding.WriteObject(conn, so.SectorRoots)
if err != nil {
return extendErr("cound not write sectors: ", ErrorConnection(err.Error()))
}
return nil
}
3 changes: 3 additions & 0 deletions modules/host/network.go
Original file line number Diff line number Diff line change
Expand Up @@ -281,6 +281,9 @@ func (h *Host) threadedHandleConn(conn net.Conn) {
case modules.RPCDownload:
atomic.AddUint64(&h.atomicDownloadCalls, 1)
err = extendErr("incoming RPCDownload failed: ", h.managedRPCDownload(conn))
case modules.RPCMetadata:
atomic.AddUint64(&h.atomicMetadataCalls, 1)
err = extendErr("incoming RPCMetadata failed: ", h.managedRPCMetadata(conn))
case modules.RPCRenewContract:
atomic.AddUint64(&h.atomicRenewCalls, 1)
err = extendErr("incoming RPCRenewContract failed: ", h.managedRPCRenewContract(conn))
Expand Down
10 changes: 10 additions & 0 deletions modules/negotiate.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,13 @@ const (
// the negotiation.
NegotiateDownloadTime = 600 * time.Second

// NegotiateMetadataTime establishes the minimum amount of time that
// the connection deadline is expected to be set to when a metadata
// is being requested from the host. The deadline is long
// enough that the connection should be successful even if both parties are
// running Tor.
NegotiateMetadataTime = 120 * time.Second

// NegotiateFileContractTime defines the amount of time that the renter and
// host have to negotiate a file contract. The time is set high enough that
// a node behind Tor has a reasonable chance at making the multiple
Expand Down Expand Up @@ -146,6 +153,9 @@ var (
// RPCDownload is the specifier for downloading a file from a host.
RPCDownload = types.Specifier{'D', 'o', 'w', 'n', 'l', 'o', 'a', 'd', 2}

// RPCMetadata is the specifier for getting the list of sector roots.
RPCMetadata = types.Specifier{'M', 'e', 't', 'a', 'd', 'a', 't', 'a'}

// RPCFormContract is the specifier for forming a contract with a host.
RPCFormContract = types.Specifier{'F', 'o', 'r', 'm', 'C', 'o', 'n', 't', 'r', 'a', 'c', 't', 2}

Expand Down
59 changes: 59 additions & 0 deletions modules/renter/contractor/host_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"errors"
"os"
"path/filepath"
"reflect"
"testing"
"time"

Expand Down Expand Up @@ -1024,3 +1025,61 @@ func TestIntegrationCachedRenew(t *testing.T) {
t.Fatal(err)
}
}

// TestIntegrationMetadata tests the Metadata RPC.
func TestIntegrationMetadata(t *testing.T) {
if testing.Short() {
t.SkipNow()
}
t.Parallel()
// create testing trio
h, c, _, err := newTestingTrio(t.Name())
if err != nil {
t.Fatal(err)
}
defer h.Close()
defer c.Close()

// get the host's entry from the db
hostEntry, ok := c.hdb.Host(h.PublicKey())
if !ok {
t.Fatal("no entry for host in db")
}

// form a contract with the host
contract, err := c.managedNewContract(hostEntry, types.SiacoinPrecision.Mul64(10), c.blockHeight+100)
if err != nil {
t.Fatal(err)
}
c.mu.Lock()
c.contracts[contract.ID] = contract
c.mu.Unlock()

// revise the contract
editor, err := c.Editor(contract.ID, nil)
if err != nil {
t.Fatal(err)
}
var want []crypto.Hash
for i := 0; i < 10; i++ {
data := fastrand.Bytes(int(modules.SectorSize))
root, err := editor.Upload(data)
if err != nil {
t.Fatal(err)
}
want = append(want, root)
}
err = editor.Close()
if err != nil {
t.Fatal(err)
}

// Get sector roots from the host.
got, err := proto.GetMetadata(hostEntry, contract, nil)
if err != nil {
t.Fatalf("RPCMetadata returned error: %v", err)
}
if !reflect.DeepEqual(got, want) {
t.Errorf("RPCMetadata returned wrong data")
}
}
63 changes: 63 additions & 0 deletions modules/renter/proto/metadata.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
package proto

import (
"errors"
"net"
"time"

"github.com/NebulousLabs/Sia/crypto"
"github.com/NebulousLabs/Sia/encoding"
"github.com/NebulousLabs/Sia/modules"
)

// GetMetadata downloads sector ids from the host.
func GetMetadata(host modules.HostDBEntry, contract modules.RenterContract, cancel <-chan struct{}) ([]crypto.Hash, error) {
conn, err := (&net.Dialer{
Cancel: cancel,
Timeout: 15 * time.Second,
}).Dial("tcp", string(host.NetAddress))
if err != nil {
return nil, err
}
defer conn.Close()
// allot 2 minutes for RPC request + revision exchange
extendDeadline(conn, modules.NegotiateMetadataTime)
if err := encoding.WriteObject(conn, modules.RPCMetadata); err != nil {
return nil, errors.New("couldn't initiate RPC: " + err.Error())
}
lastRevision, err := getRecentRevision(conn, contract, host.Version)
if err != nil {
return nil, err
}
numSectors := lastRevision.NewFileSize / modules.SectorSize
begin, end := uint64(0), uint64(numSectors)
if err := encoding.WriteObject(conn, begin); err != nil {
return nil, errors.New("unable to write 'begin': " + err.Error())
}
if err := encoding.WriteObject(conn, end); err != nil {
return nil, errors.New("unable to write 'end': " + err.Error())
}
// read acceptance
if err := modules.ReadNegotiationAcceptance(conn); err != nil {
return nil, errors.New("host did not accept [begin,end): " + err.Error())
}
var ids []crypto.Hash
if err := encoding.ReadObject(conn, &ids, numSectors*crypto.HashSize+8); err != nil {
return nil, errors.New("unable to read 'ids': " + err.Error())
}
// Calculate Merkle root from the ids, compare with the real root.
log2SectorSize := uint64(0)
for 1<<log2SectorSize < (modules.SectorSize / crypto.SegmentSize) {
log2SectorSize++
}
tree := crypto.NewCachedTree(log2SectorSize)
for _, sectorRoot := range ids {
tree.Push(sectorRoot)
}
got := tree.Root()
want := lastRevision.NewFileMerkleRoot
if got != want {
return nil, errors.New("sector ids do not match Merkle root")
}
return ids, nil
}

0 comments on commit a5083ff

Please sign in to comment.