Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Not all partitions are imported into datomic from mysql #5

Open
uchouhan opened this issue Nov 17, 2015 · 6 comments
Open

Not all partitions are imported into datomic from mysql #5

uchouhan opened this issue Nov 17, 2015 · 6 comments

Comments

@uchouhan
Copy link

When I run the following job (tried multiple times) it seems to import certain ranges of mysql ids but not all (for example ids 1-16538, 104023-164xxx, 1060xxx-1640xxx made it but the ones in b/w did not), not sure if its an issue with the partition setup in my code (was trying to build on the onyx-example in the github repo)

(ns datomic-mysql-transfer.core
(:require [clojure.java.jdbc :as jdbc]
[datomic.api :as d]
[onyx.plugin.datomic]
[onyx.plugin.sql]
[onyx.api])
(:import [com.mchange.v2.c3p0 ComboPooledDataSource]))

;;;;;;;; First, some set up work for SQL ;;;;;;;;;;;;;

;;; Def some top-level constants to use below

(def db-name "optimis_development")

(def classname "com.mysql.jdbc.Driver")

(def subprotocol "mysql")

(def subname (format "//127.0.0.1:3306/%s?zeroDateTimeBehavior=convertToNull" db-name))

(def user "root")

(def password "")

;;; Throughput knob that you can tune
(def batch-size 20)

;;; The table to read out of
(def table :patients)

;;; A monotonically increasing integer to partition the table by
(def id-column :id)

;;; JDBC spec to connect to MySQL
(def db-spec
{:classname classname
:subprotocol subprotocol
:subname subname
:user user
:password password})

;;; Create a pool of connections for the virtual peers of Onyx to share
(defn pool [spec]
{:datasource
(doto (ComboPooledDataSource.)
(.setDriverClass (:classname spec))
(.setJdbcUrl (str "jdbc:" (:subprotocol spec) ":" (:subname spec)))
(.setUser (:user spec))
(.setPassword (:password spec))
(.setMaxIdleTimeExcessConnections (* 30 60))
(.setMaxIdleTime (* 3 60 60)))})

;;; Create the pool
(def conn-pool (pool db-spec))

;;;;;;;; Next, some set up work for Datomic ;;;;;;;;;;;;;

;;; The URI for the Datomic database that we'll write to
(def db-uri (str "datomic:sql://adb3?jdbc:mysql://localhost:3306/datomic?user=datomic&password=datomic"))

;;; The schema of the database. A user's name and age, semantic
;;; equivalent of the MySQL schema.
(def schema
[{:db/id #db/id [:db.part/db]
:db/ident :com.optimispt/patients
:db.install/_partition :db.part/db}
{:db/id #db/id [:db.part/db]
:db/ident :patient/id
:db/valueType :db.type/long
:db/unique :db.unique/identity
:db/cardinality :db.cardinality/one
:db.install/_attribute :db.part/db}
{:db/id #db/id [:db.part/db]
:db/ident :patient/practice-id
:db/valueType :db.type/long
:db/cardinality :db.cardinality/one
:db.install/_attribute :db.part/db}
{:db/id #db/id [:db.part/db]
:db/ident :patient/photo-id
:db/valueType :db.type/long
:db/cardinality :db.cardinality/one
:db.install/_attribute :db.part/db}
{:db/id #db/id [:db.part/db]
:db/ident :patient/suffix
:db/valueType :db.type/string
:db/cardinality :db.cardinality/one
:db.install/_attribute :db.part/db}
{:db/id #db/id [:db.part/db]
:db/ident :patient/first-name
:db/valueType :db.type/string
:db/fulltext true
:db/cardinality :db.cardinality/one
:db.install/_attribute :db.part/db}
{:db/id #db/id [:db.part/db]
:db/ident :patient/middle-name
:db/valueType :db.type/string
:db/fulltext true
:db/cardinality :db.cardinality/one
:db.install/_attribute :db.part/db}
{:db/id #db/id [:db.part/db]
:db/ident :patient/last-name
:db/valueType :db.type/string
:db/fulltext true
:db/cardinality :db.cardinality/one
:db.install/_attribute :db.part/db}
{:db/id #db/id [:db.part/db]
:db/ident :patient/birth-date
:db/valueType :db.type/instant
:db/index true
:db/cardinality :db.cardinality/one
:db.install/_attribute :db.part/db}
{:db/id #db/id [:db.part/db]
:db/ident :patient/gender
:db/valueType :db.type/string
:db/cardinality :db.cardinality/one
:db.install/_attribute :db.part/db}
{:db/id #db/id [:db.part/db]
:db/ident :patient/ssn
:db/valueType :db.type/string
:db/cardinality :db.cardinality/one
:db/fulltext true
:db.install/_attribute :db.part/db}
{:db/id #db/id [:db.part/db]
:db/ident :patient/uuid
:db/valueType :db.type/uuid
:db/cardinality :db.cardinality/one
:db.install/_attribute :db.part/db}])

;;; Create the DB, connect to it, and transact the schema
(d/create-database db-uri)

(def datomic-conn (d/connect db-uri))

@(d/transact datomic-conn schema)

;;;;;;;;;;;; Next, we run the Onyx job to transfer the data ;;;;;;;;;;;;;;
(def id (java.util.UUID/randomUUID))

(def env-config
{:zookeeper/address "127.0.0.1:2188"
:zookeeper/server? true
:zookeeper.server/port 2188
:onyx/id id})

(def peer-config
{:zookeeper/address "127.0.0.1:2188"
:onyx/id id
:onyx.peer/job-scheduler :onyx.job-scheduler/balanced
:onyx.messaging/impl :aeron
:onyx.messaging/peer-port 40200
:onyx.messaging/bind-addr "localhost"})

(def env (onyx.api/start-env env-config))

(def peer-group (onyx.api/start-peer-group peer-config))

;;; Partition the MySQL table by ID column, parallel read the rows,
;;; do a semantic transformation, write to Datomic.
(def workflow
[[:partition-keys :read-rows]
[:read-rows :prepare-datoms]
[:prepare-datoms :write-to-datomic]])

(def n-peers (count (set (mapcat identity workflow))))

(def v-peers (onyx.api/start-peers n-peers peer-group))

(def catalog
[{:onyx/name :partition-keys
:onyx/plugin :onyx.plugin.sql/partition-keys
:onyx/type :input
:onyx/medium :sql
:sql/classname classname
:sql/subprotocol subprotocol
:sql/subname subname
:sql/user user
:sql/password password
:sql/table table
:sql/id id-column
:sql/rows-per-segment 1000
:onyx/batch-size batch-size
:onyx/max-peers 1
:onyx/doc "Partitions a range of primary keys into subranges"}

{:onyx/name :read-rows
:onyx/fn :onyx.plugin.sql/read-rows
:onyx/type :function
:sql/classname classname
:sql/subprotocol subprotocol
:sql/subname subname
:sql/user user
:sql/password password
:sql/table table
:sql/id id-column
:onyx/batch-size batch-size
:onyx/doc "Reads rows of a SQL table bounded by a key range"}

{:onyx/name :prepare-datoms
:onyx/fn :datomic-mysql-transfer.core/prepare-datoms
:onyx/type :function
:onyx/batch-size batch-size
:onyx/doc "Semantically transform the SQL rows to Datomic datoms"}

{:onyx/name :write-to-datomic
:onyx/plugin :onyx.plugin.datomic/write-bulk-datoms
:onyx/type :output
:onyx/medium :datomic
:datomic/uri db-uri
:datomic/partition :com.optimispt/patients
:onyx/batch-size batch-size
:onyx/doc "Transacts segments to storage"}])

(defn non-nil-datoms [segment](into {} %28remove #%28nil? %28val %%29%29 {:patient/id %28:id segment%29
:patient/practice-id %28:practice_id segment%29
:patient/suffix %28:suffix segment%29
:patient/first-name %28:first_name segment%29
:patient/middle-name %28:middle_name segment%29
:patient/last-name %28:last_name segment%29
:patient/birth-date %28:birth_date segment%29
:patient/gender %28:gender segment%29
:patient/ssn %28:gender segment%29
:patient/photo-id %28:photo_id segment%29}%29))

;;; We need to prepare the datoms before we send it to the Datomic plugin.
;;; Set the temp ids and batch the segments into the :datoms key.
(defn prepare-datoms [segment]
{:tx [(merge {:db/id (d/tempid :com.optimispt/patients)} (non-nil-datoms segment))]})

(def lifecycles
[{:lifecycle/task :partition-keys
:lifecycle/calls :onyx.plugin.sql/partition-keys-calls}
{:lifecycle/task :read-rows
:lifecycle/calls :onyx.plugin.sql/read-rows-calls}
{:lifecycle/task :write-to-datomic
:lifecycle/calls :onyx.plugin.datomic/write-bulk-tx-calls}])

;;; And off we go!
(def job-id
(:job-id
(onyx.api/submit-job
peer-config
{:catalog catalog :workflow workflow :lifecycles lifecycles
:task-scheduler :onyx.task-scheduler/balanced})))

;;; Block until the job is done, then check Datomic
(onyx.api/await-job-completion peer-config job-id)

;;; Aaaaaand stop!
(doseq [v-peer v-peers](onyx.api/shutdown-peer v-peer))

(onyx.api/shutdown-peer-group peer-group)

(onyx.api/shutdown-env env)

(shutdown-agents)

@uchouhan
Copy link
Author

cc: @MichaelDrogalis @lbradstreet

@MichaelDrogalis
Copy link
Contributor

I'll check it out after the Conj wraps up. Thanks!

@uchouhan
Copy link
Author

I added :onyx/batch-timeout with a higher value but that does not make a difference. Also, added a prn statement in the write-batch function

(defrecord DatomicWriteBulkDatoms [conn]
p-ext/Pipeline
(read-batch
[_ event](function/read-batch event))

(write-batch
[_ event]
;; Transact each tx individually to avoid tempid conflicts.
(doseq [tx (mapcat :leaves (:tree (:onyx.core/results event)))](prn %28:tx %28:message tx%29%29)
@(d/transact conn (:tx (:message tx))))
{:onyx.core/written? true})

(seal-resource
[_ _]
{}))

and it seems to skip a significant number of ranges (like shown below)
[{:db/id #db/id[:com.optimispt/patients -1001007], :patient/id 1000}]
[{:db/id #db/id[:com.optimispt/patients -1001008], :patient/id 1001}]
[{:db/id #db/id[:com.optimispt/patients -1001009], :patient/id 1002}]
[{:db/id #db/id[:com.optimispt/patients -1001010], :patient/id 1003}]
[{:db/id #db/id[:com.optimispt/patients -1001011], :patient/id 1004}]
[{:db/id #db/id[:com.optimispt/patients -1036012], :patient/id 600009}]
[{:db/id #db/id[:com.optimispt/patients -1036013], :patient/id 600011}]
[{:db/id #db/id[:com.optimispt/patients -1036014], :patient/id 600013}]
[{:db/id #db/id[:com.optimispt/patients -1036015], :patient/id 600015}]
[{:db/id #db/id[:com.optimispt/patients -1036016], :patient/id 600017}]

A similar statement in the read-rows function verified that the sql is generated for all ranges.

@lbradstreet
Copy link
Member

Hi @uchouhan, so you can confirm that these ranges are being generated by your tasks (i.e. in prepare-datoms)? If so, are they passed in to the datomic task? Or does write-batch never see them?

[{:db/id #db/id[:com.optimispt/patients -1001007], :patient/id 1000}]
[{:db/id #db/id[:com.optimispt/patients -1001008], :patient/id 1001}]
[{:db/id #db/id[:com.optimispt/patients -1001009], :patient/id 1002}]
[{:db/id #db/id[:com.optimispt/patients -1001010], :patient/id 1003}]
[{:db/id #db/id[:com.optimispt/patients -1001011], :patient/id 1004}]
[{:db/id #db/id[:com.optimispt/patients -1036012], :patient/id 600009}]
[{:db/id #db/id[:com.optimispt/patients -1036013], :patient/id 600011}]
[{:db/id #db/id[:com.optimispt/patients -1036014], :patient/id 600013}]
[{:db/id #db/id[:com.optimispt/patients -1036015], :patient/id 600015}]
[{:db/id #db/id[:com.optimispt/patients -1036016], :patient/id 600017}]

@lbradstreet
Copy link
Member

Please feel free to come onto https://gitter.im/onyx-platform/onyx and chat about this with us in real time. I'm also happy to work through the issue here.

@uchouhan
Copy link
Author

I've seen some of these earlier, but not always

15-Nov-19 00:12:38 Umang-104.local WARN [onyx.messaging.aeron] -
java.lang.Thread.run Thread.java: 745
uk.co.real_logic.agrona.concurrent.AgentRunner.run AgentRunner.java: 105
uk.co.real_logic.aeron.ClientConductor.doWork ClientConductor.java: 113
uk.co.real_logic.aeron.ClientConductor.doWork ClientConductor.java: 295
uk.co.real_logic.aeron.ClientConductor.onCheckTimeouts ClientConductor.java: 339
uk.co.real_logic.aeron.exceptions.ConductorServiceTimeoutException: Timeout between service calls over 10000000000ns

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants