Skip to content

EventBridge Runtime

shenlin edited this page Jan 26, 2023 · 19 revisions

Status

  • Current State: Proposed
  • Authors: shenlin
  • Mailing List discussion: [email protected]
  • Pull Request: #PR_NUMBER
  • Released: <released_version>

Background & Motivation

What do we need to do

  • Will we add a new module? -- Yes.
  • Will we add new APIs? -- Yes.
  • Will we add new features? -- Yes.

Why should we do that

  • Are there any problems with our current project? There are several problems in current RocketMQ EventBridge:
    1. 开源EB使用时,需要同时部署rocketmq和rocketmq-connect,依赖过重;目前依赖的组件包括:EventBridge + RocketMQ + RocketMQ Connect Runtime + OpenMessagingConnector;
    2. 缺少自己的Runtime,无法围绕开源标杆场景(SDK-> EB-> HTTP)提供细粒度的服务支持,包括:

全链路追踪:对标传统同步调用,EDA场景下,支持全链路追踪查询是必不可少的功能,考虑与开源生态做好集成,包括Zipkin、Skywalking 、OpenTelemetry 等等; 异常策略、重试、死信:灵活的配置异常处理、重试、死信; 监控报警:是否有丰富的metrics,是否支持监控报警,是用户是否敢于用于生产的重要参考指标,考虑集成Prometheus; 弹性伸缩:部署上,考虑与K8S做更多的集成,利用好K8S做好资源管理,进行弹性伸缩; 反压与流控:核心目的是保护好下游,集成Sentinel 、Hystrix等; 使用友好:更丰富的SDK、与Spring boot 生态更好的融合,让用户使用成本更低;

  • What can we benefit from the proposed changes?
    1. 降低外部服务的依赖;
    2. 提供更好的EDA集成体验;

Goals

  • What problem is this proposal designed to solve?

    1. 设计 Runtime的运行链路;
    2. 设计 Runtime的SPI;
    3. 设计 Runtime的管理;

Non-Goals

  • What problem is this proposal NOT designed to solve?

    1. Runtime的生命周期管理,包括如何创建/修改/删除;
  • Are there any limits to this proposal?

    Nothing specific.

Changes

Architecture

  • 根据用户配置的Rule,监听Rule关联的BUS,一旦BUS上收到事件,则将事件推送到Rule指定的Target;
  • 如果Rule上配置了过滤和转换,则在推送前,对事件进行前置处理;

image

Push Worker 运行链路

  • 监听MQ组件通过long pooling,时刻监听分配给当前Worker的Rule关联的BUS;
  • 当监听到新的事件产生,则立即将事件写入本地Buffer,监听组件继续监听后续的事件;(事件推送不阻塞监听线程)
  • Poll组件从Buffer中获取事件,并且根据上下文信息(Rule),对事件进行过滤和转换,然后发送给SinkConnector;
  • Put可以是异步操作,也可以是同步操作;Runtime每次提交位点之前,会调用PreCommit获取当前事件成功提交的offset,然后通过MQ监听组件,提交位点;

image

Push Worker 如何监听EventBus

这里存在两种方式:

  • 方案1-独立模式:每一个Rule单独创建一个MQ的订阅,去监听Rule关联的BUS是否有事件产生;每一个Rule都是一个独立的任务;

image

  • 方案2-共享模式:Push Worker中由一个统一的组件,去监听分配给当前Worker的所有Rule关联的BUS;如果监听的BUS有事件产生,则将事件分发给指定的Rule,进行过滤和转换,最终通过Sink Connector推送给Target端;

image

由于EventRule不同于Streaming的场景,大部分时候,Rule监听的BUS可能没有事件;为了降低监听MQ的成本,推荐方案2: Push Worker中由一个统一的组件,去监听分配给当前Worker的所有Rule,共享资源。

Push Manager

问题分析

  • 如果一个PushWorker需要加载所有的Rule,则当Rule越来越多时,PushWorker会成为瓶颈。 解决方案
  • Push Worker Cluster只需要加载分配给他的Rule,按照Rule的定义,监听指定的BUS,一旦BUS上有事件产生,则将事件按照过滤规则进行过滤,并推送到目标端Target。 Push Manger的职责
  • 所有的Push Worker 生命周期由Push Manger统一管理;
  • Rule如何分配给Push Worker,由Push Manger统一来管理;

image

Push Manager如何管理和分配Worker与Rule-Target

  • 通过现有的event_target_runner表格,将EventRule分配给指定的Push Worker运行

image

  • 增加event_push_worker表管理所有的worker节点 image Push Manager 根据每个Worker的负载(如CPU、Memeory、Push延迟时间、线程池剩余大小)等Metrics信息给Rule-Target分配Worker,前期可以根据每个Worker分配的Rule- Target数量均匀分配;

Push Worker如何获取分配给他的Rule-Target

  • Worker可以提前创建好,然后配置到event_push_worker表中。如果底层基础设置支持自动创建容器的能力,则可以通过PushManager调用基础设施服务,动态创建Worker,并自动注册到event_push_worker表。
  • 所有创建好的Worker都有一个系统变量:worker-name,来指定当前worker的信息。Worker在启动的时候,可以根据worker-name来获取分配给他的Rule-Target。
  • 另外,运行过程中Rule-Target信息会发生变化,这个时候,PushManger需要将最新的变化的Rule-Target信息,通知给对应的Worker,Worker收到通知后,重新加载最新的Rule-Target。

Interface Design/Change

  • Method signature changes -- Add some async function in the Store module.
  • Method behavior changes -- Nothing specific.
  • CLI command changes -- Nothing specific.
  • Log format or content changes -- Nothing specific.

问题分析:

  • EventBridge是一个开放的架构,事件网关接收到的事件不一定会存储到RocketMQ,所以PushWorker在设计的时候,需要支持监听不同的MQ,因此这里我们需要定义一套标准的SPI,基于SPI实现不同的MQ监听组件;
  • PushWorker在BUS上监听到事件之后,会对事件进行过滤和转换,最终将事件推送到Rule中指定的目标端。推送之前,还会对事件进行过滤和转换,这些过程都需要定义好SPI,以便后期进行扩展,同时方便生态合作伙伴进行共建。

解决方案: 目前,OpenMessaging Connect API 已经定义了完整的SPI,并拥有一定的生态。所以,我们考虑复用OpenMessaging Connect API作为我们标准的SPI。

image

Compatibility, Deprecation, and Migration Plan

This proposal needs to modify the API specification of MessageStore to add asynchronous interfaces. This only involves changes inside the server side, without relying on a specific client version or changing the way that client is used. It is completely transparent to applications connected to RocketMQ.

Implementation Outline

We split this proposal into several tasks:

  • Task1: Add asynchronous interfaces to MessageStore.
  • Task2: Implement tiered storage.
  • Task3: Implement tiered storage metadata migration to access messages from other Brokers.

Rejected Alternatives

None.

How do alternatives solve the issue you proposed?

None.

Clone this wiki locally