-
Notifications
You must be signed in to change notification settings - Fork 61
/
configuration.py
56 lines (50 loc) · 1.87 KB
/
configuration.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
'''Provides an object model for a our config file'''
import json
import logging
from voluptuous import Schema, Required, Any, Optional
LOGGER = logging.getLogger(__name__)
CONFIG_CONTRACT = Schema({
Required('tables'): [{
Required('path'): str,
Required('name'): str,
Required('pattern'): str,
Required('start_date'): str,
Required('key_properties'): [str],
Required('format'): Any('csv', 'excel', 'json', 'jsonl', 'parquet', 'detect'),
Optional('encoding'): str,
Optional('invalid_format_action'): Any('ignore','fail'),
Optional('universal_newlines'): bool,
Optional('skip_initial'): int,
Optional('selected'): bool,
Optional('field_names'): [str],
Optional('search_prefix'): str,
Optional('worksheet_name'): str,
Optional('delimiter'): str,
Optional('quotechar'): str,
Optional('json_path'): str,
Optional('sample_rate'): int,
Optional('max_sampling_read'): int,
Optional('max_records_per_run'): int,
Optional('max_sampled_files'): int,
Optional('prefer_number_vs_integer'): bool,
Optional('prefer_schema_as_string'): bool,
Optional('schema_overrides'): {
str: {
Required('type'): Any(Any('null','string','integer','number','date-time','object'),
[Any('null','string','integer','number','date-time','object')])
}
}
}]
})
class Config():
@classmethod
def dump(cls, config_json, ostream):
json.dump(config_json, ostream, indent=2)
@classmethod
def validate(cls, config_json):
CONFIG_CONTRACT(config_json)
return config_json
@classmethod
def load(cls, filename):
with open(filename) as fp: # pylint: disable=invalid-name
return Config.validate(json.load(fp))