Skip to content

Commit

Permalink
websocket overhaul
Browse files Browse the repository at this point in the history
karnthis committed Apr 29, 2024
1 parent 1e27e75 commit 4600144
Showing 13 changed files with 69 additions and 178 deletions.
4 changes: 2 additions & 2 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@jackallabs/banshee",
"version": "0.1.1-rc.1",
"version": "0.2.0",
"description": "Modern problems require modern solutions",
"keywords": [],
"exports": {
14 changes: 0 additions & 14 deletions src/classes/ibcQueryClient.ts
Original file line number Diff line number Diff line change
@@ -5,8 +5,6 @@ import { processExtensions } from '@/utils/extensions'
import { WebsocketCore } from '@/classes'
import {
IExtendedStargateClientOptions,
IIbcDeafenBundle,
IIbcDisengageBundle,
IIbcEngageBundle,
IIbcQueryClient,
IWebsocketCore
@@ -52,16 +50,4 @@ export class IbcQueryClient<TQ extends TQueryLibrary>
): Promise<void> {
await this.wsCore.monitor(connections)
}

disengage(connections: IIbcDisengageBundle | IIbcDisengageBundle[]): void {
this.wsCore.disengage(connections)
}

deafen(connection: IIbcDeafenBundle): void {
this.wsCore.deafen(connection)
}

debug(): void {
this.wsCore.debug()
}
}
14 changes: 0 additions & 14 deletions src/classes/ibcSigningClient.ts
Original file line number Diff line number Diff line change
@@ -15,8 +15,6 @@ import type {
} from '@/types'
import {
IExtendedSigningStargateClientOptions,
IIbcDeafenBundle,
IIbcDisengageBundle,
IIbcEngageBundle,
IIbcSigningClient,
IWebsocketCore
@@ -81,18 +79,6 @@ export class IbcSigningClient<TQ extends TQueryLibrary, TT extends TTxLibrary>
await this.wsCore.monitor(connections)
}

disengage(connections: IIbcDisengageBundle | IIbcDisengageBundle[]): void {
this.wsCore.disengage(connections)
}

deafen(connection: IIbcDeafenBundle): void {
this.wsCore.deafen(connection)
}

debug(): void {
this.wsCore.debug()
}

async selfSignAndBroadcast(
msgs: DEncodeObject[],
options: ISignAndBroadcastOptions = {},
86 changes: 38 additions & 48 deletions src/classes/websocketCore.ts
Original file line number Diff line number Diff line change
@@ -1,80 +1,70 @@
import type { CometClient } from '@cosmjs/tendermint-rpc'
import { connectComet } from '@cosmjs/tendermint-rpc'
import type { Stream } from 'xstream'
import type { TPossibleTxEvents } from '@/types'
import {
IIbcDeafenBundle,
IIbcDisengageBundle,
IIbcEngageBundle,
IWebsocketCore,
} from '@/interfaces'

import { Responses } from '@cosmjs/tendermint-rpc/build/tendermint34/adaptor'
import { tidyString } from '@/utils/misc'

export class WebsocketCore implements IWebsocketCore {
protected readonly wsConnections: Record<string, CometClient>
protected readonly activeStreams: Record<string, Stream<TPossibleTxEvents>>
protected readonly wsConnections: Record<string, WebSocket>

constructor() {
this.wsConnections = {}
this.activeStreams = {}
}

async monitor<T extends TPossibleTxEvents>(
monitor<T extends TPossibleTxEvents>(
connections: IIbcEngageBundle<T> | IIbcEngageBundle<T>[],
): Promise<void> {
): void {
try {
if (connections instanceof Array) {
for (let conn of connections) {
await this.setupMonitoring<T>(conn)
this.setupMonitoring<T>(conn)
}
} else {
await this.setupMonitoring<T>(connections)
this.setupMonitoring<T>(connections)
}
} catch (err) {
throw err
}
}

disengage(connections: IIbcDisengageBundle | IIbcDisengageBundle[]): void {
if (connections instanceof Array) {
for (let conn of connections) {
delete this.activeStreams[`${conn.chainId}|${conn.query || 'all'}`]
}
} else {
delete this.activeStreams[
`${connections.chainId}|${connections.query || 'all'}`
]
}
}

deafen(connection: IIbcDeafenBundle): void {
this.activeStreams[
`${connection.chainId}|${connection.query || 'all'}`
].removeListener(connection.listener)
}

debug(): void {
console.log('wsConnections:', this.wsConnections)
console.log('activeStreams:', this.activeStreams)
}

protected async setupMonitoring<T extends TPossibleTxEvents>(
protected setupMonitoring<T extends TPossibleTxEvents>(
conn: IIbcEngageBundle<T>,
) {
): void {
if (!conn.endpoint.startsWith('ws://') && !conn.endpoint.startsWith('wss://')) {
throw new Error('invalid url')
}
const cleanEndpoint = tidyString(conn.endpoint, '/')
const finalEndpoint = cleanEndpoint.endsWith('websocket') ? cleanEndpoint : `${cleanEndpoint}/websocket`
if (!this.wsConnections[conn.chainId]) {
this.wsConnections[conn.chainId] = await connectComet(conn.endpoint)
this.wsConnections[conn.chainId] = new WebSocket(finalEndpoint)
}
const client = this.wsConnections[conn.chainId]
const streamId = `${conn.chainId}|${conn.query || 'all'}`
if (!this.activeStreams[streamId]) {
if (conn.query) {
this.activeStreams[streamId] = client.subscribeTx(
conn.query,
) as Stream<TPossibleTxEvents>
} else {
this.activeStreams[streamId] =
client.subscribeTx() as Stream<TPossibleTxEvents>

const wsQuery = {
jsonrpc: '2.0',
method: 'subscribe',
id: Date.now().toString(),
params: {
query: (conn.query) ? `tm.event = 'Tx' AND '${conn.query}'` : `tm.event = 'Tx'`,
},
}
client.onopen = () => {
client.send(JSON.stringify(wsQuery))
}
client.onmessage = (msg) => {
try {
const data = JSON.parse(msg.data)
if (!data.result.data) {
return
}
const postProcess = Responses.decodeTxEvent(data.result)
conn.feed.push(postProcess as T)
} catch (err) {
console.error(err)
}
}
this.activeStreams[streamId].addListener(conn.listener)
}
}
2 changes: 0 additions & 2 deletions src/index.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
export * from '@/classes'
export * from '@/interfaces'
export * from '@/types'

export { makeListener } from '@/utils/misc'
35 changes: 0 additions & 35 deletions src/interfaces/IIbcBundle.ts
Original file line number Diff line number Diff line change
@@ -1,50 +1,15 @@
import type { TCurrentTxEvent, TPossibleTxEvents } from '@/types'
import type { IListener } from '@/interfaces/IListener'

/**
* @interface IIbcEngageBundle
* @property {string} chainId
* @property {string} endpoint
* @property {string} [query]
* @property {TCurrentTxEvent<T>[]} feed
* @property {IListener<TPossibleTxEvents>} listener
*/
export interface IIbcEngageBundle<T extends TPossibleTxEvents> {
chainId: string
endpoint: string
query?: string
feed: TCurrentTxEvent<T>[]
listener: IListener<TPossibleTxEvents>
}

/**
* @interface IIbcDisengageBundle
* @property {string} chainId
* @property {string} [query]
*/
export interface IIbcDisengageBundle {
chainId: string
query?: string
}

/**
* @interface IIbcDeafenBundle
* @property {string} chainId
* @property {string} [query]
* @property {IListener<TPossibleTxEvents>} listener
*/
export interface IIbcDeafenBundle {
chainId: string
query?: string
listener: IListener<TPossibleTxEvents>
}

/**
* @interface IIbcMakeListenerBundle
* @property {string} chainId
* @property {TCurrentTxEvent<T>[]} feed
*/
export interface IIbcMakeListenerBundle<T extends TPossibleTxEvents> {
chainId: string
feed: TCurrentTxEvent<T>[]
}
26 changes: 0 additions & 26 deletions src/interfaces/IListener.ts

This file was deleted.

19 changes: 2 additions & 17 deletions src/interfaces/classes/IWebsocketCore.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,9 @@
import { IIbcDeafenBundle, IIbcDisengageBundle, IIbcEngageBundle } from '@/interfaces'
import { IIbcEngageBundle } from '@/interfaces'
import type { TPossibleTxEvents } from '@/types'

/**
* @interface IWebsocketCore
* @property {monitor} monitor
* @property {disengage} disengage
*/
export interface IWebsocketCore {
/**
@@ -14,19 +13,5 @@ export interface IWebsocketCore {
*/
monitor<T extends TPossibleTxEvents>(
connections: IIbcEngageBundle<T> | IIbcEngageBundle<T>[],
): Promise<void>

/**
* @function disengage
* @param {IIbcDisengageBundle | IIbcDisengageBundle[]} connections
*/
disengage(connections: IIbcDisengageBundle | IIbcDisengageBundle[]): void

/**
* @function deafen
* @param {IIbcDeafenBundle} connection
*/
deafen(connection: IIbcDeafenBundle): void

debug(): void
): void
}
1 change: 0 additions & 1 deletion src/interfaces/index.ts
Original file line number Diff line number Diff line change
@@ -5,6 +5,5 @@ export * from '@/interfaces/compatibility'
export * from '@/interfaces/IExtendedSigningStargateClientOptions'
export * from '@/interfaces/IExtendedStargateClientOptions'
export * from '@/interfaces/IIbcBundle'
export * from '@/interfaces/IListener'
export * from '@/interfaces/ISignAndBroadcastOptions'
export * from '@/interfaces/ITxLibraryDefs'
1 change: 1 addition & 0 deletions src/types/TMisc.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
export type TTidyStringModes = 'start' | 'end' | 'both'
1 change: 1 addition & 0 deletions src/types/index.ts
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
export * from '@/types/stockCosmos'
export * from '@/types/TClientDefs'
export * from '@/types/TMisc'
42 changes: 24 additions & 18 deletions src/utils/misc.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
import type { TPossibleTxEvents } from '@/types'
import { IIbcMakeListenerBundle, IListener } from '@/interfaces'
import type { TTidyStringModes } from '@/types'

const oneSecondMs = 1000

@@ -46,22 +45,29 @@ export function secondToMS(seconds: number): number {
}

/**
* Build Listener instance for attaching to websocket.
* @param {IIbcMakeListenerBundle<T>} bundle
* @returns {IListener<TPossibleTxEvents>}
*
* @param {string} source
* @param {string} toTidy
* @param {TTidyStringModes} mode
* @returns {string}
*/
export function makeListener<T extends TPossibleTxEvents>(
bundle: IIbcMakeListenerBundle<T>,
): IListener<TPossibleTxEvents> {
return {
next(value: T): void {
bundle.feed.push(value)
},
error(err: any): void {
console.error(`Stream ${bundle.chainId} gave me an error:`, err)
},
complete(): void {
console.log(`Stream ${bundle.chainId} told me it is done.`)
},
export function tidyString(
source: string,
toTidy: string,
mode: TTidyStringModes = 'both',
): string {
let startIndex = 0
let endIndex = source.length

if (mode === 'start' || mode === 'both') {
while (startIndex < endIndex && source[startIndex] === toTidy) {
startIndex++
}
}
if (mode === 'end' || mode === 'both') {
while (startIndex < endIndex && source[endIndex - 1] === toTidy) {
endIndex--
}
}
return source.slice(startIndex, endIndex)
}

0 comments on commit 4600144

Please sign in to comment.