diff --git a/package.json b/package.json index 59a8675..82a8b2f 100644 --- a/package.json +++ b/package.json @@ -12,7 +12,7 @@ "lodash": "4.17.21" }, "peerDependencies": { - "@kwilteam/kwil-js": "0.7.1" + "@kwilteam/kwil-js": "0.7.2" }, "devDependencies": { "@typescript-eslint/eslint-plugin": "8.11.0", diff --git a/src/client/client.test.ts b/src/client/client.test.ts index 52b2b0f..4847c51 100644 --- a/src/client/client.test.ts +++ b/src/client/client.test.ts @@ -1,6 +1,5 @@ import { describe, expect, it } from "vitest"; import { ethers } from "ethers"; -import { StreamId } from "../util/StreamId"; import { NodeTSNClient } from "./nodeClient"; describe.sequential("Client", { timeout: 30000 }, () => { @@ -30,202 +29,4 @@ describe.sequential("Client", { timeout: 30000 }, () => { const chainInfo = await kwilClient.chainInfo(); expect(chainInfo.data?.chain_id).toBeDefined(); }); - - it("should deploy a stream", async () => { - const chainId = await NodeTSNClient.getDefaultChainId( - "http://localhost:8484", - ); - if (!chainId) { - throw new Error("Chain id not found"); - } - const client = new NodeTSNClient({ - chainId, - endpoint: "http://localhost:8484", - walletProvider, - }); - const streamId = await StreamId.generate("test"); - await using cleanup = new AsyncDisposableStack(); - cleanup.defer(async () => { - await client.destroyStream(streamId, true); - }); - const receipt = await client.deployStream(streamId, "primitive", true); - expect(receipt.status).toBe(200); - }); - - it("should wait for a transaction", async () => { - const chainId = await NodeTSNClient.getDefaultChainId( - "http://localhost:8484", - ); - if (!chainId) { - throw new Error("Chain id not found"); - } - const client = new NodeTSNClient({ - endpoint: "http://localhost:8484", - walletProvider, - chainId, - }); - await using cleanup = new AsyncDisposableStack(); - cleanup.defer(async () => { - await client.destroyStream(streamId, true).catch(() => {}); - }); - const streamId = await StreamId.generate("test"); - const receipt = await client.deployStream(streamId, "primitive", false); - if (!receipt.data?.tx_hash) { - throw new Error("Tx hash not found"); - } - const receipt2 = await client.waitForTx(receipt.data.tx_hash); - expect(receipt2.height).toBeGreaterThan(0); - }); - - it("list my streams", async () => { - const chainId = await NodeTSNClient.getDefaultChainId( - "http://localhost:8484", - ); - if (!chainId) { - throw new Error("Chain id not found"); - } - const client = new NodeTSNClient({ - endpoint: "http://localhost:8484", - walletProvider, - chainId, - }); - await using cleanup = new AsyncDisposableStack(); - cleanup.defer(async () => { - await client.destroyStream(streamId, true).catch(() => {}); - }); - const streamId = await StreamId.generate("test"); - await client.deployStream(streamId, "primitive", true); - const streams = await client.getAllStreams(client.address()); - expect(streams.length).toBeGreaterThan(0); - }); - - it("try query a stream", async () => { - // TODO: this test is temporary just for development, will get replaced by one that also deploys streams - const chainId = await NodeTSNClient.getDefaultChainId( - "http://localhost:8484", - ); - if (!chainId) { - throw new Error("Chain id not found"); - } - - const client = new NodeTSNClient({ - endpoint: "http://localhost:8484", - walletProvider, - chainId, - autoAuthenticate: true, - }); - const streamId = StreamId.fromString( - "st39830c44932bc42a3bffef72310948", - ).throw(); - const stream = client.loadStream(client.ownStreamLocator(streamId)); - const record = await (await stream).getRecord({}); - expect(record.length).toBeGreaterThan(0); - }); - - it("insert records", async () => { - const chainId = await NodeTSNClient.getDefaultChainId( - "http://localhost:8484", - ); - if (!chainId) { - throw new Error("Chain id not found"); - } - const client = new NodeTSNClient({ - endpoint: "http://localhost:8484", - walletProvider, - chainId, - }); - await using cleanup = new AsyncDisposableStack(); - const streamId = await StreamId.generate("test"); - cleanup.defer(async () => { - await client.destroyStream(streamId, true).catch(() => {}); - }); - - // deploy a stream - await client.deployStream(streamId, "primitive", true); - - const primitiveStream = client.loadPrimitiveStream({ - streamId, - dataProvider: client.address(), - }); - - { - const tx = await primitiveStream.initializeStream(); - if (!tx.data?.tx_hash) { - throw new Error("Tx hash not found"); - } - await client.waitForTx(tx.data.tx_hash); - } - - { - const tx = await primitiveStream.insertRecords([ - { dateValue: "2024-01-01", value: "100" }, - ]); - if (!tx.data?.tx_hash) { - throw new Error("Tx hash not found"); - } - await client.waitForTx(tx.data.tx_hash); - } - }); - - it("composed stream", async () => { - const chainId = await NodeTSNClient.getDefaultChainId( - "http://localhost:8484", - ); - if (!chainId) { - throw new Error("Chain id not found"); - } - const client = new NodeTSNClient({ - endpoint: "http://localhost:8484", - walletProvider, - chainId, - }); - await using cleanup = new AsyncDisposableStack(); - const streamId = await StreamId.generate("test"); - cleanup.defer(async () => { - await client.destroyStream(streamId, true).catch(() => {}); - }); - - // deploy a composed stream - await client.deployStream(streamId, "composed", true); - - const composedStream = client.loadComposedStream({ - streamId, - dataProvider: client.address(), - }); - - // Initialize the composed stream - { - const tx = await composedStream.initializeStream(); - if (!tx.data?.tx_hash) { - throw new Error("Tx hash not found"); - } - await client.waitForTx(tx.data.tx_hash); - } - - // Set taxonomy - { - const tx = await composedStream.setTaxonomy({ - taxonomyItems: [ - { - childStream: { - streamId: StreamId.fromString("test-child").throw(), - dataProvider: client.address(), - }, - weight: "1", - }, - ], - startDate: "2024-01-01", - }); - if (!tx.data?.tx_hash) { - throw new Error("Tx hash not found"); - } - await client.waitForTx(tx.data.tx_hash); - } - - // Get taxonomies - const taxonomies = await composedStream.describeTaxonomies({ - latestVersion: true, - }); - expect(taxonomies.length).toBeGreaterThan(0); - }); }); diff --git a/src/contracts-api/composedStream.ts b/src/contracts-api/composedStream.ts index 47d5ced..48a2c50 100644 --- a/src/contracts-api/composedStream.ts +++ b/src/contracts-api/composedStream.ts @@ -2,11 +2,11 @@ import { KwilSigner, NodeKwil, WebKwil } from "@kwilteam/kwil-js"; import { ActionInput } from "@kwilteam/kwil-js/dist/core/action"; import { GenericResponse } from "@kwilteam/kwil-js/dist/core/resreq"; import { TxReceipt } from "@kwilteam/kwil-js/dist/core/tx"; -import { StreamType } from "./contractValues"; import { DateString } from "../types/other"; import { StreamLocator } from "../types/stream"; import { EthereumAddress } from "../util/EthereumAddress"; import { StreamId } from "../util/StreamId"; +import { StreamType } from "./contractValues"; import { Stream } from "./stream"; export const ErrorStreamNotComposed = "stream is not a composed stream"; @@ -93,17 +93,17 @@ export class ComposedStream extends Stream { .mapRight((records) => { const taxonomyItems: Map = records.reduce( (acc, record) => { - acc.set(record.start_date, [ - { - childStream: { - streamId: StreamId.fromString(record.child_stream_id).throw(), - dataProvider: EthereumAddress.fromString( - record.child_data_provider, - ).throw(), - }, - weight: record.weight, + const currentArray = acc.get(record.start_date) || []; + currentArray.push({ + childStream: { + streamId: StreamId.fromString(record.child_stream_id).throw(), + dataProvider: EthereumAddress.fromString( + record.child_data_provider, + ).throw(), }, - ]); + weight: record.weight, + }); + acc.set(record.start_date, currentArray); return acc; }, new Map(), diff --git a/src/contracts-api/stream.ts b/src/contracts-api/stream.ts index 0807584..e366b46 100644 --- a/src/contracts-api/stream.ts +++ b/src/contracts-api/stream.ts @@ -170,6 +170,7 @@ export class Stream { * Returns the records of the stream within the given date range */ public async getRecord(input: GetRecordInput): Promise { + // TODO: change value to string when kwil-js is updated const result = await this.call<{ date_value: string; value: string }[]>( "get_record", [ @@ -278,7 +279,7 @@ export class Stream { ActionInput.fromObject({ $key: key, $value: value, - $value_type: MetadataKeyValueMap[key], + $val_type: MetadataKeyValueMap[key], }), ]); } diff --git a/tests/integration/composedStream.test.ts b/tests/integration/composedStream.test.ts new file mode 100644 index 0000000..c69140b --- /dev/null +++ b/tests/integration/composedStream.test.ts @@ -0,0 +1,174 @@ +import { describe, expect } from "vitest"; +import NodeTSNClient from "../../src/client/nodeClient"; +import { StreamId } from "../../src/util/StreamId"; +import { testWithDefaultWallet } from "./utils"; + +describe.sequential( + "ComposedStream Integration Tests", + { timeout: 90000 }, + () => { + // Skip in CI, because it needs a local node + testWithDefaultWallet.skipIf(process.env.CI); + + testWithDefaultWallet( + "should deploy, initialize and use a composed stream", + async ({ defaultClient }) => { + // Generate unique stream IDs for composed and child streams + const composedStreamId = await StreamId.generate( + "test-composed-stream", + ); + const childAStreamId = await StreamId.generate( + "test-composed-stream-child-a", + ); + const childBStreamId = await StreamId.generate( + "test-composed-stream-child-b", + ); + + const allStreamIds = [composedStreamId, childAStreamId, childBStreamId]; + + try { + // Deploy child streams with initial data + // Child A: [2020-01-01: 1, 2020-01-02: 2, 2020-01-30: 3, 2020-02-01: 4, 2020-02-02: 5] + await deployPrimitiveStreamWithData(defaultClient, childAStreamId, [ + { dateValue: "2020-01-01", value: "1.000000000000000000" }, + { dateValue: "2020-01-02", value: "2.000000000000000000" }, + { dateValue: "2020-01-30", value: "3.000000000000000000" }, + { dateValue: "2020-02-01", value: "4.000000000000000000" }, + { dateValue: "2020-02-02", value: "5.000000000000000000" }, + ]); + + // Child B: [2020-01-01: 3, 2020-01-02: 4, 2020-01-30: 5, 2020-02-01: 6, 2020-02-02: 7] + await deployPrimitiveStreamWithData(defaultClient, childBStreamId, [ + { dateValue: "2020-01-01", value: "3.000000000000000000" }, + { dateValue: "2020-01-02", value: "4.000000000000000000" }, + { dateValue: "2020-01-30", value: "5.000000000000000000" }, + { dateValue: "2020-02-01", value: "6.000000000000000000" }, + { dateValue: "2020-02-02", value: "7.000000000000000000" }, + ]); + + // Deploy composed stream + const deployReceipt = await defaultClient.deployStream( + composedStreamId, + "composed", + true, + ); + expect(deployReceipt.status).toBe(200); + + // Load the composed stream + const composedStream = defaultClient.loadComposedStream({ + streamId: composedStreamId, + dataProvider: defaultClient.address(), + }); + + // Initialize the composed stream + const initTx = await composedStream.initializeStream(); + if (!initTx.data?.tx_hash) { + throw new Error("Init tx hash not found"); + } + await defaultClient.waitForTx(initTx.data.tx_hash); + + // Set taxonomy with weights + // Child A weight: 1, Child B weight: 2 + const setTaxonomyTx = await composedStream.setTaxonomy({ + taxonomyItems: [ + { + childStream: { + streamId: childAStreamId, + dataProvider: defaultClient.address(), + }, + weight: "1", + }, + { + childStream: { + streamId: childBStreamId, + dataProvider: defaultClient.address(), + }, + weight: "2", + }, + ], + startDate: "2020-01-30", + }); + if (!setTaxonomyTx.data?.tx_hash) { + throw new Error("Set taxonomy tx hash not found"); + } + await defaultClient.waitForTx(setTaxonomyTx.data.tx_hash); + + // Verify taxonomies + const taxonomies = await composedStream.describeTaxonomies({ + latestVersion: true, + }); + expect(taxonomies.length).toBe(1); + expect(taxonomies[0].startDate).toBe("2020-01-30"); + expect(taxonomies[0].taxonomyItems.length).toBe(2); + + // Query records after the taxonomy start date + const records = await composedStream.getRecord({ + dateFrom: "2020-02-01", + dateTo: "2020-02-02", + }); + + // Verify records + // Formula: (value_A * weight_A + value_B * weight_B) / (weight_A + weight_B) + // 2020-02-01: (4 * 1 + 6 * 2) / (1 + 2) = 5.333... + // 2020-02-02: (5 * 1 + 7 * 2) / (1 + 2) = 6.333... + expect(records.length).toBe(2); + expect(parseFloat(records[0].value)).toBeCloseTo(5.333333, 5); + expect(parseFloat(records[1].value)).toBeCloseTo(6.333333, 5); + + // Query index values + const index = await composedStream.getIndex({ + dateFrom: "2020-01-30", + dateTo: "2020-02-01", + baseDate: "2020-01-30", + }); + + // Verify index values + expect(index.length).toBe(2); + expect(parseFloat(index[0].value)).toBe(100); // Base date is always 100 + expect(parseFloat(index[1].value)).toBeCloseTo(124.444444, 5); // Percentage change from base date + + // Query first record + const firstRecord = await composedStream.getFirstRecord({}); + expect(firstRecord).not.toBeNull(); + expect(parseFloat(firstRecord!.value)).toBeCloseTo(2.333333, 5); + expect(firstRecord!.dateValue).toBe("2020-01-01"); + } finally { + // Cleanup: destroy all streams + for (const streamId of allStreamIds) { + await defaultClient.destroyStream(streamId, true).catch(() => {}); + } + } + }, + ); + }, +); + +// Helper function to deploy and initialize a primitive stream with data +async function deployPrimitiveStreamWithData( + client: NodeTSNClient, + streamId: StreamId, + data: { dateValue: string; value: string }[], +) { + // Deploy primitive stream + const deployReceipt = await client.deployStream(streamId, "primitive", true); + expect(deployReceipt.status).toBe(200); + + // Load and initialize the stream + const primitiveStream = client.loadPrimitiveStream({ + streamId, + dataProvider: client.address(), + }); + + const initTx = await primitiveStream.initializeStream(); + if (!initTx.data?.tx_hash) { + throw new Error("Init tx hash not found"); + } + await client.waitForTx(initTx.data.tx_hash); + + // Insert records + const insertTx = await primitiveStream.insertRecords(data); + if (!insertTx.data?.tx_hash) { + throw new Error("Insert tx hash not found"); + } + await client.waitForTx(insertTx.data.tx_hash); +} diff --git a/tests/integration/getAllStreams.test.ts b/tests/integration/getAllStreams.test.ts new file mode 100644 index 0000000..9eabf92 --- /dev/null +++ b/tests/integration/getAllStreams.test.ts @@ -0,0 +1,119 @@ +import { describe, expect } from "vitest"; +import { StreamId } from "../../src/util/StreamId"; +import { StreamType } from "../../src/contracts-api/contractValues"; +import { testWithDefaultWallet, waitForTxSuccess } from "./utils"; +import NodeTSNClient from "../../src/client/nodeClient"; + +describe.sequential("Get All Streams", { timeout: 90000 }, () => { + testWithDefaultWallet.skipIf(process.env.CI)( + "should list all streams", + async ({ defaultClient }) => { + await using disposables = new AsyncDisposableStack(); + + // Create unique stream IDs + const primitiveStreamId = await StreamId.generate("test-list-primitive"); + const composedStreamId = await StreamId.generate("test-list-composed"); + + // Deploy streams and add them to the disposable stack + disposables.defer(async () => { + await defaultClient + .destroyStream(primitiveStreamId, true) + .catch((e) => { + console.error(e); + }); + await defaultClient.destroyStream(composedStreamId, true).catch((e) => { + console.error(e); + }); + }); + await createAndInitStream( + defaultClient, + primitiveStreamId, + StreamType.Primitive, + ); + await createAndInitStream( + defaultClient, + composedStreamId, + StreamType.Composed, + ); + + // Get all streams + const streams = await defaultClient.getAllStreams(); + + // Verify streams are listed + expect(streams.length).toBeGreaterThan(1); + + // Find our test streams in the list + const foundPrimitive = streams.find( + (s) => s.streamId.getId() === primitiveStreamId.getId(), + ); + const foundComposed = streams.find( + (s) => s.streamId.getId() === composedStreamId.getId(), + ); + + expect(foundPrimitive).toBeDefined(); + expect(foundComposed).toBeDefined(); + + // Verify stream types + if (foundPrimitive) { + const primitiveType = await defaultClient + .loadStream(foundPrimitive) + .getType(); + expect(primitiveType).toBe(StreamType.Primitive); + } + + if (foundComposed) { + const composedType = await defaultClient + .loadStream(foundComposed) + .getType(); + expect(composedType).toBe(StreamType.Composed); + } + }, + ); + + testWithDefaultWallet.skipIf(process.env.CI)( + "should list streams for specific owner", + async ({ defaultClient }) => { + await using disposables = new AsyncDisposableStack(); + + const streamId = await StreamId.generate("test-list-owner"); + disposables.defer(async () => { + await defaultClient.destroyStream(streamId, true); + }); + await createAndInitStream(defaultClient, streamId, StreamType.Primitive); + + // Get streams for owner + const streams = await defaultClient.getAllStreams( + defaultClient.address(), + ); + + // Verify our test stream is in the list + const found = streams.find( + (s) => s.streamId.getId() === streamId.getId(), + ); + expect(found).toBeDefined(); + + // Verify stream belongs to owner + if (found) { + expect(found.dataProvider.getAddress()).toBe( + defaultClient.address().getAddress(), + ); + } + }, + ); +}); + +// Helper function to create and initialize a stream +async function createAndInitStream( + client: NodeTSNClient, + streamId: StreamId, + type: StreamType, +) { + await client.deployStream(streamId, type, true); + const stream = client.loadStream({ + streamId, + dataProvider: client.address(), + }); + const tx = await stream.initializeStream(); + await waitForTxSuccess(tx, client); + return stream; +} diff --git a/tests/integration/permissions.test.ts b/tests/integration/permissions.test.ts new file mode 100644 index 0000000..01f6bb8 --- /dev/null +++ b/tests/integration/permissions.test.ts @@ -0,0 +1,224 @@ +import { describe, expect } from "vitest"; +import { EthereumAddress } from "../../src/util/EthereumAddress"; +import { visibility } from "../../src/util/visibility"; +import { StreamId } from "../../src/util/StreamId"; +import { createTestContexts, waitForTxSuccess } from "./utils"; +import { GenericResponse } from "@kwilteam/kwil-js/dist/core/resreq"; +import { TxReceipt } from "@kwilteam/kwil-js/dist/core/tx"; + +// Define roles and their private keys for permission tests +const PERMISSION_ROLES = { + owner: "0x1234567890abcdef1234567890abcdef1234567890abcdef1234567890abcdef", + reader: "0x1111111111111111111111111111111111111111111111111111111111111111", +} as const; + +// Create permission-specific test context +const tsnTest = createTestContexts(PERMISSION_ROLES); + +describe.sequential("Permissions", { timeout: 90000 }, () => { + // Skip in CI, because it needs a local node + tsnTest.skipIf(process.env.CI); + + tsnTest( + "should manage primitive stream permissions", + async ({ ownerClient, readerClient, readerWallet }) => { + // Generate a unique stream ID + const streamId = await StreamId.generate("test-permissions-primitive"); + const streamLocator = ownerClient.ownStreamLocator(streamId); + + // Clean up after test + try { + // Deploy and initialize primitive stream with test data + await ownerClient.deployStream(streamId, "primitive", true); + const primitiveStream = ownerClient.loadPrimitiveStream(streamLocator); + + // tx is used to wait for tx success on each call + let tx: GenericResponse; + + tx = await primitiveStream.initializeStream(); + await waitForTxSuccess(tx, ownerClient); + + // Insert test data + tx = await primitiveStream.insertRecords([ + { dateValue: "2024-01-01", value: "100.000000000000000000" }, + ]); + await waitForTxSuccess(tx, ownerClient); + + // Load stream for reader + const readerPrimitiveStream = + readerClient.loadPrimitiveStream(streamLocator); + + // Test public read access + const publicRecords = await readerPrimitiveStream.getRecord({ + dateFrom: "2024-01-01", + dateTo: "2024-01-01", + }); + expect(publicRecords.length).toBe(1); + expect(publicRecords[0].value).toBe("100.000000000000000000"); + + // Set stream to private + tx = await primitiveStream.setReadVisibility(visibility.private); + await waitForTxSuccess(tx, ownerClient); + + // Verify owner can still read + const ownerRecords = await primitiveStream.getRecord({ + dateFrom: "2024-01-01", + dateTo: "2024-01-01", + }); + expect(ownerRecords.length).toBe(1); + + // Verify reader cannot read when private + await expect( + readerPrimitiveStream.getRecord({ + dateFrom: "2024-01-01", + dateTo: "2024-01-01", + }), + ).rejects.toThrow(); + + // Allow reader access + const readerAddress = new EthereumAddress(readerWallet.address); + tx = await primitiveStream.allowReadWallet(readerAddress); + await waitForTxSuccess(tx, ownerClient); + + // Verify reader can now read + const allowedRecords = await readerPrimitiveStream.getRecord({ + dateFrom: "2024-01-01", + dateTo: "2024-01-01", + }); + expect(allowedRecords.length).toBe(1); + + // Disable reader access + tx = await primitiveStream.disableReadWallet(readerAddress); + await waitForTxSuccess(tx, ownerClient); + + // Verify reader cannot read after being disabled + await expect( + readerPrimitiveStream.getRecord({ + dateFrom: "2024-01-01", + dateTo: "2024-01-01", + }), + ).rejects.toThrow(); + } finally { + await ownerClient.destroyStream(streamId, true).catch(() => {}); + } + }, + ); + + tsnTest( + "should manage composed stream permissions", + async ({ ownerClient, readerClient, readerWallet }) => { + // Generate stream IDs for both primitive and composed streams + const primitiveStreamId = await StreamId.generate( + "test-permissions-primitive-child", + ); + const composedStreamId = await StreamId.generate( + "test-permissions-composed", + ); + + try { + // Deploy and initialize primitive stream with test data + await ownerClient.deployStream(primitiveStreamId, "primitive", true); + const primitiveStream = ownerClient.loadPrimitiveStream( + ownerClient.ownStreamLocator(primitiveStreamId), + ); + + // tx is used to wait for tx success on each call + let tx: GenericResponse; + + tx = await primitiveStream.initializeStream(); + await waitForTxSuccess(tx, ownerClient); + tx = await primitiveStream.insertRecords([ + { dateValue: "2024-01-01", value: "100.000000000000000000" }, + ]); + await waitForTxSuccess(tx, ownerClient); + // Deploy and initialize composed stream + await ownerClient.deployStream(composedStreamId, "composed", true); + const composedStream = ownerClient.loadComposedStream( + ownerClient.ownStreamLocator(composedStreamId), + ); + tx = await composedStream.initializeStream(); + await waitForTxSuccess(tx, ownerClient); + + // Set taxonomy using primitive stream + tx = await composedStream.setTaxonomy({ + taxonomyItems: [ + { + childStream: ownerClient.ownStreamLocator(primitiveStreamId), + weight: "1", + }, + ], + startDate: "2024-01-01", + }); + await waitForTxSuccess(tx, ownerClient); + + // Load streams for reader + const readerComposedStream = readerClient.loadComposedStream( + ownerClient.ownStreamLocator(composedStreamId), + ); + + // Test public access + const publicRecords = await readerComposedStream.getRecord({ + dateFrom: "2024-01-01", + dateTo: "2024-01-01", + }); + expect(publicRecords.length).toBe(1); + + // Set primitive stream compose visibility to private + tx = await primitiveStream.setComposeVisibility(visibility.private); + await waitForTxSuccess(tx, ownerClient); + + // Verify composed stream fails when child is private + await expect( + readerComposedStream.getRecord({ + dateFrom: "2024-01-01", + dateTo: "2024-01-01", + }), + ).rejects.toThrow(); + + // Allow composed stream to read primitive stream + tx = await primitiveStream.allowComposeStream( + ownerClient.ownStreamLocator(composedStreamId), + ); + await waitForTxSuccess(tx, ownerClient); + + // Verify composed stream works when allowed + const allowedRecords = await readerComposedStream.getRecord({ + dateFrom: "2024-01-01", + dateTo: "2024-01-01", + }); + expect(allowedRecords.length).toBe(1); + + // Set composed stream read visibility to private + tx = await composedStream.setReadVisibility(visibility.private); + await waitForTxSuccess(tx, ownerClient); + + // Verify reader cannot access private composed stream + await expect( + readerComposedStream.getRecord({ + dateFrom: "2024-01-01", + dateTo: "2024-01-01", + }), + ).rejects.toThrow(); + + // Allow reader to access composed stream + const readerAddress = new EthereumAddress(readerWallet.address); + tx = await composedStream.allowReadWallet(readerAddress); + await waitForTxSuccess(tx, ownerClient); + + // Verify reader can access when allowed + const finalRecords = await readerComposedStream.getRecord({ + dateFrom: "2024-01-01", + dateTo: "2024-01-01", + }); + expect(finalRecords.length).toBe(1); + } finally { + await ownerClient.destroyStream(primitiveStreamId, true).catch((e) => { + console.log("Failed to destroy primitive stream", e); + }); + await ownerClient.destroyStream(composedStreamId, true).catch((e) => { + console.log("Failed to destroy composed stream", e); + }); + } + }, + ); +}); diff --git a/tests/integration/primitiveStream.test.ts b/tests/integration/primitiveStream.test.ts new file mode 100644 index 0000000..2a61e76 --- /dev/null +++ b/tests/integration/primitiveStream.test.ts @@ -0,0 +1,83 @@ +import { describe, expect } from "vitest"; +import { StreamId } from "../../src/util/StreamId"; +import { testWithDefaultWallet } from "./utils"; + +describe.sequential( + "PrimitiveStream Integration Tests", + { timeout: 30000 }, + () => { + // Skip in CI, because it needs a local node + testWithDefaultWallet.skipIf(process.env.CI); + + testWithDefaultWallet( + "should deploy, initialize, write to, and read from a primitive stream", + async ({ defaultClient }) => { + // Generate a unique stream ID + const streamId = await StreamId.generate("test-primitive-stream"); + + try { + // Deploy a primitive stream + const deployReceipt = await defaultClient.deployStream( + streamId, + "primitive", + true, + ); + expect(deployReceipt.status).toBe(200); + + // Load the deployed stream + const primitiveStream = defaultClient.loadPrimitiveStream({ + streamId, + dataProvider: defaultClient.address(), + }); + + // Initialize the stream + const initTx = await primitiveStream.initializeStream(); + if (!initTx.data?.tx_hash) { + throw new Error("Init tx hash not found"); + } + await defaultClient.waitForTx(initTx.data.tx_hash); + + // Insert a record + const insertTx = await primitiveStream.insertRecords([ + { dateValue: "2020-01-01", value: "1" }, + ]); + if (!insertTx.data?.tx_hash) { + throw new Error("Insert tx hash not found"); + } + await defaultClient.waitForTx(insertTx.data.tx_hash); + + // Query records + const records = await primitiveStream.getRecord({ + dateFrom: "2020-01-01", + dateTo: "2021-01-01", + }); + + // Verify record content + expect(records.length).toBe(1); + expect(records[0].value).toBe("1.000000000000000000"); + expect(records[0].dateValue).toBe("2020-01-01"); + + // Query index + const index = await primitiveStream.getIndex({ + dateFrom: "2020-01-01", + dateTo: "2021-01-01", + }); + + // Verify index content + expect(index.length).toBe(1); + expect(index[0].value).toBe("100.000000000000000000"); + expect(index[0].dateValue).toBe("2020-01-01"); + + // Query first record + const firstRecord = await primitiveStream.getFirstRecord({}); + expect(firstRecord).not.toBeNull(); + expect(firstRecord?.value).toBe("1.000000000000000000"); + expect(firstRecord?.dateValue).toBe("2020-01-01"); + } finally { + // Cleanup: destroy the stream after test + await defaultClient.destroyStream(streamId, true); + } + }, + ); + }, +); diff --git a/tests/integration/utils.ts b/tests/integration/utils.ts new file mode 100644 index 0000000..e768c0d --- /dev/null +++ b/tests/integration/utils.ts @@ -0,0 +1,93 @@ +import { ethers } from "ethers"; +import { NodeTSNClient } from "../../src/client/nodeClient"; +import { test } from "vitest"; +import { GenericResponse } from "@kwilteam/kwil-js/dist/core/resreq"; +import { TxReceipt } from "@kwilteam/kwil-js/dist/core/tx"; + +export const TEST_ENDPOINT = "http://localhost:8484"; + +export const testWithDefaultWallet = createTestContexts({ + default: "0x0000000000000000000000000000000000000000100000000100000000000001", +}); + +export interface WalletContext { + wallet: ethers.Wallet; + client: NodeTSNClient; +} + +type Fixtures> = Parameters< + typeof test.extend +>[0]; + +/** + * Creates a test context for a specific wallet + * @param privateKey The private key for the wallet + * @returns A context containing the wallet and its client + */ +export async function createWalletContext( + privateKey: string, +): Promise { + const wallet = new ethers.Wallet(privateKey); + const chainId = await NodeTSNClient.getDefaultChainId(TEST_ENDPOINT); + + if (!chainId) throw new Error("Chain id not found"); + + const client = new NodeTSNClient({ + endpoint: TEST_ENDPOINT, + walletProvider: { + getAddress: () => wallet.address, + getSigner: () => wallet, + }, + chainId, + }); + + return { wallet, client }; +} + +/** + * Creates a test extension for multiple wallet roles + * @param roles Map of role names to private keys + * @returns A test extension with wallet contexts for each role + */ +export function createTestContexts(roles: Record) { + type ContextType = { + [K in T as `${K}Wallet`]: ethers.Wallet; + } & { + [K in T as `${K}Client`]: NodeTSNClient; + }; + + const testExtension = {} as Fixtures; + + // Create wallet and client fixtures for each role + Object.entries(roles).forEach(([role, privateKey]) => { + // Add wallet fixture + testExtension[`${role}Wallet` as keyof ContextType] = async ( + {}, + use: any, + ) => { + const { wallet } = await createWalletContext(privateKey as string); + await use(wallet); + }; + + // Add client fixture + testExtension[`${role}Client` as keyof ContextType] = async ( + { [`${role}Wallet` as keyof ContextType]: wallet }, + use: any, + ) => { + const { client } = await createWalletContext(privateKey as string); + await use(client); + }; + }); + + return test.extend(testExtension); +} + +export async function waitForTxSuccess( + tx: GenericResponse, + client: NodeTSNClient, +) { + if (!tx.data?.tx_hash) { + throw new Error("Tx hash not found"); + } + return client.waitForTx(tx.data.tx_hash); +} diff --git a/tsconfig.json b/tsconfig.json index ace6d42..52a041c 100644 --- a/tsconfig.json +++ b/tsconfig.json @@ -9,7 +9,7 @@ "moduleResolution": "Bundler", "skipLibCheck": true, "outDir": "dist", - "rootDir": "src", + "rootDir": ".", "resolveJsonModule": true, "isolatedModules": true, "noEmit": true, @@ -20,6 +20,6 @@ "vitest/importMeta" ] }, - "include": ["src/**/*"], + "include": ["src/**/*", "tests/**/*"], "exclude": ["node_modules", "dist"] } \ No newline at end of file diff --git a/vitest.config.ts b/vitest.config.ts index 155a0cd..7c6d550 100644 --- a/vitest.config.ts +++ b/vitest.config.ts @@ -6,5 +6,7 @@ export default defineConfig({ test: { includeSource: ['src/**/*.{js,ts}'], setupFiles: ['disposablestack/auto'], + maxConcurrency: 1, // Disable concurrency to avoid nonce errors + fileParallelism: false, }, })