Skip to content

Timeout Service

Oleg Galako edited this page Jun 5, 2018 · 14 revisions

Purpose of the service

Timeout Service accepts requests in form of events.Timer events and delivers events.TimerExpired events at the specified point of time. It is also possible to cancel a pending request.

Event schemas

The schemas are defined in system_event.proto.

events.Timer:

message Timer
{
  required common.Header header = 1;

  enum Action {
    CREATE = 1;
    CANCEL = 2;
  }

  optional Action action = 2 [default=CREATE];

  required string correlation_id = 3;

  // Payload to be sent
  optional bytes payload = 4;

  // to which exchange.
  optional string exchange = 5;

  // timestamp in second from epoch
  optional uint64 expire_at = 6;
}

events.TimerExpired:

message TimerExpired
{
  required common.Header header = 1;

  required string correlation_id = 2;
  optional string universe = 3;
}

These are the legacy schemas from Xray 1.0 and at the moment we don't change them even though we don't need all the fields defined there.

Description

The implementation is based on Kafka Streams. The idea is very simple:

  • Requests are stored in the local Key-value store (backed by RocksDB by default) with a string key starting with hex representation of the target time. This allows to query the store to get all the keys up to current time.
  • Punctuator runs every 100 ms (can be changed) and queries the local store for expired entries. For each of those it published the events.TimerExpired event and deletes the key from the local store.
  • There is another local store acting as index by correlationId so that it's possible to quickly find a key when a CANCEL request is received.

Kafka Streams not only allows for simple and transparent implementation but also adds easily configurable delivery guarantees (including exactly once, using Kafka transactions). The solution is automatically scalable because of consistent hashing approach used with Kafka topics (meaning that requests for the same entity will be handled by the same node and resulting events will be published to the same partition).

Configuration

The integrant config is self-descriptive:

:re.service/timeout {:encoder #ig/ref :re/encoder
                     :decoder #ig/ref :re/decoder
                     :config {:streams-config {"bootstrap.servers" ["kafka:9092"]
                                               "application.id" "timeout-service"
                                               "state.dir" "/tmp/kafka-streams"}
                              :input-topic "timer-requests"
                              :output-topic "dev"
                              :timers-store-name "timers"
                              :correlations-store-name "correlations"}}
Clone this wiki locally