diff --git a/dir.yaml b/dir.yaml index 308d342fe..b834c9f86 100644 --- a/dir.yaml +++ b/dir.yaml @@ -332,10 +332,10 @@ path: data-integration/s3 - title_en: Azure Event Hubs title_cn: Azure Event Hubs - path: data-integration/data-bridge-azure-event-hub + path: data-integration/data-bridge-azure-event-hub - title_en: Cassandra title_cn: Cassandra - path: data-integration/data-bridge-cassa + path: data-integration/data-bridge-cassa - title_en: ClickHouse title_cn: ClickHouse path: data-integration/data-bridge-clickhouse @@ -356,7 +356,7 @@ path: data-integration/data-bridge-greptimedb - title_en: HStreamDB title_cn: HStreamDB - path: data-integration/data-bridge-hstreamdb + path: data-integration/data-bridge-hstreamdb - title_en: HTTP Server title_cn: HTTP Server path: data-integration/data-bridge-webhook @@ -584,6 +584,8 @@ - title_en: Durable MQTT Sessions and Messages title_cn: MQTT 会话与消息持久化 path: durability/durability_introduction + children: + - durability/managing-replication - title_en: Multi-Protocol Gateway title_cn: 多协议网关 path: gateway/gateway diff --git a/en_US/durability/durability_introduction.md b/en_US/durability/durability_introduction.md index 7532cfb45..f44678fc1 100644 --- a/en_US/durability/durability_introduction.md +++ b/en_US/durability/durability_introduction.md @@ -1,55 +1,92 @@ # Durable MQTT Sessions and Messages -Starting from release v5.5.x, EMQX contains an embedded durable storage for MQTT sessions and messages. This page introduces the session persistence feature EMQX and how it's capability ensures the resumption of sessions after an EMQX node restart, marking a significant enhancement for IoT applications requiring high reliability. +Starting from release v5.7, EMQX contains an embedded durable storage for MQTT sessions and messages. +This page gives a high-level introduction of the session durability feature in EMQX and how it ensures the resumption of sessions after restart of EMQX nodes. -::: warning Important Notice -This feature is in the public alpha phase. We advise against its use in production environments containing critical data until further notice. +## Types of Client Sessions in EMQX -During the alpha phase, there may be changes to the on-disk representation of MQTT messages and sessions. Consequently, data recorded in one version of EMQX may not be readable after upgrading to a subsequent version. This caution will become obsolete with the final release. +According to the MQTT standard, client sessions facilitate the management of client connections and states within the MQTT broker. +Informally, EMQX separates client sessions into two logical categories: -::: +- **Persistent**: persistent sessions are kept by the broker after the client's connection terminates, and can be resumed if the client reconnects to the broker within the session expiry interval. Messages sent to the topics while the client was offline are delivered. +- **Ephemeral**: ephemeral sessions exist only for the duration of the client's connection to EMQX. When a client with an ephemeral session disconnects, all session information, including subscriptions and undelivered messages, is discarded. -## Understand Client Sessions in EMQX +The client session is considered persistent in two cases: -In EMQX, client sessions facilitate the management of client connections and states within the MQTT broker. Starting from version 5.5, EMQX offers two different implementations for managing the client session: regular client sessions and persistent client sessions. +1. For the clients using MQTT 5 protocol, + [Session Expiry Interval](https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html#_Toc3901048) property of `CONNECT` or `DISCONNECT` packet is set to a value greater than zero. -::: tip +2. For the clients using MQTT 3.* protocol, + [Clean Session](http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html#_Toc398718030) flag is set to 0, + and `mqtt.session_expiry_interval` configuration parameter is set to a value greater than 0. -For more information on sessions, see [MQTT Persistent Session and Clean Session Explained](https://www.emqx.com/en/blog/mqtt-session). +## Session Implementation -::: +EMQX contains two alternative implementations for the client sessions, optimized for different use cases: -### Ephemeral Client Sessions +- **RAM** +- **Durable** -Ephemeral client sessions are temporary and exist for the duration of the client's connection to EMQX. When a client with an ephemeral session disconnects, all session information, including subscriptions and undelivered messages, is discarded. +Choice of the implementation depends on the type of the session (persistent or ephemeral) and the value of `session_persistence.enable` configuration parameter, global or per zone. -### Persistent Client Sessions +Implementation is chosen according to the following rule: -Persistent client sessions are designed to retain session information even after the client disconnects. This includes subscriptions and messages that are marked for delivery but have not yet been sent to or acknowledged by the client. When a client reconnects with the same session ID, it can resume its session state, receiving any messages that were retained during its absence. +| `session_persistence.enable` | Ephemeral | Persistent | +|------------------------------|-----------|------------| +| `false` | RAM | RAM | +| `true` | RAM | durable | -Persistent sessions in EMQX are determined based on the client's protocol version and session settings: +### RAM Client Sessions -- For the clients using MQTT 5 protocol, [Session Expiry Interval](https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html#_Toc3901048) property of `CONNECT` or `DISCONNECT` packet is set to a value greater than 0. +This implementation is used by default. Technically, all EMQX releases before 5.7 used this implementation. +As the name suggests, state of the RAM sessions is kept entirely in the volatile memory. -- For the clients using MQTT 3.* protocol, [Clean Session](http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html#_Toc398718030) flag is set to 0, and `mqtt.session_expiry_interval` configuration parameter is set to a value greater than 0. +RAM client sessions have the following advantages: -The session persistence feature only affects persistent client sessions. Client sessions not meeting the above criteria remain unaffected by this feature and are processed as usual. +- Generally, RAM sessions have very high throughput and low latency. +- Messages are dispatched to the clients using RAM session immediately. -## Enable Session Persistence +They also have some disadvantages: -The session persistence feature in EMQX is disabled by default. It can be enabled by setting `session_persistence.enable` configuration parameter to `true`. +- Since RAM is a volatile memory, session data is lost when the EMQX node holding the session is stopped or restarted. +- RAM sessions store undelivered messages in a memory queue. + In order to avoid running out of memory, EMQX puts a limit on the size of the queue, so when the number of undelivered messages reaches the limit, new messages are discarded. + This can lead to the message loss. -## How Session Persistence Works +### Durable Client Sessions -EMQX introduces a unique method to ensure message durability for persistent sessions. When a client with a persistent session subscribes to a topic filter, EMQX designates topics matching the filter as "durable." This ensures that, aside from routing MQTT PUBLISH messages from these topics to active sessions, the broker also saves these messages on disk. EMQX integrates directly with a built-in RocksDB database as the storage backend. +Durable sessions is an implementation of the client session introduced in EMQX v5.7. +State of durable sessions, as well as messages routed to the durable sessions are stored on disk. +This session implementation is disabled by default. It can be enabled by setting `session_persistence.enable` configuration parameter to `true`. -The session persistence feature ensures robust durability and high availability by consistently replicating message and session data across multiple nodes within an EMQX cluster. The configurable *replication factor* determines the number of replicas for each message or session, enabling users to customize the balance between durability and performance to meet their specific requirements. +The session persistence feature ensures robust durability and high availability by consistently replicating session metadata and MQTT messages sent to the durable sessions across multiple nodes within an EMQX cluster. +The configurable *replication factor* determines the number of replicas for each message or session, enabling users to customize the balance between durability and performance to meet their specific requirements. -Each durable MQTT message is stored exactly once on each replica, regardless of the number of persistent sessions subscribing to the matching topic filter and whether those sessions are currently connected or not. This enables an efficient fan-out of messages to the persistent sessions. +This implementation has the following advantages: +- Durable sessions can be resumed after restarting or stopping of EMQX nodes. +- They store MQTT messages in a shared replicated durable storage instead of the memory queue, so the RAM cost of durable sessions (online or offline) in the broker is lower. + +They also have certain disadvantages: +- Since the messages have to be stored on disk, the overall throughput of the system is expected to be lower. +- Currently the durable sessions have higher latency than the RAM sessions, because both writing and reading MQTT messages is done in batches. + While batching improves the throughput, it leads to increased end-to-end latency (delay before the published messages are seen by the client). + +## How Durable Sessions Work + +EMQX introduces a unique approach to manage message durability that allows RAM and durable sessions coexist and minimizes the cost of storing messages. +When a durable session subscribes to a topic filter, EMQX marks topics matching the filter as "durable." This ensures that, aside from routing MQTT PUBLISH messages from these topics to RAM sessions, the broker also saves such messages to the durable storage. + +Strong durability and high availability guarantees are achieved by replicating the message and session data across multiple nodes in an EMQX cluster in a consistent manner. The *replication factor*, which determines the number of copies each message or session should have, can be adjusted to achieve a desired balance between reliability and performance. + +Each durable MQTT message is stored exactly once on each replica, regardless of the number of persistent sessions subscribing to the matching topic filter and whether those sessions are currently connected or not. This enables an efficient fan-out of messages to the persistent sessions, and minimizes the volume of data that has to be written on disk. ### Durable Storage Architecture -The architecture of EMQX’s durable storage is organized into a hierarchical structure comprising shards, generations, and streams. +The architecture of EMQX’s durable storage is organized into a hierarchical structure comprising storages, shards, generations, and streams. + +#### Storage + +Storage encapsulates all data of a certain type, such as MQTT messages or MQTT sessions. #### Shard @@ -59,7 +96,7 @@ A shard is also a unit of replication, and EMQX ensures that each shard is consi #### Generation -Messages within a shard are further segmented into generations corresponding to specific time frames. New messages are written only for the current generation, while the previous generations are only accessible for reading. EMQX cleans up old messages by deleting old generations. EMQX manages the data lifecycle by deleting older generations, with the retention period defined by the `session_persistence.message_retention_period` parameter. +Messages within a shard are further segmented into generations corresponding to specific time frames. New messages are written only into the current generation, while the previous generations are only accessible for reading. EMQX cleans up old messages by deleting old generations in their entirety. The retention period of the older generations is defined by the `session_persistence.message_retention_period` parameter. Different generations can organize the data differently, according to the *storage layout* specification. Currently, only one layout is supported, optimized for managing the high throughput of wildcard subscriptions spanning a large number of topics and single-topic subscriptions. Future updates will introduce additional layouts to optimize for the different types of workloads, such as prioritizing low latency over high throughput for certain applications. @@ -67,13 +104,12 @@ Different generations can organize the data differently, according to the *stora Messages in each shard and generation are split into streams. Streams serve as units of message serialization in EMQX. Streams can contain messages from multiple topics. Various storage layouts can employ different strategies for mapping topics into streams. -Persistent sessions interact with this structure by fetching messages in batches from the streams, with the batch size adjustable via the `session_persistence.max_batch_size` parameter. This comprehensive system ensures that EMQX can efficiently handle durable message storage and retrieval. +Persistent sessions interact with this structure by fetching messages in batches from the streams, with the batch size adjustable via the `session_persistence.batch_size` parameter. ### Session Persistence Across Cluster -Each node within an EMQX cluster is assigned a unique site ID, which serves as a stable identifier, independent of the Erlang node name (`emqx@...`). Site IDs are persistent, and they are randomly generated at the first startup of the node. This stability maintains the integrity of persistent sessions and messages, especially in scenarios where nodes might undergo name modifications or reconfigurations. - -By associating persistent sessions and messages with a unique site ID rather than just the node's name, EMQX ensures that these sessions can be reliably managed and recovered, even if the underlying node details change. +Each node within an EMQX cluster is assigned a unique *Site ID*, which serves as a stable identifier, independent of the Erlang node name (`emqx@...`). Site IDs are persistent, and they are randomly generated at the first startup of the node. +This stability maintains the integrity of the data, especially in scenarios where nodes might undergo name modifications or reconfigurations. Administrators can manage and monitor persistent sessions across the cluster by using the `emqx_ctl ds info` CLI command to view the status of different sites. @@ -85,5 +121,4 @@ The storage requirements can be estimated according to the following guidelines: - **Message Storage**: The space required for storing messages on each replica is proportional to the rate of incoming messages multiplied by the duration specified by the `session_persistence.message_retention_period` parameter. This parameter dictates how long messages are retained, influencing the total storage needed. - **Session Metadata Storage**: The amount of storage for session metadata is proportional to the number of sessions multiplied by the number of streams to which they are subscribed. -- **Stream Calculation**: The number of streams is proportional to the number of shards. - +- **Stream Calculation**: The number of streams is proportional to the number of shards. It also depends (in a non-linear fashion) on the number of topics. EMQX automatically combines topics that have a similar structure into the same stream, ensuring that the number of streams doesn't grow too fast with the number of topics, minimizing the volume of metadata stored per session. diff --git a/en_US/durability/managing-replication.md b/en_US/durability/managing-replication.md index 1523a71e2..9a8f2c4b2 100644 --- a/en_US/durability/managing-replication.md +++ b/en_US/durability/managing-replication.md @@ -1,36 +1,44 @@ -# Managing Replication +# Replication Management -If you plan to run EMQX in a single-node configuration, you can skip this section. However, if you later decide to scale your deployment to a multi-node cluster, the information and procedures described here will be essential. +This document describes how to setup data replication and high availability for the EMQX durable storage. +It is relevant when setting up a new EMQX cluster with session persistence enabled, or upgrading an existing cluster to enable session persistence. ## Initial Cluster Setup -Suppose you are setting up a new EMQX cluster with session persistence enabled, or upgrading an existing cluster to enable session persistence. In that case, there are a few configuration parameters that influence how the cluster initially sets up durable storage and starts replicating data. Be sure to review these parameters as changing them will not take any effect once the durable storage is initialized. +There are a few configuration parameters that influence how the cluster initially sets up durable storage and starts replicating data. These parameters can't be changed in the runtime, and modifying them will not take any effect once the durable storage is initialized. ### Replication Factor -The replication factor, controlled with `durable_storage.messages.replication_factor` configuration parameter, determines the number of replicas each shard should have across the cluster. The default value is 3. As a rule of thumb, the replication factor should be set to an odd number. This is because the replication factor is used to determine the quorum size needed to consider a write operation successful. The larger the replication factor, the more copies of the data are stored across the cluster, thus increasing durability and availability. This is at the expense of increased storage and network overhead, because more communication is needed to achieve consensus. +The replication factor, controlled with `durable_storage.messages.replication_factor` configuration parameter, determines the number of replicas each shard should have across the cluster. The default value is 3. As a rule of thumb, the replication factor should be set to an odd number. This is because the replication factor is used to determine the quorum size needed to consider a write operation successful. The larger the replication factor, the more copies of the data are stored across the cluster, thus improving high availability. +However, this comes at the expense of increased storage and network overhead, because more communication is needed to achieve consensus. -Note that the replication factor is not strictly enforced in smaller clusters. For example, in a two-node cluster, the default replication factor is effectively 2, as each shard is replicated on both nodes, and there is no point in replicating it further. In general, EMQX allocates the replicas of each shard to different sites in the cluster, so that no two replicas are stored on the same node. +Note that the replication factor is not strictly enforced in smaller clusters. For example, in a two-node cluster, the real replication factor is effectively 2, as each shard is replicated on both nodes, and there is no point in replicating it further. In general, EMQX allocates the replicas of each shard to different sites in the cluster, so that no two replicas are stored on the same node. ### Number of Shards -This in turn determines the number of independent partitions, called _shards_, that the durable storage is divided into. It's controlled by the `durable_storage.messages.n_shards` parameter, which is 16 by default. If you plan to have a large number of nodes in the cluster, you may want to increase this value to ensure that the data is more evenly distributed and that the cluster has more opportunities to parallelize read and write operations. However, increasing the number of shards also inevitably increases the load on subscribers' side, as they need to query more shards to retrieve the data they are interested in and manage noticeably heavier subscription state. +The builtin durable storage is split into shards, which are replicated independently from each other. +Larger number of shards makes publishing and consuming MQTT messages from the durable storage more parallel. +However, each shard requires system resources, such as file descriptors. +Additionally, it increases the volume of metadata stored per session. -Keep in mind that it's not possible to change the number of shards once the durable storage is initialized. Make a decision wisely. +This number is controlled by the `durable_storage.messages.n_shards` parameter. +The number of shards can't be changed once the durable storage in the EMQX cluster has been initialized. ### Number of Sites The number of sites, as controlled by the `durable_storage.messages.n_sites` configuration parameter, determines the _minimum_ number of sites that must be online in the cluster for the durable storage to even initialize and start accepting writes. Once the minimum number of sites is online, the durable storage will start allocating shards to the available sites in a fair manner. -The default value is 1, which actually means that (depending on clustering strategy) each node may initially consider itself an only site responsible for storing the data, and initialize the durable storage accordingly. This is obviously a conflicting situation in a multi-node cluster. In this case once the cluster is formed, a single view of the cluster will "win", which will cause the other nodes to abandon the data they have stored so far. Despite this is unlikely to happen in practice, it's still a good idea to set the number of sites to an initial cluster size to avoid such situations. +The default value is 1, which in actuality means that (depending on clustering strategy) each node may initially consider itself an only site responsible for storing the data, and initialize the durable storage accordingly. +This default has been optimized for the base case of a single-node EMQX cluster. In this case once the cluster is formed, a single view of the cluster will eventually take precedence, which will cause the other nodes to abandon the data they have stored so far. +Therefore, in a multi-node cluster it's recommended to set the number of sites to an initial cluster size to avoid such situations. -Again, once the durable storage is initialized, changes to this parameter will not take any effect. +Once the durable storage is initialized, changes to this parameter will not take any effect. ## Changing Existing Cluster At some point an existing cluster may need to be reconfigured. Changes to required capacity, durability, or client traffic, decommissioning old nodes and replacing them with new ones are most common reasons for reconfiguration. This can be achieved by joining new sites to the set of sites the durable storage is replicated to, or by removing the sites that are no longer needed. -You can always look up the current allocation of shards through `emqx ctl` CLI. The `ds` subcommand stands for "durable storage". +One can always look up the current allocation of shards through `emqx ctl` CLI. The `ds` subcommand stands for "durable storage". ```shell $ emqx ctl ds info SITES: @@ -49,7 +57,7 @@ ok Responsibility over some of the shard replicas will be transferred to this new site, and it will start replicating the data. -Depending on the amount of data stored in the cluster, the process of joining a new site may take some time. This does not compromise availability of the durable storage, but may affect the performance of the cluster, because a lot of data may be shuffled around in the background. +Depending on the amount of data stored in the cluster, the process of joining a new site may take some time. This does not compromise availability of the durable storage, but may affect the performance of the cluster, because the data must be transferred between the sites in the background. Any changes to the set of durable storage sites are durably stored as well, so that node restarts or network partitions do not affect the outcome. Cluster will eventually reach the desired state in a consistent manner. @@ -61,15 +69,15 @@ $ emqx ctl ds leave emqx_durable_storage ok ``` -This process is similar to adding a site, and may take a lot of time as well. +This process is similar to adding a site, and may take time and resources as well. -However, removing a site may cause the effective replication factor to drop below the configured value. For example, if the replication factor is 3 and one of the sites in a 3-node cluster is removed, the replication factor will effectively drop to 2. This is a bit risky, so if you plan to replace a node, it's a good idea to add a new site first, and only then remove the old one. Or alternatively, perform both operations simultaneously. +However, removing a site may cause the effective replication factor to drop below the configured value. For example, if the replication factor is 3 and one of the sites in a 3-node cluster is removed, the replication factor will effectively drop to 2. This could be risky in a situations when the goal is to permanently replace a site, so it's recommended to add the new site first, and only then decommission the old one. Or, alternatively, perform both operations simultaneously. -### Assiging Sites +### Assigning Sites -If you plan to conduct a series of changes to the set of sites holding the durable storage replicas (as in the aforementioned example), it may be a good idea to assign the sites in a single operation. +Series of changes to the set of sites holding the durable storage replicas (as in the aforementioned example) can be performed in a single operation. ```shell $ emqx ctl ds set_replicas emqx_durable_storage ... ``` -This will help to minimize amount of data that needs to be transferred between the sites, while at the same time ensuring that the replication factor is maintained if possible. +This will help to minimize the volume of data transferred between the sites, while at the same time ensuring that the replication factor is maintained if possible. diff --git a/zh_CN/durability/managing-replication.md b/zh_CN/durability/managing-replication.md new file mode 100644 index 000000000..464090415 --- /dev/null +++ b/zh_CN/durability/managing-replication.md @@ -0,0 +1 @@ +# TODO