forked from tozny/e3db-python
-
Notifications
You must be signed in to change notification settings - Fork 0
/
integration.py
executable file
·81 lines (57 loc) · 2.59 KB
/
integration.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
#! /usr/bin/env python
# run integration tests
import sys
import e3db
# Load default config in ~/.tozny/e3db.json
conf = e3db.Config.load()
# Now create a client using that configuration.
client = e3db.Client(conf)
record_data = {'test': 'Supercalifragilisticexpialidocious'}
def read(record_types):
# read records from every sdk type
did_read = {}
for record in client.query(record_type=record_types):
print(("Confirming %s (%s)" % (record.meta.record_id, record.meta.record_type)))
assert record.data['test'] == record_data['test'], "(actual) %s, (expected) %s; (%s)" % (record.data['test'], record_data['test'], record.meta.record_type)
assert record.data == record_data, "(actual) %s, (expected) %s; (%s)" % (record.data, record_data, record.meta.record_type)
assert record.meta.plain == {}, "(actual) %s, (expected) %s; (%s)" % (record.meta.plain, {}, record.meta.record_type)
did_read[record.meta.record_type] = True
assert len(did_read) > 0, "Failed to read any records."
assert all([x in list(did_read.keys()) for x in record_types]), "Failed to read all record types. (actual) %s, (expected) %s" % (list(did_read.keys()), record_types)
def write(record_type):
record = client.write(record_type, record_data)
print(("Wrote %s (%s)." % (record.meta.record_id, record.meta.record_type)))
def usage(err):
if err:
print(err)
print('''
integration.py <command>
Read, write and delete integration test records. This script uses
the default credentials found at ~/.tozny/e3db.json.
where <command> is one of:
read [type[, type, ...]]
Read records of the given type(s). Multiple types should be
provided in a comma separated list. An error will be raised if no
record exists for a given type.
write
Write a test record with the type 'python'.
''')
sys.exit(1)
if __name__ == '__main__':
if len(sys.argv) == 1:
usage("Please provide a command")
writer_record_type = 'python'
method = sys.argv[1]
if method == 'write':
write(writer_record_type)
elif method == 'read':
# create a list of record types we should read
if len(sys.argv) < 3:
usage("Please provide record types in a comma-separated list.")
if len(sys.argv) > 3:
usage("Please provide only a comma-separated list of record types.")
reader_record_types = sys.argv[2].split(',')
if len(reader_record_types) == 0:
usage("Please provide some record types to read.")
read(reader_record_types)
sys.exit(0)