forked from thepaul/cassandra-dtest
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathdtest.py
1127 lines (913 loc) · 42.2 KB
/
dtest.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
from __future__ import with_statement
import ConfigParser
import copy
import errno
import glob
import logging
import os
import pprint
import re
import shutil
import signal
import subprocess
import sys
import tempfile
import thread
import threading
import time
import traceback
import types
import unittest.case
from collections import OrderedDict
from subprocess import CalledProcessError
from unittest import TestCase
import cassandra
import ccmlib.repository
from cassandra import ConsistencyLevel
from cassandra.auth import PlainTextAuthProvider
from cassandra.cluster import Cluster as PyCluster
from cassandra.cluster import NoHostAvailable
from cassandra.cluster import ExecutionProfile, EXEC_PROFILE_DEFAULT
from cassandra.policies import RetryPolicy, WhiteListRoundRobinPolicy
from ccmlib.cluster import Cluster
from ccmlib.cluster_factory import ClusterFactory
from ccmlib.common import get_version_from_build, is_win
from nose.exc import SkipTest
from nose.tools import assert_greater_equal
from six import print_
from plugins.dtestconfig import _CONFIG as CONFIG
# We don't want test files to know about the plugins module, so we import
# constants here and re-export them.
from plugins.dtestconfig import GlobalConfigObject
from tools.context import log_filter
from tools.funcutils import merge_dicts
LOG_SAVED_DIR = "logs"
try:
os.mkdir(LOG_SAVED_DIR)
except OSError:
pass
LAST_LOG = os.path.join(LOG_SAVED_DIR, "last")
LAST_TEST_DIR = 'last_test_dir'
DEFAULT_DIR = './'
config = ConfigParser.RawConfigParser()
if len(config.read(os.path.expanduser('~/.cassandra-dtest'))) > 0:
if config.has_option('main', 'default_dir'):
DEFAULT_DIR = os.path.expanduser(config.get('main', 'default_dir'))
CASSANDRA_DIR = os.environ.get('CASSANDRA_DIR', DEFAULT_DIR)
NO_SKIP = os.environ.get('SKIP', '').lower() in ('no', 'false')
DEBUG = os.environ.get('DEBUG', '').lower() in ('yes', 'true')
TRACE = os.environ.get('TRACE', '').lower() in ('yes', 'true')
KEEP_LOGS = os.environ.get('KEEP_LOGS', '').lower() in ('yes', 'true')
KEEP_TEST_DIR = os.environ.get('KEEP_TEST_DIR', '').lower() in ('yes', 'true')
PRINT_DEBUG = os.environ.get('PRINT_DEBUG', '').lower() in ('yes', 'true')
OFFHEAP_MEMTABLES = os.environ.get('OFFHEAP_MEMTABLES', '').lower() in ('yes', 'true')
NUM_TOKENS = os.environ.get('NUM_TOKENS', '256')
RECORD_COVERAGE = os.environ.get('RECORD_COVERAGE', '').lower() in ('yes', 'true')
IGNORE_REQUIRE = os.environ.get('IGNORE_REQUIRE', '').lower() in ('yes', 'true')
DATADIR_COUNT = os.environ.get('DATADIR_COUNT', '3')
ENABLE_ACTIVE_LOG_WATCHING = os.environ.get('ENABLE_ACTIVE_LOG_WATCHING', '').lower() in ('yes', 'true')
RUN_STATIC_UPGRADE_MATRIX = os.environ.get('RUN_STATIC_UPGRADE_MATRIX', '').lower() in ('yes', 'true')
# devault values for configuration from configuration plugin
_default_config = GlobalConfigObject(
vnodes=True,
)
if CONFIG is None:
CONFIG = _default_config
DISABLE_VNODES = not CONFIG.vnodes
if os.environ.get('DISABLE_VNODES', '').lower() in ('yes', 'true'):
print 'DISABLE_VNODES environment variable deprecated. Use `./run_dtests.py --vnodes false` instead.'
CURRENT_TEST = ""
logging.basicConfig(filename=os.path.join(LOG_SAVED_DIR, "dtest.log"),
filemode='w',
format='%(asctime)s,%(msecs)d %(name)s %(current_test)s %(levelname)s %(message)s',
datefmt='%H:%M:%S',
level=logging.DEBUG)
LOG = logging.getLogger('dtest')
# set python-driver log level to INFO by default for dtest
logging.getLogger('cassandra').setLevel(logging.INFO)
def get_sha(repo_dir):
try:
output = subprocess.check_output(['git', 'rev-parse', 'HEAD'], cwd=repo_dir).strip()
prefix = 'github:apache/'
local_repo_location = os.environ.get('LOCAL_GIT_REPO')
if local_repo_location is not None:
prefix = 'local:{}:'.format(local_repo_location) # local: slugs take the form 'local:/some/path/to/cassandra/:branch_name_or_sha'
return "{}{}".format(prefix, output)
except CalledProcessError as e:
if re.search('Not a git repository', e.message) is not None:
# we tried to get a sha, but repo_dir isn't a git repo. No big deal, must just be working from a non-git install.
return None
else:
# git call failed for some unknown reason
raise
# There are times when we want to know the C* version we're testing against
# before we call Tester.setUp. In the general case, we can't know that -- the
# test method could use any version it wants for self.cluster. However, we can
# get the version from build.xml in the C* repository specified by
# CASSANDRA_VERSION or CASSANDRA_DIR. This should use the same resolution
# strategy as the actual checkout code in Tester.setUp; if it does not, that is
# a bug.
_cassandra_version_slug = os.environ.get('CASSANDRA_VERSION')
# Prefer CASSANDRA_VERSION if it's set in the environment. If not, use CASSANDRA_DIR
if _cassandra_version_slug:
# fetch but don't build the specified C* version
ccm_repo_cache_dir, _ = ccmlib.repository.setup(_cassandra_version_slug)
CASSANDRA_VERSION_FROM_BUILD = get_version_from_build(ccm_repo_cache_dir)
CASSANDRA_GITREF = get_sha(ccm_repo_cache_dir) # will be set None when not a git repo
else:
CASSANDRA_VERSION_FROM_BUILD = get_version_from_build(CASSANDRA_DIR)
CASSANDRA_GITREF = get_sha(CASSANDRA_DIR)
# Determine the location of the libjemalloc jar so that we can specify it
# through environment variables when start Cassandra. This reduces startup
# time, making the dtests run faster.
def find_libjemalloc():
if is_win():
# let the normal bat script handle finding libjemalloc
return ""
this_dir = os.path.dirname(os.path.realpath(__file__))
script = os.path.join(this_dir, "findlibjemalloc.sh")
try:
p = subprocess.Popen([script], stdout=subprocess.PIPE, stderr=subprocess.PIPE)
stdout, stderr = p.communicate()
if stderr or not stdout:
return "-" # tells C* not to look for libjemalloc
else:
return stdout
except Exception as exc:
print "Failed to run script to prelocate libjemalloc ({}): {}".format(script, exc)
return ""
CASSANDRA_LIBJEMALLOC = find_libjemalloc()
# copy the initial environment variables so we can reset them later:
initial_environment = copy.deepcopy(os.environ)
class DtestTimeoutError(Exception):
pass
def reset_environment_vars():
os.environ.clear()
os.environ.update(initial_environment)
def warning(msg):
LOG.warning(msg, extra={"current_test": CURRENT_TEST})
if PRINT_DEBUG:
print "WARN: " + msg
def debug(msg):
LOG.debug(msg, extra={"current_test": CURRENT_TEST})
if PRINT_DEBUG:
print msg
debug("Python driver version in use: {}".format(cassandra.__version__))
def retry_till_success(fun, *args, **kwargs):
timeout = kwargs.pop('timeout', 60)
bypassed_exception = kwargs.pop('bypassed_exception', Exception)
deadline = time.time() + timeout
while True:
try:
return fun(*args, **kwargs)
except bypassed_exception:
if time.time() > deadline:
raise
else:
# brief pause before next attempt
time.sleep(0.25)
class FlakyRetryPolicy(RetryPolicy):
"""
A retry policy that retries 5 times by default, but can be configured to
retry more times.
"""
def __init__(self, max_retries=5):
self.max_retries = max_retries
def on_read_timeout(self, *args, **kwargs):
if kwargs['retry_num'] < self.max_retries:
debug("Retrying read after timeout. Attempt #" + str(kwargs['retry_num']))
return (self.RETRY, None)
else:
return (self.RETHROW, None)
def on_write_timeout(self, *args, **kwargs):
if kwargs['retry_num'] < self.max_retries:
debug("Retrying write after timeout. Attempt #" + str(kwargs['retry_num']))
return (self.RETRY, None)
else:
return (self.RETHROW, None)
def on_unavailable(self, *args, **kwargs):
if kwargs['retry_num'] < self.max_retries:
debug("Retrying request after UE. Attempt #" + str(kwargs['retry_num']))
return (self.RETRY, None)
else:
return (self.RETHROW, None)
class Runner(threading.Thread):
def __init__(self, func):
threading.Thread.__init__(self)
self.__func = func
self.__error = None
self.__stopped = False
self.daemon = True
def run(self):
i = 0
while True:
if self.__stopped:
return
try:
self.__func(i)
except Exception as e:
self.__error = e
return
i = i + 1
def stop(self):
self.__stopped = True
self.join()
if self.__error is not None:
raise self.__error
def check(self):
if self.__error is not None:
raise self.__error
def make_execution_profile(retry_policy=FlakyRetryPolicy(), consistency_level=ConsistencyLevel.ONE, **kwargs):
return ExecutionProfile(retry_policy=retry_policy,
consistency_level=consistency_level,
**kwargs)
class Tester(TestCase):
maxDiff = None
allow_log_errors = False # scan the log of each node for errors after every test.
cluster_options = None
def set_node_to_current_version(self, node):
version = os.environ.get('CASSANDRA_VERSION')
cdir = CASSANDRA_DIR
if version:
node.set_install_dir(version=version)
else:
node.set_install_dir(install_dir=cdir)
def init_config(self):
init_default_config(self.cluster, self.cluster_options)
def setUp(self):
self.set_current_tst_name()
kill_windows_cassandra_procs()
maybe_cleanup_cluster_from_last_test_file()
self.test_path = get_test_path()
self.cluster = create_ccm_cluster(self.test_path, name='test')
self.maybe_begin_active_log_watch()
maybe_setup_jacoco(self.test_path)
self.init_config()
write_last_test_file(self.test_path, self.cluster)
set_log_levels(self.cluster)
self.connections = []
self.runners = []
# this is intentionally spelled 'tst' instead of 'test' to avoid
# making unittest think it's a test method
def set_current_tst_name(self):
global CURRENT_TEST
CURRENT_TEST = self.id() + self._testMethodName
def maybe_begin_active_log_watch(self):
if ENABLE_ACTIVE_LOG_WATCHING:
if not self.allow_log_errors:
self.begin_active_log_watch()
def begin_active_log_watch(self):
"""
Calls into ccm to start actively watching logs.
In the event that errors are seen in logs, ccm will call back to _log_error_handler.
When the cluster is no longer in use, stop_active_log_watch should be called to end log watching.
(otherwise a 'daemon' thread will (needlessly) run until the process exits).
"""
# log watching happens in another thread, but we want it to halt the main
# thread's execution, which we have to do by registering a signal handler
signal.signal(signal.SIGINT, self._catch_interrupt)
self._log_watch_thread = self.cluster.actively_watch_logs_for_error(self._log_error_handler, interval=0.25)
def _log_error_handler(self, errordata):
"""
Callback handler used in conjunction with begin_active_log_watch.
When called, prepares exception instance, then will indirectly
cause _catch_interrupt to be called, which can raise the exception in the main
program thread.
@param errordata is a dictonary mapping node name to failure list.
"""
# in some cases self.allow_log_errors may get set after proactive log checking has been enabled
# so we need to double-check first thing before proceeding
if self.allow_log_errors:
return
reportable_errordata = OrderedDict()
for nodename, errors in errordata.items():
filtered_errors = list(self.__filter_errors(['\n'.join(msg) for msg in errors]))
if len(filtered_errors) is not 0:
reportable_errordata[nodename] = filtered_errors
# no errors worthy of halting the test
if not reportable_errordata:
return
message = "Errors seen in logs for: {nodes}".format(nodes=", ".join(reportable_errordata.keys()))
for nodename, errors in reportable_errordata.items():
for error in errors:
message += "\n{nodename}: {error}".format(nodename=nodename, error=error)
try:
debug('Errors were just seen in logs, ending test (if not ending already)!')
print_("Error details: \n{message}".format(message=message))
self.test_is_ending # will raise AttributeError if not present
except AttributeError:
self.test_is_ending = True
self.exit_with_exception = AssertionError("Log error encountered during active log scanning, see stdout")
# thread.interrupt_main will SIGINT in the main thread, which we can
# catch to raise an exception with useful information
thread.interrupt_main()
"""
Finds files matching the glob pattern specified as argument on
the given keyspace in all nodes
"""
def glob_data_dirs(self, path, ks="ks"):
result = []
for node in self.cluster.nodelist():
for data_dir in node.data_directories():
ks_dir = os.path.join(data_dir, ks, path)
result.extend(glob.glob(ks_dir))
return result
def _catch_interrupt(self, signal, frame):
"""
Signal handler for registering on SIGINT.
If called will look for a stored exception and raise it to abort test.
If a stored exception is not present, this handler has likely caught a
user interrupt via CTRL-C, and will raise a KeyboardInterrupt.
"""
try:
# check if we have a persisted exception to fail with
raise self.exit_with_exception
except AttributeError:
# looks like this was just a plain CTRL-C event
raise KeyboardInterrupt()
def copy_logs(self, cluster, directory=None, name=None):
"""Copy the current cluster's log files somewhere, by default to LOG_SAVED_DIR with a name of 'last'"""
if directory is None:
directory = LOG_SAVED_DIR
if name is None:
name = LAST_LOG
else:
name = os.path.join(directory, name)
if not os.path.exists(directory):
os.mkdir(directory)
logs = [(node.name, node.logfilename(), node.debuglogfilename(), node.gclogfilename(), node.compactionlogfilename())
for node in self.cluster.nodes.values()]
if len(logs) is not 0:
basedir = str(int(time.time() * 1000)) + '_' + self.id()
logdir = os.path.join(directory, basedir)
os.mkdir(logdir)
for n, log, debuglog, gclog, compactionlog in logs:
if os.path.exists(log):
self.assertGreaterEqual(os.path.getsize(log), 0)
shutil.copyfile(log, os.path.join(logdir, n + ".log"))
if os.path.exists(debuglog):
self.assertGreaterEqual(os.path.getsize(debuglog), 0)
shutil.copyfile(debuglog, os.path.join(logdir, n + "_debug.log"))
if os.path.exists(gclog):
self.assertGreaterEqual(os.path.getsize(gclog), 0)
shutil.copyfile(gclog, os.path.join(logdir, n + "_gc.log"))
if os.path.exists(compactionlog):
self.assertGreaterEqual(os.path.getsize(compactionlog), 0)
shutil.copyfile(compactionlog, os.path.join(logdir, n + "_compaction.log"))
if os.path.exists(name):
os.unlink(name)
if not is_win():
os.symlink(basedir, name)
def cql_connection(self, node, keyspace=None, user=None,
password=None, compression=True, protocol_version=None, port=None, ssl_opts=None, **kwargs):
return self._create_session(node, keyspace, user, password, compression,
protocol_version, port=port, ssl_opts=ssl_opts, **kwargs)
def exclusive_cql_connection(self, node, keyspace=None, user=None,
password=None, compression=True, protocol_version=None, port=None, ssl_opts=None, **kwargs):
node_ip = get_ip_from_node(node)
wlrr = WhiteListRoundRobinPolicy([node_ip])
return self._create_session(node, keyspace, user, password, compression,
protocol_version, port=port, ssl_opts=ssl_opts, load_balancing_policy=wlrr, **kwargs)
def _create_session(self, node, keyspace, user, password, compression, protocol_version,
port=None, ssl_opts=None, execution_profiles=None, **kwargs):
node_ip = get_ip_from_node(node)
if not port:
port = get_port_from_node(node)
if protocol_version is None:
protocol_version = get_eager_protocol_version(node.cluster.version())
if user is not None:
auth_provider = get_auth_provider(user=user, password=password)
else:
auth_provider = None
profiles = {EXEC_PROFILE_DEFAULT: make_execution_profile(**kwargs)
} if not execution_profiles else execution_profiles
cluster = PyCluster([node_ip],
auth_provider=auth_provider,
compression=compression,
protocol_version=protocol_version,
port=port,
ssl_options=ssl_opts,
connect_timeout=10,
allow_beta_protocol_version=True,
execution_profiles=profiles)
session = cluster.connect(wait_for_all_pools=True)
if keyspace is not None:
session.set_keyspace(keyspace)
self.connections.append(session)
return session
def patient_cql_connection(self, node, keyspace=None,
user=None, password=None, timeout=30, compression=True,
protocol_version=None, port=None, ssl_opts=None, **kwargs):
"""
Returns a connection after it stops throwing NoHostAvailables due to not being ready.
If the timeout is exceeded, the exception is raised.
"""
if is_win():
timeout *= 2
expected_log_lines = ('Control connection failed to connect, shutting down Cluster:', '[control connection] Error connecting to ')
with log_filter('cassandra.cluster', expected_log_lines):
session = retry_till_success(
self.cql_connection,
node,
keyspace=keyspace,
user=user,
password=password,
timeout=timeout,
compression=compression,
protocol_version=protocol_version,
port=port,
ssl_opts=ssl_opts,
bypassed_exception=NoHostAvailable,
**kwargs
)
return session
def patient_exclusive_cql_connection(self, node, keyspace=None,
user=None, password=None, timeout=30, compression=True,
protocol_version=None, port=None, ssl_opts=None, **kwargs):
"""
Returns a connection after it stops throwing NoHostAvailables due to not being ready.
If the timeout is exceeded, the exception is raised.
"""
if is_win():
timeout *= 2
return retry_till_success(
self.exclusive_cql_connection,
node,
keyspace=keyspace,
user=user,
password=password,
timeout=timeout,
compression=compression,
protocol_version=protocol_version,
port=port,
ssl_opts=ssl_opts,
bypassed_exception=NoHostAvailable,
**kwargs
)
@classmethod
def tearDownClass(cls):
reset_environment_vars()
if os.path.exists(LAST_TEST_DIR):
with open(LAST_TEST_DIR) as f:
test_path = f.readline().strip('\n')
name = f.readline()
try:
cluster = ClusterFactory.load(test_path, name)
# Avoid waiting too long for node to be marked down
if KEEP_TEST_DIR:
cluster.stop(gently=RECORD_COVERAGE)
else:
cluster.remove()
os.rmdir(test_path)
except IOError:
# after a restart, /tmp will be emptied so we'll get an IOError when loading the old cluster here
pass
try:
os.remove(LAST_TEST_DIR)
except IOError:
# Ignore - see comment above
pass
def tearDown(self):
# test_is_ending prevents active log watching from being able to interrupt the test
# which we don't want to happen once tearDown begins
self.test_is_ending = True
reset_environment_vars()
for con in self.connections:
con.cluster.shutdown()
for runner in self.runners:
try:
runner.stop()
except:
pass
failed = did_fail()
try:
if not self.allow_log_errors and self.check_logs_for_errors():
failed = True
raise AssertionError('Unexpected error in log, see stdout')
finally:
try:
# save the logs for inspection
if failed or KEEP_LOGS:
self.copy_logs(self.cluster)
except Exception as e:
print "Error saving log:", str(e)
finally:
log_watch_thread = getattr(self, '_log_watch_thread', None)
cleanup_cluster(self.cluster, self.test_path, log_watch_thread)
def check_logs_for_errors(self):
for node in self.cluster.nodelist():
errors = list(self.__filter_errors(
['\n'.join(msg) for msg in node.grep_log_for_errors()]))
if len(errors) is not 0:
for error in errors:
print_("Unexpected error in {node_name} log, error: \n{error}".format(node_name=node.name, error=error))
return True
def go(self, func):
runner = Runner(func)
self.runners.append(runner)
runner.start()
return runner
def skip(self, msg):
if not NO_SKIP:
raise SkipTest(msg)
def __filter_errors(self, errors):
"""Filter errors, removing those that match self.ignore_log_patterns"""
if not hasattr(self, 'ignore_log_patterns'):
self.ignore_log_patterns = []
for e in errors:
for pattern in self.ignore_log_patterns:
if re.search(pattern, e):
break
else:
yield e
# Disable docstrings printing in nosetest output
def shortDescription(self):
return None
def get_jfr_jvm_args(self):
"""
@return The JVM arguments required for attaching flight recorder to a Java process.
"""
return ["-XX:+UnlockCommercialFeatures", "-XX:+FlightRecorder"]
def start_jfr_recording(self, nodes):
"""
Start Java flight recorder provided the cluster was started with the correct jvm arguments.
"""
for node in nodes:
p = subprocess.Popen(['jcmd', str(node.pid), 'JFR.start'],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE)
stdout, stderr = p.communicate()
debug(stdout)
debug(stderr)
def dump_jfr_recording(self, nodes):
"""
Save Java flight recorder results to file for analyzing with mission control.
"""
for node in nodes:
p = subprocess.Popen(['jcmd', str(node.pid), 'JFR.dump',
'recording=1', 'filename=recording_{}.jfr'.format(node.address())],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE)
stdout, stderr = p.communicate()
debug(stdout)
debug(stderr)
def get_eager_protocol_version(cassandra_version):
"""
Returns the highest protocol version accepted
by the given C* version
"""
if cassandra_version >= '2.2':
protocol_version = 4
elif cassandra_version >= '2.1':
protocol_version = 3
elif cassandra_version >= '2.0':
protocol_version = 2
else:
protocol_version = 1
return protocol_version
# We default to UTF8Type because it's simpler to use in tests
def create_cf(session, name, key_type="varchar", speculative_retry=None, read_repair=None, compression=None,
gc_grace=None, columns=None, validation="UTF8Type", compact_storage=False):
additional_columns = ""
if columns is not None:
for k, v in columns.items():
additional_columns = "{}, {} {}".format(additional_columns, k, v)
if additional_columns == "":
query = 'CREATE COLUMNFAMILY %s (key %s, c varchar, v varchar, PRIMARY KEY(key, c)) WITH comment=\'test cf\'' % (name, key_type)
else:
query = 'CREATE COLUMNFAMILY %s (key %s PRIMARY KEY%s) WITH comment=\'test cf\'' % (name, key_type, additional_columns)
if compression is not None:
query = '%s AND compression = { \'sstable_compression\': \'%sCompressor\' }' % (query, compression)
else:
# if a compression option is omitted, C* will default to lz4 compression
query += ' AND compression = {}'
if read_repair is not None:
query = '%s AND read_repair_chance=%f AND dclocal_read_repair_chance=%f' % (query, read_repair, read_repair)
if gc_grace is not None:
query = '%s AND gc_grace_seconds=%d' % (query, gc_grace)
if speculative_retry is not None:
query = '%s AND speculative_retry=\'%s\'' % (query, speculative_retry)
if compact_storage:
query += ' AND COMPACT STORAGE'
session.execute(query)
time.sleep(0.2)
def create_ks(session, name, rf):
query = 'CREATE KEYSPACE %s WITH replication={%s}'
if isinstance(rf, types.IntType):
# we assume simpleStrategy
session.execute(query % (name, "'class':'SimpleStrategy', 'replication_factor':%d" % rf))
else:
assert_greater_equal(len(rf), 0, "At least one datacenter/rf pair is needed")
# we assume networkTopologyStrategy
options = (', ').join(['\'%s\':%d' % (d, r) for d, r in rf.iteritems()])
session.execute(query % (name, "'class':'NetworkTopologyStrategy', %s" % options))
session.execute('USE {}'.format(name))
def get_auth_provider(user, password):
return PlainTextAuthProvider(username=user, password=password)
def make_auth(user, password):
def private_auth(node_ip):
return {'username': user, 'password': password}
return private_auth
def get_port_from_node(node):
"""
Return the port that this node is listening on.
We only use this to connect the native driver,
so we only care about the binary port.
"""
try:
return node.network_interfaces['binary'][1]
except Exception:
raise RuntimeError("No network interface defined on this node object. {}".format(node.network_interfaces))
def get_ip_from_node(node):
if node.network_interfaces['binary']:
node_ip = node.network_interfaces['binary'][0]
else:
node_ip = node.network_interfaces['thrift'][0]
return node_ip
def kill_windows_cassandra_procs():
# On Windows, forcefully terminate any leftover previously running cassandra processes. This is a temporary
# workaround until we can determine the cause of intermittent hung-open tests and file-handles.
if is_win():
try:
import psutil
for proc in psutil.process_iter():
try:
pinfo = proc.as_dict(attrs=['pid', 'name', 'cmdline'])
except psutil.NoSuchProcess:
pass
else:
if (pinfo['name'] == 'java.exe' and '-Dcassandra' in pinfo['cmdline']):
print 'Found running cassandra process with pid: ' + str(pinfo['pid']) + '. Killing.'
psutil.Process(pinfo['pid']).kill()
except ImportError:
debug("WARN: psutil not installed. Cannot detect and kill "
"running cassandra processes - you may see cascading dtest failures.")
def get_test_path():
test_path = tempfile.mkdtemp(prefix='dtest-')
# ccm on cygwin needs absolute path to directory - it crosses from cygwin space into
# regular Windows space on wmic calls which will otherwise break pathing
if sys.platform == "cygwin":
process = subprocess.Popen(["cygpath", "-m", test_path], stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
test_path = process.communicate()[0].rstrip()
return test_path
# nose will discover this as a test, so we manually make it not a test
get_test_path.__test__ = False
def create_ccm_cluster(test_path, name):
debug("cluster ccm directory: " + test_path)
version = os.environ.get('CASSANDRA_VERSION')
cdir = CASSANDRA_DIR
if version:
cluster = Cluster(test_path, name, cassandra_version=version)
else:
cluster = Cluster(test_path, name, cassandra_dir=cdir)
if DISABLE_VNODES:
cluster.set_configuration_options(values={'num_tokens': None})
else:
cluster.set_configuration_options(values={'initial_token': None, 'num_tokens': NUM_TOKENS})
if OFFHEAP_MEMTABLES:
cluster.set_configuration_options(values={'memtable_allocation_type': 'offheap_objects'})
cluster.set_datadir_count(DATADIR_COUNT)
cluster.set_environment_variable('CASSANDRA_LIBJEMALLOC', CASSANDRA_LIBJEMALLOC)
return cluster
def cleanup_cluster(cluster, test_path, log_watch_thread=None):
with log_filter('cassandra'): # quiet noise from driver when nodes start going down
if KEEP_TEST_DIR:
cluster.stop(gently=RECORD_COVERAGE)
else:
# when recording coverage the jvm has to exit normally
# or the coverage information is not written by the jacoco agent
# otherwise we can just kill the process
if RECORD_COVERAGE:
cluster.stop(gently=True)
# Cleanup everything:
try:
if log_watch_thread:
stop_active_log_watch(log_watch_thread)
finally:
debug("removing ccm cluster {name} at: {path}".format(name=cluster.name, path=test_path))
cluster.remove()
debug("clearing ssl stores from [{0}] directory".format(test_path))
for filename in ('keystore.jks', 'truststore.jks', 'ccm_node.cer'):
try:
os.remove(os.path.join(test_path, filename))
except OSError as e:
# once we port to py3, which has better reporting for exceptions raised while
# handling other excpetions, we should just assert e.errno == errno.ENOENT
if e.errno != errno.ENOENT: # ENOENT = no such file or directory
raise
os.rmdir(test_path)
cleanup_last_test_dir()
def cleanup_last_test_dir():
if os.path.exists(LAST_TEST_DIR):
os.remove(LAST_TEST_DIR)
def stop_active_log_watch(log_watch_thread):
"""
Joins the log watching thread, which will then exit.
Should be called after each test, ideally after nodes are stopped but before cluster files are removed.
Can be called multiple times without error.
If not called, log watching thread will remain running until the parent process exits.
"""
log_watch_thread.join(timeout=60)
def maybe_cleanup_cluster_from_last_test_file():
# cleaning up if a previous execution didn't trigger tearDown (which
# can happen if it is interrupted by KeyboardInterrupt)
if os.path.exists(LAST_TEST_DIR):
with open(LAST_TEST_DIR) as f:
test_path = f.readline().strip('\n')
name = f.readline()
try:
cluster = ClusterFactory.load(test_path, name)
# Avoid waiting too long for node to be marked down
cleanup_cluster(cluster, test_path)
except IOError:
# after a restart, /tmp will be emptied so we'll get an IOError when loading the old cluster here
pass
def init_default_config(cluster, cluster_options):
# the failure detector can be quite slow in such tests with quick start/stop
phi_values = {'phi_convict_threshold': 5}
timeout = 10000
if cluster_options is not None:
values = merge_dicts(cluster_options, phi_values)
else:
values = merge_dicts(phi_values, {
'read_request_timeout_in_ms': timeout,
'range_request_timeout_in_ms': timeout,
'write_request_timeout_in_ms': timeout,
'truncate_request_timeout_in_ms': timeout,
'request_timeout_in_ms': timeout
})
# No more thrift in 4.0, and start_rpc doesn't exists anymore
if cluster.version() >= '4' and 'start_rpc' in values:
del values['start_rpc']
cluster.set_configuration_options(values)
debug("Done setting configuration options:\n" + pprint.pformat(cluster._config_options, indent=4))
def write_last_test_file(test_path, cluster):
with open(LAST_TEST_DIR, 'w') as f:
f.write(test_path + '\n')
f.write(cluster.name)
def set_log_levels(cluster):
if DEBUG:
cluster.set_log_level("DEBUG")
if TRACE:
cluster.set_log_level("TRACE")
if os.environ.get('DEBUG', 'no').lower() not in ('no', 'false', 'yes', 'true'):
classes_to_debug = os.environ.get('DEBUG').split(":")
cluster.set_log_level('DEBUG', None if len(classes_to_debug) == 0 else classes_to_debug)
if os.environ.get('TRACE', 'no').lower() not in ('no', 'false', 'yes', 'true'):
classes_to_trace = os.environ.get('TRACE').split(":")
cluster.set_log_level('TRACE', None if len(classes_to_trace) == 0 else classes_to_trace)
def maybe_setup_jacoco(test_path, cluster_name='test'):
"""Setup JaCoCo code coverage support"""
if not RECORD_COVERAGE:
return
# use explicit agent and execfile locations
# or look for a cassandra build if they are not specified
cdir = CASSANDRA_DIR
agent_location = os.environ.get('JACOCO_AGENT_JAR', os.path.join(cdir, 'build/lib/jars/jacocoagent.jar'))
jacoco_execfile = os.environ.get('JACOCO_EXECFILE', os.path.join(cdir, 'build/jacoco/jacoco.exec'))
if os.path.isfile(agent_location):
debug("Jacoco agent found at {}".format(agent_location))
with open(os.path.join(
test_path, cluster_name, 'cassandra.in.sh'), 'w') as f:
f.write('JVM_OPTS="$JVM_OPTS -javaagent:{jar_path}=destfile={exec_file}"'
.format(jar_path=agent_location, exec_file=jacoco_execfile))
if os.path.isfile(jacoco_execfile):
debug("Jacoco execfile found at {}, execution data will be appended".format(jacoco_execfile))
else:
debug("Jacoco execfile will be created at {}".format(jacoco_execfile))
else:
debug("Jacoco agent not found or is not file. Execution will not be recorded.")
def did_fail():
if sys.exc_info() == (None, None, None):
return False
exc_class, _, _ = sys.exc_info()
return not issubclass(exc_class, unittest.case.SkipTest)
class ReusableClusterTester(Tester):
"""
A Tester designed for reusing the same cluster across multiple
test methods. This makes test suites with many small tests run
much, much faster. However, there are a couple of downsides:
First, test setup and teardown must be diligent about cleaning
up any data or schema elements that may interfere with other
tests.
Second, errors triggered by one test method may cascade
into other test failures. In an attempt to limit this, the
cluster will be restarted if a test fails or an exception is
caught. However, there may still be undetected problems in
Cassandra that cause cascading failures.
"""
test_path = None
cluster = None
cluster_options = None
@classmethod
def setUpClass(cls):
kill_windows_cassandra_procs()
maybe_cleanup_cluster_from_last_test_file()
cls.initialize_cluster()
def setUp(self):
self.set_current_tst_name()
self.connections = []