Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(sdk): Use Mapping instead of CachingMap #2903

Merged
merged 32 commits into from
Dec 4, 2024
Merged
Show file tree
Hide file tree
Changes from 23 commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
1486e63
add maxSize and maxAge options to Mapping
teogeb Nov 22, 2024
57a273b
configure Mapping cache constraints
teogeb Nov 22, 2024
24d3475
use Mapping instead of CacheMap
teogeb Nov 22, 2024
dfdee70
support symbol in formLookupKey()
teogeb Nov 22, 2024
2102214
fix invalidateCache()
teogeb Nov 22, 2024
63ab5d2
improve Mapping#invalidate type safety
teogeb Nov 22, 2024
6068e8a
return type of Mapping#values()
teogeb Nov 22, 2024
8a992ca
add test
teogeb Nov 22, 2024
62aef39
simplify invalidateCache()
teogeb Nov 22, 2024
42e6897
no not use size constraints
teogeb Nov 25, 2024
1a64eb3
Merge branch 'main' into sdk-mapping-cache-constraints
teogeb Nov 25, 2024
3bcddf4
builder functions
teogeb Nov 25, 2024
d98f91a
revert field name
teogeb Nov 25, 2024
6ecc3b6
fix variable name
teogeb Nov 25, 2024
7ccb819
builder functions
teogeb Nov 25, 2024
eb30e6d
revert field name
teogeb Nov 26, 2024
2f31d6b
fix variable name
teogeb Nov 26, 2024
25bca38
Merge branch 'sdk-mapping-cache-constraints' into sdk-use-Mapping-ins…
teogeb Nov 26, 2024
011bbff
Merge branch 'main' into sdk-use-Mapping-instead-of-CacheMap-in-regis…
teogeb Nov 26, 2024
fee4695
simplify field types
teogeb Nov 26, 2024
4a35a1a
TypeScript v5.1 compatibility
teogeb Nov 26, 2024
e5bfdcf
tmp: use createCacheMap() in CachingMap tests
teogeb Dec 2, 2024
cb3602e
Revert "tmp: use createCacheMap() in CachingMap tests"
teogeb Dec 2, 2024
8ef1d9c
Merge branch 'main' into sdk-use-Mapping-instead-of-CacheMap-in-regis…
teogeb Dec 3, 2024
9715473
Merge branch 'main' into sdk-use-Mapping-instead-of-CacheMap-in-regis…
teogeb Dec 3, 2024
ec762dc
get() uses array
teogeb Dec 3, 2024
ce37b3a
valueFactory() uses array
teogeb Dec 3, 2024
92a2fe4
use factory functions
teogeb Dec 3, 2024
860e412
allow primitive key
teogeb Dec 3, 2024
3b42314
fix publicSubscribePermissionCache valueFactory
teogeb Dec 3, 2024
639e477
add test
teogeb Dec 3, 2024
6605289
fix invalidateCache()
teogeb Dec 3, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
67 changes: 31 additions & 36 deletions packages/sdk/src/contracts/StreamRegistry.ts
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,9 @@ import {
isPublicPermissionQuery,
streamPermissionToSolidityType
} from '../permission'
import { CachingMap } from '../utils/CachingMap'
import { filter, map } from '../utils/GeneratorUtils'
import { LoggerFactory } from '../utils/LoggerFactory'
import { Mapping } from '../utils/Mapping'
import { ChainEventPoller } from './ChainEventPoller'
import { ContractFactory } from './ContractFactory'
import { ObservableContract, initContractEventGateway, waitForTx } from './contract'
Expand Down Expand Up @@ -102,13 +102,8 @@ const streamContractErrorProcessor = (err: any, streamId: StreamID, registry: st
}
}

const formCacheKeyPrefix = (streamId: StreamID): string => {
return `${streamId}|`
}

const invalidateCache = (cache: CachingMap<string, any, any>, streamId: StreamID): void => {
const matchTarget = (s: string) => s.startsWith(formCacheKeyPrefix(streamId))
cache.invalidate(matchTarget)
const invalidateCache = (cache: Mapping<[StreamID, ...any[]], any>, streamId: StreamID): void => {
cache.invalidate(([key]) => key === streamId)
}

@scoped(Lifecycle.ContainerScoped)
Expand All @@ -124,10 +119,10 @@ export class StreamRegistry {
private readonly config: Pick<StrictStreamrClientConfig, 'contracts' | 'cache' | '_timeouts'>
private readonly authentication: Authentication
private readonly logger: Logger
private readonly metadataCache: CachingMap<string, StreamMetadata, [StreamID]>
private readonly publisherCache: CachingMap<string, boolean, [StreamID, UserID]>
private readonly subscriberCache: CachingMap<string, boolean, [StreamID, UserID]>
private readonly publicSubscribePermissionCache: CachingMap<string, boolean, [StreamID]>
private readonly metadataCache: Mapping<[StreamID], StreamMetadata>
private readonly publisherCache: Mapping<[StreamID, UserID], boolean>
private readonly subscriberCache: Mapping<[StreamID, UserID], boolean>
private readonly publicSubscribePermissionCache: Mapping<[StreamID], boolean>

/** @internal */
constructor(
Expand Down Expand Up @@ -168,33 +163,33 @@ export class StreamRegistry {
}),
loggerFactory
})
this.metadataCache = new CachingMap((streamId: StreamID) => {
return this.getStreamMetadata_nonCached(streamId)
}, {
...config.cache,
cacheKey: ([streamId]) => formCacheKeyPrefix(streamId)
this.metadataCache = new Mapping({
valueFactory: (streamId: StreamID) => {
return this.getStreamMetadata_nonCached(streamId)
},
...config.cache
})
this.publisherCache = new CachingMap((streamId: StreamID, userId: UserID) => {
return this.isStreamPublisherOrSubscriber_nonCached(streamId, userId, StreamPermission.PUBLISH)
}, {
...config.cache,
cacheKey: ([streamId, userId]) =>`${formCacheKeyPrefix(streamId)}${userId}`
this.publisherCache = new Mapping({
valueFactory: (streamId: StreamID, userId: UserID) => {
return this.isStreamPublisherOrSubscriber_nonCached(streamId, userId, StreamPermission.PUBLISH)
},
...config.cache
})
this.subscriberCache = new CachingMap((streamId: StreamID, userId: UserID) => {
return this.isStreamPublisherOrSubscriber_nonCached(streamId, userId, StreamPermission.SUBSCRIBE)
}, {
...config.cache,
cacheKey: ([streamId, userId]) =>`${formCacheKeyPrefix(streamId)}${userId}`
this.subscriberCache = new Mapping({
valueFactory: (streamId: StreamID, userId: UserID) => {
return this.isStreamPublisherOrSubscriber_nonCached(streamId, userId, StreamPermission.SUBSCRIBE)
},
...config.cache
})
this.publicSubscribePermissionCache = new CachingMap((streamId: StreamID) => {
return this.hasPermission({
streamId,
public: true,
permission: StreamPermission.SUBSCRIBE
})
}, {
...config.cache,
cacheKey: ([streamId]) => formCacheKeyPrefix(streamId)
this.publicSubscribePermissionCache = new Mapping({
valueFactory: (streamId: StreamID) => {
return this.hasPermission({
streamId,
public: true,
permission: StreamPermission.SUBSCRIBE
})
},
...config.cache
})
}

Expand Down
29 changes: 14 additions & 15 deletions packages/sdk/src/contracts/StreamStorageRegistry.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import { LoggerFactory } from '../utils/LoggerFactory'
import { ChainEventPoller } from './ChainEventPoller'
import { ContractFactory } from './ContractFactory'
import { initContractEventGateway, waitForTx } from './contract'
import { CachingMap } from '../utils/CachingMap'
import { Mapping } from '../utils/Mapping'

export interface StorageNodeAssignmentEvent {
readonly streamId: StreamID
Expand Down Expand Up @@ -45,7 +45,7 @@ export class StreamStorageRegistry {
private readonly config: Pick<StrictStreamrClientConfig, 'contracts' | '_timeouts'>
private readonly authentication: Authentication
private readonly logger: Logger
private readonly getStorageNodes_cached: CachingMap<string | typeof GET_ALL_STORAGE_NODES, EthereumAddress[], [string | undefined]>
private readonly storageNodesCache: Mapping<[StreamID | typeof GET_ALL_STORAGE_NODES], EthereumAddress[]>

constructor(
streamIdBuilder: StreamIDBuilder,
Expand Down Expand Up @@ -78,13 +78,11 @@ export class StreamStorageRegistry {
)
}), config.contracts.pollInterval)
this.initStreamAssignmentEventListeners(eventEmitter, chainEventPoller, loggerFactory)
this.getStorageNodes_cached = new CachingMap((streamIdOrPath?: string) => {
return this.getStorageNodes_nonCached(streamIdOrPath)
}, {
...config.cache,
cacheKey: ([streamIdOrPath]): string | typeof GET_ALL_STORAGE_NODES => {
return streamIdOrPath ?? GET_ALL_STORAGE_NODES
}
this.storageNodesCache = new Mapping({
valueFactory: (query: StreamID | typeof GET_ALL_STORAGE_NODES) => {
return this.getStorageNodes_nonCached(query)
},
...config.cache
})
}

Expand Down Expand Up @@ -135,7 +133,7 @@ export class StreamStorageRegistry {
await this.connectToContract()
const ethersOverrides = await getEthersOverrides(this.rpcProviderSource, this.config)
await waitForTx(this.streamStorageRegistryContract!.addStorageNode(streamId, nodeAddress, ethersOverrides))
this.getStorageNodes_cached.invalidate((key) => key === streamId)
this.storageNodesCache.invalidate((key) => key[0] === streamId)
}

async removeStreamFromStorageNode(streamIdOrPath: string, nodeAddress: EthereumAddress): Promise<void> {
Expand All @@ -144,7 +142,7 @@ export class StreamStorageRegistry {
await this.connectToContract()
const ethersOverrides = await getEthersOverrides(this.rpcProviderSource, this.config)
await waitForTx(this.streamStorageRegistryContract!.removeStorageNode(streamId, nodeAddress, ethersOverrides))
this.getStorageNodes_cached.invalidate((key) => key === streamId)
this.storageNodesCache.invalidate((key) => key[0] === streamId)
}

async isStoredStream(streamIdOrPath: string, nodeAddress: EthereumAddress): Promise<boolean> {
Expand Down Expand Up @@ -192,13 +190,14 @@ export class StreamStorageRegistry {
}

async getStorageNodes(streamIdOrPath?: string): Promise<EthereumAddress[]> {
return this.getStorageNodes_cached.get(streamIdOrPath)
const query = (streamIdOrPath !== undefined) ? await this.streamIdBuilder.toStreamID(streamIdOrPath) : GET_ALL_STORAGE_NODES
return this.storageNodesCache.get(query)
}

private async getStorageNodes_nonCached(streamIdOrPath?: string): Promise<EthereumAddress[]> {
private async getStorageNodes_nonCached(query: StreamID | typeof GET_ALL_STORAGE_NODES): Promise<EthereumAddress[]> {
let queryResults: NodeQueryResult[]
if (streamIdOrPath !== undefined) {
const streamId = await this.streamIdBuilder.toStreamID(streamIdOrPath)
if (query !== GET_ALL_STORAGE_NODES) {
const streamId = query
this.logger.debug('Get storage nodes of stream', { streamId })
queryResults = await collect(this.theGraphClient.queryEntities<NodeQueryResult>(
(lastId: string, pageSize: number) => {
Expand Down
4 changes: 2 additions & 2 deletions packages/sdk/src/publish/MessageFactory.ts
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ export class MessageFactory {
private readonly streamId: StreamID
private readonly authentication: Authentication
private defaultPartition: number | undefined
private readonly defaultMessageChainIds: Mapping<[partition: number], string>
private readonly defaultMessageChainIds: Mapping<[number], string>
private readonly prevMsgRefs: Map<string, MessageRef> = new Map()
// eslint-disable-next-line max-len
private readonly streamRegistry: Pick<StreamRegistry, 'getStreamMetadata' | 'hasPublicSubscribePermission' | 'isStreamPublisher' | 'invalidatePermissionCaches'>
Expand All @@ -53,7 +53,7 @@ export class MessageFactory {
this.groupKeyQueue = opts.groupKeyQueue
this.signatureValidator = opts.signatureValidator
this.messageSigner = opts.messageSigner
this.defaultMessageChainIds = new Mapping({
this.defaultMessageChainIds = new Mapping<[number], string>({
valueFactory: async () => {
return createRandomMsgChainId()
}
Expand Down
6 changes: 3 additions & 3 deletions packages/sdk/src/publish/Publisher.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import { StreamID } from '@streamr/utils'
import isString from 'lodash/isString'
import pLimit from 'p-limit'
import { Lifecycle, inject, scoped } from 'tsyringe'
import { inject, Lifecycle, scoped } from 'tsyringe'
import { Authentication, AuthenticationInjectionToken } from '../Authentication'
import { NetworkNodeFacade } from '../NetworkNodeFacade'
import { StreamIDBuilder } from '../StreamIDBuilder'
Expand Down Expand Up @@ -43,8 +43,8 @@ const parseTimestamp = (metadata?: PublishMetadata): number => {
@scoped(Lifecycle.ContainerScoped)
export class Publisher {

private readonly messageFactories: Mapping<[streamId: StreamID], MessageFactory>
private readonly groupKeyQueues: Mapping<[streamId: StreamID], GroupKeyQueue>
private readonly messageFactories: Mapping<[StreamID], MessageFactory>
private readonly groupKeyQueues: Mapping<[StreamID], GroupKeyQueue>
private readonly concurrencyLimit = pLimit(1)
private readonly node: NetworkNodeFacade
private readonly streamRegistry: StreamRegistry
Expand Down
2 changes: 1 addition & 1 deletion packages/sdk/src/subscribe/ordering/OrderMessages.ts
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ export class OrderMessages {
const chain = await this.chains.get(msg.getPublisherId(), msg.getMsgChainId())
chain.addMessage(msg)
}
await Promise.all(this.chains.values().map((chain) => chain.waitUntilIdle()))
await Promise.all([...this.chains.values()].map((chain) => chain.waitUntilIdle()))
this.outBuffer.endWrite()
} catch (err) {
this.outBuffer.endWrite(err)
Expand Down
54 changes: 32 additions & 22 deletions packages/sdk/src/utils/Mapping.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import { formLookupKey } from './utils'
import LRU from '../../vendor/quick-lru'

type KeyType = (string | number)[]
type KeyType = (string | number | symbol)[]

interface BaseOptions<K extends KeyType, V> {
valueFactory: (...args: K) => Promise<V>
Expand All @@ -14,8 +14,8 @@ interface CacheMapOptions<K extends KeyType, V> extends BaseOptions<K, V> {

type LazyMapOptions<K extends KeyType, V> = BaseOptions<K, V>

// an wrapper object is used so that we can store undefined values
interface ValueWrapper<V> {
interface Item<K, V> {
key: K
value: V
}

Expand All @@ -26,7 +26,7 @@ interface ValueWrapper<V> {
*/
export class Mapping<K extends KeyType, V> {

private readonly delegate: Map<string, ValueWrapper<V>>
private readonly delegate: Map<string, Item<K, V>>
private readonly pendingPromises: Map<string, Promise<V>> = new Map()
private readonly opts: CacheMapOptions<K, V> | LazyMapOptions<K, V>

Expand All @@ -37,45 +37,55 @@ export class Mapping<K extends KeyType, V> {
**/
constructor(opts: CacheMapOptions<K, V> | LazyMapOptions<K, V>) {
if ('maxSize' in opts) {
this.delegate = new LRU<string, ValueWrapper<V>>({
this.delegate = new LRU<string, Item<K, V>>({
maxSize: opts.maxSize,
maxAge: opts.maxAge
})
} else {
this.delegate = new Map<string, ValueWrapper<V>>()
this.delegate = new Map<string, Item<K, V>>()
}
this.opts = opts
}

async get(...args: K): Promise<V> {
const key = formLookupKey(...args)
const pendingPromise = this.pendingPromises.get(key)
async get(...key: K): Promise<V> {
teogeb marked this conversation as resolved.
Show resolved Hide resolved
const lookupKey = formLookupKey(...key)
const pendingPromise = this.pendingPromises.get(lookupKey)
if (pendingPromise !== undefined) {
return await pendingPromise
} else {
let valueWrapper = this.delegate.get(key)
if (valueWrapper === undefined) {
const promise = this.opts.valueFactory(...args)
this.pendingPromises.set(key, promise)
let item = this.delegate.get(lookupKey)
if (item === undefined) {
const promise = this.opts.valueFactory(...key)
this.pendingPromises.set(lookupKey, promise)
let value
try {
value = await promise
} finally {
this.pendingPromises.delete(key)
this.pendingPromises.delete(lookupKey)
}
valueWrapper = { value }
this.delegate.set(key, valueWrapper)
item = { key, value }
this.delegate.set(lookupKey, item)
}
return valueWrapper.value
return item.value
}
}

values(): V[] {
const result: V[] = []
for (const wrapper of this.delegate.values()) {
result.push(wrapper.value)
set(key: K, value: V): void {
this.delegate.set(formLookupKey(...key), { key, value })
}

invalidate(predicate: (key: K) => boolean): void {
for (const [lookupKey, item] of this.delegate.entries()) {
if (predicate(item.key)) {
this.delegate.delete(lookupKey)
}
}
}

*values(): IterableIterator<V> {
for (const item of this.delegate.values()) {
yield item.value
}
return result
}
}

Expand Down
4 changes: 2 additions & 2 deletions packages/sdk/src/utils/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -156,8 +156,8 @@ export function generateClientId(): string {

// A unique internal identifier to some list of primitive values. Useful
// e.g. as a map key or a cache key.
export const formLookupKey = <K extends (string | number)[]>(...args: K): string => {
return args.join('|')
export const formLookupKey = <K extends (string | number | symbol)[]>(...args: K): string => {
return args.map((a) => a.toString()).join('|')
}

/** @internal */
Expand Down
13 changes: 13 additions & 0 deletions packages/sdk/test/unit/Mapping.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -113,4 +113,17 @@ describe('Mapping', () => {
await mapping.get('foo', 1)
expect(valueFactory).toHaveBeenCalledTimes(2)
})

it('invalidate', async () => {
const mapping = createLazyMap({
valueFactory: async (p1: string, p2: number) => `${p1}${p2}`,
})
mapping.set(['foo', 1], 'foo1')
mapping.set(['bar', 1], 'bar1')
await mapping.get('foo', 2)
await mapping.get('bar', 2)
expect([...mapping.values()]).toIncludeSameMembers(['foo1', 'bar1', 'foo2', 'bar2'])
mapping.invalidate(([p1]) => (p1 === 'bar'))
expect([...mapping.values()]).toIncludeSameMembers(['foo1', 'foo2'])
})
})