Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SQL transaction log reader #7

Open
lbradstreet opened this issue Nov 21, 2015 · 12 comments
Open

SQL transaction log reader #7

lbradstreet opened this issue Nov 21, 2015 · 12 comments

Comments

@lbradstreet
Copy link
Member

This could mostly copy code from the datomic plugin's read-log.

Unfortunately this would have to be per database.

A reasonable looking Java library to help is:
https://github.com/shyiko/mysql-binlog-connector-java

I'd love to help a newbie with this one. Otherwise it may have to wait for client demand.

@mushketyk
Copy link
Contributor

@lbradstreet I would like to work on this task, but I need some help with this.
Could you give me more context please?

@sventech
Copy link

I think @ibradstreet is referring to this type of stream construction from DB replication logs:
http://www.confluent.io/blog/bottled-water-real-time-integration-of-postgresql-and-kafka/

The project I referred to, by Martin Kleppmann and team, has implemented parsing for both MySQL and PostgreSQL logs. Very exciting stuff, but not as elegant as Onyx. :-)

@ghost
Copy link

ghost commented Apr 8, 2016

the project is still running?
i want to help and i have a lot of time to do it.

@lbradstreet
Copy link
Member Author

This might be a good watch if you're interested in looking at reading the binlog https://www.percona.com/live/data-performance-conference-2016/sessions/mining-mysqls-binary-log-apache-kafka-and-kafka-connect

@ghost
Copy link

ghost commented Apr 8, 2016

  • do you plan to use https://github.com/shyiko/mysql-binlog-connector-java and other cdc library for each dbms
    or implement it from scratch by introducing a general layer?
  • do you want to implement it inside this plugin to support break point?
    or implement it in another plugin for load streaming data from database in lower level?
  • can you introduce the related plugin read-log code location? i will spend the time on reading it.

thanks

@lbradstreet
Copy link
Member Author

Yes, I think we would implement it with https://github.com/shyiko/mysql-binlog-connector-java. I'm not sure how we would go about making it generic yet. I'd start by making it specific to MySQL and hopefully we can generalise later.

You may wish to read through the onyx-kafka code, since it also reads from a "log" of sorts. https://github.com/onyx-platform/onyx-kafka/blob/0.9.x/src/onyx/plugin/kafka.clj

I think implementing it in this plugin would be fine. It'd also be ok to make it a separate plugin to keep the dependencies down though. Maybe start with it in this plugin?

@ghost
Copy link

ghost commented Apr 11, 2016

i run a simple example to pull data from mysql binlog file on our production server:

(ns onyx-binlog.core                                                                                                                                                                                                                             
  (:require [clojure.core.async :as async :refer [>!! <!! timeout chan thread close!]]                                                                                                                                                           
            [clojure.java.data :as data :refer [from-java]])                                                                                                                                                                                     
  (:import [com.github.shyiko.mysql.binlog BinaryLogClient BinaryLogClient$EventListener]))


(def binlog-ch (chan 1000))
(let [client (BinaryLogClient. "10.0.146.10" 3306 "bigdatatongbu" "bigdatatongbu")
      listener (reify BinaryLogClient$EventListener (onEvent [this event] (>!! binlog-ch (from-java event) )))
      _ (doto client (.registerEventListener listener) (.connect (* 1000 5))) ]
  (while true  (prn (<!! binlog-ch))))

the related event data is:

{:data {:binlogFilename "mysql-bin.007113", :binlogPosition 122138115}, :header {:eventType "ROTATE", :dataLength 24, :headerLength 19, :serverId 10, :nextPosition 0, :position -43, :timestamp 0, :flags 32, :eventLength 43}}
{:data {:binlogVersion 4, :headerLength 19, :serverVersion "5.5.40-log"}, :header {:eventType "FORMAT_DESCRIPTION", :dataLength 84, :headerLength 19, :serverId 10, :nextPosition 0, :position -103, :timestamp 1460305443000, :flags 0, :eventLength 103}}
{:data {:database "", :errorCode 0, :executionTime 0, :sql "BEGIN", :threadId 3916942445}, :header {:eventType "QUERY", :dataLength 48, :headerLength 19, :serverId 11, :nextPosition 122138182, :position 122138115, :timestamp 1460363153000, :flags 8, :eventLength 67}}
{:data {:database "restaurant", :errorCode 0, :executionTime 0, :sql "UPDATE WxAccountTable SET ViewDate=CURDATE(),SerialID='271693972' WHERE openid='oWT18jitflzR3Be9zj2bkE8e3bCA'", :threadId 3916942445}, :header {:eventType "QUERY", :dataLength 167, :headerLength 19, :serverId 11, :nextPosition 122138368, :position 122138182, :timestamp 1460363153000, :flags 0, :eventLength 186}}
{:data {:xid 4109287098}, :header {:eventType "XID", :dataLength 8, :headerLength 19, :serverId 11, :nextPosition 122138395, :position 122138368, :timestamp 1460363153000, :flags 0, :eventLength 27}}
{:data {:database "", :errorCode 0, :executionTime 0, :sql "BEGIN", :threadId 3916942445}, :header {:eventType "QUERY", :dataLength 40, :headerLength 19, :serverId 11, :nextPosition 122138454, :position 122138395, :timestamp 1460363153000, :flags 8, :eventLength 59}}
{:data {:database "restaurant", :errorCode 0, :executionTime 0, :sql "INSERT INTO ReleationsTable (UserID, ShopID, Attention) VALUES('14390757', '19910', '1')", :threadId 3916942445}, :header {:eventType "QUERY", :dataLength 138, :headerLength 19, :serverId 11, :nextPosition 122138611, :position 122138454, :timestamp 1460363153000, :flags 0, :eventLength 157}}
{:data {:xid 4109287100}, :header {:eventType "XID", :dataLength 8, :headerLength 19, :serverId 11, :nextPosition 122138638, :position 122138611, :timestamp 1460363153000, :flags 0, :eventLength 27}}
{:data {:database "", :errorCode 0, :executionTime 0, :sql "BEGIN", :threadId 3913772817}, :header {:eventType "QUERY", :dataLength 40, :headerLength 19, :serverId 11, :nextPosition 122138697, :position 122138638, :timestamp 1460363153000, :flags 8, :eventLength 59}}
{:data {:database "restaurant", :errorCode 0, :executionTime 0, :sql "update ShopServiceStateTable  set ServiceOnline = 0,QueueState=1 where ShopID = 6960  and Service = 1", :threadId 3913772817}, :header {:eventType "QUERY", :dataLength 151, :headerLength 19, :serverId 11, :nextPosition 122138867, :position 122138697, :timestamp 1460363153000, :flags 0, :eventLength 170}}
{:data {:xid 4109287102}, :header {:eventType "XID", :dataLength 8, :headerLength 19, :serverId 11, :nextPosition 122138894, :position 122138867, :timestamp 1460363153000, :flags 0, :eventLength 27}}
{:data {:database "", :errorCode 0, :executionTime 0, :sql "BEGIN", :threadId 3916942446}, :header {:eventType "QUERY", :dataLength 48, :headerLength 19, :serverId 11, :nextPosition 122138961, :position 122138894, :timestamp 1460363153000, :flags 8, :eventLength 67}}
{:data {:database "restaurant", :errorCode 0, :executionTime 0, :sql "UPDATE ShopServiceStateTable SET LastTime=NOW(),ServiceOnline=1,Handled=0,State='1',OnLine=1,\r\n                    QueueState='1',ApState='0',ShopState='3',\r\n                    Lat='39.85825',Lng='116.43633',City='19',CityAreaID='61',\r\n                    BCID='351',TiyanDian='0',Cooperation='1',\r\n                    OpeningStart='36000',OpeningEnd='79200' \r\n                    WHERE ShopID='91033' AND Service='6'", :threadId 3916942446}, :header {:eventType "QUERY", :dataLength 476, :headerLength 19, :serverId 11, :nextPosition 122139456, :position 122138961, :timestamp 1460363153000, :flags 0, :eventLength 495}}

i think it's hard to integrated into the current plugin as the data model is not consistent.
maybe i need to dig into the replication mechanism to see how the log rebuilt the slave database,
so that produce the related command to pair write-plugin for vary db to be consistent?

can you suggest something?

@ghost
Copy link

ghost commented Apr 11, 2016

i also check the my pipe/maxwell libraryhttps://github.com/mardambey/mypipe mechanism.
it seem that binlog_format was configured with ROW mode. my production mode is set to MIXED mode.

so i will try to run the related ddl, dml to test the replication mechanism, but i also need a mechanism to get the updated data within a window.

Enabling MySQL binary logging

The following snippet of configuration will enable MySQL to generate binary logs the mypipe will be able to understand. Taken from my.cnf:

server-id = 112233
log_bin = mysql-bin
expire_logs_days = 1
binlog_format = row
The binary log format is required to be set to row for mypipe to work since we need to track individual changes to rows (insert, update, delete).

@lbradstreet
Copy link
Member Author

Interesting. Thanks for getting onto this. It looks like you're on the
right track. Which part of it specifically isn't consistent?

On 11 April 2016 at 19:54, larry luo [email protected] wrote:

i find the maxwell seems capture the delta data record by applying
operation on backend database.
so i want to find one kind of database that can apply dml and return the
merge diff before and after that can be more convenient.


You are receiving this because you were mentioned.
Reply to this email directly or view it on GitHub
#7 (comment)

@ghost
Copy link

ghost commented Apr 11, 2016

i think event action is not consistent vs data, but i can switch the log mode to low level .
the log contains the event, it contains related dml, ddl for replication rather than data itself when the mixed or statement mode open.
it will contains data just when the binary log format switch to row mode.
such the mypipe, maxwell must use the row mode to extract the record.

i will ask the dba to switch the database to row mode tomorrow,
but i think it has some impact on log size and some sql performance.
so i don't know whether dba will think it's reasonable action.

@ghost
Copy link

ghost commented Apr 12, 2016

i will develop the plugin based on row mode for mysql.
and so it only need to read the log event with related row data to streaming.

it is consistent with current read-log logic now.

but i think it's a good idea to build another plugin for log reader.
as log reader contains not only single table information, it related to whole database.
it can be extend to handle variety log data at the same logic at core
that abstract from reading log data only without other logic mess.

can you suggest something?

@ghost
Copy link

ghost commented Apr 14, 2016

i already write a simple example with messy and hardcode code
use mysql-bin-log sync one table to oracle in realtime.
https://github.com/larluo/9now/blob/master/workspace/onyx-binlog/src/onyx_binlog/core.clj

i will adapt it to the plugin, so can you give me some suggestion about the integration with the exist plugin as the bin-log is the database-level data.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants