Skip to content

Commit

Permalink
chore: Remove RepositoryInternals (#2987)
Browse files Browse the repository at this point in the history
  • Loading branch information
stbrody authored Oct 13, 2023
1 parent c2fcf53 commit 76fe5d8
Show file tree
Hide file tree
Showing 11 changed files with 570 additions and 702 deletions.
24 changes: 16 additions & 8 deletions packages/core/src/__tests__/ceramic-api.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,7 @@ describe('Ceramic API', () => {
const CONTENT0 = { myData: 0 }
const CONTENT1 = { myData: 1 }
// TODO (NET-1614): Extend with targeted payload comparison
const addIndexSpy = jest.spyOn(ceramic.repository._internals, 'indexStreamIfNeeded')
const addIndexSpy = jest.spyOn(ceramic.repository, '_indexStreamIfNeeded')
const model = await Model.create(ceramic, MODEL_DEFINITION)
// there's an extra call to indexStreamIfNeeded every time the anchor state
// is changed.
Expand All @@ -223,9 +223,11 @@ describe('Ceramic API', () => {
it('will fail to create stream over size limits', async () => {
const CONTENT0 = { myData: 'abcdefghijklmn' }
ModelInstanceDocument.MAX_DOCUMENT_SIZE = 10
const addIndexSpy = jest.spyOn(ceramic.repository, 'indexStreamIfNeeded')
const addIndexSpy = jest.spyOn(ceramic.repository, '_indexStreamIfNeeded')
const model = await Model.create(ceramic, MODEL_DEFINITION_BLOB)
expect(addIndexSpy).toBeCalledTimes(1)
// there's an extra call to indexStreamIfNeeded every time the anchor state
// is changed.
expect(addIndexSpy).toBeCalledTimes(2)
const midMetadata = { model: model.id }
await expect(ModelInstanceDocument.create(ceramic, CONTENT0, midMetadata)).rejects.toThrow(
/which exceeds maximum size/
Expand All @@ -237,7 +239,7 @@ describe('Ceramic API', () => {
const CONTENT0 = { myData: 'abcdef' }
const CONTENT1 = [{ op: 'replace', path: '/myData', value: 'abcdefgh' } as AddOperation]
ModelInstanceDocument.MAX_DOCUMENT_SIZE = 30
const addIndexSpy = jest.spyOn(ceramic.repository._internals, 'indexStreamIfNeeded')
const addIndexSpy = jest.spyOn(ceramic.repository, '_indexStreamIfNeeded')
const model = await Model.create(ceramic, MODEL_DEFINITION_BLOB)
// there's an extra call to indexStreamIfNeeded every time the anchor state
// is changed.
Expand All @@ -261,13 +263,19 @@ describe('Ceramic API', () => {
const CONTENT0 = { myData: 'abcdef' }
const CONTENT1 = [{ op: 'replace', path: '/myData', value: 'abcdefghijkl' } as AddOperation]
ModelInstanceDocument.MAX_DOCUMENT_SIZE = 20
const addIndexSpy = jest.spyOn(ceramic.repository, 'indexStreamIfNeeded')
const addIndexSpy = jest.spyOn(ceramic.repository, '_indexStreamIfNeeded')
const model = await Model.create(ceramic, MODEL_DEFINITION_BLOB)
expect(addIndexSpy).toBeCalledTimes(1)
// there's an extra call to indexStreamIfNeeded every time the anchor state
// is changed.
expect(addIndexSpy).toBeCalledTimes(2)
const midMetadata = { model: model.id }
const doc = await ModelInstanceDocument.create(ceramic, CONTENT0, midMetadata)
const doc = await ModelInstanceDocument.create(ceramic, CONTENT0, midMetadata, {
anchor: false,
pin: false,
})
expect(doc.content).toEqual(CONTENT0)
expect(addIndexSpy).toBeCalledTimes(2)
// TODO(WS1-1269): This should only add one more indexStreamIfNeeded call
expect(addIndexSpy).toBeCalledTimes(4)
await expect(doc.patch(CONTENT1)).rejects.toThrow(/which exceeds maximum size/)
addIndexSpy.mockRestore()
})
Expand Down
2 changes: 1 addition & 1 deletion packages/core/src/__tests__/ceramic-query-response.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ describe('Response to pubsub queries handling', () => {

ceramic = await createCeramic(ceramicIpfs)

handleUpdateSpy = jest.spyOn(ceramic.repository, 'handleUpdate')
handleUpdateSpy = jest.spyOn(ceramic.repository, 'handleUpdateFromNetwork')
originalPubsubPublish = ceramic.dispatcher.messageBus.pubsub.next.bind(
ceramic.dispatcher.messageBus.pubsub
)
Expand Down
16 changes: 8 additions & 8 deletions packages/core/src/__tests__/ceramic.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -324,12 +324,12 @@ describe('Ceramic integration', () => {
const ceramic2 = await createCeramic(ipfs2, false, 1)

const repository1 = ceramic1.repository
const addSpy1 = jest.spyOn(repository1._internals, 'add')
const loadSpy1 = jest.spyOn(repository1._internals, 'load')
const addSpy1 = jest.spyOn(repository1, '_registerRunningState')
const loadSpy1 = jest.spyOn(repository1, 'load')

const repository2 = ceramic2.repository
const addSpy2 = jest.spyOn(repository2._internals, 'add')
const loadSpy2 = jest.spyOn(repository2._internals, 'load')
const addSpy2 = jest.spyOn(repository2, '_registerRunningState')
const loadSpy2 = jest.spyOn(repository2, 'load')

const stream1 = await TileDocument.create<any>(ceramic1, { test: 456 }, null, {
publish: false,
Expand Down Expand Up @@ -372,12 +372,12 @@ describe('Ceramic integration', () => {
const ceramic2 = await createCeramic(ipfs2, false, 1)

const repository1 = ceramic1.repository
const addSpy1 = jest.spyOn(repository1._internals, 'add')
const loadSpy1 = jest.spyOn(repository1._internals, 'load')
const addSpy1 = jest.spyOn(repository1, '_registerRunningState')
const loadSpy1 = jest.spyOn(repository1, 'load')

const repository2 = ceramic2.repository
const addSpy2 = jest.spyOn(repository2._internals, 'add')
const loadSpy2 = jest.spyOn(repository2._internals, 'load')
const addSpy2 = jest.spyOn(repository2, '_registerRunningState')
const loadSpy2 = jest.spyOn(repository2, 'load')

const stream1 = await TileDocument.create<any>(ceramic1, { test: 456 })
expect(loadSpy1).toBeCalledTimes(1)
Expand Down
22 changes: 15 additions & 7 deletions packages/core/src/__tests__/dispatcher-mock-ipfs.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,7 @@ describe('Dispatcher with mock ipfs', () => {
it('handle message correctly without model', async () => {
async function register(state: StreamState) {
const runningState = new RunningState(state, false)
repository.add(runningState)
repository._registerRunningState(runningState)
dispatcher.messageBus.queryNetwork(runningState.id).subscribe()
return runningState
}
Expand All @@ -229,14 +229,14 @@ describe('Dispatcher with mock ipfs', () => {
const queryID = queryMessageSent.id

// Handle UPDATE message without model
dispatcher.repository.handleUpdate = jest.fn()
dispatcher.repository.handleUpdateFromNetwork = jest.fn()
await dispatcher.handleMessage({
typ: MsgType.UPDATE,
stream: FAKE_STREAM_ID,
tip: FAKE_CID,
model: null,
})
expect(dispatcher.repository.handleUpdate).toBeCalledWith(state$.id, FAKE_CID, null)
expect(dispatcher.repository.handleUpdateFromNetwork).toBeCalledWith(state$.id, FAKE_CID, null)

const continuationState = {
...initialState,
Expand All @@ -259,13 +259,17 @@ describe('Dispatcher with mock ipfs', () => {
// Handle RESPONSE message
const tips = new Map().set(FAKE_STREAM_ID.toString(), FAKE_CID2)
await dispatcher.handleMessage({ typ: MsgType.RESPONSE, id: queryID, tips: tips })
expect(dispatcher.repository.handleUpdate).toBeCalledWith(stream2.id, FAKE_CID2, undefined)
expect(dispatcher.repository.handleUpdateFromNetwork).toBeCalledWith(
stream2.id,
FAKE_CID2,
undefined
)
})

it('handle message correctly with model', async () => {
async function register(state: StreamState) {
const runningState = new RunningState(state, false)
repository.add(runningState)
repository._registerRunningState(runningState)
dispatcher.messageBus.queryNetwork(runningState.id).subscribe()
return runningState
}
Expand All @@ -282,14 +286,18 @@ describe('Dispatcher with mock ipfs', () => {
const state$ = await register(initialState)

// Handle UPDATE message with model
dispatcher.repository.handleUpdate = jest.fn()
dispatcher.repository.handleUpdateFromNetwork = jest.fn()
await dispatcher.handleMessage({
typ: MsgType.UPDATE,
stream: FAKE_STREAM_ID,
tip: FAKE_CID,
model: FAKE_MODEL,
})
expect(dispatcher.repository.handleUpdate).toBeCalledWith(state$.id, FAKE_CID, FAKE_MODEL)
expect(dispatcher.repository.handleUpdateFromNetwork).toBeCalledWith(
state$.id,
FAKE_CID,
FAKE_MODEL
)
})

test('init', async () => {
Expand Down
28 changes: 13 additions & 15 deletions packages/core/src/__tests__/state-manager.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -54,14 +54,14 @@ describe('anchor', () => {
})

beforeEach(() => {
realHandleTip = ceramic.repository._internals.handleTip
realHandleTip = ceramic.repository._handleTip
})

afterEach(() => {
// Restore the handleTip function in case any of the tests modified it
// eslint-disable-next-line @typescript-eslint/ban-ts-comment
// @ts-ignore
ceramic.repository._internals.handleTip = realHandleTip
ceramic.repository._handleTip = realHandleTip
})

describe('With anchorOnRequest == true', () => {
Expand Down Expand Up @@ -96,7 +96,7 @@ describe('anchor', () => {
expect.objectContaining({ signature: SignatureStatus.SIGNED, anchorStatus: 0 })
)

await ceramic2.repository._internals.handleTip(streamState2, stream1.state.log[1].cid)
await ceramic2.repository._handleTip(streamState2, stream1.state.log[1].cid)

expect(stream2.state.log).toHaveLength(2)
expect(stream2.state).toEqual(stream1.state)
Expand All @@ -117,33 +117,33 @@ describe('anchor', () => {
const streamState2 = await ceramic2.repository.load(stream2.id, {})

retrieveCommitSpy.mockClear()
await ceramic2.repository._internals.handleTip(streamState2, stream1.state.log[1].cid)
await ceramic2.repository._handleTip(streamState2, stream1.state.log[1].cid)

expect(streamState2.state).toEqual(stream1.state)
// 2 IPFS retrievals - the signed commit and its linked commit payload for the commit to be
// applied
expect(retrieveCommitSpy).toBeCalledTimes(2)

// Now re-apply the same commit and don't expect any additional calls to IPFS
await ceramic2.repository._internals.handleTip(streamState2, stream1.state.log[1].cid)
await ceramic2.repository._internals.handleTip(streamState2, stream1.state.log[0].cid)
await ceramic2.repository._handleTip(streamState2, stream1.state.log[1].cid)
await ceramic2.repository._handleTip(streamState2, stream1.state.log[0].cid)
expect(retrieveCommitSpy).toBeCalledTimes(2)

// Add another update to stream 1
const moreNewContent = { foo: 'baz' }
await stream1.update(moreNewContent, null, { anchor: false })

retrieveCommitSpy.mockClear()
await ceramic2.repository._internals.handleTip(streamState2, stream1.state.log[2].cid)
await ceramic2.repository._handleTip(streamState2, stream1.state.log[2].cid)

expect(streamState2.state).toEqual(stream1.state)
// 2 IPFS retrievals - 1 each for linked commit/envelope for CID to be applied - since there is no lone genesis commit
// in the stream state.
expect(retrieveCommitSpy).toBeCalledTimes(2)

// Now re-apply the same commit and don't expect any additional calls to IPFS
await ceramic2.repository._internals.handleTip(streamState2, stream1.state.log[2].cid)
await ceramic2.repository._internals.handleTip(streamState2, stream1.state.log[1].cid)
await ceramic2.repository._handleTip(streamState2, stream1.state.log[2].cid)
await ceramic2.repository._handleTip(streamState2, stream1.state.log[1].cid)
expect(retrieveCommitSpy).toBeCalledTimes(2)

await ceramic2.close()
Expand Down Expand Up @@ -313,11 +313,10 @@ describe('anchor', () => {
})

test(`handleTip is retried until it returns`, async () => {
const internals = ceramic.repository._internals
const stream = await TileDocument.create(ceramic, INITIAL_CONTENT, null, { anchor: false })
const stream$ = await ceramic.repository.load(stream.id, {})

const handleTipSpy = jest.spyOn(internals, 'handleTip')
const handleTipSpy = jest.spyOn(ceramic.repository, '_handleTip')

// Mock a throw as the first call
handleTipSpy.mockRejectedValueOnce(new Error('Handle tip failed'))
Expand All @@ -338,12 +337,11 @@ describe('anchor', () => {
})

test(`handleTip is retried up to three times within _handleAnchorCommit, if it doesn't return`, async () => {
const internals = ceramic.repository._internals
const stream = await TileDocument.create(ceramic, INITIAL_CONTENT, null, { anchor: false })
const stream$ = await ceramic.repository.load(stream.id, {})

const fakeHandleTip = jest.fn() as unknown as typeof internals.handleTip
internals.handleTip = fakeHandleTip
const fakeHandleTip = jest.fn() as unknown as typeof ceramic.repository._handleTip
ceramic.repository._handleTip = fakeHandleTip

// Mock fakeHandleTip to always throw
fakeHandleTip.mockRejectedValue(new Error('Handle tip failed'))
Expand Down Expand Up @@ -445,7 +443,7 @@ describe('anchor', () => {
publishAnchorCommitSpy.mockImplementationOnce(
async (streamId: StreamID, commit: AnchorCommit) => {
const anchorCommit = await originalPublishAnchorCommit(streamId, commit)
await ceramic.repository.handleUpdate(streamId, anchorCommit)
await ceramic.repository.handleUpdateFromNetwork(streamId, anchorCommit)
return anchorCommit
}
)
Expand Down
4 changes: 1 addition & 3 deletions packages/core/src/ceramic.ts
Original file line number Diff line number Diff line change
Expand Up @@ -272,7 +272,7 @@ export class Ceramic implements CeramicApi {
on: params.sync,
},
this.dispatcher,
this.repository.handleUpdate.bind(this.repository),
this.repository.handleUpdateFromNetwork.bind(this.repository),
this.repository.index,
this._logger
)
Expand Down Expand Up @@ -636,8 +636,6 @@ export class Ceramic implements CeramicApi {
)
this._logger.verbose(`Created stream ${streamId.toString()} from state`)

await this.repository.indexStreamIfNeeded(state$)

return stream
}

Expand Down
2 changes: 1 addition & 1 deletion packages/core/src/dispatcher.ts
Original file line number Diff line number Diff line change
Expand Up @@ -457,7 +457,7 @@ export class Dispatcher {
// Add tip to pubsub cache and continue processing
this.pubsubCache.set(tip.toString(), streamId.toString())

await this.repository.handleUpdate(streamId, tip, model)
await this.repository.handleUpdateFromNetwork(streamId, tip, model)
}

/**
Expand Down
Loading

0 comments on commit 76fe5d8

Please sign in to comment.