-
Notifications
You must be signed in to change notification settings - Fork 9
/
Copy pathtest_integration.py
88 lines (79 loc) · 3.71 KB
/
test_integration.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
# Copyright ClusterHQ Inc. See LICENSE file for details.
"""
Test supported configurations of the installer.
To run these tests, you must place a `terraform.tfvars.json` file in your home
directory thusly:
luke@tiny:~$ cat ~/terraform.tfvars.json
{"aws_access_key": "XXX",
"aws_secret_key": "YYY",
"aws_region": "us-west-1",
"aws_availability_zone": "us-west-1b",
"aws_key_name": "luke2",
"private_key_path": "/Users/luke/Downloads/luke2.pem"}
"""
from twisted.trial.unittest import TestCase
import os
from subprocess import check_output
from twisted.python.filepath import FilePath
import yaml, json
SECRETS_FILE = FilePath(os.path.expanduser("~") + "/terraform.tfvars.json")
SECRETS = json.load(SECRETS_FILE.open())
KEY = FilePath(SECRETS["private_key_path"])
GET_FLOCKER = "https://get.flocker.io/"
# set to empty string if you want to test against local install of tools
# (faster RTT than building docker image every time)
UFT = "uft-"
class UnofficialFlockerInstallerTests(TestCase):
"""
Complete spin-up tests.
"""
# Slow builds because we're provisioning VMs.
timeout = 60 * 60
def _run_integration_test(self, configuration):
test_dir = FilePath(self.mktemp())
test_dir.makedirs()
v = dict(testdir=test_dir.path, get_flocker=GET_FLOCKER,
configuration=configuration, uft=UFT, key=KEY.path)
cleaned_up = False
try:
os.system("""curl -sSL %(get_flocker)s | sh && \
cd %(testdir)s && \
chmod 0600 %(key)s && \
%(uft)sflocker-sample-files""" % v)
SECRETS_FILE.copyTo(test_dir.child("terraform").child("terraform.tfvars.json"))
os.system("""cd %(testdir)s && \
%(uft)sflocker-get-nodes --%(configuration)s && \
%(uft)sflocker-install cluster.yml && \
%(uft)sflocker-config cluster.yml && \
%(uft)sflocker-plugin-install cluster.yml && \
echo "sleeping 130 seconds to let cluster settle..." && \
sleep 130""" % v)
cluster_config = yaml.load(test_dir.child("cluster.yml").open())
node1 = cluster_config['agent_nodes'][0]
node2 = cluster_config['agent_nodes'][1]
self.assertNotEqual(node1, node2)
node1public = node1['public']
node2public = node2['public']
print runSSHRaw(node1public,
'docker run -v foo:/data --volume-driver=flocker busybox '
'sh -c \\"echo hello \\> /data/foo\\"')
output = runSSHRaw(node2public,
'docker run -v foo:/data --volume-driver=flocker busybox '
'cat /data/foo')
self.assertTrue(output.strip().endswith("hello"))
result = os.system("""cd %(testdir)s && \
%(uft)sflocker-volumes destroy --dataset=$(%(uft)sflocker-volumes list|tail -n 2 |head -n 1|awk -F ' ' '{print $1}') && \
while [ $(%(uft)sflocker-volumes list |wc -l) != "2" ]; do echo waiting for volumes to be deleted; sleep 1; done && \
FORCE_DESTROY=yes %(uft)sflocker-destroy-nodes""" % v)
if result == 0:
cleaned_up = True
finally:
if not cleaned_up:
os.system("""cd %(testdir)s && \
FORCE_DESTROY=yes %(uft)sflocker-destroy-nodes""" % v)
def test_ubuntu_aws(self):
return self._run_integration_test("ubuntu-aws")
def runSSHRaw(ip, command, username="root"):
command = 'ssh -o LogLevel=error -o UserKnownHostsFile=/dev/null -o StrictHostKeyChecking=no -i %s %s@%s %s' % (
KEY.path, username, ip, command)
return check_output(command, shell=True)