Skip to content

Commit

Permalink
feat: pre-signed chunks (ethersphere#4719)
Browse files Browse the repository at this point in the history
  • Loading branch information
nugaon authored Jul 9, 2024
1 parent 98eb3e0 commit b406289
Show file tree
Hide file tree
Showing 9 changed files with 238 additions and 23 deletions.
10 changes: 7 additions & 3 deletions openapi/Swarm.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,11 @@ paths:
- Chunk
parameters:
- $ref: "SwarmCommon.yaml#/components/parameters/SwarmTagParameter"
- $ref: "SwarmCommon.yaml#/components/parameters/SwarmPostageBatchId"
- in: header
schema:
$ref: "SwarmCommon.yaml#/components/parameters/SwarmPostageBatchId"
required: false
- $ref: "SwarmCommon.yaml#/components/parameters/SwarmPostageStamp"
requestBody:
description: Chunk binary data that has to have at least 8 bytes.
content:
Expand Down Expand Up @@ -689,8 +693,8 @@ paths:
- in: header
schema:
$ref: "SwarmCommon.yaml#/components/parameters/SwarmPostageBatchId"
name: swarm-postage-batch-id
required: true
required: false
- $ref: "SwarmCommon.yaml#/components/parameters/SwarmPostageStamp"
requestBody:
required: true
description: The SOC binary data is composed of the span (8 bytes) and the at most 4KB payload.
Expand Down
14 changes: 14 additions & 0 deletions openapi/SwarmCommon.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -1116,6 +1116,20 @@ components:
schema:
$ref: "#/components/schemas/SwarmAddress"

SwarmPostageStamp:
in: header
name: swarm-postage-stamp
description: |
Postage stamp for the corresponding chunk in the request. \
It is required if Swarm-Postage-Batch-Id header is missing \
It consists of: \
- batch ID - 0:32 bytes \
- postage index (bucket and bucket index) - 32:40 bytes \
- timestamp - 40:48 bytes \
- signature - 48:113 bytes
schema:
$ref: "#/components/schemas/HexString"

SwarmDeferredUpload:
in: header
name: swarm-deferred-upload
Expand Down
34 changes: 33 additions & 1 deletion pkg/api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ const (
SwarmFeedIndexNextHeader = "Swarm-Feed-Index-Next"
SwarmCollectionHeader = "Swarm-Collection"
SwarmPostageBatchIdHeader = "Swarm-Postage-Batch-Id"
SwarmPostageStampHeader = "Swarm-Postage-Stamp"
SwarmDeferredUploadHeader = "Swarm-Deferred-Upload"
SwarmRedundancyLevelHeader = "Swarm-Redundancy-Level"
SwarmRedundancyStrategyHeader = "Swarm-Redundancy-Strategy"
Expand Down Expand Up @@ -115,6 +116,8 @@ var (
errBatchUnusable = errors.New("batch not usable")
errUnsupportedDevNodeOperation = errors.New("operation not supported in dev mode")
errOperationSupportedOnlyInFullMode = errors.New("operation is supported only in full mode")

batchIdOrStampSig = fmt.Sprintf("Either '%s' or '%s' header must be set in the request", SwarmPostageStampHeader, SwarmPostageBatchIdHeader)
)

// Storer interface provides the functionality required from the local storage
Expand Down Expand Up @@ -506,7 +509,7 @@ func (s *Service) corsHandler(h http.Handler) http.Handler {
allowedHeaders := []string{
"User-Agent", "Accept", "X-Requested-With", "Access-Control-Request-Headers", "Access-Control-Request-Method", "Accept-Ranges", "Content-Encoding",
AuthorizationHeader, AcceptEncodingHeader, ContentTypeHeader, ContentDispositionHeader, RangeHeader, OriginHeader,
SwarmTagHeader, SwarmPinHeader, SwarmEncryptHeader, SwarmIndexDocumentHeader, SwarmErrorDocumentHeader, SwarmCollectionHeader, SwarmPostageBatchIdHeader, SwarmDeferredUploadHeader, SwarmRedundancyLevelHeader, SwarmRedundancyStrategyHeader, SwarmRedundancyFallbackModeHeader, SwarmChunkRetrievalTimeoutHeader, SwarmLookAheadBufferSizeHeader, SwarmFeedIndexHeader, SwarmFeedIndexNextHeader, GasPriceHeader, GasLimitHeader, ImmutableHeader,
SwarmTagHeader, SwarmPinHeader, SwarmEncryptHeader, SwarmIndexDocumentHeader, SwarmErrorDocumentHeader, SwarmCollectionHeader, SwarmPostageBatchIdHeader, SwarmPostageStampHeader, SwarmDeferredUploadHeader, SwarmRedundancyLevelHeader, SwarmRedundancyStrategyHeader, SwarmRedundancyFallbackModeHeader, SwarmChunkRetrievalTimeoutHeader, SwarmLookAheadBufferSizeHeader, SwarmFeedIndexHeader, SwarmFeedIndexNextHeader, GasPriceHeader, GasLimitHeader, ImmutableHeader,
}
allowedHeadersStr := strings.Join(allowedHeaders, ", ")

Expand Down Expand Up @@ -725,6 +728,35 @@ func (s *Service) newStamperPutter(ctx context.Context, opts putterOptions) (sto
}, nil
}

func (s *Service) newStampedPutter(ctx context.Context, opts putterOptions, stamp *postage.Stamp) (storer.PutterSession, error) {
if !opts.Deferred && s.beeMode == DevMode {
return nil, errUnsupportedDevNodeOperation
}

storedBatch, err := s.batchStore.Get(stamp.BatchID())
if err != nil {
return nil, errInvalidPostageBatch
}

var session storer.PutterSession
if opts.Deferred || opts.Pin {
session, err = s.storer.Upload(ctx, opts.Pin, opts.TagID)
if err != nil {
return nil, fmt.Errorf("failed creating session: %w", err)
}
} else {
session = s.storer.DirectUpload()
}

stamper := postage.NewPresignedStamper(stamp, storedBatch.Owner)

return &putterSessionWrapper{
PutterSession: session,
stamper: stamper,
save: func() error { return nil },
}, nil
}

type pipelineFunc func(context.Context, io.Reader) (swarm.Address, error)

func requestPipelineFn(s storage.Putter, encrypt bool, rLevel redundancy.Level) pipelineFunc {
Expand Down
47 changes: 38 additions & 9 deletions pkg/api/chunk.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (

"github.com/ethersphere/bee/v2/pkg/cac"
"github.com/ethersphere/bee/v2/pkg/soc"
"github.com/ethersphere/bee/v2/pkg/storer"

"github.com/ethersphere/bee/v2/pkg/jsonhttp"
"github.com/ethersphere/bee/v2/pkg/postage"
Expand All @@ -30,7 +31,8 @@ func (s *Service) chunkUploadHandler(w http.ResponseWriter, r *http.Request) {
logger := s.logger.WithName("post_chunk").Build()

headers := struct {
BatchID []byte `map:"Swarm-Postage-Batch-Id" validate:"required"`
BatchID []byte `map:"Swarm-Postage-Batch-Id"`
StampSig []byte `map:"Swarm-Postage-Stamp"`
SwarmTag uint64 `map:"Swarm-Tag"`
}{}
if response := s.mapStructure(r.Header, &headers); response != nil {
Expand All @@ -57,20 +59,45 @@ func (s *Service) chunkUploadHandler(w http.ResponseWriter, r *http.Request) {
}
}

if len(headers.BatchID) == 0 && len(headers.StampSig) == 0 {
logger.Error(nil, batchIdOrStampSig)
jsonhttp.BadRequest(w, batchIdOrStampSig)
return
}

// Currently the localstore supports session based uploads. We don't want to
// create new session for single chunk uploads. So if the chunk upload is not
// part of a session already, then we directly push the chunk. This way we dont
// need to go through the UploadStore.
deferred := tag != 0

putter, err := s.newStamperPutter(r.Context(), putterOptions{
BatchID: headers.BatchID,
TagID: tag,
Deferred: deferred,
})
var putter storer.PutterSession
if len(headers.StampSig) != 0 {
stamp := postage.Stamp{}
if err := stamp.UnmarshalBinary(headers.StampSig); err != nil {
errorMsg := "Stamp deserialization failure"
logger.Debug(errorMsg, "error", err)
logger.Error(nil, errorMsg)
jsonhttp.BadRequest(w, errorMsg)
return
}

putter, err = s.newStampedPutter(r.Context(), putterOptions{
BatchID: stamp.BatchID(),
TagID: tag,
Deferred: deferred,
}, &stamp)
} else {
putter, err = s.newStamperPutter(r.Context(), putterOptions{
BatchID: headers.BatchID,
TagID: tag,
Deferred: deferred,
})
}
if err != nil {
logger.Debug("get putter failed", "error", err)
logger.Error(nil, "get putter failed")
errorMsg := "get putter failed"
logger.Debug(errorMsg, "error", err)
logger.Error(nil, errorMsg)
switch {
case errors.Is(err, errBatchUnusable) || errors.Is(err, postage.ErrNotUsable):
jsonhttp.UnprocessableEntity(w, "batch not usable yet or does not exist")
Expand All @@ -81,7 +108,7 @@ func (s *Service) chunkUploadHandler(w http.ResponseWriter, r *http.Request) {
case errors.Is(err, errUnsupportedDevNodeOperation):
jsonhttp.BadRequest(w, errUnsupportedDevNodeOperation)
default:
jsonhttp.BadRequest(w, nil)
jsonhttp.BadRequest(w, errorMsg)
}
return
}
Expand Down Expand Up @@ -146,6 +173,8 @@ func (s *Service) chunkUploadHandler(w http.ResponseWriter, r *http.Request) {
switch {
case errors.Is(err, postage.ErrBucketFull):
jsonhttp.PaymentRequired(ow, "batch is overissued")
case errors.Is(err, postage.ErrInvalidBatchSignature):
jsonhttp.BadRequest(ow, "stamp signature is invalid")
default:
jsonhttp.InternalServerError(ow, "chunk write error")
}
Expand Down
39 changes: 39 additions & 0 deletions pkg/api/chunk_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,16 @@ package api_test
import (
"bytes"
"context"
"encoding/hex"
"errors"
"fmt"
"net/http"
"testing"
"time"

"github.com/ethersphere/bee/v2/pkg/crypto"
"github.com/ethersphere/bee/v2/pkg/log"
"github.com/ethersphere/bee/v2/pkg/postage"
mockbatchstore "github.com/ethersphere/bee/v2/pkg/postage/batchstore/mock"
mockpost "github.com/ethersphere/bee/v2/pkg/postage/mock"
"github.com/ethersphere/bee/v2/pkg/spinlock"
Expand All @@ -22,6 +25,7 @@ import (
"github.com/ethersphere/bee/v2/pkg/api"
"github.com/ethersphere/bee/v2/pkg/jsonhttp"
"github.com/ethersphere/bee/v2/pkg/jsonhttp/jsonhttptest"
testingpostage "github.com/ethersphere/bee/v2/pkg/postage/testing"
testingc "github.com/ethersphere/bee/v2/pkg/storage/testing"
"github.com/ethersphere/bee/v2/pkg/swarm"
)
Expand Down Expand Up @@ -269,3 +273,38 @@ func TestChunkDirectUpload(t *testing.T) {
}),
)
}

// // TestPreSignedUpload tests that chunk can be uploaded with pre-signed postage stamp
func TestPreSignedUpload(t *testing.T) {
t.Parallel()

var (
chunksEndpoint = "/chunks"
chunk = testingc.GenerateTestRandomChunk()
storerMock = mockstorer.New()
batchStore = mockbatchstore.New()
client, _, _, _ = newTestServer(t, testServerOptions{
Storer: storerMock,
BatchStore: batchStore,
})
)

// generate random postage batch and stamp
key, _ := crypto.GenerateSecp256k1Key()
signer := crypto.NewDefaultSigner(key)
owner, _ := signer.EthereumAddress()
stamp := testingpostage.MustNewValidStamp(signer, chunk.Address())
_ = batchStore.Save(&postage.Batch{
ID: stamp.BatchID(),
Owner: owner.Bytes(),
})
stampBytes, _ := stamp.MarshalBinary()

// read off inserted chunk
go func() { <-storerMock.PusherFeed() }()

jsonhttptest.Request(t, client, http.MethodPost, chunksEndpoint, http.StatusCreated,
jsonhttptest.WithRequestHeader(api.SwarmPostageStampHeader, hex.EncodeToString(stampBytes)),
jsonhttptest.WithRequestBody(bytes.NewReader(chunk.Data())),
)
}
46 changes: 38 additions & 8 deletions pkg/api/soc.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"github.com/ethersphere/bee/v2/pkg/postage"
"github.com/ethersphere/bee/v2/pkg/soc"
storage "github.com/ethersphere/bee/v2/pkg/storage"
storer "github.com/ethersphere/bee/v2/pkg/storer"
"github.com/ethersphere/bee/v2/pkg/swarm"
"github.com/gorilla/mux"
)
Expand Down Expand Up @@ -43,17 +44,25 @@ func (s *Service) socUploadHandler(w http.ResponseWriter, r *http.Request) {
}

headers := struct {
BatchID []byte `map:"Swarm-Postage-Batch-Id" validate:"required"`
Pin bool `map:"Swarm-Pin"`
BatchID []byte `map:"Swarm-Postage-Batch-Id"`
StampSig []byte `map:"Swarm-Postage-Stamp"`
Pin bool `map:"Swarm-Pin"`
}{}
if response := s.mapStructure(r.Header, &headers); response != nil {
response("invalid header params", logger, w)
return
}

if len(headers.BatchID) == 0 && len(headers.StampSig) == 0 {
logger.Error(nil, batchIdOrStampSig)
jsonhttp.BadRequest(w, batchIdOrStampSig)
return
}

// if pinning header is set we do a deferred upload, else we do a direct upload
var (
tag uint64
err error
)
if headers.Pin {
session, err := s.storer.NewSession()
Expand All @@ -71,12 +80,33 @@ func (s *Service) socUploadHandler(w http.ResponseWriter, r *http.Request) {
tag = session.TagID
}

putter, err := s.newStamperPutter(r.Context(), putterOptions{
BatchID: headers.BatchID,
TagID: tag,
Pin: headers.Pin,
Deferred: headers.Pin,
})
deferred := tag != 0

var putter storer.PutterSession
if len(headers.StampSig) != 0 {
stamp := postage.Stamp{}
if err := stamp.UnmarshalBinary(headers.StampSig); err != nil {
errorMsg := "Stamp deserialization failure"
logger.Debug(errorMsg, "error", err)
logger.Error(nil, errorMsg)
jsonhttp.BadRequest(w, errorMsg)
return
}

putter, err = s.newStampedPutter(r.Context(), putterOptions{
BatchID: stamp.BatchID(),
TagID: tag,
Pin: headers.Pin,
Deferred: deferred,
}, &stamp)
} else {
putter, err = s.newStamperPutter(r.Context(), putterOptions{
BatchID: headers.BatchID,
TagID: tag,
Pin: headers.Pin,
Deferred: deferred,
})
}
if err != nil {
logger.Debug("get putter failed", "error", err)
logger.Error(nil, "get putter failed")
Expand Down
39 changes: 39 additions & 0 deletions pkg/api/soc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,12 @@ import (
"time"

"github.com/ethersphere/bee/v2/pkg/api"
"github.com/ethersphere/bee/v2/pkg/crypto"
"github.com/ethersphere/bee/v2/pkg/jsonhttp"
"github.com/ethersphere/bee/v2/pkg/jsonhttp/jsonhttptest"
"github.com/ethersphere/bee/v2/pkg/postage"
mockbatchstore "github.com/ethersphere/bee/v2/pkg/postage/batchstore/mock"
testingpostage "github.com/ethersphere/bee/v2/pkg/postage/testing"
testingsoc "github.com/ethersphere/bee/v2/pkg/soc/testing"
"github.com/ethersphere/bee/v2/pkg/spinlock"
mockstorer "github.com/ethersphere/bee/v2/pkg/storer/mock"
Expand Down Expand Up @@ -140,6 +144,41 @@ func TestSOC(t *testing.T) {
t.Fatal(err)
}
})

// TestPreSignedUpload tests that chunk can be uploaded with pre-signed postage stamp
t.Run("pre-signed upload", func(t *testing.T) {
t.Parallel()

var (
s = testingsoc.GenerateMockSOC(t, testData)
storerMock = mockstorer.New()
batchStore = mockbatchstore.New()
client, _, _, _ = newTestServer(t, testServerOptions{
Storer: storerMock,
BatchStore: batchStore,
})
)

// generate random postage batch and stamp
key, _ := crypto.GenerateSecp256k1Key()
signer := crypto.NewDefaultSigner(key)
owner, _ := signer.EthereumAddress()
stamp := testingpostage.MustNewValidStamp(signer, s.Address())
_ = batchStore.Save(&postage.Batch{
ID: stamp.BatchID(),
Owner: owner.Bytes(),
})
stampBytes, _ := stamp.MarshalBinary()

// read off inserted chunk
go func() { <-storerMock.PusherFeed() }()

jsonhttptest.Request(t, client, http.MethodPost, socResource(hex.EncodeToString(s.Owner), hex.EncodeToString(s.ID), hex.EncodeToString(s.Signature)), http.StatusCreated,
jsonhttptest.WithRequestHeader(api.SwarmPostageStampHeader, hex.EncodeToString(stampBytes)),
jsonhttptest.WithRequestBody(bytes.NewReader(s.WrappedChunk.Data())),
)
})

t.Run("err - batch empty", func(t *testing.T) {
s := testingsoc.GenerateMockSOC(t, testData)
hexbatch := hex.EncodeToString(batchEmpty)
Expand Down
Loading

0 comments on commit b406289

Please sign in to comment.