diff --git a/packages/nitro-node/src/node/node.ts b/packages/nitro-node/src/node/node.ts index f7f8c886..da3a5007 100644 --- a/packages/nitro-node/src/node/node.ts +++ b/packages/nitro-node/src/node/node.ts @@ -339,4 +339,24 @@ export class Node { sentVouchers(): ReadChannel { return this.engine.sentVouchers; } + + // LedgerUpdates returns a chan that receives ledger channel info whenever that ledger channel is updated. Not suitable for multiple subscribers. + ledgerUpdates() { + return this.channelNotifier!.registerForAllLedgerUpdates(); + } + + // PaymentUpdates returns a chan that receives payment channel info whenever that payment channel is updated. Not suitable fo multiple subscribers. + paymentUpdates() { + return this.channelNotifier!.registerForAllPaymentUpdates(); + } + + // LedgerUpdatedChan returns a chan that receives a ledger channel info whenever the ledger with given id is updated + ledgerUpdatedChan(ledgerId: Destination) { + return this.channelNotifier!.registerForLedgerUpdates(ledgerId); + } + + // PaymentChannelUpdatedChan returns a chan that receives a payment channel info whenever the payment channel with given id is updated + paymentChannelUpdatedChan(ledgerId: Destination) { + return this.channelNotifier!.registerForPaymentChannelUpdates(ledgerId); + } } diff --git a/packages/nitro-node/src/node/notifier/channel-notifier.ts b/packages/nitro-node/src/node/notifier/channel-notifier.ts index e2911e91..4a22684d 100644 --- a/packages/nitro-node/src/node/notifier/channel-notifier.ts +++ b/packages/nitro-node/src/node/notifier/channel-notifier.ts @@ -5,6 +5,7 @@ import { Store } from '../engine/store/store'; import { PaymentChannelListeners, LedgerChannelListeners } from './listeners'; import { SafeSyncMap } from '../../internal/safesync/safesync'; import { LedgerChannelInfo, PaymentChannelInfo } from '../query/types'; +import { Destination } from '../../types/destination'; const ALL_NOTIFICATIONS = 'all'; @@ -37,6 +38,30 @@ export class ChannelNotifier { }); } + // RegisterForAllLedgerUpdates returns a buffered channel that will receive updates for all ledger channels. + registerForAllLedgerUpdates() { + const [li] = this.ledgerListeners!.loadOrStore(ALL_NOTIFICATIONS, LedgerChannelListeners.newLedgerChannelListeners()); + return li.getOrCreateListener(); + } + + // RegisterForLedgerUpdates returns a buffered channel that will receive updates or a specific ledger channel. + registerForLedgerUpdates(cId: Destination) { + const [li] = this.ledgerListeners!.loadOrStore(cId.string(), LedgerChannelListeners.newLedgerChannelListeners()); + return li.createNewListener(); + } + + // RegisterForAllPaymentUpdates returns a buffered channel that will receive updates for all payment channels. + registerForAllPaymentUpdates() { + const [li] = this.paymentListeners!.loadOrStore(ALL_NOTIFICATIONS, PaymentChannelListeners.newPaymentChannelListeners()); + return li.getOrCreateListener(); + } + + // RegisterForPaymentChannelUpdates returns a buffered channel that will receive updates or a specific payment channel. + registerForPaymentChannelUpdates(cId: Destination) { + const [li] = this.paymentListeners!.loadOrStore(cId.string(), PaymentChannelListeners.newPaymentChannelListeners()); + return li.createNewListener(); + } + // NotifyLedgerUpdated notifies all listeners of a ledger channel update. // It should be called whenever a ledger channel is updated. notifyLedgerUpdated(info: LedgerChannelInfo): void { diff --git a/packages/nitro-node/src/node/notifier/listeners.ts b/packages/nitro-node/src/node/notifier/listeners.ts index 5c7cf75c..36b09b31 100644 --- a/packages/nitro-node/src/node/notifier/listeners.ts +++ b/packages/nitro-node/src/node/notifier/listeners.ts @@ -1,4 +1,4 @@ -import { ReadWriteChannel } from '@cerc-io/ts-channel'; +import Channel, { ReadWriteChannel } from '@cerc-io/ts-channel'; import { Mutex } from 'async-mutex'; import { LedgerChannelInfo, PaymentChannelInfo } from '../query/types'; @@ -45,6 +45,33 @@ export class PaymentChannelListeners { } } + // createNewListener creates a new listener and adds it to the list of listeners. + async createNewListener() { + const release = await this.listenersLock.acquire(); + let listener; + try { + // Use a buffered channel to avoid blocking the notifier. + listener = Channel(1000); + this.listeners.push(listener); + } finally { + release(); + } + return listener; + } + + // getOrCreateListener returns the first listener, creating one if none exist. + async getOrCreateListener() { + const release = await this.listenersLock.acquire(); + if (this.listeners.length !== 0) { + const l = this.listeners[0]; + release(); + return l; + } + + release(); + return this.createNewListener(); + } + async close(): Promise { const release = await this.listenersLock.acquire(); @@ -101,6 +128,33 @@ export class LedgerChannelListeners { } } + // createNewListener creates a new listener and adds it to the list of listeners. + async createNewListener() { + const release = await this.listenersLock.acquire(); + let listener; + try { + // Use a buffered channel to avoid blocking the notifier. + listener = Channel(1000); + this.listeners.push(listener); + } finally { + release(); + } + return listener; + } + + // getOrCreateListener returns the first listener, creating one if none exist. + async getOrCreateListener() { + const release = await this.listenersLock.acquire(); + if (this.listeners.length !== 0) { + const l = this.listeners[0]; + release(); + return l; + } + + release(); + return this.createNewListener(); + } + async close(): Promise { const release = await this.listenersLock.acquire(); diff --git a/packages/nitro-node/src/utils/nitro.ts b/packages/nitro-node/src/utils/nitro.ts index e74d8cc2..6aff6379 100644 --- a/packages/nitro-node/src/utils/nitro.ts +++ b/packages/nitro-node/src/utils/nitro.ts @@ -3,13 +3,14 @@ import { providers } from 'ethers'; // @ts-expect-error import type { Peer } from '@cerc-io/peer'; -import { NitroSigner, DEFAULT_ASSET } from '@cerc-io/nitro-util'; +import { NitroSigner, DEFAULT_ASSET, Context } from '@cerc-io/nitro-util'; +import Channel from '@cerc-io/ts-channel'; import { Node } from '../node/node'; import { P2PMessageService } from '../node/engine/messageservice/p2p-message-service/service'; import { Store } from '../node/engine/store/store'; import { Destination } from '../types/destination'; -import { LedgerChannelInfo, PaymentChannelInfo } from '../node/query/types'; +import { ChannelStatus, LedgerChannelInfo, PaymentChannelInfo } from '../node/query/types'; import { createOutcome } from './helpers'; import { ChainService } from '../node/engine/chainservice/chainservice'; @@ -227,6 +228,97 @@ export class Nitro { return this.node.getPaymentChannelsByLedger(ledgerChannelId); } + async waitForPaymentChannelStatus( + channelId: string, + status: ChannelStatus, + ctx: Context, + ) { + const paymentUpdatesChannel = await this.node.paymentUpdates(); + + while (true) { + /* eslint-disable default-case */ + /* eslint-disable no-await-in-loop */ + switch (await Channel.select([ + paymentUpdatesChannel.shift(), + ctx.done.shift(), + ])) { + case paymentUpdatesChannel: { + const paymentInfo = paymentUpdatesChannel.value(); + if (paymentInfo.iD.string() === channelId && paymentInfo.status === status) { + return; + } + break; + } + + case ctx.done: { + return; + } + } + } + } + + async waitForLedgerChannelStatus( + channelId: string, + status: ChannelStatus, + ctx: Context, + ) { + const ledgerUpdatesChannel = await this.node.ledgerUpdates(); + + while (true) { + /* eslint-disable default-case */ + /* eslint-disable no-await-in-loop */ + switch (await Channel.select([ + ledgerUpdatesChannel.shift(), + ctx.done.shift(), + ])) { + case ledgerUpdatesChannel: { + const ledgerInfo = ledgerUpdatesChannel.value(); + if (ledgerInfo.iD.string() === channelId && ledgerInfo.status === status) { + return; + } + break; + } + + case ctx.done: { + return; + } + } + } + } + + async onPaymentChannelUpdated( + channelId: string, + callback: (info: PaymentChannelInfo) => void, + ctx: Context, + ) { + const wrapperFn = (info: PaymentChannelInfo) => { + if (info.iD.string().toLowerCase() === channelId.toLowerCase()) { + callback(info); + } + }; + + const paymentUpdatesChannel = await this.node.paymentUpdates(); + + while (true) { + /* eslint-disable default-case */ + /* eslint-disable no-await-in-loop */ + switch (await Channel.select([ + paymentUpdatesChannel.shift(), + ctx.done.shift(), + ])) { + case paymentUpdatesChannel: { + const paymentInfo = paymentUpdatesChannel.value(); + wrapperFn(paymentInfo); + break; + } + + case ctx.done: { + return; + } + } + } + } + async close() { await this.store.close(); await this.msgService.close();