-
Notifications
You must be signed in to change notification settings - Fork 12
Data Syncing: Receiving Changes
Naming convention for receiving handlers is Rabbit::Handler::GROUP_ID::TableSync
,
where GROUP_ID
represents first part of source exchange name.
Define handler class inherited from TableSync::ReceivingHandler
and named according to described convention.
You should use DSL inside the class.
Suppose we will synchronize models {Project, News, User} project {MainProject}, then:
class Rabbit::Handler::MainProject::TableSync < TableSync::ReceivingHandler
queue_as :custom_queue
receive "Project", to_table: :projects
receive "News", to_table: :news, events: :update do
after_commit_on_update do
NewsCache.reload
end
end
receive "User", to_table: :clients, events: %i[update destroy] do
mapping_overrides email: :project_user_email, id: :project_user_id
only :project_user_email, :project_user_id, :project_id
target_keys :project_id, :project_user_id
rest_key :project_user_rest
version_key :project_user_version
additional_data do |project_id:|
{ project_id: project_id }
end
default_values do
{ created_at: Time.current }
end
end
receive "User", to_model: CustomModel.new(:users) do
rest_key false
end
end
In this case:
-
TableSync
- RabbitMQ event type. -
MainProject
- event source. -
Rabbit::Handler
- module for our handlers of events from RabbitMQ (there might be others)
Method queue_as
allows you to set custom queue.
Receiving handler supports array of attributes in a single update or destroy event. Corresponding upsert-style logic in ActiveRecord and Sequel orm handlers are provided.
receive source, [to_table:, to_model:, events:, &block]
The method receives following arguments
-
source
- string, name of source model (required) -
to_table
- destination table name (required if not set to_model) -
to_model
- destination model (required if not set to_table) -
events
- array of supported events (optional) -
block
- configuration block with options (optional)
This method implements logic of mapping source
to to_table
(or to to_model
) and allows customizing
the event handling logic with provided block.
You can use one source
for a lot of to_table
or to_moel
.
Most of the options can be set as computed value or as a process.
option(value)
option do |key params|
value
end
Each of options can receive static value or code block which will be called for each event with the following arguments:
-
event
- type of event (:update
or:destroy
) -
model
- source model (Project
,News
,User
in example) -
version
- version of the data -
project_id
- id of project which is used in RabbitMQ -
raw_data
- raw data from event (before applyingmapping_overrides
,only
, etc.)
Blocks can receive any number of parameters from the list.
All specific key params will be explained in examples for each option.
Whitelist for receiving attributes.
only(instance of Array)
only do |row:|
return instance of Array
end
default value is taken through the call model.columns
Primary keys or unique keys.
target_keys(instance of Array)
target_keys do |data:|
return instance of Array
end
default value is taken through the call model.primary_keys
Name of jsonb column for attributes which are not included in the whitelist.
You can set the rest_key(false)
if you won't need the rest data.
rest_key(instance of Symbol)
rest_key do |row:, rest:|
return instance of Symbol
end
default value is :rest
Name of version column.
version_key(instance of Symbol)
version_key do |data:|
return instance of Symbol
end
default value is :version
Blacklist for receiving attributes.
except(instance of Array)
except do |row:|
return instance of Array
end
default value is []
Map for overriding receiving columns.
mapping_overrides(instance of Hash)
mapping_overrides do |row:|
return instance of Hash
end
default value is {}
Additional data for insert or update (e.g. project_id
).
additional_data(instance of Hash)
additional_data do |row:|
return instance of Hash
end
default value is {}
Values for insert if a row is not found.
default_values(instance of Hash)
default_values do |data:|
return instance of Hash
end
default value is {}
Return truthy value to skip the row.
skip(instance of TrueClass or FalseClass)
skip do |data:|
return instance of TrueClass or FalseClass
end
default value is false
Proc that is used to wrap the receiving logic by custom block of code.
wrap_receiving do |data:, target_keys:, version_key:, default_values: {}, event:, &receiving_logic|
receiving_logic.call
return makes no sense
end
event option is current fired event
default value is proc { |&block| block.call }
Perform code before updating data in the database.
before_update do |data:, target_keys:, version_key:, default_values:|
return makes no sense
end
before_update do |data:, target_keys:, version_key:, default_values:|
return makes no sense
end
Сan be defined several times. Execution order guaranteed.
Perform code after updated data was committed.
after_commit_on_update do |data:, target_keys:, version_key:, default_values:, results:|
return makes no sense
end
after_commit_on_update do |data:, target_keys:, version_key:, default_values:, results:|
return makes no sense
end
-
results
- returned value frommodel.upsert
Сan be defined several times. Execution order guaranteed.
Perform code before destroying data in database.
before_destroy do |data:, target_keys:, version_key:|
return makes no sense
end
before_destroy do |data:, target_keys:, version_key:|
return makes no sense
end
Сan be defined several times. Execution order guaranteed.
Perform code after destroyed data was committed.
after_commit_on_destroy do |data:, target_keys:, version_key:, results:|
return makes no sense
end
after_commit_on_destroy do |data:, target_keys:, version_key:, results:|
return makes no sense
end
-
results
- returned value frommodel.destroy
Сan be defined several times. Execution order guaranteed.
You can use custom model for receiving.
class Rabbit::Handler::MainProject::TableSync < TableSync::ReceivingHandler
receive "Project", to_model: CustomModel.new
end
This model has to implement next interface:
def columns
return all columns from table
end
def primary_keys
return primary keys from table
end
def upsert(data: Array, target_keys: Array, version_key: Symbol, default_values: Hash)
return array with updated rows
end
def destroy(data: Array, target_keys: Array, version_key: Symbol)
return array with delited rows
end
def transaction(&block)
block.call
return makes no sense
end
def after_commit(&block)
block.call
return makes no sense
end