forked from mariadb-corporation/MaxScale
-
Notifications
You must be signed in to change notification settings - Fork 0
/
cdc_schema.py
executable file
·96 lines (74 loc) · 3.13 KB
/
cdc_schema.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
#!/usr/bin/env python3
# Copyright (c) 2016 MariaDB Corporation Ab
# Copyright (c) 2023 MariaDB plc, Finnish Branch
#
# Use of this software is governed by the Business Source License included
# in the LICENSE.TXT file and at www.mariadb.com/bsl11.
#
# Change Date: 2027-04-10
#
# On the date above, in accordance with the Business Source License, use
# of this software will be governed by version 2 or later of the General
# Public License.
#
# This program requires the MySQL Connector/Python to work
#
import mysql.connector as mysql
import json
import sys
import argparse
parser = argparse.ArgumentParser(description = "CDC Schema Generator", conflict_handler="resolve", epilog="""This program generates CDC schema files for a specific table or all the tables in a database. The
schema files need to be generated if the binary log files do not contain the
CREATE TABLE events that define the table layout.""")
parser.add_argument("-h", "--host", dest="host", help="Network address where the connection is made", default="localhost")
parser.add_argument("-P", "--port", dest="port", help="Port where the connection is made", default="3306")
parser.add_argument("-u", "--user", dest="user", help="Username used when connecting", default="")
parser.add_argument("-p", "--password", dest="password", help="Password used when connecting", default="")
parser.add_argument("DATABASE", help="Generate Avro schemas for this database")
opts = parser.parse_args(sys.argv[1:])
def parse_field(row):
res = dict()
parts = row[1].lower().split('(')
name = parts[0]
res["real_type"] = name
if len(parts) > 1 and name not in ["enum", "set", "decimal"]:
res["length"] = int(parts[1].split(')')[0])
else:
res["length"] = -1
type = "string"
if name in ("date", "datetime", "time", "timestamp", "year", "tinytext", "text",
"mediumtext", "longtext", "char", "varchar", "enum", "set"):
type = "string"
elif name in ("tinyblob", "blob", "mediumblob", "longblob", "binary", "varbinary"):
type = "bytes"
elif name in ("int", "smallint", "mediumint", "integer", "tinyint", "short", "bit"):
type = "int"
elif name in ("float"):
type = "float"
elif name in ("double", "decimal"):
type = "double"
elif name in ("long", "bigint"):
type = "long"
res["type"] = ["null", type]
res["name"] = row[0].lower()
return res
try:
conn = mysql.connect(user=opts.user, password=opts.password, host=opts.host, port=opts.port)
cursor = conn.cursor()
cursor.execute("SHOW TABLES FROM {}".format(opts.DATABASE))
tables = []
for res in cursor:
tables.append(res[0])
for t in tables:
schema = dict(namespace="MaxScaleChangeDataSchema.avro", type="record", name="ChangeRecord", fields=[])
cursor.execute("DESCRIBE {}.{}".format(opts.DATABASE, t))
for res in cursor:
schema["fields"].append(parse_field(res))
dest = open("{}.{}.000001.avsc".format(opts.DATABASE, t), 'w')
dest.write(json.dumps(schema))
dest.close()
cursor.close()
conn.close()
except Exception as e:
print(e)
exit(1)