Skip to content

Commit

Permalink
feat: remove integer batch IDs
Browse files Browse the repository at this point in the history
  • Loading branch information
anomit committed Oct 10, 2024
1 parent d70932c commit 5ebae4a
Show file tree
Hide file tree
Showing 6 changed files with 24 additions and 320 deletions.
4 changes: 1 addition & 3 deletions pkgs/helpers/clients/txClient.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ type SubmissionBatchSizeRequest struct {
type SubmitSubmissionBatchRequest struct {
DataMarketAddress string `json:"dataMarket"`
BatchCid string `json:"batchCid"`
BatchId string `json:"batchId"`
EpochId *big.Int `json:"epochId"`
ProjectIds []string `json:"projectIds"`
SnapshotCids []string `json:"snapshotCids"`
Expand Down Expand Up @@ -76,11 +75,10 @@ func SendSubmissionBatchSize(epochId *big.Int, size int) error {
return nil
}

func SubmitSubmissionBatch(dataMarketAddress string, batchCid string, batchId string, epochId *big.Int, projectIds []string, snapshotCids []string, finalizedCidsRootHash string) error {
func SubmitSubmissionBatch(dataMarketAddress string, batchCid string, epochId *big.Int, projectIds []string, snapshotCids []string, finalizedCidsRootHash string) error {
request := SubmitSubmissionBatchRequest{
DataMarketAddress: dataMarketAddress,
BatchCid: batchCid,
BatchId: batchId,
EpochId: epochId,
ProjectIds: projectIds,
SnapshotCids: snapshotCids,
Expand Down
1 change: 0 additions & 1 deletion pkgs/helpers/ipfs/ipfs.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ var IPFSCon *shell.Shell

// Batch represents your data structure
type Batch struct {
ID *big.Int `json:"id"`
SubmissionIds []string `json:"submissionIds"`
Submissions []string `json:"submissions"`
RootHash string `json:"roothash"`
Expand Down
42 changes: 16 additions & 26 deletions pkgs/helpers/merkle/merkle.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ import (
"fmt"
"math/big"
"sort"
"strconv"
"strings"
"sync"
"time"
Expand All @@ -22,8 +21,6 @@ import (
"google.golang.org/protobuf/encoding/protojson"
)

var BatchId int

func UpdateMerkleTree(sortedData []string, tree *imt.IncrementalMerkleTree) (*imt.IncrementalMerkleTree, error) {
for _, value := range sortedData {
err := tree.AddLeaf([]byte(value))
Expand Down Expand Up @@ -101,7 +98,7 @@ func finalizeBatches(batchedKeys [][]string, epochId *big.Int, tree *imt.Increme
localProjectValueFrequencies := make(map[string]map[string]int)

for _, key := range batch {
val, err := redis.Get(context.Background(), key) // submission data is uuid.submission_json
val, err := redis.Get(context.Background(), key) // submission data is uuid.submission_json

if err != nil {
clients.SendFailureNotification("finalizeBatches", fmt.Sprintf("Error fetching data from redis: %s", err.Error()), time.Now().String(), "High")
Expand All @@ -111,11 +108,11 @@ func finalizeBatches(batchedKeys [][]string, epochId *big.Int, tree *imt.Increme

log.Debugln(fmt.Sprintf("Processing key %s and value %s", key, val))

if len(val) == 0 {
clients.SendFailureNotification("finalizeBatches", fmt.Sprintf("Value has expired for key, not being counted in batch: %s", key), time.Now().String(), "High")
log.Errorln("Value has expired for key: ", key)
continue
}
if len(val) == 0 {
clients.SendFailureNotification("finalizeBatches", fmt.Sprintf("Value has expired for key, not being counted in batch: %s", key), time.Now().String(), "High")
log.Errorln("Value has expired for key: ", key)
continue
}

parts := strings.Split(key, ".")
if len(parts) != 3 {
Expand Down Expand Up @@ -173,7 +170,7 @@ func finalizeBatches(batchedKeys [][]string, epochId *big.Int, tree *imt.Increme

log.Debugln("PIDs and CIDs for epoch: ", epochId, pids, cids)

batchSubmission, err := BuildBatch(allIds, allData, BatchId, epochId, tree, pids, cids)
batchSubmission, err := BuildBatch(allIds, allData, epochId, tree, pids, cids)
if err != nil {
clients.SendFailureNotification("finalizeBatches", fmt.Sprintf("Batch building error: %s", err.Error()), time.Now().String(), "High")
log.Errorln("Error storing the batch: ", err.Error())
Expand All @@ -182,24 +179,18 @@ func finalizeBatches(batchedKeys [][]string, epochId *big.Int, tree *imt.Increme

mu.Lock()
batchSubmissions = append(batchSubmissions, batchSubmission)
BatchId++
mu.Unlock()

log.Debugf("CID: %s Batch: %d", batchSubmission.Cid, BatchId-1)
log.Debugf("CID: %s Epoch: %s", batchSubmission.Cid, epochId.String())
}(batch)
}

wg.Wait()
ids := []string{}
for _, bs := range batchSubmissions {
ids = append(ids, bs.Batch.ID.String())
}

// Set finalized batches in redis for epochId
logEntry := map[string]interface{}{
"epoch_id": epochId.String(),
"finalized_batches_count": len(batchSubmissions),
"finalized_batch_ids": ids,
"timestamp": time.Now().Unix(),
}

Expand Down Expand Up @@ -252,40 +243,39 @@ func arrangeKeysInBatches(keys []string) [][]string {
return batches
}

func BuildBatch(dataIds, data []string, id int, epochId *big.Int, tree *imt.IncrementalMerkleTree, pids, cids []string) (*ipfs.BatchSubmission, error) {
func BuildBatch(dataIds, data []string, epochId *big.Int, tree *imt.IncrementalMerkleTree, pids, cids []string) (*ipfs.BatchSubmission, error) {
log.Debugln("Building batch for epoch: ", epochId.String())
var err error
_, err = UpdateMerkleTree(dataIds, tree)
if err != nil {
return nil, err
}
roothash := GetRootHash(tree)
log.Debugln("RootHash for batch ", id, roothash)
batch := &ipfs.Batch{ID: big.NewInt(int64(id)), SubmissionIds: dataIds, Submissions: data, RootHash: roothash, Pids: pids, Cids: cids}
log.Debugln("RootHash for batch in epoch", epochId.String(), roothash)
batch := &ipfs.Batch{SubmissionIds: dataIds, Submissions: data, RootHash: roothash, Pids: pids, Cids: cids}
if cid, err := ipfs.StoreOnIPFS(ipfs.IPFSCon, batch); err != nil {
clients.SendFailureNotification("Build Batch", fmt.Sprintf("Error storing batch %d on IPFS: %s", id, err.Error()), time.Now().String(), "High")
log.Errorf("Error storing batch on IPFS: %d", id)
clients.SendFailureNotification("Build Batch", fmt.Sprintf("Error storing batch on IPFS: %s", err.Error()), time.Now().String(), "High")
log.Errorf("Error storing batch on IPFS: %s", err.Error())
return nil, err
} else {
log.Debugln("Stored cid for batch ", id, cid)
log.Debugln("Stored cid for batch ", cid)
// Set batch building success for epochId
logEntry := map[string]interface{}{
"epoch_id": epochId.String(),
"batch_id": id,
"batch_cid": cid,
"submissions_count": len(data),
"submissions": data,
"timestamp": time.Now().Unix(),
}

if err = redis.SetProcessLog(context.Background(), redis.TriggeredProcessLog(pkgs.BuildBatch, strconv.Itoa(id)), logEntry, 4*time.Hour); err != nil {
if err = redis.SetProcessLog(context.Background(), redis.TriggeredProcessLog(pkgs.BuildBatch, roothash), logEntry, 4*time.Hour); err != nil {
clients.SendFailureNotification("BuildBatch", err.Error(), time.Now().String(), "High")
log.Errorln("BuildBatch process log error: ", err.Error())
}

cidTree, _ := imt.New()
if _, err := UpdateMerkleTree(batch.Cids, cidTree); err != nil {
clients.SendFailureNotification("Build Batch", fmt.Sprintf("Error updating merkle tree for batch %d: %s", id, err.Error()), time.Now().String(), "High")
clients.SendFailureNotification("Build Batch", fmt.Sprintf("Error updating merkle tree for batch with roothash %s: %s", roothash, err.Error()), time.Now().String(), "High")
log.Errorln("Unable to get finalized root hash: ", err.Error())
return nil, err
}
Expand Down
7 changes: 0 additions & 7 deletions pkgs/helpers/prost/contract.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (
"collector/pkgs"
"collector/pkgs/contract"
"collector/pkgs/helpers/clients"
"collector/pkgs/helpers/merkle"
"collector/pkgs/helpers/redis"
"context"
"fmt"
Expand Down Expand Up @@ -73,12 +72,6 @@ func PopulateStateVars() {
CurrentEpochID.Set(big.NewInt(0))
}

if output, err := MustQuery[*big.Int](context.Background(), func() (*big.Int, error) {
return Instance.CurrentBatchId(&bind.CallOpts{}, config.SettingsObj.DataMarketContractAddress)
}); err == nil {
merkle.BatchId = int(output.Int64())
}

if output, err := MustQuery[*big.Int](context.Background(), func() (*big.Int, error) {
return Instance.EpochsInADay(&bind.CallOpts{}, config.SettingsObj.DataMarketContractAddress)
}); err == nil {
Expand Down
18 changes: 2 additions & 16 deletions pkgs/helpers/prost/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -215,11 +215,10 @@ func triggerCollectionFlow(epochID *big.Int, headers []string, day *big.Int) {
}
// now send the actual batches by looping through them
for _, batch := range batchSubmissions {
log.Debugln("Submitting batch with CID against batch ID and epoch ID", batch.Cid, batch.Batch.ID.String(), epochID.String())
log.Debugln("Submitting batch with CID against epoch ID", batch.Cid, epochID.String())
clients.SubmitSubmissionBatch(
config.SettingsObj.DataMarketAddress,
batch.Cid,
batch.Batch.ID.String(),
epochID,
batch.Batch.Pids,
batch.Batch.Cids,
Expand All @@ -229,20 +228,7 @@ func triggerCollectionFlow(epochID *big.Int, headers []string, day *big.Int) {
time.Sleep(time.Duration(config.SettingsObj.BlockTime*500) * time.Millisecond)
}
redis.ResetCollectorDBSubmissions(context.Background(), epochID, headers)
// ensure all transactions were included after waiting for new block
// log.Debugln("Verifying all batch submissions")
// txManager.EnsureBatchSubmissionSuccess(epochID)
// if count, err := redis.Get(context.Background(), redis.TransactionReceiptCountByEvent(epochID.String())); count != "" {
// log.Debugf("Transaction receipt fetches for epoch %s: %s", epochID.String(), count)
// n, _ := strconv.Atoi(count)
// if n > len(batchSubmissions)*3 { // giving upto 3 retries per txn
// clients.SendFailureNotification("EnsureBatchSubmissionSuccess", fmt.Sprintf("Too many transaction receipts fetched for epoch %s: %s", epochID.String(), count), time.Now().String(), "Medium")
// log.Errorf("Too many transaction receipts fetched for epoch %s: %s", epochID.String(), count)
// }
// } else if err != nil {
// clients.SendFailureNotification("Redis error", err.Error(), time.Now().String(), "High")
// log.Errorln("Redis error: ", err.Error())
// }

redis.Delete(context.Background(), redis.TransactionReceiptCountByEvent(epochID.String()))

// txManager.EndBatchSubmissionsForEpoch(epochID)
Expand Down
Loading

0 comments on commit 5ebae4a

Please sign in to comment.