Skip to content

Commit

Permalink
add request splitting to rewards
Browse files Browse the repository at this point in the history
  • Loading branch information
Ridel1e committed Oct 4, 2023
1 parent 8b3943e commit ce031f6
Showing 1 changed file with 89 additions and 7 deletions.
96 changes: 89 additions & 7 deletions src/network/cardano/api/rewards/rewards.ts
Original file line number Diff line number Diff line change
@@ -1,13 +1,18 @@
import { HexString } from '@spectrumlabs/cardano-dex-sdk';
import { RustModule } from '@spectrumlabs/cardano-dex-sdk/build/main/utils/rustLoader';
import axios from 'axios';
import groupBy from 'lodash/groupBy';
import last from 'lodash/last';
import {
catchError,
combineLatest,
filter,
from,
interval,
map,
mapTo,
merge,
Observable,
of,
publishReplay,
refCount,
Expand All @@ -17,9 +22,9 @@ import {
} from 'rxjs';

import { applicationConfig } from '../../../../applicationConfig';
import { Address } from '../../../../common/models/Address';
import { AssetInfo } from '../../../../common/models/AssetInfo';
import { Currency } from '../../../../common/models/Currency.ts';
import { Address } from '../../../../common/types';
import { Dictionary } from '../../../../common/utils/Dictionary';
import { getAddresses } from '../../../../gateway/api/addresses';

Expand Down Expand Up @@ -169,6 +174,87 @@ const buildRewardsData = (response: RawRewardResponse): RewardsData => {
);
};

const ADDRESSES_IN_REQUEST_LIMIT = 400;

const requestRewards = (
addresses: string[],
): Observable<RawRewardResponse | undefined> =>
from(
axios.post('https://rewards.spectrum.fi/v1/rewards/data', addresses),
).pipe(
map((res) => res.data),
catchError(() => of(undefined)),
);

const combineRequests = (
allAddresses: string[],
): Observable<RawRewardResponse | undefined> => {
const addressesBatch: string[][] = [[]];

for (const address of allAddresses) {
const lastItem = last(addressesBatch);
if (!lastItem) {
break;
}
if (lastItem.length >= ADDRESSES_IN_REQUEST_LIMIT) {
addressesBatch.push([address]);
} else {
lastItem.push(address);
}
}

const getStakeKeyHash = (address: string): HexString | undefined =>
RustModule.CardanoWasm.BaseAddress.from_address(
RustModule.CardanoWasm.Address.from_bech32(address),
)
?.stake_cred()
.to_keyhash()
?.to_hex();

return combineLatest(addressesBatch.map(requestRewards)).pipe(
map(
(responses: (RawRewardResponse | undefined)[]) =>
responses.filter(Boolean) as RawRewardResponse[],
),
map((responses: RawRewardResponse[]) => {
if (!responses.length) {
return undefined;
}
return responses.reduce<RawRewardResponse>(
(acc, item) => {
const filteredItems = item.rewards.filter((rewardItem) => {
const stakeKeyHash = getStakeKeyHash(rewardItem.address);

if (
rewardItem.rewardFor !== RewardSectionType.AIRDROP ||
!stakeKeyHash
) {
return true;
}
return acc.rewards.every(
(accItem) =>
getStakeKeyHash(accItem.address) !== stakeKeyHash ||
accItem.epoch !== rewardItem.epoch,
);
});
return {
rewards: acc.rewards.concat(filteredItems),
upcoming: {
sp0: item.upcoming.sp0
? (acc.upcoming.sp0 || 0) + item.upcoming.sp0
: acc.upcoming.sp0,
sp1: item.upcoming.sp1
? (acc.upcoming.sp1 || 0) + item.upcoming.sp1
: acc.upcoming.sp1,
},
};
},
{ rewards: [], upcoming: { sp0: 0, sp1: 0 } },
);
}),
);
};

export const rewards$ = getAddresses().pipe(
filter((addresses) => !!addresses?.length),
switchMap((addresses) =>
Expand All @@ -177,12 +263,8 @@ export const rewards$ = getAddresses().pipe(
mapTo(addresses),
),
),
switchMap((addresses) =>
from(
axios.post('https://rewards.spectrum.fi/v1/rewards/data', addresses),
).pipe(catchError(() => of(undefined))),
),
map((res) => (res ? buildRewardsData(res.data) : undefined)),
switchMap((addresses) => combineRequests(addresses as string[])),
map((data) => (data ? buildRewardsData(data) : undefined)),
publishReplay(1),
refCount(),
);

0 comments on commit ce031f6

Please sign in to comment.