Skip to content

Commit

Permalink
Stream Client Package Upgrades (#35)
Browse files Browse the repository at this point in the history
  • Loading branch information
mzkrasner authored Dec 5, 2024
1 parent 8804a2c commit 93856f1
Show file tree
Hide file tree
Showing 10 changed files with 216 additions and 22 deletions.
33 changes: 33 additions & 0 deletions packages/stream-client/src/index.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,19 @@
import { type CeramicClient, getCeramicClient } from '@ceramic-sdk/http-client'
import type { DID } from 'dids'

export type StreamState = {
/** Multibase encoding of the stream id */
id: string
/** CID of the event that produced this state */
event_cid: string
/** Controller of the stream */
controller: string
/** Dimensions of the stream, each value is multibase encoded */
dimensions: Record<string, Uint8Array>
/** Multibase encoding of the data of the stream. Content is stream type specific */
data: string
}

export type StreamClientParams = {
/** Ceramic HTTP client instance or Ceramic One server URL */
ceramic: CeramicClient | string
Expand All @@ -22,6 +35,26 @@ export class StreamClient {
return this.#ceramic
}

/**
* Get the state of a stream by its ID
* @param streamId - Multibase encoded stream ID
* @returns The state of the stream
*/
async getStreamState(streamId: string): Promise<StreamState> {
const { data, error } = await this.#ceramic.api.GET(
'/streams/{stream_id}',
{
params: { path: { stream_id: streamId } },
},
)

if (error != null) {
throw new Error(error.message)
}

return data
}

/** Utility method used to access the provided DID or the one attached to the instance, throws if neither is provided */
getDID(provided?: DID): DID {
if (provided != null) {
Expand Down
58 changes: 57 additions & 1 deletion packages/stream-client/test/lib.test.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import { CeramicClient } from '@ceramic-sdk/http-client'
import { jest } from '@jest/globals'
import { DID } from 'dids'

import { StreamClient } from '../src/index.js'

describe('StreamClient', () => {
Expand Down Expand Up @@ -38,4 +38,60 @@ describe('StreamClient', () => {
expect(client.getDID(did)).toBe(did)
})
})
describe('getStreamState() method', () => {
test('fetches the state of a stream by its ID', async () => {
const streamId = 'streamId123'
const mockStreamState = {
id: streamId,
controller: 'did:example:123',
data: 'someEncodedData',
event_cid: 'someCid',
dimensions: {},
}

// Mock CeramicClient and its API
const mockGet = jest.fn(() =>
Promise.resolve({
data: mockStreamState,
error: null,
}),
)
const mockCeramicClient = {
api: { GET: mockGet },
} as unknown as CeramicClient

const client = new StreamClient({ ceramic: mockCeramicClient })
const state = await client.getStreamState(streamId)

expect(state).toEqual(mockStreamState)
expect(mockGet).toHaveBeenCalledWith('/streams/{stream_id}', {
params: { path: { stream_id: streamId } },
})
})

test('throws an error if the stream is not found', async () => {
const streamId = 'invalidStreamId'
const mockError = { message: 'Stream not found' }

// Mock CeramicClient and its API
const mockGet = jest.fn(() =>
Promise.resolve({
data: null,
error: mockError,
}),
)
const mockCeramicClient = {
api: { GET: mockGet },
} as unknown as CeramicClient

const client = new StreamClient({ ceramic: mockCeramicClient })

await expect(client.getStreamState(streamId)).rejects.toThrow(
'Stream not found',
)
expect(mockGet).toHaveBeenCalledWith('/streams/{stream_id}', {
params: { path: { stream_id: streamId } },
})
})
})
})
10 changes: 10 additions & 0 deletions pnpm-lock.yaml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 5 additions & 1 deletion tests/c1-integration/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
},
"dependencies": {
"@ceramic-sdk/events": "workspace:^",
"@ceramic-sdk/stream-client": "workspace:^",
"@ceramic-sdk/identifiers": "workspace:^",
"@ceramic-sdk/flight-sql-client": "workspace:^",
"@ceramic-sdk/http-client": "workspace:^",
Expand All @@ -31,13 +32,16 @@
"@ceramic-sdk/model-instance-protocol": "workspace:^",
"@ceramic-sdk/model-protocol": "workspace:^",
"@didtools/key-did": "^1.0.0",
"@didtools/codecs": "^3.0.0",
"apache-arrow": "18.0.0",
"@jest/environment": "^29.7.0",
"@types/cross-spawn": "^6.0.0",
"cross-spawn": "^7.0.6",
"modern-spawn": "^1.0.0"
},
"devDependencies": {},
"devDependencies": {
"multiformats": "^13.3.0"
},
"jest": {
"extensionsToTreatAsEsm": [".ts"],
"moduleNameMapper": {
Expand Down
2 changes: 2 additions & 0 deletions tests/c1-integration/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ const DEFAULT_ENVIRONMENT = {
CERAMIC_ONE_LOG_FORMAT: 'single-line',
CERAMIC_ONE_NETWORK: 'in-memory',
CERAMIC_ONE_STORE_DIR: '/',
CERAMIC_ONE_AGGREGATOR: 'true',
CERAMIC_ONE_OBJECT_STORE_URL: 'file://./generated',
}

export type EnvironmentOptions = {
Expand Down
13 changes: 7 additions & 6 deletions tests/c1-integration/test/classes.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,20 +16,21 @@ import CeramicOneContainer, { type EnvironmentOptions } from '../src'

const authenticatedDID = await getAuthenticatedDID(new Uint8Array(32))

const OPTIONS: EnvironmentOptions = {
containerName: 'ceramic-test-document',
apiPort: 5203,
flightSqlPort: 5204,
const CONTAINER_OPTS: EnvironmentOptions = {
containerName: 'ceramic-test-classes',
apiPort: 5222,
flightSqlPort: 5223,
testPort: 5223,
}

describe('stream classes', () => {
let c1Container: CeramicOneContainer
const client = new CeramicClient({
url: `http://127.0.0.1:${OPTIONS.apiPort}`,
url: `http://127.0.0.1:${CONTAINER_OPTS.apiPort}`,
})

beforeAll(async () => {
c1Container = await CeramicOneContainer.startContainer(OPTIONS)
c1Container = await CeramicOneContainer.startContainer(CONTAINER_OPTS)
}, 10000)

test('create and update deterministic document', async () => {
Expand Down
12 changes: 6 additions & 6 deletions tests/c1-integration/test/document.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ import {
} from '@ceramic-sdk/model-protocol'
import { getAuthenticatedDID } from '@didtools/key-did'
import CeramicOneContainer, { type EnvironmentOptions } from '../src'
import ContainerWrapper from '../src/withContainer'

const authenticatedDID = await getAuthenticatedDID(new Uint8Array(32))

Expand All @@ -35,20 +34,21 @@ const testModel: ModelDefinition = {
},
}

const OPTIONS: EnvironmentOptions = {
const CONTAINER_OPTS: EnvironmentOptions = {
containerName: 'ceramic-test-document',
apiPort: 5211,
flightSqlPort: 5212,
apiPort: 5222,
flightSqlPort: 5223,
testPort: 5223,
}

describe('model integration test', () => {
let c1Container: CeramicOneContainer
const client = new CeramicClient({
url: `http://127.0.0.1:${OPTIONS.apiPort}`,
url: `http://127.0.0.1:${CONTAINER_OPTS.apiPort}`,
})

beforeAll(async () => {
c1Container = await CeramicOneContainer.startContainer(OPTIONS)
c1Container = await CeramicOneContainer.startContainer(CONTAINER_OPTS)
}, 10000)

test('create model and documents using the model', async () => {
Expand Down
5 changes: 2 additions & 3 deletions tests/c1-integration/test/flight.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -65,15 +65,14 @@ describe('flight sql', () => {
expect(withSchema).not.toBe(noSchema)
})

// disabled until server support is implemented
test.skip('prepared stmt', async () => {
test('prepared stmt', async () => {
const client = await createFlightSqlClient(OPTIONS)
const buffer = await client.preparedStatement(
'SELECT * from conclusion_events where stream_type = $1',
new Array(['$1', '3']),
)
const data = tableFromIPC(buffer)
console.log(JSON.stringify(data))
expect(data).toBeDefined()
})

afterAll(async () => {
Expand Down
11 changes: 6 additions & 5 deletions tests/c1-integration/test/model.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,20 +27,21 @@ const testModel: ModelDefinition = {
},
}

const OPTIONS: EnvironmentOptions = {
const CONTAINER_OPTS: EnvironmentOptions = {
containerName: 'ceramic-test-model',
apiPort: 5201,
flightSqlPort: 5202,
apiPort: 5222,
flightSqlPort: 5223,
testPort: 5223,
}

describe('model integration test', () => {
let c1Container: CeramicOneContainer
const client = new CeramicClient({
url: `http://127.0.0.1:${OPTIONS.apiPort}`,
url: `http://127.0.0.1:${CONTAINER_OPTS.apiPort}`,
})

beforeAll(async () => {
c1Container = await CeramicOneContainer.startContainer(OPTIONS)
c1Container = await CeramicOneContainer.startContainer(CONTAINER_OPTS)
}, 10000)

test('create model', async () => {
Expand Down
88 changes: 88 additions & 0 deletions tests/c1-integration/test/stream-client.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
import { InitEventPayload, SignedEvent, signEvent } from '@ceramic-sdk/events'
import {
type ClientOptions,
type FlightSqlClient,
createFlightSqlClient,
} from '@ceramic-sdk/flight-sql-client'
import { CeramicClient } from '@ceramic-sdk/http-client'
import { StreamID } from '@ceramic-sdk/identifiers'
import { StreamClient } from '@ceramic-sdk/stream-client'
import { asDIDString } from '@didtools/codecs'
import { getAuthenticatedDID } from '@didtools/key-did'
import { tableFromIPC } from 'apache-arrow'
import { CID } from 'multiformats'
import type { EnvironmentOptions } from '../src'
import CeramicOneContainer from '../src'

const CONTAINER_OPTS: EnvironmentOptions = {
containerName: 'ceramic-test-stream-client',
apiPort: 5222,
flightSqlPort: 5223,
testPort: 5223,
}

const authenticatedDID = await getAuthenticatedDID(new Uint8Array(32))

const OPTIONS: ClientOptions = {
headers: new Array(),
username: undefined,
password: undefined,
token: undefined,
tls: false,
host: '127.0.0.1',
port: CONTAINER_OPTS.flightSqlPort,
}

async function getClient(): Promise<FlightSqlClient> {
return createFlightSqlClient(OPTIONS)
}

describe('stream client', () => {
let c1Container: CeramicOneContainer
const ceramicClient = new CeramicClient({
url: `http://127.0.0.1:${CONTAINER_OPTS.apiPort}`,
})
let streamId: StreamID
let cid: CID
beforeAll(async () => {
c1Container = await CeramicOneContainer.startContainer(CONTAINER_OPTS)

// create a new event
const model = StreamID.fromString(
'kjzl6hvfrbw6c5he7fxl3oakeckm2kchkqboqug08inkh1tmfqpd8v3oceriml2',
)
const eventPayload: InitEventPayload = {
data: {
body: 'This is a simple message',
},
header: {
controllers: [asDIDString(authenticatedDID.id)],
model,
sep: 'test',
},
}
const encodedPayload = InitEventPayload.encode(eventPayload)
const signedEvent = await signEvent(authenticatedDID, encodedPayload)
cid = await ceramicClient.postEventType(SignedEvent, signedEvent)

// obtain the stream ID
const client = await getClient()
const buffer = await client.query('SELECT * FROM conclusion_events LIMIT 1')
const data = tableFromIPC(buffer)
const row = data.get(0)
const streamCid = CID.decode(row?.stream_cid).toString()
streamId = new StreamID(3, streamCid)
}, 10000)

test('gets a stream', async () => {
const client = new StreamClient({ ceramic: ceramicClient })
console.log(streamId.toString())
const streamState = await client.getStreamState(streamId.toString())
expect(streamState).toBeDefined()
expect(streamState.id.toString()).toEqual(streamId.toString())
}, 10000)

afterAll(async () => {
await c1Container.teardown()
})
})

0 comments on commit 93856f1

Please sign in to comment.