Skip to content

Commit

Permalink
modified put and get to be synchronus and blocking
Browse files Browse the repository at this point in the history
  • Loading branch information
afkbyte committed Jun 10, 2024
1 parent acbfdec commit 98231af
Show file tree
Hide file tree
Showing 5 changed files with 49 additions and 20 deletions.
5 changes: 3 additions & 2 deletions arbnode/batch_poster.go
Original file line number Diff line number Diff line change
Expand Up @@ -1090,7 +1090,7 @@ func (b *BatchPoster) maybePostSequencerBatch(ctx context.Context) (bool, error)
}

var useEigenDA bool
if config.PostEigenDA && b.eigenDAWriter != nil {
if b.eigenDAWriter != nil {
useEigenDA = true
}

Expand Down Expand Up @@ -1276,7 +1276,7 @@ func (b *BatchPoster) maybePostSequencerBatch(ctx context.Context) (bool, error)
}

var blobInfo *eigenda.EigenDABlobInfo
if b.daWriter == nil && b.eigenDAWriter != nil && config.PostEigenDA {
if b.daWriter == nil && b.eigenDAWriter != nil {
log.Info("Start to write data to eigenda: ", "data", hex.EncodeToString(sequencerMsg))
blobInfo, err = b.eigenDAWriter.Store(ctx, sequencerMsg)
if err != nil {
Expand Down Expand Up @@ -1336,6 +1336,7 @@ func (b *BatchPoster) maybePostSequencerBatch(ctx context.Context) (bool, error)
}
log.Info(
"BatchPoster: batch sent",
"eigenDA", b.building.useEigenDA,
"sequenceNumber", batchPosition.NextSeqNum,
"from", batchPosition.MessageCount,
"to", b.building.msgCount,
Expand Down
3 changes: 3 additions & 0 deletions arbnode/inbox_tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -616,6 +616,9 @@ func (t *InboxTracker) AddSequencerBatches(ctx context.Context, client arbutil.L
if t.blobReader != nil {
daProviders = append(daProviders, arbstate.NewDAProviderBlobReader(t.blobReader))
}
if t.eigenDA != nil {
daProviders = append(daProviders, arbstate.NewDAProviderEigenDA(t.eigenDA))
}
multiplexer := arbstate.NewInboxMultiplexer(backend, prevbatchmeta.DelayedMessageCount, daProviders, arbstate.KeysetValidate)
batchMessageCounts := make(map[uint64]arbutil.MessageIndex)

Expand Down
2 changes: 2 additions & 0 deletions arbnode/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -546,6 +546,8 @@ func createNodeImpl(
eigenDAWriter = eigenDAService
}

log.Info("EigenDA reader", "reader", eigenDAReader)

inboxTracker, err := NewInboxTracker(arbDb, txStreamer, daReader, blobReader, eigenDAReader)
if err != nil {
return nil, err
Expand Down
7 changes: 5 additions & 2 deletions eigenda/eigenda.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ func NewEigenDA(proxyServerRpc string) (*EigenDA, error) {

// TODO: There should probably be two types of query blob as the
func (e *EigenDA) QueryBlob(ctx context.Context, cert *EigenDABlobInfo, domainFilter string) ([]byte, error) {
data, err := e.client.Get(cert, domainFilter)
data, err := e.client.Get(ctx, cert, domainFilter)
if err != nil {
return nil, err
}
Expand All @@ -132,12 +132,15 @@ func (e *EigenDA) QueryBlob(ctx context.Context, cert *EigenDABlobInfo, domainFi
}

func (e *EigenDA) Store(ctx context.Context, data []byte) (*EigenDABlobInfo, error) {
log.Info("Storing blob")
var blobInfo *EigenDABlobInfo
commitment, err := e.client.Put(data)
commitment, err := e.client.Put(ctx, data)
if err != nil {
return nil, err
}

log.Info("Stored blob", "commitment", hex.EncodeToString(commitment.GetBlobHeader().GetCommitment().GetX()), "y", hex.EncodeToString(commitment.GetBlobHeader().GetCommitment().GetY()))

blobInfo.loadBlobInfo(commitment)

return blobInfo, nil
Expand Down
52 changes: 36 additions & 16 deletions eigenda/eigenda_proxy_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,15 @@ package eigenda

import (
"bytes"
"context"
"encoding/hex"
"fmt"
"io/ioutil"
"net/http"
"net/url"

"github.com/Layr-Labs/eigenda/api/grpc/disperser"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/rlp"
)

Expand All @@ -20,13 +23,27 @@ func NewEigenDAProxyClient(RPCUrl string) *EigenDAProxyClient {
}

// TODO: proper error types
func (c *EigenDAProxyClient) Put(data []byte) (*disperser.BlobInfo, error) {
var blobInfo *disperser.BlobInfo
func (c *EigenDAProxyClient) Put(ctx context.Context, data []byte) (*disperser.BlobInfo, error) {
log.Info("Putting blob EIGENDAPROXYCLIENT", "data", hex.EncodeToString(data))

url := fmt.Sprintf("%s/put", c.RPCUrl)
resp, err := http.Post(url, "text/plain", bytes.NewBuffer([]byte(data)))
body := bytes.NewReader(data)

log.Info("Creating HTTP POST request", "body", body)

url := fmt.Sprintf("%s/put/", c.RPCUrl)
log.Info("Creating HTTP POST request", "url", url)
req, err := http.NewRequestWithContext(ctx, http.MethodPost, url, body)
if err != nil {
return nil, fmt.Errorf("failed to create HTTP request: %w", err)
}
req.Header.Set("Content-Type", "application/octet-stream")

log.Info("Sending HTTP POST request", "url", url)
log.Info("Sending HTTP POST request", "body", body)
log.Info("Sending HTTP POST request", "req", req)
resp, err := http.DefaultClient.Do(req)
if err != nil {
return nil, fmt.Errorf("failed to store data: %w", err)
return nil, err
}
defer resp.Body.Close()

Expand All @@ -39,34 +56,37 @@ func (c *EigenDAProxyClient) Put(data []byte) (*disperser.BlobInfo, error) {
return nil, fmt.Errorf("failed to read response: %w", err)
}

var blobInfo disperser.BlobInfo
cert := commitment[3:]
if err != nil {
return nil, fmt.Errorf("failed to decode commitment: %w", err)
}

err = rlp.DecodeBytes(cert, blobInfo)
err = rlp.DecodeBytes(cert, &blobInfo)
if err != nil {
return nil, fmt.Errorf("failed to decode blob info: %w", err)
}

return blobInfo, nil
return &blobInfo, nil
}

func (c *EigenDAProxyClient) Get(blobInfo *EigenDABlobInfo, domainFilter string) ([]byte, error) {
func (c *EigenDAProxyClient) Get(ctx context.Context, blobInfo *EigenDABlobInfo, domainFilter string) ([]byte, error) {
commitment, err := rlp.EncodeToBytes(blobInfo)
if err != nil {
return nil, fmt.Errorf("failed to encode blob info: %w", err)
}

rpcurl := fmt.Sprintf("%s/get/%s", c.RPCUrl, commitment)

// if not nil put in the domain filter as a part of the query url
if domainFilter != "" {
// if not nil or binary (default) put in the domain filter as a part of the query url
if domainFilter != "" && domainFilter != "binary" {
rpcurl = fmt.Sprintf("%s?domain=%s", rpcurl, url.QueryEscape(domainFilter))
}
resp, err := http.Get(rpcurl)

req, err := http.NewRequestWithContext(ctx, http.MethodGet, rpcurl, nil)
if err != nil {
return nil, fmt.Errorf("failed to create HTTP request: %w", err)
}

resp, err := http.DefaultClient.Do(req)
if err != nil {
return nil, fmt.Errorf("failed to retrieve data: %w", err)
return nil, err
}
defer resp.Body.Close()

Expand Down

0 comments on commit 98231af

Please sign in to comment.