Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

wip: feat: fedimint client rpc ng #95

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 17 additions & 20 deletions packages/core-web/src/FedimintWallet.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import { WorkerClient } from './worker'
import { RpcClient } from './rpc'
import { WebWorkerTransportInit } from './worker/WorkerTransport'
import {
BalanceService,
MintService,
Expand All @@ -11,7 +12,7 @@ import { logger, type LogLevel } from './utils/logger'
const DEFAULT_CLIENT_NAME = 'fm-default' as const

export class FedimintWallet {
private _client: WorkerClient
private _client: RpcClient

public balance: BalanceService
public mint: MintService
Expand Down Expand Up @@ -56,7 +57,7 @@ export class FedimintWallet {
this._openPromise = new Promise((resolve) => {
this._resolveOpen = resolve
})
this._client = new WorkerClient()
this._client = new RpcClient(new WebWorkerTransportInit())
this.mint = new MintService(this._client)
this.lightning = new LightningService(this._client)
this.balance = new BalanceService(this._client)
Expand All @@ -71,9 +72,9 @@ export class FedimintWallet {
}

async initialize() {
logger.info('Initializing WorkerClient')
logger.info('Initializing RpcClient')
await this._client.initialize()
logger.info('WorkerClient initialized')
logger.info('RpcClient initialized')
}

async waitForOpen() {
Expand All @@ -85,14 +86,15 @@ export class FedimintWallet {
await this._client.initialize()
// TODO: Determine if this should be safe or throw
if (this._isOpen) throw new Error('The FedimintWallet is already open.')
const { success } = await this._client.sendSingleMessage<{
success: boolean
}>('open', { clientName })
if (success) {
this._isOpen = !!success
try {
await this._client.openClient(clientName)
this._isOpen = true
this._resolveOpen()
return true
} catch (e) {
logger.error('Error opening client', e)
return false
}
return success
}

async joinFederation(
Expand All @@ -106,15 +108,10 @@ export class FedimintWallet {
'The FedimintWallet is already open. You can only call `joinFederation` on closed clients.',
)
try {
const response = await this._client.sendSingleMessage<{
success: boolean
}>('join', { inviteCode, clientName })
if (response.success) {
this._isOpen = true
this._resolveOpen()
}

return response.success
await this._client.joinFederation(inviteCode, clientName)
this._isOpen = true
this._resolveOpen()
return true
} catch (e) {
logger.error('Error joining federation', e)
return false
Expand Down
207 changes: 207 additions & 0 deletions packages/core-web/src/rpc.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,207 @@
import type {
CancelFunction,
JSONValue,
ModuleKind,
RpcRequest,
RpcResponse,
RpcRequestFull,
RpcResponseFull,
} from './types'
import { logger } from './utils/logger'

export interface RpcTransport {
sendRequest(request: RpcRequestFull): void
destroy(): void
}

export interface RpcTransportInit {
init(
onRpcResponse: (response: RpcResponseFull) => void,
): Promise<RpcTransport>
}

// Handles communication with the wasm worker
// TODO: Move rpc stream management to a separate "SubscriptionManager" class
export class RpcClient {
private transport?: RpcTransport
private transportInit: RpcTransportInit
private requestCounter = 0
private requestCallbacks = new Map<number, (response: RpcResponse) => void>()
private initPromise?: Promise<void>
private clientName: string | undefined

constructor(transportInit: RpcTransportInit) {
this.transportInit = transportInit
}

private async initializeInner(): Promise<void> {
this.transport = await this.transportInit.init(
this.handleWorkerMessage.bind(this),
)
}

async initialize() {
if (this.initPromise) {
return this.initPromise
}

this.initPromise = this.initializeInner()
return this.initPromise
}

private handleWorkerMessage = (response: RpcResponseFull) => {
const callback = this.requestCallbacks.get(response.request_id)

if (callback) {
callback(response)
} else {
logger.warn(
'RpcClient - handleWorkerMessage - received message with no callback',
response.request_id,
response,
)
}
}

async joinFederation(inviteCode: string, clientName: string) {
await this.internalRpcSingle({
type: 'join_federation',
invite_code: inviteCode,
client_name: clientName,
})
}

async openClient(clientName: string) {
await this.internalRpcSingle({
type: 'open_client',
client_name: clientName,
})
this.clientName = clientName
}

async closeClient(clientName: string) {
await this.internalRpcSingle({
type: 'close_client',
client_name: clientName,
})
this.clientName = undefined
}

private internalRpcStream<Response extends JSONValue = JSONValue>(
request: RpcRequest,
onData: (data: Response) => void,
onError: (error: string) => void,
onEnd: () => void = () => {},
): CancelFunction {
const requestId = ++this.requestCounter
logger.debug('RpcClient - rpcStream', requestId, request)
let unsubscribe = () => {
const cancelRequest: RpcRequestFull = {
request_id: ++this.requestCounter,
type: 'cancel_rpc',
cancel_request_id: requestId,
}
this.transport?.sendRequest(cancelRequest)
}

const requestFull: RpcRequestFull = {
...request,
request_id: requestId,
}

this.requestCallbacks.set(requestId, (response: RpcResponse) => {
switch (response.type) {
case 'data':
onData(response.data)
break
case 'error':
onError(response.error)
break
case 'end':
this.requestCallbacks.delete(requestId)
onEnd()
break
case 'aborted':
this.requestCallbacks.delete(requestId)
onEnd()
break
}
})
this.transport?.sendRequest(requestFull)
return unsubscribe
}

private internalRpcSingle<T extends JSONValue = JSONValue>(
request: RpcRequest,
): Promise<T> {
return new Promise((resolve, reject) => {
const unsubscribe = this.internalRpcStream(
request,
(data) => resolve(data as T),
(error) => reject(new Error(error)),
() => {},
)
// No need to unsubscribe for single requests as they auto-complete
})
}

rpcStream<
Response extends JSONValue = JSONValue,
Body extends JSONValue = JSONValue,
>(
module: ModuleKind,
method: string,
body: Body,
onData: (data: Response) => void,
onError: (error: string) => void,
onEnd: () => void = () => {},
): CancelFunction {
if (this.clientName === undefined) {
throw new Error('Wallet is not open')
}
return this.internalRpcStream(
{
type: 'client_rpc',
client_name: this.clientName,
module,
method,
payload: body,
},
onData,
onError,
onEnd,
)
}

rpcSingle<T extends JSONValue = JSONValue, P extends JSONValue = JSONValue>(
module: string,
method: string,
payload: P,
): Promise<T> {
if (this.clientName === undefined) {
throw new Error('Wallet is not open')
}
return this.internalRpcSingle<T>({
type: 'client_rpc',
client_name: this.clientName,
module,
method,
payload,
})
}

async cleanup() {
this.transport?.destroy()
this.requestCounter = 0
this.initPromise = undefined
this.requestCallbacks.clear()
}

// For Testing
_getRequestCounter() {
return this.requestCounter
}
_getRequestCallbackMap() {
return this.requestCallbacks
}
}
4 changes: 2 additions & 2 deletions packages/core-web/src/services/BalanceService.ts
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
import { WorkerClient } from '../worker'
import { RpcClient } from '../rpc'

/**
* Balance Service
*
* The Balance Service provides methods to interact with the balance of a Fedimint wallet.
*/
export class BalanceService {
constructor(private client: WorkerClient) {}
constructor(private client: RpcClient) {}

/**
* Get the balance of the current wallet in milli-satoshis (MSats)
Expand Down
4 changes: 2 additions & 2 deletions packages/core-web/src/services/FederationService.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
import type { JSONValue } from '../types'
import { WorkerClient } from '../worker'
import { RpcClient } from '../rpc'

export class FederationService {
constructor(private client: WorkerClient) {}
constructor(private client: RpcClient) {}

async getConfig(): Promise<JSONValue> {
return await this.client.rpcSingle('', 'get_config', {})
Expand Down
4 changes: 2 additions & 2 deletions packages/core-web/src/services/LightningService.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { WorkerClient } from '../worker'
import { RpcClient } from '../rpc'
import type {
CreateBolt11Response,
GatewayInfo,
Expand All @@ -11,7 +11,7 @@ import type {
} from '../types'

export class LightningService {
constructor(private client: WorkerClient) {}
constructor(private client: RpcClient) {}

async createInvoice(
amountMsats: number,
Expand Down
8 changes: 3 additions & 5 deletions packages/core-web/src/services/MintService.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { WorkerClient } from '../worker'
import { RpcClient } from '../rpc'
import type {
Duration,
JSONObject,
Expand All @@ -9,7 +9,7 @@ import type {
} from '../types'

export class MintService {
constructor(private client: WorkerClient) {}
constructor(private client: RpcClient) {}

async redeemEcash(notes: string): Promise<void> {
await this.client.rpcSingle('mint', 'reissue_external_notes', {
Expand Down Expand Up @@ -58,7 +58,7 @@ export class MintService {
? { nanos: 0, secs: tryCancelAfter }
: tryCancelAfter

const res = await this.client.rpcSingle<Array<string>>(
const [operationId, notes] = await this.client.rpcSingle<[string, string]>(
'mint',
'spend_notes',
{
Expand All @@ -68,8 +68,6 @@ export class MintService {
extra_meta: extraMeta,
},
)
const notes = res[1]
const operationId = res[0]

return {
notes,
Expand Down
4 changes: 2 additions & 2 deletions packages/core-web/src/services/RecoveryService.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
import type { JSONValue } from '../types'
import { WorkerClient } from '../worker'
import { RpcClient } from '../rpc'

export class RecoveryService {
constructor(private client: WorkerClient) {}
constructor(private client: RpcClient) {}

async hasPendingRecoveries(): Promise<boolean> {
return await this.client.rpcSingle('', 'has_pending_recoveries', {})
Expand Down
4 changes: 2 additions & 2 deletions packages/core-web/src/test/TestFedimintWallet.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { FedimintWallet } from '../FedimintWallet'
import { WorkerClient } from '../worker/WorkerClient'
import { RpcClient } from '../rpc'
import { TestingService } from './TestingService'

export class TestFedimintWallet extends FedimintWallet {
Expand All @@ -20,7 +20,7 @@ export class TestFedimintWallet extends FedimintWallet {
}

// Method to expose the WorkerClient
getWorkerClient(): WorkerClient {
getWorkerClient(): RpcClient {
return this['_client']
}
}
Loading
Loading