From e5a86114982c216e7a07704d77ccd15e0484f902 Mon Sep 17 00:00:00 2001 From: Cassandra Heart <7929478+CassOnMars@users.noreply.github.com> Date: Tue, 17 Sep 2024 01:14:14 -0500 Subject: [PATCH] feat: support bulk message RPC (#2313) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit ## Why is this change needed? Reconciliation of messages historically from off-hub sources is achieved via `submitMessage`, which incurs a performance penalty when many of these calls are made in rapid succession. This change introduces a new `submitBulkMessages` RPC which allows many of the to-be reconciled messages to be submitted at once and handle via the more efficient rust `mergeMany` underlying call. ## Merge Checklist _Choose all relevant options below by adding an `x` now or at any time before submitting for review_ - [x] PR title adheres to the [conventional commits](https://www.conventionalcommits.org/en/v1.0.0/) standard - [x] PR has a [changeset](https://github.com/farcasterxyz/hub-monorepo/blob/main/CONTRIBUTING.md#35-adding-changesets) - [x] PR has been tagged with a change label(s) (i.e. documentation, feature, bugfix, or chore) - [x] PR includes [documentation](https://github.com/farcasterxyz/hub-monorepo/blob/main/CONTRIBUTING.md#32-writing-docs) if necessary. --- ## PR-Codex overview This PR adds a new gRPC method `SubmitBulkMessages` for submitting multiple messages at once and updates related proto files and service implementations. ### Detailed summary - Added `SubmitBulkMessages` gRPC method - Defined new message types: `SubmitBulkMessagesRequest`, `MessageError`, `BulkMessageResponse`, `SubmitBulkMessagesResponse` - Updated service implementations and proto files - Implemented server-side logic for `SubmitBulkMessages` - Added encoding and decoding functions for new message types > The following files were skipped due to too many changes: `packages/hub-web/src/generated/request_response.ts`, `packages/hub-nodejs/src/generated/request_response.ts`, `packages/core/src/protobufs/generated/request_response.ts` > ✨ Ask PR-Codex anything about this PR by commenting with `/codex {your question}` --- .changeset/pink-drinks-brush.md | 8 + apps/hubble/src/rpc/server.ts | 71 +++- apps/hubble/www/docs/docs/api.md | 28 ++ .../protobufs/generated/request_response.ts | 302 ++++++++++++++++++ .../src/generated/request_response.ts | 302 ++++++++++++++++++ packages/hub-nodejs/src/generated/rpc.ts | 32 ++ .../hub-web/src/generated/request_response.ts | 302 ++++++++++++++++++ packages/hub-web/src/generated/rpc.ts | 38 +++ protobufs/schemas/request_response.proto | 21 ++ protobufs/schemas/rpc.proto | 2 + 10 files changed, 1105 insertions(+), 1 deletion(-) create mode 100644 .changeset/pink-drinks-brush.md diff --git a/.changeset/pink-drinks-brush.md b/.changeset/pink-drinks-brush.md new file mode 100644 index 0000000000..485727b52e --- /dev/null +++ b/.changeset/pink-drinks-brush.md @@ -0,0 +1,8 @@ +--- +"@farcaster/hub-nodejs": patch +"@farcaster/hub-web": patch +"@farcaster/core": patch +"@farcaster/hubble": patch +--- + +feat: support bulk message writing rpcs diff --git a/apps/hubble/src/rpc/server.ts b/apps/hubble/src/rpc/server.ts index 514b31281e..13d63038e0 100644 --- a/apps/hubble/src/rpc/server.ts +++ b/apps/hubble/src/rpc/server.ts @@ -54,6 +54,11 @@ import { StreamError, OnChainEventRequest, FidRequest, + getFarcasterTime, + MessageBundle, + SubmitBulkMessagesResponse, + BulkMessageResponse, + MessageError, } from "@farcaster/hub-nodejs"; import { err, ok, Result, ResultAsync } from "neverthrow"; import { APP_NICKNAME, APP_VERSION, HubInterface } from "../hubble.js"; @@ -70,7 +75,7 @@ import { STREAM_MESSAGE_BUFFER_SIZE, SLOW_CLIENT_GRACE_PERIOD_MS, } from "./bufferedStreamWriter.js"; -import { sleep } from "../utils/crypto.js"; +import { blake3Truncate160, sleep } from "../utils/crypto.js"; import { jumpConsistentHash } from "../utils/jumpConsistentHash.js"; import { SUBMIT_MESSAGE_RATE_LIMIT, rateLimitByIp } from "../utils/rateLimits.js"; import { statsd } from "../utils/statsd.js"; @@ -927,6 +932,70 @@ export default class Server { }, ); }, + submitBulkMessages: async (call, callback) => { + // Identify peer that is calling, if available. This is used for rate limiting. + const peer = Result.fromThrowable( + () => call.getPeer(), + (e) => e, + )().unwrapOr("unavailable"); + + // Check for rate limits + const rateLimitResult = await rateLimitByIp(peer, this.submitMessageRateLimiter); + if (rateLimitResult.isErr()) { + logger.warn({ peer }, "submitBulkMessages rate limited"); + callback(toServiceError(new HubError("unavailable", "API rate limit exceeded"))); + return; + } + + // Authentication + const authResult = authenticateUser(call.metadata, this.rpcUsers); + if (authResult.isErr()) { + logger.warn({ errMsg: authResult.error.message }, "gRPC submitBulkMessages failed"); + callback( + toServiceError(new HubError("unauthenticated", `gRPC authentication failed: ${authResult.error.message}`)), + ); + return; + } + + const submissionTime = getFarcasterTime(); + if (submissionTime.isErr()) { + callback(toServiceError(submissionTime.error)); + return; + } + + const { messages } = call.request; + const allHashes = Buffer.concat(messages.map((message) => message.hash ?? new Uint8Array())); + const bundleHash = blake3Truncate160(allHashes); + + const messageBundle = MessageBundle.create({ + messages: messages, + hash: bundleHash, + }); + const result = await this.hub?.submitMessageBundle(submissionTime.value, messageBundle, "rpc"); + callback( + null, + SubmitBulkMessagesResponse.create({ + messages: result?.map((m, i) => + m.match( + () => { + return BulkMessageResponse.create({ + message: messages[i], + }); + }, + (err: HubError) => { + return BulkMessageResponse.create({ + messageError: MessageError.create({ + hash: messages[i]?.hash ?? new Uint8Array([]), + errCode: err.errCode, + message: err.message, + }), + }); + }, + ), + ), + }), + ); + }, validateMessage: async (call, callback) => { const message = call.request; const result = await this.hub?.validateMessage(message); diff --git a/apps/hubble/www/docs/docs/api.md b/apps/hubble/www/docs/docs/api.md index 99385d1086..a73eb2a7c4 100644 --- a/apps/hubble/www/docs/docs/api.md +++ b/apps/hubble/www/docs/docs/api.md @@ -270,6 +270,34 @@ Used to subscribe to real-time event updates from the Farcaster Hub | Method Name | Request Type | Response Type | Description | | ------------- | ------------ | ------------- | ---------------------------- | | SubmitMessage | Message | Message | Submits a Message to the Hub | +| SubmitBulkMessages | [SubmitBulkMessagesRequest](#SubmitBulkMessagesRequest) | [SubmitBulkMessagesResponse](#SubmitBulkMessagesResponse) | Submits several Messages to the Hub | + +### SubmitBulkMessagesRequest + +| Field | Type | Label | Description | +| ----- | ---- | ----- | ----------- | +| messages | [Message](#Message) | repeated | | + +### MessageError + +| Field | Type | Label | Description | +| ----- | ---- | ----- | ----------- | +| hash | [bytes](#bytes) | | | +| errCode | [string](#string) | | | +| message | [string](#string) | | | + +### BulkMessageResponse + +| Field | Type | Label | Description | +| ----- | ---- | ----- | ----------- | +| message | [Message](#Message) | | | +| message_error | [MessageError](#MessageError) | | | + +### SubmitBulkMessagesResponse + +| Field | Type | Label | Description | +| ----- | ---- | ----- | ----------- | +| messages | [BulkMessageResponse](#BulkMessageResponse) | repeated | | ## 10. Username Proofs Service diff --git a/packages/core/src/protobufs/generated/request_response.ts b/packages/core/src/protobufs/generated/request_response.ts index f644a5608c..2670ddb630 100644 --- a/packages/core/src/protobufs/generated/request_response.ts +++ b/packages/core/src/protobufs/generated/request_response.ts @@ -347,6 +347,25 @@ export interface ValidationResponse { message: Message | undefined; } +export interface SubmitBulkMessagesRequest { + messages: Message[]; +} + +export interface MessageError { + hash: Uint8Array; + errCode: string; + message: string; +} + +export interface BulkMessageResponse { + message?: Message | undefined; + messageError?: MessageError | undefined; +} + +export interface SubmitBulkMessagesResponse { + messages: BulkMessageResponse[]; +} + export interface StreamSyncRequest { getInfo?: HubInfoRequest | undefined; getCurrentPeers?: Empty | undefined; @@ -3916,6 +3935,289 @@ export const ValidationResponse = { }, }; +function createBaseSubmitBulkMessagesRequest(): SubmitBulkMessagesRequest { + return { messages: [] }; +} + +export const SubmitBulkMessagesRequest = { + encode(message: SubmitBulkMessagesRequest, writer: _m0.Writer = _m0.Writer.create()): _m0.Writer { + for (const v of message.messages) { + Message.encode(v!, writer.uint32(10).fork()).ldelim(); + } + return writer; + }, + + decode(input: _m0.Reader | Uint8Array, length?: number): SubmitBulkMessagesRequest { + const reader = input instanceof _m0.Reader ? input : _m0.Reader.create(input); + let end = length === undefined ? reader.len : reader.pos + length; + const message = createBaseSubmitBulkMessagesRequest(); + while (reader.pos < end) { + const tag = reader.uint32(); + switch (tag >>> 3) { + case 1: + if (tag != 10) { + break; + } + + message.messages.push(Message.decode(reader, reader.uint32())); + continue; + } + if ((tag & 7) == 4 || tag == 0) { + break; + } + reader.skipType(tag & 7); + } + return message; + }, + + fromJSON(object: any): SubmitBulkMessagesRequest { + return { messages: Array.isArray(object?.messages) ? object.messages.map((e: any) => Message.fromJSON(e)) : [] }; + }, + + toJSON(message: SubmitBulkMessagesRequest): unknown { + const obj: any = {}; + if (message.messages) { + obj.messages = message.messages.map((e) => e ? Message.toJSON(e) : undefined); + } else { + obj.messages = []; + } + return obj; + }, + + create, I>>(base?: I): SubmitBulkMessagesRequest { + return SubmitBulkMessagesRequest.fromPartial(base ?? {}); + }, + + fromPartial, I>>(object: I): SubmitBulkMessagesRequest { + const message = createBaseSubmitBulkMessagesRequest(); + message.messages = object.messages?.map((e) => Message.fromPartial(e)) || []; + return message; + }, +}; + +function createBaseMessageError(): MessageError { + return { hash: new Uint8Array(), errCode: "", message: "" }; +} + +export const MessageError = { + encode(message: MessageError, writer: _m0.Writer = _m0.Writer.create()): _m0.Writer { + if (message.hash.length !== 0) { + writer.uint32(10).bytes(message.hash); + } + if (message.errCode !== "") { + writer.uint32(18).string(message.errCode); + } + if (message.message !== "") { + writer.uint32(26).string(message.message); + } + return writer; + }, + + decode(input: _m0.Reader | Uint8Array, length?: number): MessageError { + const reader = input instanceof _m0.Reader ? input : _m0.Reader.create(input); + let end = length === undefined ? reader.len : reader.pos + length; + const message = createBaseMessageError(); + while (reader.pos < end) { + const tag = reader.uint32(); + switch (tag >>> 3) { + case 1: + if (tag != 10) { + break; + } + + message.hash = reader.bytes(); + continue; + case 2: + if (tag != 18) { + break; + } + + message.errCode = reader.string(); + continue; + case 3: + if (tag != 26) { + break; + } + + message.message = reader.string(); + continue; + } + if ((tag & 7) == 4 || tag == 0) { + break; + } + reader.skipType(tag & 7); + } + return message; + }, + + fromJSON(object: any): MessageError { + return { + hash: isSet(object.hash) ? bytesFromBase64(object.hash) : new Uint8Array(), + errCode: isSet(object.errCode) ? String(object.errCode) : "", + message: isSet(object.message) ? String(object.message) : "", + }; + }, + + toJSON(message: MessageError): unknown { + const obj: any = {}; + message.hash !== undefined && + (obj.hash = base64FromBytes(message.hash !== undefined ? message.hash : new Uint8Array())); + message.errCode !== undefined && (obj.errCode = message.errCode); + message.message !== undefined && (obj.message = message.message); + return obj; + }, + + create, I>>(base?: I): MessageError { + return MessageError.fromPartial(base ?? {}); + }, + + fromPartial, I>>(object: I): MessageError { + const message = createBaseMessageError(); + message.hash = object.hash ?? new Uint8Array(); + message.errCode = object.errCode ?? ""; + message.message = object.message ?? ""; + return message; + }, +}; + +function createBaseBulkMessageResponse(): BulkMessageResponse { + return { message: undefined, messageError: undefined }; +} + +export const BulkMessageResponse = { + encode(message: BulkMessageResponse, writer: _m0.Writer = _m0.Writer.create()): _m0.Writer { + if (message.message !== undefined) { + Message.encode(message.message, writer.uint32(10).fork()).ldelim(); + } + if (message.messageError !== undefined) { + MessageError.encode(message.messageError, writer.uint32(18).fork()).ldelim(); + } + return writer; + }, + + decode(input: _m0.Reader | Uint8Array, length?: number): BulkMessageResponse { + const reader = input instanceof _m0.Reader ? input : _m0.Reader.create(input); + let end = length === undefined ? reader.len : reader.pos + length; + const message = createBaseBulkMessageResponse(); + while (reader.pos < end) { + const tag = reader.uint32(); + switch (tag >>> 3) { + case 1: + if (tag != 10) { + break; + } + + message.message = Message.decode(reader, reader.uint32()); + continue; + case 2: + if (tag != 18) { + break; + } + + message.messageError = MessageError.decode(reader, reader.uint32()); + continue; + } + if ((tag & 7) == 4 || tag == 0) { + break; + } + reader.skipType(tag & 7); + } + return message; + }, + + fromJSON(object: any): BulkMessageResponse { + return { + message: isSet(object.message) ? Message.fromJSON(object.message) : undefined, + messageError: isSet(object.messageError) ? MessageError.fromJSON(object.messageError) : undefined, + }; + }, + + toJSON(message: BulkMessageResponse): unknown { + const obj: any = {}; + message.message !== undefined && (obj.message = message.message ? Message.toJSON(message.message) : undefined); + message.messageError !== undefined && + (obj.messageError = message.messageError ? MessageError.toJSON(message.messageError) : undefined); + return obj; + }, + + create, I>>(base?: I): BulkMessageResponse { + return BulkMessageResponse.fromPartial(base ?? {}); + }, + + fromPartial, I>>(object: I): BulkMessageResponse { + const message = createBaseBulkMessageResponse(); + message.message = (object.message !== undefined && object.message !== null) + ? Message.fromPartial(object.message) + : undefined; + message.messageError = (object.messageError !== undefined && object.messageError !== null) + ? MessageError.fromPartial(object.messageError) + : undefined; + return message; + }, +}; + +function createBaseSubmitBulkMessagesResponse(): SubmitBulkMessagesResponse { + return { messages: [] }; +} + +export const SubmitBulkMessagesResponse = { + encode(message: SubmitBulkMessagesResponse, writer: _m0.Writer = _m0.Writer.create()): _m0.Writer { + for (const v of message.messages) { + BulkMessageResponse.encode(v!, writer.uint32(10).fork()).ldelim(); + } + return writer; + }, + + decode(input: _m0.Reader | Uint8Array, length?: number): SubmitBulkMessagesResponse { + const reader = input instanceof _m0.Reader ? input : _m0.Reader.create(input); + let end = length === undefined ? reader.len : reader.pos + length; + const message = createBaseSubmitBulkMessagesResponse(); + while (reader.pos < end) { + const tag = reader.uint32(); + switch (tag >>> 3) { + case 1: + if (tag != 10) { + break; + } + + message.messages.push(BulkMessageResponse.decode(reader, reader.uint32())); + continue; + } + if ((tag & 7) == 4 || tag == 0) { + break; + } + reader.skipType(tag & 7); + } + return message; + }, + + fromJSON(object: any): SubmitBulkMessagesResponse { + return { + messages: Array.isArray(object?.messages) ? object.messages.map((e: any) => BulkMessageResponse.fromJSON(e)) : [], + }; + }, + + toJSON(message: SubmitBulkMessagesResponse): unknown { + const obj: any = {}; + if (message.messages) { + obj.messages = message.messages.map((e) => e ? BulkMessageResponse.toJSON(e) : undefined); + } else { + obj.messages = []; + } + return obj; + }, + + create, I>>(base?: I): SubmitBulkMessagesResponse { + return SubmitBulkMessagesResponse.fromPartial(base ?? {}); + }, + + fromPartial, I>>(object: I): SubmitBulkMessagesResponse { + const message = createBaseSubmitBulkMessagesResponse(); + message.messages = object.messages?.map((e) => BulkMessageResponse.fromPartial(e)) || []; + return message; + }, +}; + function createBaseStreamSyncRequest(): StreamSyncRequest { return { getInfo: undefined, diff --git a/packages/hub-nodejs/src/generated/request_response.ts b/packages/hub-nodejs/src/generated/request_response.ts index f644a5608c..2670ddb630 100644 --- a/packages/hub-nodejs/src/generated/request_response.ts +++ b/packages/hub-nodejs/src/generated/request_response.ts @@ -347,6 +347,25 @@ export interface ValidationResponse { message: Message | undefined; } +export interface SubmitBulkMessagesRequest { + messages: Message[]; +} + +export interface MessageError { + hash: Uint8Array; + errCode: string; + message: string; +} + +export interface BulkMessageResponse { + message?: Message | undefined; + messageError?: MessageError | undefined; +} + +export interface SubmitBulkMessagesResponse { + messages: BulkMessageResponse[]; +} + export interface StreamSyncRequest { getInfo?: HubInfoRequest | undefined; getCurrentPeers?: Empty | undefined; @@ -3916,6 +3935,289 @@ export const ValidationResponse = { }, }; +function createBaseSubmitBulkMessagesRequest(): SubmitBulkMessagesRequest { + return { messages: [] }; +} + +export const SubmitBulkMessagesRequest = { + encode(message: SubmitBulkMessagesRequest, writer: _m0.Writer = _m0.Writer.create()): _m0.Writer { + for (const v of message.messages) { + Message.encode(v!, writer.uint32(10).fork()).ldelim(); + } + return writer; + }, + + decode(input: _m0.Reader | Uint8Array, length?: number): SubmitBulkMessagesRequest { + const reader = input instanceof _m0.Reader ? input : _m0.Reader.create(input); + let end = length === undefined ? reader.len : reader.pos + length; + const message = createBaseSubmitBulkMessagesRequest(); + while (reader.pos < end) { + const tag = reader.uint32(); + switch (tag >>> 3) { + case 1: + if (tag != 10) { + break; + } + + message.messages.push(Message.decode(reader, reader.uint32())); + continue; + } + if ((tag & 7) == 4 || tag == 0) { + break; + } + reader.skipType(tag & 7); + } + return message; + }, + + fromJSON(object: any): SubmitBulkMessagesRequest { + return { messages: Array.isArray(object?.messages) ? object.messages.map((e: any) => Message.fromJSON(e)) : [] }; + }, + + toJSON(message: SubmitBulkMessagesRequest): unknown { + const obj: any = {}; + if (message.messages) { + obj.messages = message.messages.map((e) => e ? Message.toJSON(e) : undefined); + } else { + obj.messages = []; + } + return obj; + }, + + create, I>>(base?: I): SubmitBulkMessagesRequest { + return SubmitBulkMessagesRequest.fromPartial(base ?? {}); + }, + + fromPartial, I>>(object: I): SubmitBulkMessagesRequest { + const message = createBaseSubmitBulkMessagesRequest(); + message.messages = object.messages?.map((e) => Message.fromPartial(e)) || []; + return message; + }, +}; + +function createBaseMessageError(): MessageError { + return { hash: new Uint8Array(), errCode: "", message: "" }; +} + +export const MessageError = { + encode(message: MessageError, writer: _m0.Writer = _m0.Writer.create()): _m0.Writer { + if (message.hash.length !== 0) { + writer.uint32(10).bytes(message.hash); + } + if (message.errCode !== "") { + writer.uint32(18).string(message.errCode); + } + if (message.message !== "") { + writer.uint32(26).string(message.message); + } + return writer; + }, + + decode(input: _m0.Reader | Uint8Array, length?: number): MessageError { + const reader = input instanceof _m0.Reader ? input : _m0.Reader.create(input); + let end = length === undefined ? reader.len : reader.pos + length; + const message = createBaseMessageError(); + while (reader.pos < end) { + const tag = reader.uint32(); + switch (tag >>> 3) { + case 1: + if (tag != 10) { + break; + } + + message.hash = reader.bytes(); + continue; + case 2: + if (tag != 18) { + break; + } + + message.errCode = reader.string(); + continue; + case 3: + if (tag != 26) { + break; + } + + message.message = reader.string(); + continue; + } + if ((tag & 7) == 4 || tag == 0) { + break; + } + reader.skipType(tag & 7); + } + return message; + }, + + fromJSON(object: any): MessageError { + return { + hash: isSet(object.hash) ? bytesFromBase64(object.hash) : new Uint8Array(), + errCode: isSet(object.errCode) ? String(object.errCode) : "", + message: isSet(object.message) ? String(object.message) : "", + }; + }, + + toJSON(message: MessageError): unknown { + const obj: any = {}; + message.hash !== undefined && + (obj.hash = base64FromBytes(message.hash !== undefined ? message.hash : new Uint8Array())); + message.errCode !== undefined && (obj.errCode = message.errCode); + message.message !== undefined && (obj.message = message.message); + return obj; + }, + + create, I>>(base?: I): MessageError { + return MessageError.fromPartial(base ?? {}); + }, + + fromPartial, I>>(object: I): MessageError { + const message = createBaseMessageError(); + message.hash = object.hash ?? new Uint8Array(); + message.errCode = object.errCode ?? ""; + message.message = object.message ?? ""; + return message; + }, +}; + +function createBaseBulkMessageResponse(): BulkMessageResponse { + return { message: undefined, messageError: undefined }; +} + +export const BulkMessageResponse = { + encode(message: BulkMessageResponse, writer: _m0.Writer = _m0.Writer.create()): _m0.Writer { + if (message.message !== undefined) { + Message.encode(message.message, writer.uint32(10).fork()).ldelim(); + } + if (message.messageError !== undefined) { + MessageError.encode(message.messageError, writer.uint32(18).fork()).ldelim(); + } + return writer; + }, + + decode(input: _m0.Reader | Uint8Array, length?: number): BulkMessageResponse { + const reader = input instanceof _m0.Reader ? input : _m0.Reader.create(input); + let end = length === undefined ? reader.len : reader.pos + length; + const message = createBaseBulkMessageResponse(); + while (reader.pos < end) { + const tag = reader.uint32(); + switch (tag >>> 3) { + case 1: + if (tag != 10) { + break; + } + + message.message = Message.decode(reader, reader.uint32()); + continue; + case 2: + if (tag != 18) { + break; + } + + message.messageError = MessageError.decode(reader, reader.uint32()); + continue; + } + if ((tag & 7) == 4 || tag == 0) { + break; + } + reader.skipType(tag & 7); + } + return message; + }, + + fromJSON(object: any): BulkMessageResponse { + return { + message: isSet(object.message) ? Message.fromJSON(object.message) : undefined, + messageError: isSet(object.messageError) ? MessageError.fromJSON(object.messageError) : undefined, + }; + }, + + toJSON(message: BulkMessageResponse): unknown { + const obj: any = {}; + message.message !== undefined && (obj.message = message.message ? Message.toJSON(message.message) : undefined); + message.messageError !== undefined && + (obj.messageError = message.messageError ? MessageError.toJSON(message.messageError) : undefined); + return obj; + }, + + create, I>>(base?: I): BulkMessageResponse { + return BulkMessageResponse.fromPartial(base ?? {}); + }, + + fromPartial, I>>(object: I): BulkMessageResponse { + const message = createBaseBulkMessageResponse(); + message.message = (object.message !== undefined && object.message !== null) + ? Message.fromPartial(object.message) + : undefined; + message.messageError = (object.messageError !== undefined && object.messageError !== null) + ? MessageError.fromPartial(object.messageError) + : undefined; + return message; + }, +}; + +function createBaseSubmitBulkMessagesResponse(): SubmitBulkMessagesResponse { + return { messages: [] }; +} + +export const SubmitBulkMessagesResponse = { + encode(message: SubmitBulkMessagesResponse, writer: _m0.Writer = _m0.Writer.create()): _m0.Writer { + for (const v of message.messages) { + BulkMessageResponse.encode(v!, writer.uint32(10).fork()).ldelim(); + } + return writer; + }, + + decode(input: _m0.Reader | Uint8Array, length?: number): SubmitBulkMessagesResponse { + const reader = input instanceof _m0.Reader ? input : _m0.Reader.create(input); + let end = length === undefined ? reader.len : reader.pos + length; + const message = createBaseSubmitBulkMessagesResponse(); + while (reader.pos < end) { + const tag = reader.uint32(); + switch (tag >>> 3) { + case 1: + if (tag != 10) { + break; + } + + message.messages.push(BulkMessageResponse.decode(reader, reader.uint32())); + continue; + } + if ((tag & 7) == 4 || tag == 0) { + break; + } + reader.skipType(tag & 7); + } + return message; + }, + + fromJSON(object: any): SubmitBulkMessagesResponse { + return { + messages: Array.isArray(object?.messages) ? object.messages.map((e: any) => BulkMessageResponse.fromJSON(e)) : [], + }; + }, + + toJSON(message: SubmitBulkMessagesResponse): unknown { + const obj: any = {}; + if (message.messages) { + obj.messages = message.messages.map((e) => e ? BulkMessageResponse.toJSON(e) : undefined); + } else { + obj.messages = []; + } + return obj; + }, + + create, I>>(base?: I): SubmitBulkMessagesResponse { + return SubmitBulkMessagesResponse.fromPartial(base ?? {}); + }, + + fromPartial, I>>(object: I): SubmitBulkMessagesResponse { + const message = createBaseSubmitBulkMessagesResponse(); + message.messages = object.messages?.map((e) => BulkMessageResponse.fromPartial(e)) || []; + return message; + }, +}; + function createBaseStreamSyncRequest(): StreamSyncRequest { return { getInfo: undefined, diff --git a/packages/hub-nodejs/src/generated/rpc.ts b/packages/hub-nodejs/src/generated/rpc.ts index f99eaea601..20666df059 100644 --- a/packages/hub-nodejs/src/generated/rpc.ts +++ b/packages/hub-nodejs/src/generated/rpc.ts @@ -45,6 +45,8 @@ import { StreamFetchResponse, StreamSyncRequest, StreamSyncResponse, + SubmitBulkMessagesRequest, + SubmitBulkMessagesResponse, SubscribeRequest, SyncIds, SyncStatusRequest, @@ -421,6 +423,18 @@ export const HubServiceService = { responseSerialize: (value: MessagesResponse) => Buffer.from(MessagesResponse.encode(value).finish()), responseDeserialize: (value: Buffer) => MessagesResponse.decode(value), }, + /** @http-api: none */ + submitBulkMessages: { + path: "/HubService/SubmitBulkMessages", + requestStream: false, + responseStream: false, + requestSerialize: (value: SubmitBulkMessagesRequest) => + Buffer.from(SubmitBulkMessagesRequest.encode(value).finish()), + requestDeserialize: (value: Buffer) => SubmitBulkMessagesRequest.decode(value), + responseSerialize: (value: SubmitBulkMessagesResponse) => + Buffer.from(SubmitBulkMessagesResponse.encode(value).finish()), + responseDeserialize: (value: Buffer) => SubmitBulkMessagesResponse.decode(value), + }, /** Sync Methods */ getInfo: { path: "/HubService/GetInfo", @@ -624,6 +638,8 @@ export interface HubServiceServer extends UntypedServiceImplementation { getAllLinkMessagesByFid: handleUnaryCall; /** @http-api: none */ getLinkCompactStateMessageByFid: handleUnaryCall; + /** @http-api: none */ + submitBulkMessages: handleUnaryCall; /** Sync Methods */ getInfo: handleUnaryCall; getCurrentPeers: handleUnaryCall; @@ -1192,6 +1208,22 @@ export interface HubServiceClient extends Client { options: Partial, callback: (error: ServiceError | null, response: MessagesResponse) => void, ): ClientUnaryCall; + /** @http-api: none */ + submitBulkMessages( + request: SubmitBulkMessagesRequest, + callback: (error: ServiceError | null, response: SubmitBulkMessagesResponse) => void, + ): ClientUnaryCall; + submitBulkMessages( + request: SubmitBulkMessagesRequest, + metadata: Metadata, + callback: (error: ServiceError | null, response: SubmitBulkMessagesResponse) => void, + ): ClientUnaryCall; + submitBulkMessages( + request: SubmitBulkMessagesRequest, + metadata: Metadata, + options: Partial, + callback: (error: ServiceError | null, response: SubmitBulkMessagesResponse) => void, + ): ClientUnaryCall; /** Sync Methods */ getInfo( request: HubInfoRequest, diff --git a/packages/hub-web/src/generated/request_response.ts b/packages/hub-web/src/generated/request_response.ts index f644a5608c..2670ddb630 100644 --- a/packages/hub-web/src/generated/request_response.ts +++ b/packages/hub-web/src/generated/request_response.ts @@ -347,6 +347,25 @@ export interface ValidationResponse { message: Message | undefined; } +export interface SubmitBulkMessagesRequest { + messages: Message[]; +} + +export interface MessageError { + hash: Uint8Array; + errCode: string; + message: string; +} + +export interface BulkMessageResponse { + message?: Message | undefined; + messageError?: MessageError | undefined; +} + +export interface SubmitBulkMessagesResponse { + messages: BulkMessageResponse[]; +} + export interface StreamSyncRequest { getInfo?: HubInfoRequest | undefined; getCurrentPeers?: Empty | undefined; @@ -3916,6 +3935,289 @@ export const ValidationResponse = { }, }; +function createBaseSubmitBulkMessagesRequest(): SubmitBulkMessagesRequest { + return { messages: [] }; +} + +export const SubmitBulkMessagesRequest = { + encode(message: SubmitBulkMessagesRequest, writer: _m0.Writer = _m0.Writer.create()): _m0.Writer { + for (const v of message.messages) { + Message.encode(v!, writer.uint32(10).fork()).ldelim(); + } + return writer; + }, + + decode(input: _m0.Reader | Uint8Array, length?: number): SubmitBulkMessagesRequest { + const reader = input instanceof _m0.Reader ? input : _m0.Reader.create(input); + let end = length === undefined ? reader.len : reader.pos + length; + const message = createBaseSubmitBulkMessagesRequest(); + while (reader.pos < end) { + const tag = reader.uint32(); + switch (tag >>> 3) { + case 1: + if (tag != 10) { + break; + } + + message.messages.push(Message.decode(reader, reader.uint32())); + continue; + } + if ((tag & 7) == 4 || tag == 0) { + break; + } + reader.skipType(tag & 7); + } + return message; + }, + + fromJSON(object: any): SubmitBulkMessagesRequest { + return { messages: Array.isArray(object?.messages) ? object.messages.map((e: any) => Message.fromJSON(e)) : [] }; + }, + + toJSON(message: SubmitBulkMessagesRequest): unknown { + const obj: any = {}; + if (message.messages) { + obj.messages = message.messages.map((e) => e ? Message.toJSON(e) : undefined); + } else { + obj.messages = []; + } + return obj; + }, + + create, I>>(base?: I): SubmitBulkMessagesRequest { + return SubmitBulkMessagesRequest.fromPartial(base ?? {}); + }, + + fromPartial, I>>(object: I): SubmitBulkMessagesRequest { + const message = createBaseSubmitBulkMessagesRequest(); + message.messages = object.messages?.map((e) => Message.fromPartial(e)) || []; + return message; + }, +}; + +function createBaseMessageError(): MessageError { + return { hash: new Uint8Array(), errCode: "", message: "" }; +} + +export const MessageError = { + encode(message: MessageError, writer: _m0.Writer = _m0.Writer.create()): _m0.Writer { + if (message.hash.length !== 0) { + writer.uint32(10).bytes(message.hash); + } + if (message.errCode !== "") { + writer.uint32(18).string(message.errCode); + } + if (message.message !== "") { + writer.uint32(26).string(message.message); + } + return writer; + }, + + decode(input: _m0.Reader | Uint8Array, length?: number): MessageError { + const reader = input instanceof _m0.Reader ? input : _m0.Reader.create(input); + let end = length === undefined ? reader.len : reader.pos + length; + const message = createBaseMessageError(); + while (reader.pos < end) { + const tag = reader.uint32(); + switch (tag >>> 3) { + case 1: + if (tag != 10) { + break; + } + + message.hash = reader.bytes(); + continue; + case 2: + if (tag != 18) { + break; + } + + message.errCode = reader.string(); + continue; + case 3: + if (tag != 26) { + break; + } + + message.message = reader.string(); + continue; + } + if ((tag & 7) == 4 || tag == 0) { + break; + } + reader.skipType(tag & 7); + } + return message; + }, + + fromJSON(object: any): MessageError { + return { + hash: isSet(object.hash) ? bytesFromBase64(object.hash) : new Uint8Array(), + errCode: isSet(object.errCode) ? String(object.errCode) : "", + message: isSet(object.message) ? String(object.message) : "", + }; + }, + + toJSON(message: MessageError): unknown { + const obj: any = {}; + message.hash !== undefined && + (obj.hash = base64FromBytes(message.hash !== undefined ? message.hash : new Uint8Array())); + message.errCode !== undefined && (obj.errCode = message.errCode); + message.message !== undefined && (obj.message = message.message); + return obj; + }, + + create, I>>(base?: I): MessageError { + return MessageError.fromPartial(base ?? {}); + }, + + fromPartial, I>>(object: I): MessageError { + const message = createBaseMessageError(); + message.hash = object.hash ?? new Uint8Array(); + message.errCode = object.errCode ?? ""; + message.message = object.message ?? ""; + return message; + }, +}; + +function createBaseBulkMessageResponse(): BulkMessageResponse { + return { message: undefined, messageError: undefined }; +} + +export const BulkMessageResponse = { + encode(message: BulkMessageResponse, writer: _m0.Writer = _m0.Writer.create()): _m0.Writer { + if (message.message !== undefined) { + Message.encode(message.message, writer.uint32(10).fork()).ldelim(); + } + if (message.messageError !== undefined) { + MessageError.encode(message.messageError, writer.uint32(18).fork()).ldelim(); + } + return writer; + }, + + decode(input: _m0.Reader | Uint8Array, length?: number): BulkMessageResponse { + const reader = input instanceof _m0.Reader ? input : _m0.Reader.create(input); + let end = length === undefined ? reader.len : reader.pos + length; + const message = createBaseBulkMessageResponse(); + while (reader.pos < end) { + const tag = reader.uint32(); + switch (tag >>> 3) { + case 1: + if (tag != 10) { + break; + } + + message.message = Message.decode(reader, reader.uint32()); + continue; + case 2: + if (tag != 18) { + break; + } + + message.messageError = MessageError.decode(reader, reader.uint32()); + continue; + } + if ((tag & 7) == 4 || tag == 0) { + break; + } + reader.skipType(tag & 7); + } + return message; + }, + + fromJSON(object: any): BulkMessageResponse { + return { + message: isSet(object.message) ? Message.fromJSON(object.message) : undefined, + messageError: isSet(object.messageError) ? MessageError.fromJSON(object.messageError) : undefined, + }; + }, + + toJSON(message: BulkMessageResponse): unknown { + const obj: any = {}; + message.message !== undefined && (obj.message = message.message ? Message.toJSON(message.message) : undefined); + message.messageError !== undefined && + (obj.messageError = message.messageError ? MessageError.toJSON(message.messageError) : undefined); + return obj; + }, + + create, I>>(base?: I): BulkMessageResponse { + return BulkMessageResponse.fromPartial(base ?? {}); + }, + + fromPartial, I>>(object: I): BulkMessageResponse { + const message = createBaseBulkMessageResponse(); + message.message = (object.message !== undefined && object.message !== null) + ? Message.fromPartial(object.message) + : undefined; + message.messageError = (object.messageError !== undefined && object.messageError !== null) + ? MessageError.fromPartial(object.messageError) + : undefined; + return message; + }, +}; + +function createBaseSubmitBulkMessagesResponse(): SubmitBulkMessagesResponse { + return { messages: [] }; +} + +export const SubmitBulkMessagesResponse = { + encode(message: SubmitBulkMessagesResponse, writer: _m0.Writer = _m0.Writer.create()): _m0.Writer { + for (const v of message.messages) { + BulkMessageResponse.encode(v!, writer.uint32(10).fork()).ldelim(); + } + return writer; + }, + + decode(input: _m0.Reader | Uint8Array, length?: number): SubmitBulkMessagesResponse { + const reader = input instanceof _m0.Reader ? input : _m0.Reader.create(input); + let end = length === undefined ? reader.len : reader.pos + length; + const message = createBaseSubmitBulkMessagesResponse(); + while (reader.pos < end) { + const tag = reader.uint32(); + switch (tag >>> 3) { + case 1: + if (tag != 10) { + break; + } + + message.messages.push(BulkMessageResponse.decode(reader, reader.uint32())); + continue; + } + if ((tag & 7) == 4 || tag == 0) { + break; + } + reader.skipType(tag & 7); + } + return message; + }, + + fromJSON(object: any): SubmitBulkMessagesResponse { + return { + messages: Array.isArray(object?.messages) ? object.messages.map((e: any) => BulkMessageResponse.fromJSON(e)) : [], + }; + }, + + toJSON(message: SubmitBulkMessagesResponse): unknown { + const obj: any = {}; + if (message.messages) { + obj.messages = message.messages.map((e) => e ? BulkMessageResponse.toJSON(e) : undefined); + } else { + obj.messages = []; + } + return obj; + }, + + create, I>>(base?: I): SubmitBulkMessagesResponse { + return SubmitBulkMessagesResponse.fromPartial(base ?? {}); + }, + + fromPartial, I>>(object: I): SubmitBulkMessagesResponse { + const message = createBaseSubmitBulkMessagesResponse(); + message.messages = object.messages?.map((e) => BulkMessageResponse.fromPartial(e)) || []; + return message; + }, +}; + function createBaseStreamSyncRequest(): StreamSyncRequest { return { getInfo: undefined, diff --git a/packages/hub-web/src/generated/rpc.ts b/packages/hub-web/src/generated/rpc.ts index d6b6cec6a0..b3be5450de 100644 --- a/packages/hub-web/src/generated/rpc.ts +++ b/packages/hub-web/src/generated/rpc.ts @@ -33,6 +33,8 @@ import { StreamFetchResponse, StreamSyncRequest, StreamSyncResponse, + SubmitBulkMessagesRequest, + SubmitBulkMessagesResponse, SubscribeRequest, SyncIds, SyncStatusRequest, @@ -165,6 +167,11 @@ export interface HubService { request: DeepPartial, metadata?: grpcWeb.grpc.Metadata, ): Promise; + /** @http-api: none */ + submitBulkMessages( + request: DeepPartial, + metadata?: grpcWeb.grpc.Metadata, + ): Promise; /** Sync Methods */ getInfo(request: DeepPartial, metadata?: grpcWeb.grpc.Metadata): Promise; getCurrentPeers(request: DeepPartial, metadata?: grpcWeb.grpc.Metadata): Promise; @@ -242,6 +249,7 @@ export class HubServiceClientImpl implements HubService { this.getAllUserDataMessagesByFid = this.getAllUserDataMessagesByFid.bind(this); this.getAllLinkMessagesByFid = this.getAllLinkMessagesByFid.bind(this); this.getLinkCompactStateMessageByFid = this.getLinkCompactStateMessageByFid.bind(this); + this.submitBulkMessages = this.submitBulkMessages.bind(this); this.getInfo = this.getInfo.bind(this); this.getCurrentPeers = this.getCurrentPeers.bind(this); this.stopSync = this.stopSync.bind(this); @@ -437,6 +445,13 @@ export class HubServiceClientImpl implements HubService { return this.rpc.unary(HubServiceGetLinkCompactStateMessageByFidDesc, FidRequest.fromPartial(request), metadata); } + submitBulkMessages( + request: DeepPartial, + metadata?: grpcWeb.grpc.Metadata, + ): Promise { + return this.rpc.unary(HubServiceSubmitBulkMessagesDesc, SubmitBulkMessagesRequest.fromPartial(request), metadata); + } + getInfo(request: DeepPartial, metadata?: grpcWeb.grpc.Metadata): Promise { return this.rpc.unary(HubServiceGetInfoDesc, HubInfoRequest.fromPartial(request), metadata); } @@ -1278,6 +1293,29 @@ export const HubServiceGetLinkCompactStateMessageByFidDesc: UnaryMethodDefinitio } as any, }; +export const HubServiceSubmitBulkMessagesDesc: UnaryMethodDefinitionish = { + methodName: "SubmitBulkMessages", + service: HubServiceDesc, + requestStream: false, + responseStream: false, + requestType: { + serializeBinary() { + return SubmitBulkMessagesRequest.encode(this).finish(); + }, + } as any, + responseType: { + deserializeBinary(data: Uint8Array) { + const value = SubmitBulkMessagesResponse.decode(data); + return { + ...value, + toObject() { + return value; + }, + }; + }, + } as any, +}; + export const HubServiceGetInfoDesc: UnaryMethodDefinitionish = { methodName: "GetInfo", service: HubServiceDesc, diff --git a/protobufs/schemas/request_response.proto b/protobufs/schemas/request_response.proto index f77aeaafa3..ff9a91e83c 100644 --- a/protobufs/schemas/request_response.proto +++ b/protobufs/schemas/request_response.proto @@ -273,6 +273,27 @@ message ValidationResponse { Message message = 2; } +message SubmitBulkMessagesRequest { + repeated Message messages = 1; +} + +message MessageError { + bytes hash = 1; + string errCode = 2; + string message = 3; +} + +message BulkMessageResponse { + oneof response { + Message message = 1; + MessageError message_error = 2; + } +} + +message SubmitBulkMessagesResponse { + repeated BulkMessageResponse messages = 1; +} + message StreamSyncRequest { oneof request { HubInfoRequest get_info = 1; diff --git a/protobufs/schemas/rpc.proto b/protobufs/schemas/rpc.proto index 9aa7d63d21..79f6df6d05 100644 --- a/protobufs/schemas/rpc.proto +++ b/protobufs/schemas/rpc.proto @@ -93,6 +93,8 @@ service HubService { rpc GetAllLinkMessagesByFid(FidTimestampRequest) returns (MessagesResponse); // @http-api: none rpc GetLinkCompactStateMessageByFid(FidRequest) returns (MessagesResponse); + // @http-api: none + rpc SubmitBulkMessages(SubmitBulkMessagesRequest) returns (SubmitBulkMessagesResponse); // Sync Methods rpc GetInfo(HubInfoRequest) returns (HubInfoResponse);