-
Notifications
You must be signed in to change notification settings - Fork 4
/
Copy pathairbyte_dto_factory.py
288 lines (247 loc) · 10.3 KB
/
airbyte_dto_factory.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
class SourceDto:
"""
Data transfer object class for Source-type Airbyte abstractions
"""
def __init__(self):
self.source_definition_id = None
self.source_id = None
self.workspace_id = None
self.connection_configuration = {}
self.name = None
self.source_name = None
self.tags = []
def get_identity(self):
return self.source_id, self.name
def to_payload(self):
"""
sends this dto object to a dict formatted as a payload
"""
r = {}
r['sourceDefinitionId'] = self.source_definition_id
r['sourceId'] = self.source_id
r['workspaceId'] = self.workspace_id
r['connectionConfiguration'] = self.connection_configuration
r['name'] = self.name
r['sourceName'] = self.source_name
return r
class DestinationDto:
"""
Data transfer object class for Destination-type Airbyte abstractions
"""
def __init__(self):
self.destination_definition_id = None
self.destination_id = None
self.workspace_id = None
self.connection_configuration = {}
self.name = None
self.destination_name = None
self.tags = []
def get_identity(self):
return self.destination_id, self.name
def to_payload(self):
"""
Sends this dto object to a dict formatted as a payload
"""
r = {}
r['destinationDefinitionId'] = self.destination_definition_id
r['destinationId'] = self.destination_id
r['workspaceId'] = self.workspace_id
r['connectionConfiguration'] = self.connection_configuration
r['name'] = self.name
r['destinationName'] = self.destination_name
return r
class ConnectionDto:
"""
Data transfer object class for Connection-type Airbyte abstractions
"""
def __init__(self):
self.connection_id = None
self.name = 'default'
self.prefix = ''
self.source_id = None
self.source_name = None
self.destination_id = None
self.destination_name = None
self.sync_catalog = {} # sync_catalog['streams'] is a list of dicts {stream:, config:}
self.schedule = {}
self.namespace_definition = None
self.status = 'active'
def get_identity(self):
return self.connection_id, self.name
def to_payload(self):
r = {}
r['connectionId'] = self.connection_id
r['sourceId'] = self.source_id
r['destinationId'] = self.destination_id
r['name'] = self.name
r['prefix'] = self.prefix
r['schedule'] = self.schedule
r['status'] = self.status
r['syncCatalog'] = self.sync_catalog
r['namespaceDefinition'] = self.namespace_definition
return r
class ConnectionGroupDto:
"""Data transfer object class for connection groups, each one representing a set of connections
Note, Airbyte does not have this abstraction internally.
ConnectionGroupDto also does not have a to_payload method, as it will never need to be written to .yml,
or interact directly with the client, only read. Instead, to_incomplete_connection_dict sends the
relevant info for making a new ConnectionDto to a dict.
"""
def __init__(self):
self.group_name = None
self.prefix = ''
self.source_tags = None
self.destination_tags = None
self.sync_catalog = {} # sync_catalog['streams'] is a list of dicts {stream:, config:}
self.schedule = {}
self.status = 'active'
def to_incomplete_connection_dict(self):
"""
This function returns what AirbyteDtoFactory.build_connection_dto craves
"""
r = {
'name': self.group_name,
'prefix': self.prefix,
'syncCatalog': self.sync_catalog,
'schedule': self.schedule,
'status': self.status
}
return r
class StreamDto:
"""
Data transfer object class for the stream, belongs to the connection abstraction
"""
def __init__(self):
self.name = None
self.json_schema = {}
self.supported_sync_modes = []
self.source_defined_cursor = None
self.default_cursor_field = []
self.source_defined_primary_key = []
self.namespace = None
class StreamConfigDto:
"""
Data transfer object class for the stream configuration, belongs to the connection abstraction
"""
def __init__(self):
self.sync_mode = None
self.cursor_field = []
self.destination_sync_mode = None
self.primary_key = []
self.alias_name = None
self.selected = None
class WorkspaceDto:
"""
Data transfer object class for Workspace-type Airbyte abstractions
"""
class AirbyteDtoFactory:
"""
Builds data transfer objects, each modeling an abstraction inside Airbyte
"""
def __init__(self, source_definitions, destination_definitions):
self.source_definitions = source_definitions
self.destination_definitions = destination_definitions
def populate_secrets(self, secrets, new_dtos):
# TODO: Find a better way to deal with unpredictable naming in secrets v2
if 'sources' in new_dtos:
for source in new_dtos['sources']:
if source.source_name in secrets['sources']:
if 'access_token' in source.connection_configuration:
source.connection_configuration['access_token'] = secrets['sources'][source.source_name]['access_token']
elif 'token' in source.connection_configuration:
source.connection_configuration['token'] = secrets['sources'][source.source_name]['token']
if 'destinations' in new_dtos:
for destination in new_dtos['destinations']:
if destination.destination_name in secrets['destinations']:
if 'password' in destination.connection_configuration:
destination.connection_configuration['password'] = secrets['destinations'][destination.destination_name]['password']
elif 'credentials_json' in destination.connection_configuration:
destination.connection_configuration['credentials_json'] = \
secrets['destinations'][destination.destination_name]['credentials_json']
def build_source_dto(self, source: dict) -> SourceDto:
"""
Builds a SourceDto object from a dict representing a source
"""
r = SourceDto()
if 'connectionConfiguration' in source:
r.connection_configuration = source['connectionConfiguration']
r.name = source['name']
r.source_name = source['sourceName']
if 'sourceDefinitionId' in source:
r.source_definition_id = source['sourceDefinitionId']
else:
for definition in self.source_definitions['sourceDefinitions']:
if r.source_name == definition['name']:
r.source_definition_id = definition['sourceDefinitionId']
# TODO: handle exception where no sourceDefinitionId matches the provided source name
if 'sourceId' in source:
r.source_id = source['sourceId']
if 'workspaceId' in source:
r.workspace_id = source['workspaceId']
if 'tags' in source:
r.tags = source['tags']
return r
def build_destination_dto(self, destination):
"""
Builds a DestinationDto object from a dict representing a source
"""
r = DestinationDto()
r.connection_configuration = destination['connectionConfiguration']
r.destination_name = destination['destinationName']
r.name = destination['name']
if 'destinationDefinitionId' in destination:
r.destination_definition_id = destination['destinationDefinitionId']
else:
for definition in self.destination_definitions['destinationDefinitions']:
if r.destination_name == definition['name']:
r.destination_definition_id = definition['destinationDefinitionId']
if 'destinationId' in destination:
r.destination_id = destination['destinationId']
if 'workspaceId' in destination:
r.workspace_id = destination['workspaceId']
if 'tags' in destination:
r.tags = destination['tags']
return r
def build_connection_dto(self, connection):
"""
Builds a ConnectionDto from a dict representing a connection
"""
r = ConnectionDto()
if 'prefix' in connection:
r.prefix = connection['prefix']
if 'connectionId' in connection: # => connection is already defined in an Airbyte deployment
r.connection_id = connection['connectionId']
if 'sourceId' in connection:
r.source_id = connection['sourceId']
if 'sourceName' in connection:
r.source_name = connection['sourceName']
if 'destinationId' in connection:
r.destination_id = connection['destinationId']
if 'destinationName' in connection:
r.destination_name = connection['destinationName']
if 'name' in connection:
r.name = connection['name']
if 'syncCatalog' in connection:
r.sync_catalog = connection['syncCatalog']
if 'status' in connection:
r.status = connection['status']
if 'namespaceDefinition' in connection:
r.status = connection['namespaceDefinition']
r.schedule = connection['schedule']
r.status = connection['status']
return r
def build_connection_group_dto(self, connection_group):
"""
Builds a ConnectionGroupDto from a dict representing a connection_group
Note: unlike the other DTO classes, ConnectionGroupDto doesn't represent an abstraction inside Airbyte
"""
r = ConnectionGroupDto()
r.group_name = connection_group['groupName']
if 'syncCatalog' in connection_group:
r.sync_catalog = connection_group['syncCatalog']
r.schedule = connection_group['schedule']
r.status = connection_group['status']
r.source_tags = connection_group['sourceTags']
r.destination_tags = connection_group['destinationTags']
r.prefix = connection_group['prefix']
return r