Skip to content

Commit

Permalink
Add methods to register listeners for ledger and payment updates (#5)
Browse files Browse the repository at this point in the history
* Implement wait for payment and ledger channel status meethods

* Add methods to register ledger and payment updates
  • Loading branch information
neerajvijay1997 authored Apr 23, 2024
1 parent 88c0191 commit 5d1a12b
Show file tree
Hide file tree
Showing 4 changed files with 194 additions and 3 deletions.
20 changes: 20 additions & 0 deletions packages/nitro-node/src/node/node.ts
Original file line number Diff line number Diff line change
Expand Up @@ -339,4 +339,24 @@ export class Node {
sentVouchers(): ReadChannel<Voucher> {
return this.engine.sentVouchers;
}

// LedgerUpdates returns a chan that receives ledger channel info whenever that ledger channel is updated. Not suitable for multiple subscribers.
ledgerUpdates() {
return this.channelNotifier!.registerForAllLedgerUpdates();
}

// PaymentUpdates returns a chan that receives payment channel info whenever that payment channel is updated. Not suitable fo multiple subscribers.
paymentUpdates() {
return this.channelNotifier!.registerForAllPaymentUpdates();
}

// LedgerUpdatedChan returns a chan that receives a ledger channel info whenever the ledger with given id is updated
ledgerUpdatedChan(ledgerId: Destination) {
return this.channelNotifier!.registerForLedgerUpdates(ledgerId);
}

// PaymentChannelUpdatedChan returns a chan that receives a payment channel info whenever the payment channel with given id is updated
paymentChannelUpdatedChan(ledgerId: Destination) {
return this.channelNotifier!.registerForPaymentChannelUpdates(ledgerId);
}
}
25 changes: 25 additions & 0 deletions packages/nitro-node/src/node/notifier/channel-notifier.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import { Store } from '../engine/store/store';
import { PaymentChannelListeners, LedgerChannelListeners } from './listeners';
import { SafeSyncMap } from '../../internal/safesync/safesync';
import { LedgerChannelInfo, PaymentChannelInfo } from '../query/types';
import { Destination } from '../../types/destination';

const ALL_NOTIFICATIONS = 'all';

Expand Down Expand Up @@ -37,6 +38,30 @@ export class ChannelNotifier {
});
}

// RegisterForAllLedgerUpdates returns a buffered channel that will receive updates for all ledger channels.
registerForAllLedgerUpdates() {
const [li] = this.ledgerListeners!.loadOrStore(ALL_NOTIFICATIONS, LedgerChannelListeners.newLedgerChannelListeners());
return li.getOrCreateListener();
}

// RegisterForLedgerUpdates returns a buffered channel that will receive updates or a specific ledger channel.
registerForLedgerUpdates(cId: Destination) {
const [li] = this.ledgerListeners!.loadOrStore(cId.string(), LedgerChannelListeners.newLedgerChannelListeners());
return li.createNewListener();
}

// RegisterForAllPaymentUpdates returns a buffered channel that will receive updates for all payment channels.
registerForAllPaymentUpdates() {
const [li] = this.paymentListeners!.loadOrStore(ALL_NOTIFICATIONS, PaymentChannelListeners.newPaymentChannelListeners());
return li.getOrCreateListener();
}

// RegisterForPaymentChannelUpdates returns a buffered channel that will receive updates or a specific payment channel.
registerForPaymentChannelUpdates(cId: Destination) {
const [li] = this.paymentListeners!.loadOrStore(cId.string(), PaymentChannelListeners.newPaymentChannelListeners());
return li.createNewListener();
}

// NotifyLedgerUpdated notifies all listeners of a ledger channel update.
// It should be called whenever a ledger channel is updated.
notifyLedgerUpdated(info: LedgerChannelInfo): void {
Expand Down
56 changes: 55 additions & 1 deletion packages/nitro-node/src/node/notifier/listeners.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { ReadWriteChannel } from '@cerc-io/ts-channel';
import Channel, { ReadWriteChannel } from '@cerc-io/ts-channel';
import { Mutex } from 'async-mutex';

import { LedgerChannelInfo, PaymentChannelInfo } from '../query/types';
Expand Down Expand Up @@ -45,6 +45,33 @@ export class PaymentChannelListeners {
}
}

// createNewListener creates a new listener and adds it to the list of listeners.
async createNewListener() {
const release = await this.listenersLock.acquire();
let listener;
try {
// Use a buffered channel to avoid blocking the notifier.
listener = Channel<PaymentChannelInfo>(1000);
this.listeners.push(listener);
} finally {
release();
}
return listener;
}

// getOrCreateListener returns the first listener, creating one if none exist.
async getOrCreateListener() {
const release = await this.listenersLock.acquire();
if (this.listeners.length !== 0) {
const l = this.listeners[0];
release();
return l;
}

release();
return this.createNewListener();
}

async close(): Promise<void> {
const release = await this.listenersLock.acquire();

Expand Down Expand Up @@ -101,6 +128,33 @@ export class LedgerChannelListeners {
}
}

// createNewListener creates a new listener and adds it to the list of listeners.
async createNewListener() {
const release = await this.listenersLock.acquire();
let listener;
try {
// Use a buffered channel to avoid blocking the notifier.
listener = Channel<LedgerChannelInfo>(1000);
this.listeners.push(listener);
} finally {
release();
}
return listener;
}

// getOrCreateListener returns the first listener, creating one if none exist.
async getOrCreateListener() {
const release = await this.listenersLock.acquire();
if (this.listeners.length !== 0) {
const l = this.listeners[0];
release();
return l;
}

release();
return this.createNewListener();
}

async close(): Promise<void> {
const release = await this.listenersLock.acquire();

Expand Down
96 changes: 94 additions & 2 deletions packages/nitro-node/src/utils/nitro.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,14 @@ import { providers } from 'ethers';

// @ts-expect-error
import type { Peer } from '@cerc-io/peer';
import { NitroSigner, DEFAULT_ASSET } from '@cerc-io/nitro-util';
import { NitroSigner, DEFAULT_ASSET, Context } from '@cerc-io/nitro-util';
import Channel from '@cerc-io/ts-channel';

import { Node } from '../node/node';
import { P2PMessageService } from '../node/engine/messageservice/p2p-message-service/service';
import { Store } from '../node/engine/store/store';
import { Destination } from '../types/destination';
import { LedgerChannelInfo, PaymentChannelInfo } from '../node/query/types';
import { ChannelStatus, LedgerChannelInfo, PaymentChannelInfo } from '../node/query/types';

import { createOutcome } from './helpers';
import { ChainService } from '../node/engine/chainservice/chainservice';
Expand Down Expand Up @@ -227,6 +228,97 @@ export class Nitro {
return this.node.getPaymentChannelsByLedger(ledgerChannelId);
}

async waitForPaymentChannelStatus(
channelId: string,
status: ChannelStatus,
ctx: Context,
) {
const paymentUpdatesChannel = await this.node.paymentUpdates();

while (true) {
/* eslint-disable default-case */
/* eslint-disable no-await-in-loop */
switch (await Channel.select([
paymentUpdatesChannel.shift(),
ctx.done.shift(),
])) {
case paymentUpdatesChannel: {
const paymentInfo = paymentUpdatesChannel.value();
if (paymentInfo.iD.string() === channelId && paymentInfo.status === status) {
return;
}
break;
}

case ctx.done: {
return;
}
}
}
}

async waitForLedgerChannelStatus(
channelId: string,
status: ChannelStatus,
ctx: Context,
) {
const ledgerUpdatesChannel = await this.node.ledgerUpdates();

while (true) {
/* eslint-disable default-case */
/* eslint-disable no-await-in-loop */
switch (await Channel.select([
ledgerUpdatesChannel.shift(),
ctx.done.shift(),
])) {
case ledgerUpdatesChannel: {
const ledgerInfo = ledgerUpdatesChannel.value();
if (ledgerInfo.iD.string() === channelId && ledgerInfo.status === status) {
return;
}
break;
}

case ctx.done: {
return;
}
}
}
}

async onPaymentChannelUpdated(
channelId: string,
callback: (info: PaymentChannelInfo) => void,
ctx: Context,
) {
const wrapperFn = (info: PaymentChannelInfo) => {
if (info.iD.string().toLowerCase() === channelId.toLowerCase()) {
callback(info);
}
};

const paymentUpdatesChannel = await this.node.paymentUpdates();

while (true) {
/* eslint-disable default-case */
/* eslint-disable no-await-in-loop */
switch (await Channel.select([
paymentUpdatesChannel.shift(),
ctx.done.shift(),
])) {
case paymentUpdatesChannel: {
const paymentInfo = paymentUpdatesChannel.value();
wrapperFn(paymentInfo);
break;
}

case ctx.done: {
return;
}
}
}
}

async close() {
await this.store.close();
await this.msgService.close();
Expand Down

0 comments on commit 5d1a12b

Please sign in to comment.