Skip to content

Commit

Permalink
Bundle Resubmission (#124)
Browse files Browse the repository at this point in the history
* chore: implemented resubmit bundle logic

* chore: added unit tests for bundle resubmissions
  • Loading branch information
troykessler authored Mar 22, 2024
1 parent 5a0bab3 commit a1784d7
Show file tree
Hide file tree
Showing 3 changed files with 365 additions and 125 deletions.
7 changes: 7 additions & 0 deletions common/protocol/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,13 @@ export class Validator {
protected metricsPort!: number;
protected home!: string;

// tmp variables
protected lastUploadedBundle: {
storageId: string;
dataSize: number;
dataHash: string;
} | null = null;

// setups
protected setupLogger = setupLogger;
protected setupCacheProvider = setupCacheProvider;
Expand Down
272 changes: 147 additions & 125 deletions common/protocol/src/methods/upload/createBundleProposal.ts
Original file line number Diff line number Diff line change
Expand Up @@ -141,143 +141,165 @@ export async function createBundleProposal(this: Validator): Promise<void> {
`Successfully compressed bundle with Compression:${compression.name}`
);

// create tags for bundle to make it easier to find KYVE data
// on the storage provider itself
const tags: BundleTag[] = [
{
name: "Content-Type",
value: compression.mimeType,
},
{
name: "Application",
value: "KYVE",
},
{
name: "ChainId",
value: this.chainId,
},
{
name: "@kyvejs/protocol",
value: this.protocolVersion,
},
{
name: this.runtime.name,
value: this.runtime.version,
},
{
name: "Pool",
value: this.poolId.toString(),
},
{
name: "Uploader",
value: this.client[0].account.address,
},
{
name: "FromIndex",
value: toIndex.toString(),
},
{
name: "ToIndex",
value: (toIndex + bundleProposal.length).toString(),
},
{
name: "BundleSize",
value: bundleProposal.length.toString(),
},
{
name: "FromKey",
value: fromKey,
},
{
name: "ToKey",
value: toKey,
},
{
name: "BundleSummary",
value: bundleSummary,
},
];

// try to upload the bundle proposal to the storage provider
// if the upload fails the node should immediately skip the
// uploader role to prevent upload slashes
try {
// get current storage provider defined on pool
this.logger.debug(`this.storageProviderFactory()`);
const storageProvider = this.storageProviderFactory();

// if balance is less than the upload cost we skip the uploader
// role with a warning
const balance = await storageProvider.getBalance();
const cost = await storageProvider.getPrice(storageProviderData.length);

if (new BigNumber(balance).lt(cost)) {
this.logger.warn(
`Not enough balance on StorageProvider:${storageProvider.name}; balance = ${balance} required = ${cost}`
);
await this.skipUploaderRole(fromIndex);
return;
}
// hash the raw data which gets uploaded to the storage provider
// with sha256
const dataSize = storageProviderData.byteLength;

// hash the raw data which gets uploaded to the storage provider
// with sha256
const dataHash = sha256(storageProviderData);

let storageId = "";

// check if the same bundle was already uploaded the last round.
// if we see that the exact same bundle was already uploaded (hash
// comparison) we don't upload the data again and instead reuse the
// storage id
if (
this.lastUploadedBundle &&
this.lastUploadedBundle.dataHash === dataHash
) {
storageId = this.lastUploadedBundle.storageId;

// upload the bundle proposal to the storage provider
// and get a storage id. With that other participants in the
// network can retrieve the data again and validate it
this.logger.debug(
`this.storageProvider.saveBundle($STORAGE_PROVIDER_DATA,$TAGS)`
);

const { storageId, storageData } = await storageProvider.saveBundle(
storageProviderData,
tags
this.logger.info(
`Uploaded same bundle with data hash ${dataHash} already with storage id ${storageId}. Resubmitting ...`
);
} else {
// try to upload the bundle proposal to the storage provider
// if the upload fails the node should immediately skip the
// uploader role to prevent upload slashes
try {
// get current storage provider defined on pool
this.logger.debug(`this.storageProviderFactory()`);
const storageProvider = this.storageProviderFactory();

// if balance is less than the upload cost we skip the uploader
// role with a warning
const balance = await storageProvider.getBalance();
const cost = await storageProvider.getPrice(storageProviderData.length);

if (new BigNumber(balance).lt(cost)) {
this.logger.warn(
`Not enough balance on StorageProvider:${storageProvider.name}; balance = ${balance} required = ${cost}`
);
await this.skipUploaderRole(fromIndex);
return;
}

// throw error if storage provider returns an empty storage id
if (!storageId) {
throw new Error("Storage Provider returned empty storageId");
}
// upload the bundle proposal to the storage provider
// and get a storage id. With that other participants in the
// network can retrieve the data again and validate it
this.logger.debug(
`this.storageProvider.saveBundle($STORAGE_PROVIDER_DATA,$TAGS)`
);

// hash the raw data which gets uploaded to the storage provider
// with sha256
const dataSize = storageData.byteLength;
const uploadBundle = await storageProvider.saveBundle(
storageProviderData,
[
{
name: "Content-Type",
value: compression.mimeType,
},
{
name: "Application",
value: "KYVE",
},
{
name: "ChainId",
value: this.chainId,
},
{
name: "@kyvejs/protocol",
value: this.protocolVersion,
},
{
name: this.runtime.name,
value: this.runtime.version,
},
{
name: "Pool",
value: this.poolId.toString(),
},
{
name: "Uploader",
value: this.client[0].account.address,
},
{
name: "FromIndex",
value: toIndex.toString(),
},
{
name: "ToIndex",
value: (toIndex + bundleProposal.length).toString(),
},
{
name: "BundleSize",
value: bundleProposal.length.toString(),
},
{
name: "FromKey",
value: fromKey,
},
{
name: "ToKey",
value: toKey,
},
{
name: "BundleSummary",
value: bundleSummary,
},
]
);

// hash the raw data which gets uploaded to the storage provider
// with sha256
const dataHash = sha256(storageData);
this.m.storage_provider_save_successful.inc();

this.m.storage_provider_save_successful.inc();
this.logger.info(
`Successfully saved bundle on StorageProvider:${storageProvider.name}`
);

this.logger.info(
`Successfully saved bundle on StorageProvider:${storageProvider.name}`
);
storageId = uploadBundle.storageId;

// save uploaded bundle details for next round
this.lastUploadedBundle = {
storageId,
dataSize,
dataHash,
};
} catch (err) {
this.logger.info(
`Saving bundle proposal on StorageProvider was unsuccessful`
);
this.logger.debug(standardizeError(err));

// if the bundle was successfully uploaded to the storage provider
// the node can finally submit the actual bundle proposal to
// the network
const success = await this.submitBundleProposal(
storageId,
dataSize,
dataHash,
fromIndex,
bundleProposal.length,
fromKey,
toKey,
bundleSummary
);
this.m.storage_provider_save_failed.inc();

if (success) {
this.logger.info(`Successfully submitted BundleProposal:${storageId}`);
// if the bundle fails to the uploaded to the storage provider
// let the node skip the uploader role and continue
await this.skipUploaderRole(fromIndex);
}
} catch (err) {
this.logger.info(
`Saving bundle proposal on StorageProvider was unsuccessful`
);
this.logger.debug(standardizeError(err));
}

// throw error if storage provider returns an empty storage id
if (!storageId) {
throw new Error("Storage Provider returned empty storageId");
}

this.m.storage_provider_save_failed.inc();
// if the bundle was successfully uploaded to the storage provider
// the node can finally submit the actual bundle proposal to
// the network
const success = await this.submitBundleProposal(
storageId,
dataSize,
dataHash,
fromIndex,
bundleProposal.length,
fromKey,
toKey,
bundleSummary
);

// if the bundle fails to the uploaded to the storage provider
// let the node skip the uploader role and continue
await this.skipUploaderRole(fromIndex);
if (success) {
this.logger.info(`Successfully submitted BundleProposal:${storageId}`);
}
} catch (err) {
this.logger.error(
Expand Down
Loading

0 comments on commit a1784d7

Please sign in to comment.