Library provides worker-queue implementation on top of Java and database.
Fintech company YooMoney uses db-queue in cases where reliability is a must-have requirement.
Project uses Semantic Versioning.
Library is available on Maven Central
implementation 'ru.yoomoney.tech:db-queue:15.0.0'
There are several reasons:
- You need simple, efficient and flexible task processing tool which supports delayed job execution.
- You already have a database and don't want to introduce additional tools in your infrastructure.
- You have somewhat small load. Db-queue can handle more than 1000 rps on a single table. Moreover, you can shard your database and get horizontal scalability.
- You require strong guaranties for task delivery, processing and consistency.
- Persistent working-queue
- Support for PostgreSQL, Oracle, MSSQL, H2.
- At-least-once task processing semantic.
- Delayed task execution.
- Strong-typed api (TaskPayloadTransformer)
- Task processing modes (ProcessingSettings)
- Task polling settings (PollSettings)
- Retry strategies in case of errors (FailureSettings)
- Retry strategies when you need postpone tasks (ReenqueueSettings)
- Tracing support via Brave (ExampleTracingConfiguration)
- Task event listeners to build up monitoring (TaskLifecycleListener , ThreadLifecycleListener)
- Configuration reload in runtime (QueueService#updateQueueConfigs)
- Reading queue configuration from file and dynamic reloading when file changed (QueueConfigsReader, QueueConfigsReloader).
- Storing queue tasks in a separate tables (QueueLocation).
- Storing queue tasks in a separate databases (QueueShard).
- And many other features
The library provides one-time tasks - tasks that are executed once. If you need (recurring tasks)/(periodic tasks) - tasks that are executed periodically, look at db-queue-scheduler library, please.
- You have a task that you want to process later.
- You tell QueueProducer to schedule the task.
- QueueProducer chooses a database shard.
- QueueProducer converts the task payload to string representation through TaskPayloadTransformer .
- QueueProducer inserts the task in the database through QueueDao.
- ... the task has been selected from database at specified time according to queue settings ...
- The task payload is converted to typed representation through TaskPayloadTransformer .
- The task is passed to the QueueConsumer instance in order to be processed.
- You process the task and return processing result.
- ... the task is updated according to processing result and queue settings ...
- Simple configuration: ExampleBasicConfiguration .
- Tracing configuration: ExampleTracingConfiguration
The main steps to configure the library:
- Specify a queue configuration through QueueConfig.
- Implement QueueProducer interface.
- Implement QueueConsumer interface.
- Create QueueService.
- Register
QueueConsumer
inQueueService
- Start queues through
QueueService
As of now the library supports PostgreSQL, MSSQL, Oracle and H2 as backing database, however library architecture makes
it easy to add other relational databases which has support for transactions and "for update skip locked" feature,
for example MySql.
Feel free to add support for other databases via pull request.
Create table (with index) where tasks will be stored.
CREATE TABLE queue_tasks (
id BIGSERIAL PRIMARY KEY,
queue_name TEXT NOT NULL,
payload TEXT,
created_at TIMESTAMP WITH TIME ZONE DEFAULT now(),
next_process_at TIMESTAMP WITH TIME ZONE DEFAULT now(),
attempt INTEGER DEFAULT 0,
reenqueue_attempt INTEGER DEFAULT 0,
total_attempt INTEGER DEFAULT 0
);
CREATE INDEX queue_tasks_name_time_desc_idx
ON queue_tasks USING btree (queue_name, next_process_at, id DESC);
You should always analyze your database workload before applying these recommendations. Settings heavily depends on a hardware, and a load you have.
- Autovacuum
You need to make autovacuum more aggressive in order to eliminate dead tuples. Dead tuples leads to excessive page reads because they occupy space that can be reused by active tuples. Autovacuum can be configured in many ways, for example, you can set scale-factor to 1% or even lower.
Our production settings for frequently updated tasks tables are:
CREATE TABLE queue_tasks (...) WITH (
autovacuum_vacuum_cost_delay=5,
autovacuum_vacuum_cost_limit=500,
autovacuum_vacuum_scale_factor=0.0001)
Create table (with index) where tasks will be stored.
CREATE TABLE queue_tasks (
id INT IDENTITY(1,1) NOT NULL,
queue_name TEXT NOT NULL,
payload TEXT,
created_at DATETIMEOFFSET NOT NULL DEFAULT SYSDATETIMEOFFSET(),
next_process_at DATETIMEOFFSET NOT NULL DEFAULT SYSDATETIMEOFFSET(),
attempt INTEGER NOT NULL DEFAULT 0,
reenqueue_attempt INTEGER NOT NULL DEFAULT 0,
total_attempt INTEGER NOT NULL DEFAULT 0,
PRIMARY KEY (id)
);
CREATE INDEX queue_tasks_name_time_desc_idx
ON queue_tasks (queue_name, next_process_at, id DESC);
Create table (with index) where tasks will be stored.
CREATE TABLE queue_tasks (
id NUMBER(38) NOT NULL PRIMARY KEY,
queue_name VARCHAR2(128) NOT NULL,
payload CLOB,
created_at TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP,
next_process_at TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP,
attempt NUMBER(38) DEFAULT 0,
reenqueue_attempt NUMBER(38) DEFAULT 0,
total_attempt NUMBER(38) DEFAULT 0
);
CREATE INDEX queue_tasks_name_time_desc_idx
ON queue_tasks (queue_name, next_process_at, id DESC);
Create sequence and specify its name through QueueLocation.Builder.withIdSequence(String)
or id-sequence
in file config.
CREATE SEQUENCE tasks_seq;
A table that is needed for a work
CREATE TABLE queue_tasks (
id BIGSERIAL PRIMARY KEY,
queue_name VARCHAR(100) NOT NULL,
payload VARCHAR(100),
created_at TIMESTAMP WITH TIME ZONE DEFAULT now(),
next_process_at TIMESTAMP WITH TIME ZONE DEFAULT now(),
attempt INTEGER DEFAULT 0,
reenqueue_attempt INTEGER DEFAULT 0,
total_attempt INTEGER DEFAULT 0
);
CREATE INDEX queue_tasks_name_time_desc_idx
ON queue_tasks (queue_name, next_process_at, id DESC);
The library is divided into several modules. Each module contains minimal set of dependencies to easily integrate in any project.
core
module provides base logic and requiresorg.slf4j:slf4j-api
libraryspring
module provides access to database and requires Spring Framework: spring-jdbc and spring-tx. Other features of Spring ecosystem are not in use.brave
module provides tracing support with help of Braveexample
module provides integration testing across all modules. It might help to figure out how to use the library in your code.
You should provide implementation for interfaces in that package. The package contains classes which are involved in processing or enqueueing tasks.
Queue settings.
Additional classes for managing storage.
Registration and configuration.
- internal
Internal classes. Not for public use.
Backward compatibility for classes in that package maybe broken in any release