Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Default power balance snapshot #1732

Open
wants to merge 1 commit into
base: staging
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,18 @@ import { convertTimeStampToSeconds } from '../../utils/utils';
export class GivPowerBalanceAggregatorAdapterMock
implements IGivPowerBalanceAggregator
{
private excludedAddresses: Set<string> = new Set();

addExcludedAddresses(addresses: string[]): void {
addresses.forEach(address => {
this.excludedAddresses.add(address);
});
}

clearExcludedAddresses(): void {
this.excludedAddresses.clear();
}

async getAddressesBalance(
params: BalancesAtTimestampInputParams,
): Promise<BalanceResponse[]> {
Expand All @@ -21,14 +33,16 @@ export class GivPowerBalanceAggregatorAdapterMock
'addresses length can not be greater than NUMBER_OF_BALANCE_AGGREGATOR_BATCH that is defined in .env',
);
}
return _.uniq(params.addresses).map(address => {
return {
address,
balance: 13, // Just an example balance
updatedAt: new Date('2023-08-10T16:18:02.655Z'),
networks: [100],
};
});
return _.uniq(params.addresses)
.filter(address => !this.excludedAddresses.has(address))
.map(address => {
return {
address,
balance: 13, // Just an example balance
updatedAt: new Date('2023-08-10T16:18:02.655Z'),
networks: [100],
};
});
}

async getLatestBalances(
Expand Down
101 changes: 99 additions & 2 deletions src/services/cronJobs/fillSnapshotBalances.test.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import { assert } from 'chai';
import sinon from 'sinon';
import { In } from 'typeorm';
import {
addFillPowerSnapshotBalanceJobsToQueue,
processFillPowerSnapshotJobs,
Expand All @@ -16,7 +17,10 @@ import { PowerSnapshot } from '../../entities/powerSnapshot';
import { PowerBoostingSnapshot } from '../../entities/powerBoostingSnapshot';
import { AppDataSource } from '../../orm';
import { PowerBalanceSnapshot } from '../../entities/powerBalanceSnapshot';
import { getPowerBalanceAggregatorAdapter } from '../../adapters/adaptersFactory';
import {
getPowerBalanceAggregatorAdapter,
mockPowerBalanceAggregator,
} from '../../adapters/adaptersFactory';
import { convertTimeStampToSeconds } from '../../utils/utils';

describe(
Expand All @@ -33,6 +37,7 @@ async function processFillPowerSnapshotJobsTestCases() {
);
await PowerBalanceSnapshot.clear();
await PowerBoostingSnapshot.clear();
mockPowerBalanceAggregator.clearExcludedAddresses();
});

afterEach(() => {
Expand Down Expand Up @@ -295,6 +300,98 @@ async function processFillPowerSnapshotJobsTestCases() {
await sleep(4_000);

assert.equal((await getPowerBoostingSnapshotWithoutBalance()).length, 0);
// assert.isEmpty(await getPowerBoostingSnapshotWithoutBalance());
});

it('should fill zero snapshot balance for users with no balance on balance aggreagator', async () => {
const powerSnapshotTime = new Date().getTime() - 10 * 3600 * 1000; // 10 hour earlier
const excludedAddresses: string[] = [];
const excludedUserIds: number[] = [];

const ITERATIONS = 10;
for (let i = 0; i < ITERATIONS; i++) {
const user = await saveUserDirectlyToDb(generateRandomEtheriumAddress());
const user2 = await saveUserDirectlyToDb(generateRandomEtheriumAddress());

excludedAddresses.push(user2.walletAddress as string);
excludedUserIds.push(user2.id);

const project = await saveProjectDirectlyToDb(createProjectData());
const powerSnapshots = PowerSnapshot.create([
{
time: new Date(powerSnapshotTime + (i + 1) * 1000),
},
{
time: new Date(powerSnapshotTime + 500 + (i + 1) * 1000),
},
]);
await PowerSnapshot.save(powerSnapshots);

const powerBoostingSnapshots = PowerBoostingSnapshot.create([
{
userId: user.id,
projectId: project.id,
percentage: 10,
powerSnapshot: powerSnapshots[0],
},
{
userId: user2.id,
projectId: project.id,
percentage: 20,
powerSnapshot: powerSnapshots[0],
},
{
userId: user.id,
projectId: project.id,
percentage: 30,
powerSnapshot: powerSnapshots[1],
},
{
userId: user2.id,
projectId: project.id,
percentage: 40,
powerSnapshot: powerSnapshots[1],
},
]);
await PowerBoostingSnapshot.save(powerBoostingSnapshots);

const powerBalances = PowerBalanceSnapshot.create([
{
userId: user.id,
powerSnapshot: powerSnapshots[0],
},
{
userId: user2.id,
powerSnapshot: powerSnapshots[0],
},
{
userId: user.id,
powerSnapshot: powerSnapshots[1],
},
{
userId: user2.id,
powerSnapshot: powerSnapshots[1],
},
]);
await PowerBalanceSnapshot.save(powerBalances);
}

mockPowerBalanceAggregator.addExcludedAddresses(excludedAddresses);

assert.isNotEmpty(await getPowerBoostingSnapshotWithoutBalance());

await addFillPowerSnapshotBalanceJobsToQueue();

await sleep(4_000);

assert.equal((await getPowerBoostingSnapshotWithoutBalance()).length, 0);

const excludedUsersPowerBalances = await PowerBalanceSnapshot.find({
where: {
userId: In(excludedUserIds),
},
});

assert.lengthOf(excludedUsersPowerBalances, ITERATIONS * 2);
assert.isTrue(excludedUsersPowerBalances.every(pb => pb.balance === 0));
});
}
37 changes: 27 additions & 10 deletions src/services/cronJobs/fillSnapshotBalances.ts
Original file line number Diff line number Diff line change
Expand Up @@ -120,27 +120,44 @@ export function processFillPowerSnapshotJobs() {
// Process in batches
for (let i = 0; i < Math.ceil(data.length / batchNumber); i++) {
const batch = data.slice(i * batchNumber, (i + 1) * batchNumber);
const addresses = batch.map(item => item.walletAddress);
const addressesToFetch = new Set<string>(
batch.map(item => item.walletAddress),
);
const balances =
await getPowerBalanceAggregatorAdapter().getAddressesBalance({
timestamp,
addresses,
addresses: Array.from(addressesToFetch),
});

const groupByWalletAddress = _.groupBy(batch, item =>
item.walletAddress.toLowerCase(),
);

const snapshotBalances = balances
.map(balance =>
groupByWalletAddress[balance.address.toLowerCase()].map(item => ({
balance: balance.balance,
const snapshotBalances = balances.map(balance => {
const address = balance.address.toLowerCase();

// Remove the address from the set
addressesToFetch.delete(address);

return groupByWalletAddress[address].map(item => ({
balance: balance.balance,
powerSnapshotId: item.powerSnapshotId,
userId: item!.userId,
}));
});

// Fill zero for the missing balances
for (const missedAddress of addressesToFetch) {
snapshotBalances.push(
groupByWalletAddress[missedAddress].map(item => ({
balance: 0,
powerSnapshotId: item.powerSnapshotId,
userId: item!.userId,
userId: item.userId,
})),
)
.flat();
await addOrUpdatePowerSnapshotBalances(snapshotBalances);
);
}

await addOrUpdatePowerSnapshotBalances(snapshotBalances.flat());
}
} catch (e) {
logger.error('processFillPowerSnapshotJobs >> error', e);
Expand Down
Loading