Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(sdk): Support multiple contracts in ChainEventPoller #2928

Open
wants to merge 9 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 7 additions & 4 deletions packages/sdk/src/StreamrClient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import { Stream } from './Stream'
import { StreamIDBuilder } from './StreamIDBuilder'
import { StreamMetadata, getPartitionCount } from './StreamMetadata'
import { StreamrClientError } from './StreamrClientError'
import { ChainEventPoller } from './contracts/ChainEventPoller'
import { ContractFactory } from './contracts/ContractFactory'
import { Operator } from './contracts/Operator'
import { OperatorRegistry } from './contracts/OperatorRegistry'
Expand All @@ -48,11 +49,11 @@ import { Subscription, SubscriptionEvents } from './subscribe/Subscription'
import { initResendSubscription } from './subscribe/resendSubscription'
import { waitForStorage } from './subscribe/waitForStorage'
import { StreamDefinition } from './types'
import { map } from './utils/GeneratorUtils'
import { LoggerFactory } from './utils/LoggerFactory'
import { addStreamToStorageNode } from './utils/addStreamToStorageNode'
import { pOnce } from './utils/promises'
import { convertPeerDescriptorToNetworkPeerDescriptor, createTheGraphClient } from './utils/utils'
import { addStreamToStorageNode } from './utils/addStreamToStorageNode'
import { map } from './utils/GeneratorUtils'

// TODO: this type only exists to enable tsdoc to generate proper documentation
export type SubscribeOptions = StreamDefinition & ExtraSubscribeOptions
Expand Down Expand Up @@ -96,6 +97,7 @@ export class StreamrClient {
private readonly operatorRegistry: OperatorRegistry
private readonly contractFactory: ContractFactory
private readonly localGroupKeyStore: LocalGroupKeyStore
private readonly chainEventPoller: ChainEventPoller
private readonly theGraphClient: TheGraphClient
private readonly streamIdBuilder: StreamIDBuilder
private readonly config: StrictStreamrClientConfig
Expand Down Expand Up @@ -132,6 +134,7 @@ export class StreamrClient {
this.operatorRegistry = container.resolve<OperatorRegistry>(OperatorRegistry)
this.contractFactory = container.resolve<ContractFactory>(ContractFactory)
this.localGroupKeyStore = container.resolve<LocalGroupKeyStore>(LocalGroupKeyStore)
this.chainEventPoller = container.resolve<ChainEventPoller>(ChainEventPoller)
this.streamIdBuilder = container.resolve<StreamIDBuilder>(StreamIDBuilder)
this.eventEmitter = container.resolve<StreamrClientEventEmitter>(StreamrClientEventEmitter)
this.destroySignal = container.resolve<DestroySignal>(DestroySignal)
Expand Down Expand Up @@ -770,12 +773,12 @@ export class StreamrClient {
operatorContractAddress,
this.contractFactory,
this.rpcProviderSource,
this.chainEventPoller,
this.theGraphClient,
this.authentication,
this.destroySignal,
this.loggerFactory,
() => this.getEthersOverrides(),
this.config.contracts.pollInterval
() => this.getEthersOverrides()
)
}

Expand Down
99 changes: 65 additions & 34 deletions packages/sdk/src/contracts/ChainEventPoller.ts
Original file line number Diff line number Diff line change
@@ -1,39 +1,51 @@
import { Logger, Multimap, randomString, scheduleAtInterval, wait } from '@streamr/utils'
import { Contract, EventLog, Provider } from 'ethers'
import { sample } from 'lodash'

type EventName = string
type Listener = (...args: any[]) => void
import { EthereumAddress, Logger, randomString, scheduleAtInterval, toEthereumAddress, wait } from '@streamr/utils'
import { AbstractProvider, EventFragment, Interface } from 'ethers'
import { remove, sample, uniq } from 'lodash'
import { inject, Lifecycle, scoped } from 'tsyringe'
import { ConfigInjectionToken, StrictStreamrClientConfig } from '../Config'
import { RpcProviderSource } from '../RpcProviderSource'

export interface EventListenerDefinition {
onEvent: (...args: any[]) => void
contractInterfaceFragment: EventFragment
contractAddress: EthereumAddress
}

const BLOCK_NUMBER_QUERY_RETRY_DELAY = 1000
export const POLLS_SINCE_LAST_FROM_BLOCK_UPDATE_THRESHOLD = 30

@scoped(Lifecycle.ContainerScoped)
export class ChainEventPoller {

private listeners: Multimap<EventName, Listener> = new Multimap()
private abortController?: AbortController
private contracts: Contract[]
private listeners: EventListenerDefinition[] = []
private providers: AbstractProvider[]
private pollInterval: number
private abortController?: AbortController

// all these contracts are actually the same chain contract (i.e. StreamRegistry), but have different providers
// connected to them
constructor(contracts: Contract[], pollInterval: number) {
this.contracts = contracts
this.pollInterval = pollInterval
constructor(
rpcProviderSource: RpcProviderSource,
@inject(ConfigInjectionToken) config: Pick<StrictStreamrClientConfig, 'contracts'>
) {
this.providers = rpcProviderSource.getSubProviders()
this.pollInterval = config.contracts.pollInterval
}

on(eventName: string, listener: Listener): void {
const started = !this.listeners.isEmpty()
this.listeners.add(eventName, listener)
on(definition: EventListenerDefinition): void {
const started = this.listeners.length > 0
this.listeners.push(definition)
if (!started) {
this.start()
}
}

off(eventName: string, listener: Listener): void {
const started = !this.listeners.isEmpty()
this.listeners.remove(eventName, listener)
if (started && this.listeners.isEmpty()) {
off(definition: EventListenerDefinition): void {
const started = this.listeners.length > 0
remove(this.listeners, (l) => {
return (l.contractAddress === definition.contractAddress)
&& (l.contractInterfaceFragment.topicHash === definition.contractInterfaceFragment.topicHash)
&& (l.onEvent == definition.onEvent)
})
if (started && this.listeners.length === 0) {
this.abortController!.abort()
}
}
Expand All @@ -48,7 +60,7 @@ export class ChainEventPoller {
let fromBlock: number | undefined = undefined
do {
try {
fromBlock = await sample(this.getProviders())!.getBlockNumber()
fromBlock = await sample(this.providers)!.getBlockNumber()
} catch (err) {
logger.debug('Failed to query block number', { err })
await wait(BLOCK_NUMBER_QUERY_RETRY_DELAY) // TODO: pass signal?
Expand All @@ -57,25 +69,46 @@ export class ChainEventPoller {

let pollsSinceFromBlockUpdate = 0
await scheduleAtInterval(async () => {
const contract = sample(this.contracts)!
const eventNames = [...this.listeners.keys()]
const provider = sample(this.providers)!
const eventNames = this.listeners.map((l) => l.contractInterfaceFragment.name)
let newFromBlock = 0
let events: EventLog[] | undefined = undefined
let events: { contractAddress: EthereumAddress, name: string, args: any[], blockNumber: number }[] | undefined = undefined

try {
// If we haven't updated `fromBlock` for a while, fetch the latest block number explicitly. If
// `fromBlock` falls too much behind the current block number, the RPCs may start rejecting our
// eth_getLogs requests (presumably for performance reasons).
if (pollsSinceFromBlockUpdate >= POLLS_SINCE_LAST_FROM_BLOCK_UPDATE_THRESHOLD) {
newFromBlock = await contract.runner!.provider!.getBlockNumber() + 1
newFromBlock = await provider.getBlockNumber() + 1
logger.debug('Fetch next block number explicitly', { newFromBlock } )
if (abortController.signal.aborted) {
return
}
}

logger.debug('Polling', { fromBlock, eventNames })
events = await contract.queryFilter([eventNames], fromBlock) as EventLog[]
const filter = {
address: uniq(this.listeners.map((l) => l.contractAddress)),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe worth commenting how the filtering logic here works, e.g.

address == (X OR Y OR Z) AND topic === (A OR B OR C)

topics: [uniq(this.listeners.map((l) => l.contractInterfaceFragment.topicHash))],
fromBlock
Comment on lines +90 to +91
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need to check that this definitely works as we think it does

}
const logItems = await provider.getLogs(filter)
events = []
for (const logItem of logItems) {
const definition = this.listeners.find((l) => {
return (l.contractAddress === toEthereumAddress(logItem.address))
&& (l.contractInterfaceFragment.topicHash === logItem.topics[0])
})
if (definition !== undefined) {
const contractInterface = new Interface([definition.contractInterfaceFragment.format('minimal')])
const args = contractInterface.decodeEventLog(definition.contractInterfaceFragment.name, logItem.data, logItem.topics)
events.push({
contractAddress: definition.contractAddress,
name: definition.contractInterfaceFragment.name,
args,
blockNumber: logItem.blockNumber
})
}
}
logger.debug('Polled', { fromBlock, events: events.length })
} catch (err) {
logger.debug('Failed to poll', { reason: err?.reason, eventNames, fromBlock })
Expand All @@ -87,9 +120,11 @@ export class ChainEventPoller {

if (events !== undefined && events.length > 0) {
for (const event of events) {
const listeners = this.listeners.get(event.fragment.name)
const listeners = this.listeners.filter(
(l) => (l.contractAddress === event.contractAddress) && (l.contractInterfaceFragment.name === event.name)
)
for (const listener of listeners) {
listener(...event.args, event.blockNumber)
listener.onEvent(...event.args, event.blockNumber)
}
}
newFromBlock = Math.max(...events.map((e) => e.blockNumber)) + 1
Expand All @@ -108,8 +143,4 @@ export class ChainEventPoller {
}, this.pollInterval, true, abortController.signal)
})
}

private getProviders(): Provider[] {
return this.contracts.map((c) => c.runner!.provider!)
}
}
29 changes: 18 additions & 11 deletions packages/sdk/src/contracts/Operator.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import {
ObservableEventEmitter, StreamID, TheGraphClient,
collect, ensureValidStreamPartitionIndex, toEthereumAddress, toStreamID
} from '@streamr/utils'
import { Overrides } from 'ethers'
import { Interface, Overrides } from 'ethers'
import { z } from 'zod'
import { Authentication } from '../Authentication'
import { DestroySignal } from '../DestroySignal'
Expand Down Expand Up @@ -150,12 +150,12 @@ export class Operator {
contractAddress: EthereumAddress,
contractFactory: ContractFactory,
rpcProviderSource: RpcProviderSource,
chainEventPoller: ChainEventPoller,
theGraphClient: TheGraphClient,
authentication: Authentication,
destroySignal: DestroySignal,
loggerFactory: LoggerFactory,
getEthersOverrides: () => Promise<Overrides>,
eventPollInterval: number
) {
this.contractAddress = contractAddress
this.contractFactory = contractFactory
Expand All @@ -169,33 +169,37 @@ export class Operator {
this.theGraphClient = theGraphClient
this.authentication = authentication
this.getEthersOverrides = getEthersOverrides
this.initEventGateways(contractAddress, loggerFactory, eventPollInterval)
this.initEventGateways(contractAddress, chainEventPoller, loggerFactory)
destroySignal.onDestroy.listen(() => {
this.eventEmitter.removeAllListeners()
})
}

private initEventGateways(
contractAddress: EthereumAddress,
loggerFactory: LoggerFactory,
eventPollInterval: number
chainEventPoller: ChainEventPoller,
loggerFactory: LoggerFactory
): void {
const chainEventPoller = new ChainEventPoller(this.rpcProviderSource.getSubProviders().map((p) => {
return this.contractFactory.createEventContract(contractAddress, OperatorArtifact, p)
}), eventPollInterval)
const contractInterface = new Interface(OperatorArtifact)
const stakeEventTransformation = (sponsorship: string) => ({
sponsorship: toEthereumAddress(sponsorship)
})
initContractEventGateway({
sourceName: 'Staked',
sourceDefinition: {
contractInterfaceFragment: contractInterface.getEvent('Staked')!,
contractAddress
},
sourceEmitter: chainEventPoller,
targetName: 'staked',
targetEmitter: this.eventEmitter,
transformation: stakeEventTransformation,
loggerFactory
})
initContractEventGateway({
sourceName: 'Unstaked',
sourceDefinition: {
contractInterfaceFragment: contractInterface.getEvent('Unstaked')!,
contractAddress
},
sourceEmitter: chainEventPoller,
targetName: 'unstaked',
targetEmitter: this.eventEmitter,
Expand All @@ -219,7 +223,10 @@ export class Operator {
}
}
initContractEventGateway({
sourceName: 'ReviewRequest',
sourceDefinition: {
contractInterfaceFragment: contractInterface.getEvent('ReviewRequest')!,
contractAddress
},
sourceEmitter: chainEventPoller,
targetName: 'reviewRequested',
targetEmitter: this.eventEmitter,
Expand Down
11 changes: 6 additions & 5 deletions packages/sdk/src/contracts/StreamRegistry.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import {
toUserId,
until
} from '@streamr/utils'
import { ContractTransactionResponse } from 'ethers'
import { ContractTransactionResponse, Interface } from 'ethers'
import { intersection } from 'lodash'
import { Lifecycle, inject, scoped } from 'tsyringe'
import { Authentication, AuthenticationInjectionToken } from '../Authentication'
Expand Down Expand Up @@ -134,6 +134,7 @@ export class StreamRegistry {
constructor(
contractFactory: ContractFactory,
rpcProviderSource: RpcProviderSource,
chainEventPoller: ChainEventPoller,
theGraphClient: TheGraphClient,
streamIdBuilder: StreamIDBuilder,
@inject(ConfigInjectionToken) config: Pick<StrictStreamrClientConfig, 'contracts' | 'cache' | '_timeouts'>,
Expand All @@ -154,11 +155,11 @@ export class StreamRegistry {
this.rpcProviderSource.getProvider(),
'streamRegistry'
)
const chainEventPoller = new ChainEventPoller(this.rpcProviderSource.getSubProviders().map((p) => {
return contractFactory.createEventContract(toEthereumAddress(this.config.contracts.streamRegistryChainAddress), StreamRegistryArtifact, p)
}), config.contracts.pollInterval)
initContractEventGateway({
sourceName: 'StreamCreated',
sourceDefinition: {
contractInterfaceFragment: new Interface(StreamRegistryArtifact).getEvent('StreamCreated')!,
contractAddress: toEthereumAddress(this.config.contracts.streamRegistryChainAddress)
},
sourceEmitter: chainEventPoller,
targetName: 'streamCreated',
targetEmitter: eventEmitter,
Expand Down
24 changes: 13 additions & 11 deletions packages/sdk/src/contracts/StreamStorageRegistry.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import { EthereumAddress, Logger, StreamID, TheGraphClient, collect, toEthereumAddress, toStreamID } from '@streamr/utils'
import { Interface } from 'ethers'
import min from 'lodash/min'
import { Lifecycle, inject, scoped } from 'tsyringe'
import { Authentication, AuthenticationInjectionToken } from '../Authentication'
Expand All @@ -11,10 +12,10 @@ import StreamStorageRegistryArtifact from '../ethereumArtifacts/StreamStorageReg
import { getEthersOverrides } from '../ethereumUtils'
import { StreamrClientEventEmitter } from '../events'
import { LoggerFactory } from '../utils/LoggerFactory'
import { Mapping, createCacheMap } from '../utils/Mapping'
import { ChainEventPoller } from './ChainEventPoller'
import { ContractFactory } from './ContractFactory'
import { initContractEventGateway, waitForTx } from './contract'
import { createCacheMap, Mapping } from '../utils/Mapping'

export interface StorageNodeAssignmentEvent {
readonly streamId: StreamID
Expand Down Expand Up @@ -51,6 +52,7 @@ export class StreamStorageRegistry {
streamIdBuilder: StreamIDBuilder,
contractFactory: ContractFactory,
rpcProviderSource: RpcProviderSource,
chainEventPoller: ChainEventPoller,
theGraphClient: TheGraphClient,
@inject(ConfigInjectionToken) config: Pick<StrictStreamrClientConfig, 'contracts' | 'cache' | '_timeouts'>,
@inject(AuthenticationInjectionToken) authentication: Authentication,
Expand All @@ -70,13 +72,6 @@ export class StreamStorageRegistry {
rpcProviderSource.getProvider(),
'streamStorageRegistry'
) as StreamStorageRegistryContract
const chainEventPoller = new ChainEventPoller(this.rpcProviderSource.getSubProviders().map((p) => {
return contractFactory.createEventContract(
toEthereumAddress(this.config.contracts.streamStorageRegistryChainAddress),
StreamStorageRegistryArtifact,
p
)
}), config.contracts.pollInterval)
this.initStreamAssignmentEventListeners(eventEmitter, chainEventPoller, loggerFactory)
this.storageNodesCache = createCacheMap({
valueFactory: (query) => {
Expand All @@ -86,7 +81,6 @@ export class StreamStorageRegistry {
})
}

// eslint-disable-next-line class-methods-use-this
private initStreamAssignmentEventListeners(
eventEmitter: StreamrClientEventEmitter,
chainEventPoller: ChainEventPoller,
Expand All @@ -97,16 +91,24 @@ export class StreamStorageRegistry {
nodeAddress: toEthereumAddress(nodeAddress),
blockNumber
})
const contractAddress = toEthereumAddress(this.config.contracts.streamStorageRegistryChainAddress)
const contractInterface = new Interface(StreamStorageRegistryArtifact)
initContractEventGateway({
sourceName: 'Added',
sourceDefinition: {
contractInterfaceFragment: contractInterface.getEvent('Added')!,
contractAddress
},
sourceEmitter: chainEventPoller,
targetName: 'streamAddedToStorageNode',
targetEmitter: eventEmitter,
transformation,
loggerFactory
})
initContractEventGateway({
sourceName: 'Removed',
sourceDefinition: {
contractInterfaceFragment: contractInterface.getEvent('Removed')!,
contractAddress
},
sourceEmitter: chainEventPoller,
targetName: 'streamRemovedFromFromStorageNode',
targetEmitter: eventEmitter,
Expand Down
Loading
Loading