Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: implement client #5

Merged
merged 4 commits into from
Oct 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 7 additions & 6 deletions package.json
Original file line number Diff line number Diff line change
@@ -1,11 +1,8 @@
{
"name": "tsn-sdk-js",
"version": "1.0.0",
"version": "0.1.0",
"description": "",
"main": "index.js",
"scripts": {
"test": "echo \"Error: no test specified\" && exit 1"
},
"type": "module",
"keywords": [],
"author": "",
"license": "Apache-2.0",
Expand All @@ -18,8 +15,12 @@
},
"devDependencies": {
"typescript": "^5.6.3",
"disposablestack": "1.1.6",
"vitest": "^2.1.3",
"vite": "5.4.9",
"tsx": "^4.19.1",
"esbuild": "0.24.0"
"esbuild": "0.24.0",
"ethers": "6.13.4",
"vite-plugin-plain-text": "1.4.2"
}
}
75 changes: 75 additions & 0 deletions src/client/client.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
import { describe, expect, it } from "vitest";
import { NodeTSNClient } from "./client";
import { ethers } from "ethers";
import { StreamId } from "../util/StreamId";

describe('Client', {timeout: 15000}, () => {
// Skip in CI, because it needs a local node
it.skipIf(process.env.CI);

const wallet = new ethers.Wallet("0x0000000000000000000000000000000000000000000000000000000000000001");
const walletProvider = {
getAddress: () => wallet.address,
getSigner: () => wallet,
};
it('should create a client', async () => {
const client = new NodeTSNClient({endpoint: "http://localhost:8484", walletProvider, chainId: "1234567890"});
const kwilClient = client.getKwilClient();
const chainInfo = await kwilClient.chainInfo();
expect(chainInfo.data?.chain_id).toBeDefined();
});

it("should deploy a stream", async () => {
const chainId = await NodeTSNClient.getDefaultChainId("http://localhost:8484");
if (!chainId) {
throw new Error("Chain id not found");
}
const client = new NodeTSNClient({
chainId,
endpoint: "http://localhost:8484",
walletProvider,
});
const streamId = await StreamId.generate("test");
await using cleanup = new AsyncDisposableStack();
cleanup.defer(async () => {
await client.destroyStream(streamId, true);
});
const receipt = await client.deployStream(streamId, "primitive", true);
expect(receipt.status).toBe(200);
});

it("should wait for a transaction", async () => {
const chainId = await NodeTSNClient.getDefaultChainId("http://localhost:8484");
if (!chainId) {
throw new Error("Chain id not found");
}
const client = new NodeTSNClient({endpoint: "http://localhost:8484", walletProvider, chainId});
await using cleanup = new AsyncDisposableStack();
cleanup.defer(async () => {
await client.destroyStream(streamId, true).catch(() => {});
});
const streamId = await StreamId.generate("test");
const receipt = await client.deployStream(streamId, "primitive", false);
if (!receipt.data?.tx_hash) {
throw new Error("Tx hash not found");
}
const receipt2 = await client.waitForTx(receipt.data.tx_hash);
expect(receipt2.height).toBeGreaterThan(0);
});

it("list my streams", async () => {
const chainId = await NodeTSNClient.getDefaultChainId("http://localhost:8484");
if (!chainId) {
throw new Error("Chain id not found");
}
const client = new NodeTSNClient({endpoint: "http://localhost:8484", walletProvider, chainId});
await using cleanup = new AsyncDisposableStack();
cleanup.defer(async () => {
await client.destroyStream(streamId, true).catch(() => {});
});
const streamId = await StreamId.generate("test");
await client.deployStream(streamId, "primitive", true);
const streams = await client.getAllStreams(client.address());
expect(streams.length).toBeGreaterThan(0);
});
});
162 changes: 162 additions & 0 deletions src/client/client.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,162 @@
import { Client, KwilSigner, NodeKwil, WebKwil } from "@kwilteam/kwil-js";
import { KwilConfig } from "@kwilteam/kwil-js/dist/api_client/config";
import { Kwil } from "@kwilteam/kwil-js/dist/client/kwil";
import { CustomSigner, EthSigner } from "@kwilteam/kwil-js/dist/core/builders";
import { EnvironmentType } from "@kwilteam/kwil-js/dist/core/enums";
import { GenericResponse } from "@kwilteam/kwil-js/dist/core/resreq";
import { TxReceipt } from "@kwilteam/kwil-js/dist/core/tx";
import { TxInfoReceipt } from "@kwilteam/kwil-js/dist/core/txQuery";
import { Client as IClient } from "../types/client";
import { StreamType } from "../types/contractValues";
import { IStream, StreamLocator } from "../types/stream";
import { EthereumAddress } from "../util/EthereumAddress";
import { StreamId } from "../util/StreamId";
import { IPrimitiveStream } from "../types/primitiveStream";
import { IComposedStream } from "../types/composedStream";
import { deployStream } from "../contracts-api/deployStream";
import { destroyStream } from "../contracts-api/destroyStream";
import { listAllStreams } from "./listAllStreams";

export interface EthProvider {
getAddress(): string;
getSigner(): EthSigner;
}

type TSNClientOptions = {
endpoint: string;
walletProvider: EthProvider;
} & Omit<KwilConfig, "kwilProvider">;

export abstract class TSNClient<T extends EnvironmentType> implements IClient {
protected kwilClient: Kwil<T> | undefined;
protected ethProvider: EthProvider;
protected constructor(options: TSNClientOptions) {
this.ethProvider = options.walletProvider;
}

async waitForTx(txHash: string, timeout = 12000): Promise<TxInfoReceipt> {
return new Promise<TxInfoReceipt>(async (resolve, reject) => {
const interval = setInterval(async () => {
const receipt = await this.getKwilClient()["txInfoClient"](txHash).catch(() => ({data: undefined, status: undefined}));
switch (receipt.status) {
case 200:
if (receipt.data?.tx_result.log === 'success') {
resolve(receipt.data);
} else {
reject(new Error(`Transaction failed: status ${receipt.status} : log message ${receipt.data?.tx_result.log}`));
}
break;
case undefined:
break;
default:
reject(new Error(`Transaction failed: status ${receipt.status} : log message ${receipt.data?.tx_result.log}`));
}
}, 1000);
setTimeout(() => {
clearInterval(interval);
reject(new Error("Transaction failed: Timeout"));
}, timeout);
});
}

getKwilSigner(): KwilSigner {
return new KwilSigner(this.ethProvider.getSigner(), this.address().getAddress());
}

getKwilClient(): Kwil<EnvironmentType> {
if (!this.kwilClient) {
throw new Error("Kwil client not initialized");
}
return this.kwilClient;
}

async deployStream(
streamId: StreamId,
streamType: StreamType,
synchronous?: boolean
): Promise<GenericResponse<TxReceipt>> {
return await deployStream({
streamId,
streamType,
synchronous,
kwilClient: this.getKwilClient(),
kwilSigner: this.getKwilSigner(),
});
}

async destroyStream(streamId: StreamId, synchronous?: boolean): Promise<GenericResponse<TxReceipt>> {
return await destroyStream({
streamId,
synchronous,
kwilClient: this.getKwilClient(),
kwilSigner: this.getKwilSigner(),
});
}

async loadStream(stream: StreamLocator): Promise<IStream> {
throw new Error("Method not implemented.");
}

async loadPrimitiveStream(stream: StreamLocator): Promise<IPrimitiveStream> {
throw new Error("Method not implemented.");
}

async loadComposedStream(stream: StreamLocator): Promise<IComposedStream> {
throw new Error("Method not implemented.");
}

ownStreamLocator(streamId: StreamId): StreamLocator {
return {
streamId,
dataProvider: this.address(),
};
}

address(): EthereumAddress {
return new EthereumAddress(this.ethProvider.getAddress());
}

/**
* Get all streams from the TSN network.
* @param owner - The owner of the streams. If not provided, all streams will be returned.
* @returns A list of stream locators.
*/
async getAllStreams(owner?: EthereumAddress): Promise<StreamLocator[]> {
return listAllStreams(this.getKwilClient(), owner);
}

/**
* Get the default chain id for a provider. Use with caution, as this decreases the security of the TSN.
* @param provider - The provider url.
* @returns The chain id.
*/
public static async getDefaultChainId(provider: string) {
const kwilClient = new Client({
kwilProvider: provider,
});
const chainInfo = await kwilClient["chainInfoClient"]();
return chainInfo.data?.chain_id;
}
}

export class BrowserTSNClient extends TSNClient<EnvironmentType.BROWSER> {
constructor(options: TSNClientOptions) {
super(options);
this.kwilClient = new WebKwil({
...options,
kwilProvider: options.endpoint,
});
}
}

export class NodeTSNClient extends TSNClient<EnvironmentType.NODE> {
constructor(options: TSNClientOptions) {
super(options);
this.kwilClient = new NodeKwil({
...options,
kwilProvider: options.endpoint,
});
}
}


33 changes: 33 additions & 0 deletions src/client/listAllStreams.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
import { Kwil } from "@kwilteam/kwil-js/dist/client/kwil";
import { EnvironmentType } from "@kwilteam/kwil-js/dist/core/enums";
import { StreamLocator } from "../types/stream";
import { EthereumAddress } from "../util/EthereumAddress";
import { StreamId } from "../util/StreamId";
import { Database } from "@kwilteam/kwil-js/dist/core/database";

/**
* List all streams from the TSN network.
* @param kwilClient - The Kwil client.
* @param owner - The owner of the streams. If not provided, all streams will be returned.
* @returns A list of stream locators.
*/
export async function listAllStreams(kwilClient: Kwil<EnvironmentType>, owner?: EthereumAddress): Promise<StreamLocator[]> {
const databases = await kwilClient.listDatabases(owner?.getAddress());
const schemas = await Promise.all(databases.data?.map(async (database) => {
const schema = await kwilClient.getSchema(database.dbid);
if (schema.status === 200 && schema.data && isStream(schema.data)) {
return schema.data;
}
return undefined;
}) ?? []);
return schemas.filter(schema => schema !== undefined).map(schema => ({
streamId: new StreamId(schema.name),
dataProvider: EthereumAddress.fromBytes(schema.owner).throw(),
}));
}

export const isStream = (schema: Database) => {
const requiredProcedures = ['get_index', 'get_record', 'get_metadata'];
return requiredProcedures.every(procedure => schema.procedures.some(p => p.name === procedure));
};

59 changes: 59 additions & 0 deletions src/contracts-api/deployStream.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
import { StreamType } from "../types/contractValues.js";
import { TxReceipt } from "@kwilteam/kwil-js/dist/core/tx.js";
import { Kwil } from "@kwilteam/kwil-js/dist/client/kwil.js";
import { DeployBody } from "@kwilteam/kwil-js/dist/core/database.js";
import { CompiledKuneiform } from "@kwilteam/kwil-js/dist/core/payload.js";
import { composedStreamTemplate, primitiveStreamTemplate } from "../contracts/contractsContent.js";
import { GenericResponse } from "@kwilteam/kwil-js/dist/core/resreq.js";
import { KwilSigner } from "@kwilteam/kwil-js";
import { StreamId } from "../util/StreamId.js";

export interface DeployStreamInput {
streamId: StreamId;
streamType: StreamType;
kwilClient: Kwil<any>;
kwilSigner: KwilSigner;
synchronous?: boolean;
}

export interface DeployStreamOutput {
receipt: TxReceipt
}

/**
* Deploys a stream to TSN.
* @param input - The input parameters for deploying the stream.
* @returns The transaction hash of the deployment.
*/
export async function deployStream(input: DeployStreamInput): Promise<GenericResponse<TxReceipt>> {
try {
const schema = await getContract(input.streamType);

schema.name = input.streamId.getId();

const txHash = await input.kwilClient.deploy({
schema,
}, input.kwilSigner, input.synchronous);

return txHash;
} catch (error) {
throw new Error(`Failed to deploy stream: ${error}`);
}
}

/**
* Returns the contract content based on the stream type.
* @param streamType - The type of the stream.
* @returns The contract content as a Uint8Array.
*/
async function getContract(streamType: StreamType): Promise<CompiledKuneiform> {
switch (streamType) {
case StreamType.Composed:
return composedStreamTemplate;
case StreamType.Primitive:
return primitiveStreamTemplate;
default:
throw new Error(`Unknown stream type: ${streamType}`);
}
}

Loading
Loading