Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(sdk): Use StreamrClientError instead of custom errors #2927

Draft
wants to merge 8 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 10 additions & 3 deletions packages/sdk/src/StreamrClientError.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import { StreamMessage } from './exports'
import { MessageID } from './protocol/MessageID'

export type StreamrClientErrorCode =
Expand All @@ -9,22 +10,28 @@ export type StreamrClientErrorCode =
'CLIENT_DESTROYED' |
'PIPELINE_ERROR' |
'UNSUPPORTED_OPERATION' |
'UNKNOWN_DATA_FORMAT' |
'INVALID_MESSAGE_CONTENT' |
'INVALID_STREAM_METADATA' |
'INVALID_SIGNATURE' |
'INVALID_PARTITION' |
'DECRYPT_ERROR' |
'STORAGE_NODE_ERROR' |
'UNKNOWN_ERROR'

export class StreamrClientError extends Error {

public readonly code: StreamrClientErrorCode
public readonly messageId?: MessageID

constructor(message: string, code: StreamrClientErrorCode) {
super(message)
constructor(message: string, code: StreamrClientErrorCode, streamMessage?: StreamMessage) {
super(streamMessage === undefined ? message : `${message} (messageId=${formMessageIdDescription(streamMessage.messageId)})`)
this.code = code
this.name = this.constructor.name
this.messageId = streamMessage?.messageId
}
}

export const formMessageIdDescription = (messageId: MessageID): string => {
const formMessageIdDescription = (messageId: MessageID): string => {
return JSON.stringify(messageId)
}
12 changes: 4 additions & 8 deletions packages/sdk/src/encryption/EncryptionUtil.ts
Original file line number Diff line number Diff line change
@@ -1,11 +1,7 @@
import crypto, { CipherKey } from 'crypto'
import { StreamMessage, StreamMessageAESEncrypted } from '../protocol/StreamMessage'
import { StreamMessageAESEncrypted } from '../protocol/StreamMessage'
import { StreamrClientError } from '../StreamrClientError'
import { GroupKey } from './GroupKey'
import { formMessageIdDescription, StreamrClientError } from '../StreamrClientError'

export const createDecryptError = (message: string, streamMessage: StreamMessage): StreamrClientError => {
return new StreamrClientError(`${message} (messageId=${formMessageIdDescription(streamMessage.messageId)})`, 'DECRYPT_ERROR')
}

export const INITIALIZATION_VECTOR_LENGTH = 16

Expand Down Expand Up @@ -52,15 +48,15 @@ export class EncryptionUtil {
try {
content = this.decryptWithAES(streamMessage.content, groupKey.data)
} catch {
throw createDecryptError('AES decryption failed', streamMessage)
throw new StreamrClientError('AES decryption failed', 'DECRYPT_ERROR', streamMessage)
}

let newGroupKey: GroupKey | undefined = undefined
if (streamMessage.newGroupKey) {
try {
newGroupKey = groupKey.decryptNextGroupKey(streamMessage.newGroupKey)
} catch {
throw createDecryptError('Could not decrypt new encryption key', streamMessage)
throw new StreamrClientError('Could not decrypt new encryption key', 'DECRYPT_ERROR', streamMessage)
}
}

Expand Down
5 changes: 3 additions & 2 deletions packages/sdk/src/encryption/decrypt.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
import { DestroySignal } from '../DestroySignal'
import { createDecryptError, EncryptionUtil } from '../encryption/EncryptionUtil'
import { EncryptionUtil } from '../encryption/EncryptionUtil'
import { GroupKey } from '../encryption/GroupKey'
import { GroupKeyManager } from '../encryption/GroupKeyManager'
import { EncryptionType, StreamMessage, StreamMessageAESEncrypted } from '../protocol/StreamMessage'
import { StreamrClientError } from '../StreamrClientError'

// TODO if this.destroySignal.isDestroyed() is true, would it make sense to reject the promise
// and not to return the original encrypted message?
Expand All @@ -26,7 +27,7 @@ export const decrypt = async (
if (destroySignal.isDestroyed()) {
return streamMessage
}
throw createDecryptError(`Could not get encryption key ${streamMessage.groupKeyId}`, streamMessage)
throw new StreamrClientError(`Could not get encryption key ${streamMessage.groupKeyId}`, 'DECRYPT_ERROR', streamMessage)
}
if (destroySignal.isDestroyed()) {
return streamMessage
Expand Down
12 changes: 0 additions & 12 deletions packages/sdk/src/protocol/InvalidJsonError.ts

This file was deleted.

12 changes: 4 additions & 8 deletions packages/sdk/src/protocol/StreamMessage.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,8 @@
import { StreamID, StreamPartID, UserID, binaryToUtf8 } from '@streamr/utils'
import { StreamrClientError } from '../StreamrClientError'
import { EncryptedGroupKey } from './EncryptedGroupKey'
import { InvalidJsonError } from './InvalidJsonError'
import { MessageID } from './MessageID'
import { MessageRef } from './MessageRef'
import { StreamMessageError } from './StreamMessageError'
import { ValidationError } from './ValidationError'
import { validateIsDefined } from './validations'

Expand Down Expand Up @@ -155,14 +154,11 @@ export class StreamMessage implements StreamMessageOptions {
try {
return JSON.parse(binaryToUtf8(this.content))
} catch (err: any) {
throw new InvalidJsonError(
this.getStreamId(),
err,
this,
)
// eslint-disable-next-line @typescript-eslint/restrict-template-expressions
throw new StreamrClientError(`Unable to parse JSON: ${err}`, 'INVALID_MESSAGE_CONTENT', this)
}
} else {
throw new StreamMessageError(`Unsupported contentType: ${this.contentType}`, this)
throw new StreamrClientError(`Unknown content type: ${this.contentType}`, 'UNKNOWN_DATA_FORMAT', this)
}
}

Expand Down
12 changes: 0 additions & 12 deletions packages/sdk/src/protocol/StreamMessageError.ts

This file was deleted.

6 changes: 3 additions & 3 deletions packages/sdk/src/signature/SignatureValidator.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import { toEthereumAddress, toUserIdRaw, verifySignature } from '@streamr/utils'
import { Lifecycle, scoped } from 'tsyringe'
import { ERC1271ContractFacade } from '../contracts/ERC1271ContractFacade'
import { SignatureType, StreamMessage } from '../protocol/StreamMessage'
import { StreamMessageError } from '../protocol/StreamMessageError'
import { StreamrClientError } from '../StreamrClientError'
import { createLegacySignaturePayload } from './createLegacySignaturePayload'
import { createSignaturePayload } from './createSignaturePayload'

Expand All @@ -24,10 +24,10 @@ export class SignatureValidator {
success = await this.validate(streamMessage)
} catch (err) {
// eslint-disable-next-line @typescript-eslint/restrict-template-expressions
throw new StreamMessageError(`An error occurred during address recovery from signature: ${err}`, streamMessage)
throw new StreamrClientError(`An error occurred during address recovery from signature: ${err}`, 'INVALID_SIGNATURE', streamMessage)
}
if (!success) {
throw new StreamMessageError('Signature validation failed', streamMessage)
throw new StreamrClientError('Signature validation failed', 'INVALID_SIGNATURE', streamMessage)
}
}

Expand Down
16 changes: 9 additions & 7 deletions packages/sdk/src/subscribe/messagePipeline.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,16 @@ import { StreamRegistry } from '../contracts/StreamRegistry'
import { GroupKeyManager } from '../encryption/GroupKeyManager'
import { decrypt } from '../encryption/decrypt'
import { StreamMessage } from '../protocol/StreamMessage'
import { StreamMessageError } from '../protocol/StreamMessageError'

import { SignatureValidator } from '../signature/SignatureValidator'
import { LoggerFactory } from '../utils/LoggerFactory'
import { PushPipeline } from '../utils/PushPipeline'
import { validateStreamMessage } from '../utils/validateStreamMessage'
import { MsgChainUtil } from './MsgChainUtil'
import { Resends } from './Resends'
import { OrderMessages } from './ordering/OrderMessages'
import { StreamrClientError } from '../StreamrClientError'
import { MessageID } from '../protocol/MessageID'

export interface MessagePipelineOptions {
streamPartId: StreamPartID
Expand All @@ -34,13 +36,13 @@ export const createMessagePipeline = (opts: MessagePipelineOptions): PushPipelin

const logger = opts.loggerFactory.createLogger(module)

const onError = (error: Error | StreamMessageError, streamMessage?: StreamMessage) => {
const onError = (error: Error | StreamrClientError, streamMessage?: StreamMessage) => {
if (streamMessage) {
ignoreMessages.add(streamMessage)
ignoreMessages.add(streamMessage.messageId)
}

if (error && 'streamMessage' in error && error.streamMessage) {
ignoreMessages.add(error.streamMessage)
if (error && 'messageId' in error && error.messageId) {
ignoreMessages.add(error.messageId)
}

throw error
Expand Down Expand Up @@ -70,7 +72,7 @@ export const createMessagePipeline = (opts: MessagePipelineOptions): PushPipelin
// collect messages that fail validation/parsing, do not push out of pipeline
// NOTE: we let failed messages be processed and only removed at end so they don't
// end up acting as gaps that we repeatedly try to fill.
const ignoreMessages = new WeakSet()
const ignoreMessages = new WeakSet<MessageID>()
messageStream.onError.listen(onError)
if (opts.config.orderMessages) {
// order messages and fill gaps
Expand Down Expand Up @@ -109,7 +111,7 @@ export const createMessagePipeline = (opts: MessagePipelineOptions): PushPipelin
})
// ignore any failed messages
.filter((streamMessage: StreamMessage) => {
return !ignoreMessages.has(streamMessage)
return !ignoreMessages.has(streamMessage.messageId)
})
return messageStream
}
16 changes: 10 additions & 6 deletions packages/sdk/src/utils/validateStreamMessage.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,9 @@ import { UserID } from '@streamr/utils'
import { StreamRegistry } from '../contracts/StreamRegistry'
import { convertBytesToGroupKeyRequest, convertBytesToGroupKeyResponse } from '../protocol/oldStreamMessageBinaryUtils'
import { StreamMessage, StreamMessageType } from '../protocol/StreamMessage'
import { StreamMessageError } from '../protocol/StreamMessageError'
import { SignatureValidator } from '../signature/SignatureValidator'
import { getPartitionCount } from '../StreamMetadata'
import { StreamrClientError } from '../StreamrClientError'

export const validateStreamMessage = async (
msg: StreamMessage,
Expand Down Expand Up @@ -56,7 +56,7 @@ const doValidate = async (
streamRegistry
)
default:
throw new StreamMessageError(`Unknown message type: ${streamMessage.messageType}!`, streamMessage)
throw new StreamrClientError(`Unknown message type: ${streamMessage.messageType}!`, 'UNKNOWN_DATA_FORMAT', streamMessage)
}
}

Expand All @@ -68,12 +68,16 @@ const validateMessage = async (
const streamMetadata = await streamRegistry.getStreamMetadata(streamId)
const partitionCount = getPartitionCount(streamMetadata)
if (streamMessage.getStreamPartition() < 0 || streamMessage.getStreamPartition() >= partitionCount) {
throw new StreamMessageError(`Partition ${streamMessage.getStreamPartition()} is out of range (0..${partitionCount - 1})`, streamMessage)
throw new StreamrClientError(
`Partition ${streamMessage.getStreamPartition()} is out of range (0..${partitionCount - 1})`,
'INVALID_PARTITION',
streamMessage
)
}
const sender = streamMessage.getPublisherId()
const isPublisher = await streamRegistry.isStreamPublisher(streamId, sender)
if (!isPublisher) {
throw new StreamMessageError(`${sender} is not a publisher on stream ${streamId}`, streamMessage)
throw new StreamrClientError(`${sender} is not a publisher on stream ${streamId}`, 'MISSING_PERMISSION', streamMessage)
}
}

Expand All @@ -86,10 +90,10 @@ const validateGroupKeyMessage = async (
const streamId = streamMessage.getStreamId()
const isPublisher = await streamRegistry.isStreamPublisher(streamId, expectedPublisherId)
if (!isPublisher) {
throw new StreamMessageError(`${expectedPublisherId} is not a publisher on stream ${streamId}`, streamMessage)
throw new StreamrClientError(`${expectedPublisherId} is not a publisher on stream ${streamId}`, 'MISSING_PERMISSION', streamMessage)
}
const isSubscriber = await streamRegistry.isStreamSubscriber(streamId, expectedSubscriberId)
if (!isSubscriber) {
throw new StreamMessageError(`${expectedSubscriberId} is not a subscriber on stream ${streamId}`, streamMessage)
throw new StreamrClientError(`${expectedSubscriberId} is not a subscriber on stream ${streamId}`, 'MISSING_PERMISSION', streamMessage)
}
}
9 changes: 4 additions & 5 deletions packages/sdk/test/unit/Decrypt.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import { GroupKeyManager } from '../../src/encryption/GroupKeyManager'
import { decrypt } from '../../src/encryption/decrypt'
import { createGroupKeyManager, createMockMessage } from '../test-utils/utils'
import { EncryptionType, StreamMessage, StreamMessageAESEncrypted } from './../../src/protocol/StreamMessage'
import { formMessageIdDescription } from '../../src/StreamrClientError'
import { StreamrClientError } from '../../src/StreamrClientError'

describe('Decrypt', () => {

Expand Down Expand Up @@ -55,9 +55,8 @@ describe('Decrypt', () => {
msg as StreamMessageAESEncrypted,
groupKeyManager,
destroySignal)
}).rejects.toThrowStreamrClientError({
code: 'DECRYPT_ERROR',
message: `Could not get encryption key ${groupKey.id} (messageId=${formMessageIdDescription(msg.messageId)})`
})
}).rejects.toThrowStreamrClientError(
new StreamrClientError(`Could not get encryption key ${groupKey.id}`, 'DECRYPT_ERROR', msg)
)
})
})
9 changes: 4 additions & 5 deletions packages/sdk/test/unit/EncryptionUtil.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import { GroupKey } from '../../src/encryption/GroupKey'
import { createMockMessage } from '../test-utils/utils'
import { EncryptedGroupKey } from './../../src/protocol/EncryptedGroupKey'
import { StreamMessage, StreamMessageAESEncrypted } from './../../src/protocol/StreamMessage'
import { formMessageIdDescription } from '../../src/StreamrClientError'
import { StreamrClientError } from '../../src/StreamrClientError'

const STREAM_ID = toStreamID('streamId')

Expand Down Expand Up @@ -62,9 +62,8 @@ describe('EncryptionUtil', () => {
...msg,
newGroupKey: new EncryptedGroupKey('mockId', hexToBinary('0x1234'))
}) as StreamMessageAESEncrypted
expect(() => EncryptionUtil.decryptStreamMessage(msg2, key)).toThrowStreamrClientError({
code: 'DECRYPT_ERROR',
message: `Could not decrypt new encryption key (messageId=${formMessageIdDescription(msg2.messageId)})`
})
expect(() => EncryptionUtil.decryptStreamMessage(msg2, key)).toThrowStreamrClientError(
new StreamrClientError('Could not decrypt new encryption key', 'DECRYPT_ERROR', msg2)
)
})
})
9 changes: 5 additions & 4 deletions packages/sdk/test/unit/SignatureValidator.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import { ERC1271ContractFacade } from '../../src/contracts/ERC1271ContractFacade
import { MessageRef } from '../../src/protocol/MessageRef'
import { SignatureValidator } from '../../src/signature/SignatureValidator'
import { createSignaturePayload } from '../../src/signature/createSignaturePayload'
import { StreamrClientError } from './../../src/StreamrClientError'
import { MessageID } from './../../src/protocol/MessageID'
import { ContentType, EncryptionType, SignatureType, StreamMessage, StreamMessageType } from './../../src/protocol/StreamMessage'

Expand Down Expand Up @@ -172,8 +173,8 @@ describe('SignatureValidator', () => {

it('not passing signature validation scenario', async () => {
erc1271ContractFacade.isValidSignature.mockResolvedValueOnce(false)
await expect(signatureValidator.assertSignatureIsValid(message)).rejects.toEqual(
new Error('Signature validation failed')
await expect(signatureValidator.assertSignatureIsValid(message)).rejects.toThrowStreamrClientError(
new StreamrClientError('Signature validation failed', 'INVALID_SIGNATURE', message)
)
expect(erc1271ContractFacade.isValidSignature).toHaveBeenCalledWith(
message.getPublisherId(),
Expand All @@ -184,8 +185,8 @@ describe('SignatureValidator', () => {

it('failing signature validation scenario', async () => {
erc1271ContractFacade.isValidSignature.mockRejectedValueOnce(new Error('random issue'))
await expect(signatureValidator.assertSignatureIsValid(message)).rejects.toEqual(
new Error('An error occurred during address recovery from signature: Error: random issue')
await expect(signatureValidator.assertSignatureIsValid(message)).rejects.toThrowStreamrClientError(
new StreamrClientError('An error occurred during address recovery from signature: Error: random issue', 'INVALID_SIGNATURE', message)
)
expect(erc1271ContractFacade.isValidSignature).toHaveBeenCalledWith(
message.getPublisherId(),
Expand Down
2 changes: 1 addition & 1 deletion packages/sdk/test/unit/messagePipeline.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ describe('messagePipeline', () => {
const output = await collect(pipeline)
expect(onError).toHaveBeenCalledTimes(1)
const error = onError.mock.calls[0][0]
expect(error.message).toContain('Invalid JSON')
expect(error.message).toContain('Unable to parse JSON')
expect(output).toEqual([])
})

Expand Down
Loading
Loading