From 7944af3c62a9426d7bf3df387db7a72e7f513fb7 Mon Sep 17 00:00:00 2001 From: Kostas Kyrimis Date: Mon, 17 Jul 2023 13:13:12 +0300 Subject: [PATCH] feat: Add black formatter to the project (#1544) Add black formatter and run it on pytests --- .pre-commit-config.yaml | 5 + pyproject.toml | 12 + tests/dragonfly/__init__.py | 30 ++- tests/dragonfly/cluster_test.py | 140 +++++----- tests/dragonfly/conftest.py | 85 +++--- tests/dragonfly/connection_test.py | 91 ++++--- tests/dragonfly/eval_test.py | 85 +++--- tests/dragonfly/generic_test.py | 21 +- tests/dragonfly/json_test.py | 22 +- tests/dragonfly/list_family_test.py | 13 +- tests/dragonfly/pymemcached_test.py | 3 + tests/dragonfly/redis_replication_test.py | 62 +++-- tests/dragonfly/replication_test.py | 315 ++++++++++++---------- tests/dragonfly/search_test.py | 123 +++++---- tests/dragonfly/sentinel_test.py | 65 +++-- tests/dragonfly/server_family_test.py | 15 +- tests/dragonfly/shutdown_test.py | 11 +- tests/dragonfly/snapshot_test.py | 37 ++- tests/dragonfly/utility.py | 170 ++++++------ tests/integration/async.py | 20 +- tests/integration/generate_sets.py | 40 +-- 21 files changed, 796 insertions(+), 569 deletions(-) create mode 100644 pyproject.toml diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index f293f1855436..739bd77b8b94 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -24,3 +24,8 @@ repos: hooks: - id: clang-format name: Clang formatting + + - repo: https://github.com/psf/black + rev: 23.7.0 + hooks: + - id: black diff --git a/pyproject.toml b/pyproject.toml new file mode 100644 index 000000000000..f03575d468a4 --- /dev/null +++ b/pyproject.toml @@ -0,0 +1,12 @@ +[tool.black] +line-length = 100 +include = '\.py$' +extend-exclude = ''' +/( + | .git + | .__pycache__ + | build-dbg + | build-opt + | helio +)/ +''' diff --git a/tests/dragonfly/__init__.py b/tests/dragonfly/__init__.py index 3b1c2ee44c88..1fb69a76a7c6 100644 --- a/tests/dragonfly/__init__.py +++ b/tests/dragonfly/__init__.py @@ -34,7 +34,7 @@ def __init__(self, params: DflyParams, args): self.args = args self.params = params self.proc = None - self._client : Optional[RedisClient] = None + self._client: Optional[RedisClient] = None def client(self) -> RedisClient: return RedisClient(port=self.port) @@ -70,8 +70,7 @@ def _start(self): return base_args = [f"--{v}" for v in self.params.args] all_args = self.format_args(self.args) + base_args - print( - f"Starting instance on {self.port} with arguments {all_args} from {self.params.path}") + print(f"Starting instance on {self.port} with arguments {all_args} from {self.params.path}") run_cmd = [self.params.path, *all_args] if self.params.gdb: @@ -82,8 +81,7 @@ def _check_status(self): if not self.params.existing_port: return_code = self.proc.poll() if return_code is not None: - raise Exception( - f"Failed to start instance, return code {return_code}") + raise Exception(f"Failed to start instance, return code {return_code}") def __getitem__(self, k): return self.args.get(k) @@ -93,11 +91,13 @@ def port(self) -> int: if self.params.existing_port: return self.params.existing_port return int(self.args.get("port", "6379")) + @property def admin_port(self) -> int: if self.params.existing_admin_port: return self.params.existing_admin_port return int(self.args.get("admin_port", "16379")) + @property def mc_port(self) -> int: if self.params.existing_mc_port: @@ -107,7 +107,7 @@ def mc_port(self) -> int: @staticmethod def format_args(args): out = [] - for (k, v) in args.items(): + for k, v in args.items(): out.append(f"--{k}") if v is not None: out.append(str(v)) @@ -118,7 +118,10 @@ async def metrics(self): resp = await session.get(f"http://localhost:{self.port}/metrics") data = await resp.text() await session.close() - return {metric_family.name : metric_family for metric_family in text_string_to_metric_families(data)} + return { + metric_family.name: metric_family + for metric_family in text_string_to_metric_families(data) + } class DflyInstanceFactory: @@ -142,7 +145,7 @@ def create(self, **kwargs) -> DflyInstance: return instance def start_all(self, instances): - """ Start multiple instances in parallel """ + """Start multiple instances in parallel""" for instance in instances: instance._start() @@ -162,17 +165,17 @@ def __str__(self): def dfly_args(*args): - """ Used to define a singular set of arguments for dragonfly test """ + """Used to define a singular set of arguments for dragonfly test""" return pytest.mark.parametrize("df_factory", args, indirect=True) def dfly_multi_test_args(*args): - """ Used to define multiple sets of arguments to test multiple dragonfly configurations """ + """Used to define multiple sets of arguments to test multiple dragonfly configurations""" return pytest.mark.parametrize("df_factory", args, indirect=True) -class PortPicker(): - """ A simple port manager to allocate available ports for tests """ +class PortPicker: + """A simple port manager to allocate available ports for tests""" def __init__(self): self.next_port = 5555 @@ -185,5 +188,6 @@ def get_available_port(self): def is_port_available(self, port): import socket + with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: - return s.connect_ex(('localhost', port)) != 0 + return s.connect_ex(("localhost", port)) != 0 diff --git a/tests/dragonfly/cluster_test.py b/tests/dragonfly/cluster_test.py index f00cf85b8b97..211530f10718 100644 --- a/tests/dragonfly/cluster_test.py +++ b/tests/dragonfly/cluster_test.py @@ -13,9 +13,9 @@ async def push_config(config, admin_connections): print("Pushing config ", config) - await asyncio.gather(*(c_admin.execute_command( - "DFLYCLUSTER", "CONFIG", config) - for c_admin in admin_connections)) + await asyncio.gather( + *(c_admin.execute_command("DFLYCLUSTER", "CONFIG", config) for c_admin in admin_connections) + ) async def get_node_id(admin_connection): @@ -38,15 +38,13 @@ async def test_cluster_commands_fails_when_not_emulate(self, async_client: aiore @dfly_args({"cluster_mode": "emulated"}) class TestEmulated: def test_cluster_slots_command(self, cluster_client: redis.RedisCluster): - expected = {(0, 16383): {'primary': ( - '127.0.0.1', 6379), 'replicas': []}} + expected = {(0, 16383): {"primary": ("127.0.0.1", 6379), "replicas": []}} res = cluster_client.execute_command("CLUSTER SLOTS") assert expected == res def test_cluster_help_command(self, cluster_client: redis.RedisCluster): # `target_nodes` is necessary because CLUSTER HELP is not mapped on redis-py - res = cluster_client.execute_command( - "CLUSTER HELP", target_nodes=redis.RedisCluster.RANDOM) + res = cluster_client.execute_command("CLUSTER HELP", target_nodes=redis.RedisCluster.RANDOM) assert "HELP" in res assert "SLOTS" in res @@ -61,31 +59,31 @@ def test_cluster_pipeline(self, cluster_client: redis.RedisCluster): @dfly_args({"cluster_mode": "emulated", "cluster_announce_ip": "127.0.0.2"}) class TestEmulatedWithAnnounceIp: def test_cluster_slots_command(self, cluster_client: redis.RedisCluster): - expected = {(0, 16383): {'primary': ( - '127.0.0.2', 6379), 'replicas': []}} + expected = {(0, 16383): {"primary": ("127.0.0.2", 6379), "replicas": []}} res = cluster_client.execute_command("CLUSTER SLOTS") assert expected == res -def verify_slots_result(ip: str, port: int, answer: list, rep_ip: str = None, rep_port: int = None) -> bool: +def verify_slots_result( + ip: str, port: int, answer: list, rep_ip: str = None, rep_port: int = None +) -> bool: def is_local_host(ip: str) -> bool: - return ip == '127.0.0.1' or ip == 'localhost' + return ip == "127.0.0.1" or ip == "localhost" - assert answer[0] == 0 # start shard - assert answer[1] == 16383 # last shard + assert answer[0] == 0 # start shard + assert answer[1] == 16383 # last shard if rep_ip is not None: assert len(answer) == 4 # the network info rep_info = answer[3] assert len(rep_info) == 3 - ip_addr = str(rep_info[0], 'utf-8') - assert ip_addr == rep_ip or ( - is_local_host(ip_addr) and is_local_host(ip)) + ip_addr = str(rep_info[0], "utf-8") + assert ip_addr == rep_ip or (is_local_host(ip_addr) and is_local_host(ip)) assert rep_info[1] == rep_port else: assert len(answer) == 3 info = answer[2] assert len(info) == 3 - ip_addr = str(info[0], 'utf-8') + ip_addr = str(info[0], "utf-8") assert ip_addr == ip or (is_local_host(ip_addr) and is_local_host(ip)) assert info[1] == port return True @@ -94,7 +92,7 @@ def is_local_host(ip: str) -> bool: @dfly_args({"proactor_threads": 4, "cluster_mode": "emulated"}) async def test_cluster_slots_in_replicas(df_local_factory): master = df_local_factory.create(port=BASE_PORT) - replica = df_local_factory.create(port=BASE_PORT+1, logtostdout=True) + replica = df_local_factory.create(port=BASE_PORT + 1, logtostdout=True) df_local_factory.start_all([master, replica]) @@ -103,45 +101,46 @@ async def test_cluster_slots_in_replicas(df_local_factory): res = await c_replica.execute_command("CLUSTER SLOTS") assert len(res) == 1 - assert verify_slots_result( - ip="127.0.0.1", port=replica.port, answer=res[0]) + assert verify_slots_result(ip="127.0.0.1", port=replica.port, answer=res[0]) res = await c_master.execute_command("CLUSTER SLOTS") - assert verify_slots_result( - ip="127.0.0.1", port=master.port, answer=res[0]) + assert verify_slots_result(ip="127.0.0.1", port=master.port, answer=res[0]) # Connect replica to master rc = await c_replica.execute_command(f"REPLICAOF localhost {master.port}") - assert str(rc, 'utf-8') == "OK" + assert str(rc, "utf-8") == "OK" await asyncio.sleep(0.5) res = await c_replica.execute_command("CLUSTER SLOTS") assert verify_slots_result( - ip="127.0.0.1", port=master.port, answer=res[0], rep_ip="127.0.0.1", rep_port=replica.port) + ip="127.0.0.1", port=master.port, answer=res[0], rep_ip="127.0.0.1", rep_port=replica.port + ) res = await c_master.execute_command("CLUSTER SLOTS") assert verify_slots_result( - ip="127.0.0.1", port=master.port, answer=res[0], rep_ip="127.0.0.1", rep_port=replica.port) + ip="127.0.0.1", port=master.port, answer=res[0], rep_ip="127.0.0.1", rep_port=replica.port + ) @dfly_args({"cluster_mode": "emulated", "cluster_announce_ip": "127.0.0.2"}) async def test_cluster_info(async_client): res = await async_client.execute_command("CLUSTER INFO") assert len(res) == 16 - assert res == {'cluster_current_epoch': '1', - 'cluster_known_nodes': '1', - 'cluster_my_epoch': '1', - 'cluster_size': '1', - 'cluster_slots_assigned': '16384', - 'cluster_slots_fail': '0', - 'cluster_slots_ok': '16384', - 'cluster_slots_pfail': '0', - 'cluster_state': 'ok', - 'cluster_stats_messages_meet_received': '0', - 'cluster_stats_messages_ping_received': '1', - 'cluster_stats_messages_ping_sent': '1', - 'cluster_stats_messages_pong_received': '1', - 'cluster_stats_messages_pong_sent': '1', - 'cluster_stats_messages_received': '1', - 'cluster_stats_messages_sent': '1' - } + assert res == { + "cluster_current_epoch": "1", + "cluster_known_nodes": "1", + "cluster_my_epoch": "1", + "cluster_size": "1", + "cluster_slots_assigned": "16384", + "cluster_slots_fail": "0", + "cluster_slots_ok": "16384", + "cluster_slots_pfail": "0", + "cluster_state": "ok", + "cluster_stats_messages_meet_received": "0", + "cluster_stats_messages_ping_received": "1", + "cluster_stats_messages_ping_sent": "1", + "cluster_stats_messages_pong_received": "1", + "cluster_stats_messages_pong_sent": "1", + "cluster_stats_messages_received": "1", + "cluster_stats_messages_sent": "1", + } @dfly_args({"cluster_mode": "emulated", "cluster_announce_ip": "127.0.0.2"}) @@ -149,14 +148,14 @@ async def test_cluster_info(async_client): async def test_cluster_nodes(async_client): res = await async_client.execute_command("CLUSTER NODES") assert len(res) == 1 - info = res['127.0.0.2:6379'] + info = res["127.0.0.2:6379"] assert res is not None - assert info['connected'] == True - assert info['epoch'] == '0' - assert info['flags'] == 'myself,master' - assert info['last_ping_sent'] == '0' - assert info['slots'] == [['0', '16383']] - assert info['master_id'] == "-" + assert info["connected"] == True + assert info["epoch"] == "0" + assert info["flags"] == "myself,master" + assert info["last_ping_sent"] == "0" + assert info["slots"] == [["0", "16383"]] + assert info["master_id"] == "-" """ @@ -166,11 +165,13 @@ async def test_cluster_nodes(async_client): intended. Also add keys to each of them that are *not* moved, and see that they are unaffected by the move. """ + + @dfly_args({"proactor_threads": 4, "cluster_mode": "yes"}) async def test_cluster_slot_ownership_changes(df_local_factory): # Start and configure cluster with 2 nodes nodes = [ - df_local_factory.create(port=BASE_PORT+i, admin_port=BASE_PORT+i+1000) + df_local_factory.create(port=BASE_PORT + i, admin_port=BASE_PORT + i + 1000) for i in range(2) ] @@ -214,7 +215,10 @@ async def test_cluster_slot_ownership_changes(df_local_factory): ] """ - await push_config(config.replace('LAST_SLOT_CUTOFF', '5259').replace('NEXT_SLOT_CUTOFF', '5260'), c_nodes_admin) + await push_config( + config.replace("LAST_SLOT_CUTOFF", "5259").replace("NEXT_SLOT_CUTOFF", "5260"), + c_nodes_admin, + ) # Slot for "KEY1" is 5259 @@ -243,7 +247,10 @@ async def test_cluster_slot_ownership_changes(df_local_factory): print("Moving ownership over 5259 ('KEY1') to other node") - await push_config(config.replace('LAST_SLOT_CUTOFF', '5258').replace('NEXT_SLOT_CUTOFF', '5259'), c_nodes_admin) + await push_config( + config.replace("LAST_SLOT_CUTOFF", "5258").replace("NEXT_SLOT_CUTOFF", "5259"), + c_nodes_admin, + ) # node0 should have removed "KEY1" as it no longer owns it assert await c_nodes[0].execute_command("DBSIZE") == 1 @@ -292,8 +299,8 @@ async def test_cluster_slot_ownership_changes(df_local_factory): @dfly_args({"proactor_threads": 4, "cluster_mode": "yes"}) async def test_cluster_replica_sets_non_owned_keys(df_local_factory): # Start and configure cluster with 1 master and 1 replica, both own all slots - master = df_local_factory.create(port=BASE_PORT, admin_port=BASE_PORT+1000) - replica = df_local_factory.create(port=BASE_PORT+1, admin_port=BASE_PORT+1001) + master = df_local_factory.create(port=BASE_PORT, admin_port=BASE_PORT + 1000) + replica = df_local_factory.create(port=BASE_PORT + 1, admin_port=BASE_PORT + 1001) df_local_factory.start_all([master, replica]) c_master = aioredis.Redis(port=master.port) @@ -404,8 +411,8 @@ async def test_cluster_replica_sets_non_owned_keys(df_local_factory): @dfly_args({"proactor_threads": 4, "cluster_mode": "yes"}) async def test_cluster_flush_slots_after_config_change(df_local_factory): # Start and configure cluster with 1 master and 1 replica, both own all slots - master = df_local_factory.create(port=BASE_PORT, admin_port=BASE_PORT+1000) - replica = df_local_factory.create(port=BASE_PORT+1, admin_port=BASE_PORT+1001) + master = df_local_factory.create(port=BASE_PORT, admin_port=BASE_PORT + 1000) + replica = df_local_factory.create(port=BASE_PORT + 1, admin_port=BASE_PORT + 1001) df_local_factory.start_all([master, replica]) c_master = aioredis.Redis(port=master.port) @@ -453,7 +460,7 @@ async def test_cluster_flush_slots_after_config_change(df_local_factory): resp = await c_master_admin.execute_command("dflycluster", "getslotinfo", "slots", "0") assert resp[0][0] == 0 slot_0_size = resp[0][2] - print(f'Slot 0 size = {slot_0_size}') + print(f"Slot 0 size = {slot_0_size}") assert slot_0_size > 0 config = f""" @@ -512,7 +519,7 @@ async def test_cluster_flush_slots_after_config_change(df_local_factory): async def test_cluster_native_client(df_local_factory): # Start and configure cluster with 3 masters and 3 replicas masters = [ - df_local_factory.create(port=BASE_PORT+i, admin_port=BASE_PORT+i+1000) + df_local_factory.create(port=BASE_PORT + i, admin_port=BASE_PORT + i + 1000) for i in range(3) ] df_local_factory.start_all(masters) @@ -521,7 +528,7 @@ async def test_cluster_native_client(df_local_factory): master_ids = await asyncio.gather(*(get_node_id(c) for c in c_masters_admin)) replicas = [ - df_local_factory.create(port=BASE_PORT+100+i, admin_port=BASE_PORT+i+1100) + df_local_factory.create(port=BASE_PORT + 100 + i, admin_port=BASE_PORT + i + 1100) for i in range(3) ] df_local_factory.start_all(replicas) @@ -597,22 +604,23 @@ async def test_cluster_native_client(df_local_factory): client = aioredis.RedisCluster(decode_responses=True, host="localhost", port=masters[0].port) - assert await client.set('key0', 'value') == True - assert await client.get('key0') == 'value' + assert await client.set("key0", "value") == True + assert await client.get("key0") == "value" async def test_random_keys(): for i in range(100): - key = 'key' + str(random.randint(0, 100_000)) - assert await client.set(key, 'value') == True - assert await client.get(key) == 'value' + key = "key" + str(random.randint(0, 100_000)) + assert await client.set(key, "value") == True + assert await client.get(key) == "value" await test_random_keys() await asyncio.gather(*(wait_available_async(c) for c in c_replicas)) # Make sure that getting a value from a replica works as well. replica_response = await client.execute_command( - 'get', 'key0', target_nodes=aioredis.RedisCluster.REPLICAS) - assert 'value' in replica_response.values() + "get", "key0", target_nodes=aioredis.RedisCluster.REPLICAS + ) + assert "value" in replica_response.values() # Push new config config = f""" diff --git a/tests/dragonfly/conftest.py b/tests/dragonfly/conftest.py index d47be272f6d5..f16b871681d4 100644 --- a/tests/dragonfly/conftest.py +++ b/tests/dragonfly/conftest.py @@ -21,7 +21,7 @@ from . import DflyInstance, DflyInstanceFactory, DflyParams, PortPicker, dfly_args from .utility import DflySeederFactory, gen_certificate -logging.getLogger('asyncio').setLevel(logging.WARNING) +logging.getLogger("asyncio").setLevel(logging.WARNING) DATABASE_INDEX = 1 @@ -55,7 +55,6 @@ def df_seeder_factory(request) -> DflySeederFactory: if seed is None: seed = random.randrange(sys.maxsize) - random.seed(int(seed)) print(f"--- Random seed: {seed}, check: {random.randrange(100)} ---") @@ -68,8 +67,7 @@ def df_factory(request, tmp_dir, test_env) -> DflyInstanceFactory: Create an instance factory with supplied params. """ scripts_dir = os.path.dirname(os.path.abspath(__file__)) - path = os.environ.get("DRAGONFLY_PATH", os.path.join( - scripts_dir, '../../build-dbg/dragonfly')) + path = os.environ.get("DRAGONFLY_PATH", os.path.join(scripts_dir, "../../build-dbg/dragonfly")) args = request.param if request.param else {} existing = request.config.getoption("--existing-port") @@ -83,7 +81,7 @@ def df_factory(request, tmp_dir, test_env) -> DflyInstanceFactory: existing_port=int(existing) if existing else None, existing_admin_port=int(existing_admin) if existing_admin else None, existing_mc_port=int(existing_mc) if existing_mc else None, - env=test_env + env=test_env, ) factory = DflyInstanceFactory(params, args) @@ -121,15 +119,15 @@ def df_server(df_factory: DflyInstanceFactory) -> DflyInstance: # TODO: Investigate spurious open connection with cluster client # if not instance['cluster_mode']: - # TODO: Investigate adding fine grain control over the pool by - # by adding a cache ontop of the clients connection pool and then evict - # properly with client.connection_pool.disconnect() avoiding non synced - # side effects - # assert clients_left == [] + # TODO: Investigate adding fine grain control over the pool by + # by adding a cache ontop of the clients connection pool and then evict + # properly with client.connection_pool.disconnect() avoiding non synced + # side effects + # assert clients_left == [] # else: # print("Cluster clients left: ", len(clients_left)) - if instance['cluster_mode']: + if instance["cluster_mode"]: print("Cluster clients left: ", len(clients_left)) @@ -160,8 +158,7 @@ def cluster_client(df_server): """ Return a cluster client to the default instance with all entries flushed. """ - client = redis.RedisCluster(decode_responses=True, host="localhost", - port=df_server.port) + client = redis.RedisCluster(decode_responses=True, host="localhost", port=df_server.port) client.client_setname("default-cluster-fixture") client.flushall() @@ -171,11 +168,17 @@ def cluster_client(df_server): @pytest_asyncio.fixture(scope="function") async def async_pool(df_server: DflyInstance): - pool = aioredis.ConnectionPool(host="localhost", port=df_server.port, - db=DATABASE_INDEX, decode_responses=True, max_connections=32) + pool = aioredis.ConnectionPool( + host="localhost", + port=df_server.port, + db=DATABASE_INDEX, + decode_responses=True, + max_connections=32, + ) yield pool await pool.disconnect(inuse_connections=True) + @pytest_asyncio.fixture(scope="function") async def async_client(async_pool): """ @@ -197,25 +200,35 @@ def pytest_addoption(parser): --existing-admin-port - to provide an admin port to an existing process instead of starting a new instance --rand-seed - to set the global random seed """ + parser.addoption("--gdb", action="store_true", default=False, help="Run instances in gdb") + parser.addoption("--df", action="append", default=[], help="Add arguments to dragonfly") parser.addoption( - '--gdb', action='store_true', default=False, help='Run instances in gdb' + "--log-seeder", action="store", default=None, help="Store last generator commands in file" ) parser.addoption( - '--df', action='append', default=[], help='Add arguments to dragonfly' + "--rand-seed", + action="store", + default=None, + help="Set seed for global random. Makes seeder predictable", ) parser.addoption( - '--log-seeder', action='store', default=None, help='Store last generator commands in file' + "--existing-port", + action="store", + default=None, + help="Provide a port to the existing process for the test", ) parser.addoption( - '--rand-seed', action='store', default=None, help='Set seed for global random. Makes seeder predictable' + "--existing-admin-port", + action="store", + default=None, + help="Provide an admin port to the existing process for the test", ) - parser.addoption( - '--existing-port', action='store', default=None, help='Provide a port to the existing process for the test') - parser.addoption( - '--existing-admin-port', action='store', default=None, help='Provide an admin port to the existing process for the test') parser.addoption( - '--existing-mc-port', action='store', default=None, help='Provide a port to the existing memcached process for the test' + "--existing-mc-port", + action="store", + default=None, + help="Provide a port to the existing memcached process for the test", ) @@ -251,11 +264,15 @@ def with_tls_server_args(tmp_dir, gen_ca_cert): tls_server_req = os.path.join(tmp_dir, "df-req.pem") tls_server_cert = os.path.join(tmp_dir, "df-cert.pem") - gen_certificate(gen_ca_cert["ca_key"], gen_ca_cert["ca_cert"], tls_server_req, tls_server_key, tls_server_cert) + gen_certificate( + gen_ca_cert["ca_key"], + gen_ca_cert["ca_cert"], + tls_server_req, + tls_server_key, + tls_server_cert, + ) - args = {"tls": "", - "tls_key_file": tls_server_key, - "tls_cert_file": tls_server_cert} + args = {"tls": "", "tls_key_file": tls_server_key, "tls_cert_file": tls_server_cert} return args @@ -272,11 +289,15 @@ def with_tls_client_args(tmp_dir, gen_ca_cert): tls_client_req = os.path.join(tmp_dir, "client-req.pem") tls_client_cert = os.path.join(tmp_dir, "client-cert.pem") - gen_certificate(gen_ca_cert["ca_key"], gen_ca_cert["ca_cert"], tls_client_req, tls_client_key, tls_client_cert) + gen_certificate( + gen_ca_cert["ca_key"], + gen_ca_cert["ca_cert"], + tls_client_req, + tls_client_key, + tls_client_cert, + ) - args = {"ssl": True, - "ssl_keyfile": tls_client_key, - "ssl_certfile": tls_client_cert} + args = {"ssl": True, "ssl_keyfile": tls_client_key, "ssl_certfile": tls_client_cert} return args diff --git a/tests/dragonfly/connection_test.py b/tests/dragonfly/connection_test.py index 04d198e86520..5ada888975f9 100644 --- a/tests/dragonfly/connection_test.py +++ b/tests/dragonfly/connection_test.py @@ -9,6 +9,7 @@ BASE_PORT = 1111 + async def run_monitor_eval(monitor, expected): async with monitor as mon: count = 0 @@ -29,33 +30,32 @@ async def run_monitor_eval(monitor, expected): return False return True -''' + +""" Test issue https://github.com/dragonflydb/dragonfly/issues/756 Monitor command do not return when we have lua script issue -''' +""" @pytest.mark.asyncio async def test_monitor_command_lua(async_pool): - expected = ["EVAL return redis", - "EVAL return redis", "SET foo2"] + expected = ["EVAL return redis", "EVAL return redis", "SET foo2"] conn = aioredis.Redis(connection_pool=async_pool) monitor = conn.monitor() cmd1 = aioredis.Redis(connection_pool=async_pool) - future = asyncio.create_task(run_monitor_eval( - monitor=monitor, expected=expected)) + future = asyncio.create_task(run_monitor_eval(monitor=monitor, expected=expected)) await asyncio.sleep(0.1) try: res = await cmd1.eval(r'return redis.call("GET", "bar")', 0) - assert False # this will return an error + assert False # this will return an error except Exception as e: assert "script tried accessing undeclared key" in str(e) try: - res = await cmd1.eval(r'return redis.call("SET", KEYS[1], ARGV[1])', 1, 'foo2', 'bar2') + res = await cmd1.eval(r'return redis.call("SET", KEYS[1], ARGV[1])', 1, "foo2", "bar2") except Exception as e: print(f"EVAL error: {e}") assert False @@ -66,12 +66,12 @@ async def test_monitor_command_lua(async_pool): assert status -''' +""" Test the monitor command. Open connection which is used for monitoring Then send on other connection commands to dragonfly instance Make sure that we are getting the commands in the monitor context -''' +""" @pytest.mark.asyncio @@ -101,9 +101,11 @@ async def process_cmd(monitor, key, value): if "select" not in response["command"].lower(): success = verify_response(response, key, value) if not success: - print( - f"failed to verify message {response} for {key}/{value}") - return False, f"failed on the verification of the message {response} at {key}: {value}" + print(f"failed to verify message {response} for {key}/{value}") + return ( + False, + f"failed on the verification of the message {response} at {key}: {value}", + ) else: return True, None except asyncio.TimeoutError: @@ -146,11 +148,11 @@ async def run_monitor(messages: dict, pool: aioredis.ConnectionPool): return False, f"monitor result: {status}: {message}, set command success {success}" -''' +""" Run test in pipeline mode. This is mostly how this is done with python - its more like a transaction that the connections is running all commands in its context -''' +""" @pytest.mark.asyncio @@ -194,12 +196,12 @@ async def run_pipeline_mode(async_client: aioredis.Redis, messages): return True, "all command processed successfully" -''' +""" Test the pipeline command Open connection to the subscriber and publish on the other end messages Make sure that we are able to send all of them and that we are getting the expected results on the subscriber side -''' +""" @pytest.mark.asyncio @@ -232,7 +234,10 @@ async def run_pubsub(async_client, messages, channel_name): if status and success: return True, "successfully completed all" else: - return False, f"subscriber result: {status}: {message}, publisher publish: success {success}" + return ( + False, + f"subscriber result: {status}: {message}, publisher publish: success {success}", + ) async def run_multi_pubsub(async_client, messages, channel_name): @@ -241,7 +246,8 @@ async def run_multi_pubsub(async_client, messages, channel_name): await s.subscribe(channel_name) tasks = [ - asyncio.create_task(reader(s, messages, random.randint(0, len(messages)))) for s in subs] + asyncio.create_task(reader(s, messages, random.randint(0, len(messages)))) for s in subs + ] success = True @@ -260,18 +266,18 @@ async def run_multi_pubsub(async_client, messages, channel_name): if success: for status, message in results: if not status: - return False, f"failed to process {message}" + return False, f"failed to process {message}" return True, "success" else: return False, "failed to publish" -''' +""" Test with multiple subscribers for a channel We want to stress this to see if we have any issue with the pub sub code since we are "sharing" the message across multiple connections internally -''' +""" @pytest.mark.asyncio @@ -279,6 +285,7 @@ async def test_multi_pubsub(async_client): def generate(max): for i in range(max): yield f"this is message number {i} from the publisher on the channel" + messages = [a for a in generate(500)] state, message = await run_multi_pubsub(async_client, messages, "my-channel") @@ -288,8 +295,13 @@ def generate(max): @pytest.mark.asyncio async def test_subscribers_with_active_publisher(df_server: DflyInstance, max_connections=100): # TODO: I am not how to customize the max connections for the pool. - async_pool = aioredis.ConnectionPool(host="localhost", port=df_server.port, - db=0, decode_responses=True, max_connections=max_connections) + async_pool = aioredis.ConnectionPool( + host="localhost", + port=df_server.port, + db=0, + decode_responses=True, + max_connections=max_connections, + ) async def publish_worker(): client = aioredis.Redis(connection_pool=async_pool) @@ -322,12 +334,12 @@ async def subscribe_worker(): async def test_big_command(df_server, size=8 * 1024): - reader, writer = await asyncio.open_connection('127.0.0.1', df_server.port) + reader, writer = await asyncio.open_connection("127.0.0.1", df_server.port) writer.write(f"SET a {'v'*size}\n".encode()) await writer.drain() - assert 'OK' in (await reader.readline()).decode() + assert "OK" in (await reader.readline()).decode() writer.close() await writer.wait_closed() @@ -335,9 +347,8 @@ async def test_big_command(df_server, size=8 * 1024): async def test_subscribe_pipelined(async_client: aioredis.Redis): pipe = async_client.pipeline(transaction=False) - pipe.execute_command('subscribe channel').execute_command( - 'subscribe channel') - await pipe.echo('bye bye').execute() + pipe.execute_command("subscribe channel").execute_command("subscribe channel") + await pipe.echo("bye bye").execute() async def test_subscribe_in_pipeline(async_client: aioredis.Redis): @@ -349,8 +360,8 @@ async def test_subscribe_in_pipeline(async_client: aioredis.Redis): pipe.echo("three") res = await pipe.execute() - assert res == ['one', ['subscribe', 'ch1', 1], - 'two', ['subscribe', 'ch2', 2], 'three'] + assert res == ["one", ["subscribe", "ch1", 1], "two", ["subscribe", "ch2", 2], "three"] + """ This test makes sure that Dragonfly can receive blocks of pipelined commands even @@ -376,9 +387,13 @@ async def test_subscribe_in_pipeline(async_client: aioredis.Redis): MGET m7 m8 m9\n """ -PACKET3 = """ +PACKET3 = ( + """ PING -""" * 500 + "ECHO DONE\n" +""" + * 500 + + "ECHO DONE\n" +) async def test_parser_while_script_running(async_client: aioredis.Redis, df_server: DflyInstance): @@ -386,7 +401,7 @@ async def test_parser_while_script_running(async_client: aioredis.Redis, df_serv # Use a raw tcp connection for strict control of sent commands # Below we send commands while the previous ones didn't finish - reader, writer = await asyncio.open_connection('localhost', df_server.port) + reader, writer = await asyncio.open_connection("localhost", df_server.port) # Send first pipeline packet, last commands is a long executing script writer.write(PACKET1.format(sha=sha).encode()) @@ -409,7 +424,9 @@ async def test_parser_while_script_running(async_client: aioredis.Redis, df_serv @dfly_args({"proactor_threads": 1}) async def test_large_cmd(async_client: aioredis.Redis): MAX_ARR_SIZE = 65535 - res = await async_client.hset('foo', mapping={f"key{i}": f"val{i}" for i in range(MAX_ARR_SIZE // 2)}) + res = await async_client.hset( + "foo", mapping={f"key{i}": f"val{i}" for i in range(MAX_ARR_SIZE // 2)} + ) assert res == MAX_ARR_SIZE // 2 res = await async_client.mset({f"key{i}": f"val{i}" for i in range(MAX_ARR_SIZE // 2)}) @@ -421,7 +438,9 @@ async def test_large_cmd(async_client: aioredis.Redis): @pytest.mark.asyncio async def test_reject_non_tls_connections_on_tls(with_tls_server_args, df_local_factory): - server = df_local_factory.create(no_tls_on_admin_port="true", admin_port=1111, port=1211, **with_tls_server_args) + server = df_local_factory.create( + no_tls_on_admin_port="true", admin_port=1111, port=1211, **with_tls_server_args + ) server.start() client = aioredis.Redis(port=server.port) diff --git a/tests/dragonfly/eval_test.py b/tests/dragonfly/eval_test.py index 943e597a1359..1a62fc409bf2 100644 --- a/tests/dragonfly/eval_test.py +++ b/tests/dragonfly/eval_test.py @@ -75,14 +75,12 @@ """ -def DJANGO_CACHEOPS_SCHEMA(vs): return { - "table_1": [ - {"f-1": f'v-{vs[0]}'}, {"f-2": f'v-{vs[1]}'} - ], - "table_2": [ - {"f-1": f'v-{vs[2]}'}, {"f-2": f'v-{vs[3]}'} - ] -} +def DJANGO_CACHEOPS_SCHEMA(vs): + return { + "table_1": [{"f-1": f"v-{vs[0]}"}, {"f-2": f"v-{vs[1]}"}], + "table_2": [{"f-1": f"v-{vs[2]}"}, {"f-2": f"v-{vs[3]}"}], + } + """ Test the main caching script of https://github.com/Suor/django-cacheops. @@ -91,21 +89,25 @@ def DJANGO_CACHEOPS_SCHEMA(vs): return { """ -@dfly_multi_test_args({'default_lua_flags': 'allow-undeclared-keys', 'proactor_threads': 4}, - {'default_lua_flags': 'allow-undeclared-keys disable-atomicity', 'proactor_threads': 4}) +@dfly_multi_test_args( + {"default_lua_flags": "allow-undeclared-keys", "proactor_threads": 4}, + {"default_lua_flags": "allow-undeclared-keys disable-atomicity", "proactor_threads": 4}, +) async def test_django_cacheops_script(async_client, num_keys=500): script = async_client.register_script(DJANGO_CACHEOPS_SCRIPT) - data = [(f'k-{k}', [random.randint(0, 10) for _ in range(4)]) - for k in range(num_keys)] + data = [(f"k-{k}", [random.randint(0, 10) for _ in range(4)]) for k in range(num_keys)] for k, vs in data: schema = DJANGO_CACHEOPS_SCHEMA(vs) - assert await script(keys=['', k, ''], args=['a' * 10, json.dumps(schema, sort_keys=True), 100]) == 'OK' + assert ( + await script(keys=["", k, ""], args=["a" * 10, json.dumps(schema, sort_keys=True), 100]) + == "OK" + ) # Check schema was built correctly base_schema = DJANGO_CACHEOPS_SCHEMA([0] * 4) for table, fields in base_schema.items(): - schema = await async_client.smembers(f'schemes:{table}') + schema = await async_client.smembers(f"schemes:{table}") fields = set.union(*(set(part.keys()) for part in fields)) assert schema == fields @@ -114,9 +116,9 @@ async def test_django_cacheops_script(async_client, num_keys=500): assert await async_client.exists(k) for table, fields in DJANGO_CACHEOPS_SCHEMA(vs).items(): for sub_schema in fields: - conj_key = f'conj:{table}:' + \ - '&'.join("{}={}".format(f, v) - for f, v in sub_schema.items()) + conj_key = f"conj:{table}:" + "&".join( + "{}={}".format(f, v) for f, v in sub_schema.items() + ) assert await async_client.sismember(conj_key, k) @@ -158,24 +160,29 @@ async def test_django_cacheops_script(async_client, num_keys=500): """ -@dfly_multi_test_args({'default_lua_flags': 'allow-undeclared-keys', 'proactor_threads': 4}, - {'default_lua_flags': 'allow-undeclared-keys disable-atomicity', 'proactor_threads': 4}) +@dfly_multi_test_args( + {"default_lua_flags": "allow-undeclared-keys", "proactor_threads": 4}, + {"default_lua_flags": "allow-undeclared-keys disable-atomicity", "proactor_threads": 4}, +) async def test_golang_asynq_script(async_pool, num_queues=10, num_tasks=100): async def enqueue_worker(queue): client = aioredis.Redis(connection_pool=async_pool) enqueue = client.register_script(ASYNQ_ENQUEUE_SCRIPT) - task_ids = 2*list(range(num_tasks)) + task_ids = 2 * list(range(num_tasks)) random.shuffle(task_ids) - res = [await enqueue(keys=[f"asynq:{{{queue}}}:t:{task_id}", f"asynq:{{{queue}}}:pending"], - args=[f"{task_id}", task_id, int(time.time())]) - for task_id in task_ids] + res = [ + await enqueue( + keys=[f"asynq:{{{queue}}}:t:{task_id}", f"asynq:{{{queue}}}:pending"], + args=[f"{task_id}", task_id, int(time.time())], + ) + for task_id in task_ids + ] assert sum(res) == num_tasks # Start filling the queues - jobs = [asyncio.create_task(enqueue_worker( - f"q-{queue}")) for queue in range(num_queues)] + jobs = [asyncio.create_task(enqueue_worker(f"q-{queue}")) for queue in range(num_queues)] collected = 0 @@ -185,15 +192,19 @@ async def dequeue_worker(): dequeue = client.register_script(ASYNQ_DEQUE_SCRIPT) while collected < num_tasks * num_queues: - #pct = round(collected/(num_tasks*num_queues), 2) - #print(f'\r \r{pct}', end='', flush=True) + # pct = round(collected/(num_tasks*num_queues), 2) + # print(f'\r \r{pct}', end='', flush=True) for queue in (f"q-{queue}" for queue in range(num_queues)): prefix = f"asynq:{{{queue}}}:t:" - msg = await dequeue(keys=[f"asynq:{{{queue}}}:"+t for t in ["pending", "paused", "active", "lease"]], - args=[int(time.time()), prefix]) + msg = await dequeue( + keys=[ + f"asynq:{{{queue}}}:" + t for t in ["pending", "paused", "active", "lease"] + ], + args=[int(time.time()), prefix], + ) if msg is not None: collected += 1 - assert await client.hget(prefix+msg, 'state') == 'active' + assert await client.hget(prefix + msg, "state") == "active" # Run many contending workers await asyncio.gather(*(dequeue_worker() for _ in range(num_queues * 2))) @@ -204,19 +215,19 @@ async def dequeue_worker(): ERROR_CALL_SCRIPT_TEMPLATE = [ "redis.{}('LTRIM', 'l', 'a', 'b')", # error only on evaluation - "redis.{}('obviously wrong')" # error immediately on preprocessing + "redis.{}('obviously wrong')", # error immediately on preprocessing ] @dfly_args({"proactor_threads": 1}) @pytest.mark.asyncio async def test_eval_error_propagation(async_client): - CMDS = ['call', 'pcall', 'acall', 'apcall'] + CMDS = ["call", "pcall", "acall", "apcall"] for cmd, template in itertools.product(CMDS, ERROR_CALL_SCRIPT_TEMPLATE): - does_abort = 'p' not in cmd + does_abort = "p" not in cmd try: - await async_client.eval(template.format(cmd), 1, 'l') + await async_client.eval(template.format(cmd), 1, "l") if does_abort: assert False, "Eval must have thrown an error: " + cmd except aioredis.RedisError as e: @@ -230,12 +241,12 @@ async def test_global_eval_in_multi(async_client: aioredis.Redis): return redis.call('GET', 'any-key'); """ - await async_client.set('any-key', 'works') + await async_client.set("any-key", "works") pipe = async_client.pipeline(transaction=True) - pipe.set('another-key', 'ok') + pipe.set("another-key", "ok") pipe.eval(GLOBAL_SCRIPT, 0) res = await pipe.execute() print(res) - assert res[1] == 'works' + assert res[1] == "works" diff --git a/tests/dragonfly/generic_test.py b/tests/dragonfly/generic_test.py index c25d0d8c5975..76aa1dd85b1d 100644 --- a/tests/dragonfly/generic_test.py +++ b/tests/dragonfly/generic_test.py @@ -8,22 +8,24 @@ from .utility import batch_fill_data, gen_test_data -@dfly_multi_test_args({'keys_output_limit': 512}, {'keys_output_limit': 1024}) +@dfly_multi_test_args({"keys_output_limit": 512}, {"keys_output_limit": 1024}) class TestKeys: async def test_max_keys(self, async_client: aioredis.Redis, df_server): - max_keys = df_server['keys_output_limit'] + max_keys = df_server["keys_output_limit"] pipe = async_client.pipeline() batch_fill_data(pipe, gen_test_data(max_keys * 3)) await pipe.execute() keys = await async_client.keys() - assert len(keys) in range(max_keys, max_keys+512) + assert len(keys) in range(max_keys, max_keys + 512) + @pytest.fixture(scope="function") def export_dfly_password() -> str: - pwd = 'flypwd' - os.environ['DFLY_PASSWORD'] = pwd + pwd = "flypwd" + os.environ["DFLY_PASSWORD"] = pwd yield pwd - del os.environ['DFLY_PASSWORD'] + del os.environ["DFLY_PASSWORD"] + async def test_password(df_local_factory, export_dfly_password): dfly = df_local_factory.create() @@ -38,7 +40,7 @@ async def test_password(df_local_factory, export_dfly_password): dfly.stop() # --requirepass should take precedence over environment variable - requirepass = 'requirepass' + requirepass = "requirepass" dfly = df_local_factory.create(requirepass=requirepass) dfly.start() @@ -61,6 +63,7 @@ async def test_password(df_local_factory, export_dfly_password): end """ + @dfly_args({"proactor_threads": 1}) async def test_txq_ooo(async_client: aioredis.Redis, df_server): async def task1(k, h): @@ -77,4 +80,6 @@ async def task2(k, n): pipe.blpop(k, 0.001) await pipe.execute() - await asyncio.gather(task1('i1', 2), task1('i2', 3), task2('l1', 2), task2('l1', 2), task2('l1', 5)) + await asyncio.gather( + task1("i1", 2), task1("i2", 3), task2("l1", 2), task2("l1", 2), task2("l1", 5) + ) diff --git a/tests/dragonfly/json_test.py b/tests/dragonfly/json_test.py index 3cfd3f777463..62be35381a22 100755 --- a/tests/dragonfly/json_test.py +++ b/tests/dragonfly/json_test.py @@ -4,15 +4,9 @@ from .utility import * from json import JSONDecoder, JSONEncoder -jane = { - 'name': "Jane", - 'Age': 33, - 'Location': "Chawton" -} +jane = {"name": "Jane", "Age": 33, "Location": "Chawton"} -json_num = { - "a": {"a": 1, "b": 2, "c": 3} -} +json_num = {"a": {"a": 1, "b": 2, "c": 3}} async def get_set_json(connection: aioredis.Redis, key, value, path="$"): @@ -30,8 +24,8 @@ async def test_basic_json_get_set(async_client: aioredis.Redis): the_type = await async_client.type(key_name) assert the_type == "ReJSON-RL" assert len(result) == 1 - assert result[0]['name'] == 'Jane' - assert result[0]['Age'] == 33 + assert result[0]["name"] == "Jane" + assert result[0]["Age"] == 33 async def test_access_json_value_as_string(async_client: aioredis.Redis): @@ -76,18 +70,16 @@ async def test_update_value(async_client: aioredis.Redis): # make sure that we have valid JSON here the_type = await async_client.type(key_name) assert the_type == "ReJSON-RL" - result = await get_set_json(async_client, value="0", - key=key_name, path="$.a.*") + result = await get_set_json(async_client, value="0", key=key_name, path="$.a.*") assert len(result) == 3 # make sure that all the values under 'a' where set to 0 - assert result == ['0', '0', '0'] + assert result == ["0", "0", "0"] # Ensure that after we're changing this into STRING type, it will no longer work await async_client.set(key_name, "some random value") assert await async_client.type(key_name) == "string" try: - await get_set_json(async_client, value="0", key=key_name, - path="$.a.*") + await get_set_json(async_client, value="0", key=key_name, path="$.a.*") assert False, "should not be able to modify JSON value as string" except redis.exceptions.ResponseError as e: assert e.args[0] == "WRONGTYPE Operation against a key holding the wrong kind of value" diff --git a/tests/dragonfly/list_family_test.py b/tests/dragonfly/list_family_test.py index 6c30e9b92961..12e02b861ed1 100644 --- a/tests/dragonfly/list_family_test.py +++ b/tests/dragonfly/list_family_test.py @@ -4,20 +4,19 @@ import pytest -@pytest.mark.parametrize('index', range(50)) +@pytest.mark.parametrize("index", range(50)) class TestBlPop: async def async_blpop(client: aioredis.Redis): - return await client.blpop( - ['list1{t}', 'list2{t}', 'list2{t}', 'list1{t}'], 0.5) + return await client.blpop(["list1{t}", "list2{t}", "list2{t}", "list1{t}"], 0.5) async def blpop_mult_keys(async_client: aioredis.Redis, key: str, val: str): task = asyncio.create_task(TestBlPop.async_blpop(async_client)) await async_client.lpush(key, val) result = await asyncio.wait_for(task, 3) assert result[1] == val - watched = await async_client.execute_command('DEBUG WATCHED') - assert watched == ['awaked', [], 'watched', []] + watched = await async_client.execute_command("DEBUG WATCHED") + assert watched == ["awaked", [], "watched", []] async def test_blpop_multiple_keys(self, async_client: aioredis.Redis, index): - await TestBlPop.blpop_mult_keys(async_client, 'list1{t}', 'a') - await TestBlPop.blpop_mult_keys(async_client, 'list2{t}', 'b') + await TestBlPop.blpop_mult_keys(async_client, "list1{t}", "a") + await TestBlPop.blpop_mult_keys(async_client, "list2{t}", "b") diff --git a/tests/dragonfly/pymemcached_test.py b/tests/dragonfly/pymemcached_test.py index dfcdc613fb54..cb26c223ad78 100644 --- a/tests/dragonfly/pymemcached_test.py +++ b/tests/dragonfly/pymemcached_test.py @@ -8,12 +8,14 @@ def test_add_get(memcached_connection): assert memcached_connection.add(b"key", b"data", noreply=False) assert memcached_connection.get(b"key") == b"data" + @dfly_args({"memcached_port": 11211}) def test_add_set(memcached_connection): assert memcached_connection.add(b"key", b"data", noreply=False) memcached_connection.set(b"key", b"other") assert memcached_connection.get(b"key") == b"other" + @dfly_args({"memcached_port": 11211}) def test_set_add(memcached_connection): memcached_connection.set(b"key", b"data") @@ -23,6 +25,7 @@ def test_set_add(memcached_connection): memcached_connection.set(b"key", b"other") assert memcached_connection.get(b"key") == b"other" + @dfly_args({"memcached_port": 11211}) def test_mixed_reply(memcached_connection): memcached_connection.set(b"key", b"data", noreply=True) diff --git a/tests/dragonfly/redis_replication_test.py b/tests/dragonfly/redis_replication_test.py index dc4e9f599fed..5e3484431100 100644 --- a/tests/dragonfly/redis_replication_test.py +++ b/tests/dragonfly/redis_replication_test.py @@ -12,13 +12,17 @@ def __init__(self, port): self.proc = None def start(self): - self.proc = subprocess.Popen(["redis-server-6.2.11", - f"--port {self.port}", - "--save ''", - "--appendonly no", - "--protected-mode no", - "--repl-diskless-sync yes", - "--repl-diskless-sync-delay 0"]) + self.proc = subprocess.Popen( + [ + "redis-server-6.2.11", + f"--port {self.port}", + "--save ''", + "--appendonly no", + "--protected-mode no", + "--repl-diskless-sync yes", + "--repl-diskless-sync-delay 0", + ] + ) print(self.proc.args) def stop(self): @@ -28,6 +32,7 @@ def stop(self): except Exception as e: pass + # Checks that master redis and dragonfly replica are synced by writing a random key to master # and waiting for it to exist in replica. Foreach db in 0..dbcount-1. async def await_synced(master_port, replica_port, dbcount=1): @@ -58,7 +63,7 @@ async def await_synced_all(c_master, c_replicas): async def check_data(seeder, replicas, c_replicas): capture = await seeder.capture() - for (replica, c_replica) in zip(replicas, c_replicas): + for replica, c_replica in zip(replicas, c_replicas): await wait_available_async(c_replica) assert await seeder.compare(capture, port=replica.port) @@ -84,7 +89,9 @@ def redis_server(port_picker) -> RedisServer: @pytest.mark.parametrize("t_replicas, seeder_config", full_sync_replication_specs) -async def test_replication_full_sync(df_local_factory, df_seeder_factory, redis_server, t_replicas, seeder_config, port_picker): +async def test_replication_full_sync( + df_local_factory, df_seeder_factory, redis_server, t_replicas, seeder_config, port_picker +): master = redis_server c_master = aioredis.Redis(port=master.port) assert await c_master.ping() @@ -93,7 +100,8 @@ async def test_replication_full_sync(df_local_factory, df_seeder_factory, redis_ await seeder.run(target_deviation=0.1) replica = df_local_factory.create( - port=port_picker.get_available_port(), proactor_threads=t_replicas[0]) + port=port_picker.get_available_port(), proactor_threads=t_replicas[0] + ) replica.start() c_replica = aioredis.Redis(port=replica.port) assert await c_replica.ping() @@ -105,6 +113,7 @@ async def test_replication_full_sync(df_local_factory, df_seeder_factory, redis_ capture = await seeder.capture() assert await seeder.compare(capture, port=replica.port) + stable_sync_replication_specs = [ ([1], dict(keys=100, dbcount=1, unsupported_types=[ValueType.JSON])), ([1], dict(keys=10_000, dbcount=2, unsupported_types=[ValueType.JSON])), @@ -115,13 +124,16 @@ async def test_replication_full_sync(df_local_factory, df_seeder_factory, redis_ @pytest.mark.parametrize("t_replicas, seeder_config", stable_sync_replication_specs) -async def test_replication_stable_sync(df_local_factory, df_seeder_factory, redis_server, t_replicas, seeder_config, port_picker): +async def test_replication_stable_sync( + df_local_factory, df_seeder_factory, redis_server, t_replicas, seeder_config, port_picker +): master = redis_server c_master = aioredis.Redis(port=master.port) assert await c_master.ping() replica = df_local_factory.create( - port=port_picker.get_available_port(), proactor_threads=t_replicas[0]) + port=port_picker.get_available_port(), proactor_threads=t_replicas[0] + ) replica.start() c_replica = aioredis.Redis(port=replica.port) assert await c_replica.ping() @@ -150,7 +162,9 @@ async def test_replication_stable_sync(df_local_factory, df_seeder_factory, redi @pytest.mark.parametrize("t_replicas, seeder_config", replication_specs) -async def test_redis_replication_all(df_local_factory, df_seeder_factory, redis_server, t_replicas, seeder_config, port_picker): +async def test_redis_replication_all( + df_local_factory, df_seeder_factory, redis_server, t_replicas, seeder_config, port_picker +): master = redis_server c_master = aioredis.Redis(port=master.port) assert await c_master.ping() @@ -178,11 +192,11 @@ async def run_replication(c_replica): await c_replica.execute_command("REPLICAOF localhost " + str(master.port)) await wait_available_async(c_replica) - await asyncio.gather(*(asyncio.create_task(run_replication(c)) - for c in c_replicas)) + await asyncio.gather(*(asyncio.create_task(run_replication(c)) for c in c_replicas)) # Wait for streaming to finish - assert not stream_task.done( + assert ( + not stream_task.done() ), "Weak testcase. Increase number of streamed iterations to surpass full sync" seeder.stop() await stream_task @@ -206,7 +220,15 @@ async def run_replication(c_replica): @pytest.mark.parametrize("t_replicas, t_disconnect, seeder_config", master_disconnect_cases) -async def test_disconnect_master(df_local_factory, df_seeder_factory, redis_server, t_replicas, t_disconnect, seeder_config, port_picker): +async def test_disconnect_master( + df_local_factory, + df_seeder_factory, + redis_server, + t_replicas, + t_disconnect, + seeder_config, + port_picker, +): master = redis_server c_master = aioredis.Redis(port=master.port) assert await c_master.ping() @@ -234,11 +256,11 @@ async def run_replication(c_replica): await c_replica.execute_command("REPLICAOF localhost " + str(master.port)) await wait_available_async(c_replica) - await asyncio.gather(*(asyncio.create_task(run_replication(c)) - for c in c_replicas)) + await asyncio.gather(*(asyncio.create_task(run_replication(c)) for c in c_replicas)) # Wait for streaming to finish - assert not stream_task.done( + assert ( + not stream_task.done() ), "Weak testcase. Increase number of streamed iterations to surpass full sync" seeder.stop() await stream_task diff --git a/tests/dragonfly/replication_test.py b/tests/dragonfly/replication_test.py index 10a5351487d9..ba5903f1eed7 100644 --- a/tests/dragonfly/replication_test.py +++ b/tests/dragonfly/replication_test.py @@ -35,10 +35,12 @@ @pytest.mark.asyncio @pytest.mark.parametrize("t_master, t_replicas, seeder_config", replication_cases) -async def test_replication_all(df_local_factory, df_seeder_factory, t_master, t_replicas, seeder_config): +async def test_replication_all( + df_local_factory, df_seeder_factory, t_master, t_replicas, seeder_config +): master = df_local_factory.create(port=BASE_PORT, proactor_threads=t_master) replicas = [ - df_local_factory.create(port=BASE_PORT+i+1, proactor_threads=t) + df_local_factory.create(port=BASE_PORT + i + 1, proactor_threads=t) for i, t in enumerate(t_replicas) ] @@ -63,11 +65,11 @@ async def test_replication_all(df_local_factory, df_seeder_factory, t_master, t_ async def run_replication(c_replica): await c_replica.execute_command("REPLICAOF localhost " + str(master.port)) - await asyncio.gather(*(asyncio.create_task(run_replication(c)) - for c in c_replicas)) + await asyncio.gather(*(asyncio.create_task(run_replication(c)) for c in c_replicas)) # Wait for streaming to finish - assert not stream_task.done( + assert ( + not stream_task.done() ), "Weak testcase. Increase number of streamed iterations to surpass full sync" await stream_task @@ -87,7 +89,7 @@ async def check_replica_finished_exec(c_replica, c_master): syncid, r_offset = await c_replica.execute_command("DEBUG REPLICA OFFSET") m_offset = await c_master.execute_command("DFLY REPLICAOFFSET") - print(" offset", syncid.decode(), r_offset, m_offset) + print(" offset", syncid.decode(), r_offset, m_offset) return r_offset == m_offset @@ -98,18 +100,16 @@ async def check_all_replicas_finished(c_replicas, c_master): while len(waiting_for) > 0: await asyncio.sleep(1.0) - tasks = (asyncio.create_task(check_replica_finished_exec(c, c_master)) - for c in waiting_for) + tasks = (asyncio.create_task(check_replica_finished_exec(c, c_master)) for c in waiting_for) finished_list = await asyncio.gather(*tasks) # Remove clients that finished from waiting list - waiting_for = [c for (c, finished) in zip( - waiting_for, finished_list) if not finished] + waiting_for = [c for (c, finished) in zip(waiting_for, finished_list) if not finished] async def check_data(seeder, replicas, c_replicas): capture = await seeder.capture() - for (replica, c_replica) in zip(replicas, c_replicas): + for replica, c_replica in zip(replicas, c_replicas): await wait_available_async(c_replica) assert await seeder.compare(capture, port=replica.port) @@ -140,22 +140,30 @@ async def check_data(seeder, replicas, c_replicas): # stable state heavy (8, [], [4] * 4, [], 4_000), # disconnect only - (8, [], [], [4] * 4, 4_000) + (8, [], [], [4] * 4, 4_000), ] -@pytest.mark.skip(reason='Failing on github regression action') +@pytest.mark.skip(reason="Failing on github regression action") @pytest.mark.asyncio @pytest.mark.parametrize("t_master, t_crash_fs, t_crash_ss, t_disonnect, n_keys", disconnect_cases) -async def test_disconnect_replica(df_local_factory: DflyInstanceFactory, df_seeder_factory, t_master, t_crash_fs, t_crash_ss, t_disonnect, n_keys): +async def test_disconnect_replica( + df_local_factory: DflyInstanceFactory, + df_seeder_factory, + t_master, + t_crash_fs, + t_crash_ss, + t_disonnect, + n_keys, +): master = df_local_factory.create(port=BASE_PORT, proactor_threads=t_master) replicas = [ - (df_local_factory.create(port=BASE_PORT+i+1, proactor_threads=t), crash_fs) + (df_local_factory.create(port=BASE_PORT + i + 1, proactor_threads=t), crash_fs) for i, (t, crash_fs) in enumerate( chain( zip(t_crash_fs, repeat(DISCONNECT_CRASH_FULL_SYNC)), zip(t_crash_ss, repeat(DISCONNECT_CRASH_STABLE_SYNC)), - zip(t_disonnect, repeat(DISCONNECT_NORMAL_STABLE_SYNC)) + zip(t_disonnect, repeat(DISCONNECT_NORMAL_STABLE_SYNC)), ) ) ] @@ -168,15 +176,11 @@ async def test_disconnect_replica(df_local_factory: DflyInstanceFactory, df_seed df_local_factory.start_all([replica for replica, _ in replicas]) c_replicas = [ - (replica, aioredis.Redis(port=replica.port), crash_type) - for replica, crash_type in replicas + (replica, aioredis.Redis(port=replica.port), crash_type) for replica, crash_type in replicas ] def replicas_of_type(tfunc): - return [ - args for args in c_replicas - if tfunc(args[2]) - ] + return [args for args in c_replicas if tfunc(args[2])] # Start data fill loop seeder = df_seeder_factory.create(port=master.port, keys=n_keys, dbcount=2) @@ -187,7 +191,7 @@ async def full_sync(replica, c_replica, crash_type): c_replica = aioredis.Redis(port=replica.port) await c_replica.execute_command("REPLICAOF localhost " + str(master.port)) if crash_type == 0: - await asyncio.sleep(random.random()/100+0.01) + await asyncio.sleep(random.random() / 100 + 0.01) await c_replica.connection_pool.disconnect() replica.stop(kill=True) else: @@ -211,8 +215,7 @@ async def stable_sync(replica, c_replica, crash_type): await c_replica.connection_pool.disconnect() replica.stop(kill=True) - await asyncio.gather(*(stable_sync(*args) for args - in replicas_of_type(lambda t: t == 1))) + await asyncio.gather(*(stable_sync(*args) for args in replicas_of_type(lambda t: t == 1))) # Check master survived all crashes assert await c_master.ping() @@ -241,8 +244,7 @@ async def disconnect(replica, c_replica, crash_type): await asyncio.sleep(random.random() / 100) await c_replica.execute_command("REPLICAOF NO ONE") - await asyncio.gather(*(disconnect(*args) for args - in replicas_of_type(lambda t: t == 2))) + await asyncio.gather(*(disconnect(*args) for args in replicas_of_type(lambda t: t == 2))) await asyncio.sleep(0.5) @@ -282,10 +284,12 @@ async def disconnect(replica, c_replica, crash_type): @pytest.mark.asyncio @pytest.mark.parametrize("t_master, t_replicas, n_random_crashes, n_keys", master_crash_cases) -async def test_disconnect_master(df_local_factory, df_seeder_factory, t_master, t_replicas, n_random_crashes, n_keys): +async def test_disconnect_master( + df_local_factory, df_seeder_factory, t_master, t_replicas, n_random_crashes, n_keys +): master = df_local_factory.create(port=1111, proactor_threads=t_master) replicas = [ - df_local_factory.create(port=BASE_PORT+i+1, proactor_threads=t) + df_local_factory.create(port=BASE_PORT + i + 1, proactor_threads=t) for i, t in enumerate(t_replicas) ] @@ -309,8 +313,12 @@ async def start_master(): await start_master() # Crash master during full sync, but with all passing initial connection phase - await asyncio.gather(*(c_replica.execute_command("REPLICAOF localhost " + str(master.port)) - for c_replica in c_replicas)) + await asyncio.gather( + *( + c_replica.execute_command("REPLICAOF localhost " + str(master.port)) + for c_replica in c_replicas + ) + ) await crash_master_fs() await asyncio.sleep(1 + len(replicas) * 0.5) @@ -338,24 +346,25 @@ async def start_master(): await wait_available_async(c_replica) assert await seeder.compare(capture, port=replica.port) + """ Test re-connecting replica to different masters. """ -rotating_master_cases = [ - (4, [4, 4, 4, 4], dict(keys=2_000, dbcount=4)) -] +rotating_master_cases = [(4, [4, 4, 4, 4], dict(keys=2_000, dbcount=4))] @pytest.mark.asyncio @pytest.mark.parametrize("t_replica, t_masters, seeder_config", rotating_master_cases) -async def test_rotating_masters(df_local_factory, df_seeder_factory, t_replica, t_masters, seeder_config): - replica = df_local_factory.create( - port=BASE_PORT, proactor_threads=t_replica) - masters = [df_local_factory.create( - port=BASE_PORT+i+1, proactor_threads=t) for i, t in enumerate(t_masters)] - seeders = [df_seeder_factory.create( - port=m.port, **seeder_config) for m in masters] +async def test_rotating_masters( + df_local_factory, df_seeder_factory, t_replica, t_masters, seeder_config +): + replica = df_local_factory.create(port=BASE_PORT, proactor_threads=t_replica) + masters = [ + df_local_factory.create(port=BASE_PORT + i + 1, proactor_threads=t) + for i, t in enumerate(t_masters) + ] + seeders = [df_seeder_factory.create(port=m.port, **seeder_config) for m in masters] df_local_factory.start_all([replica] + masters) @@ -398,7 +407,7 @@ async def test_cancel_replication_immediately(df_local_factory, df_seeder_factor COMMANDS_TO_ISSUE = 40 replica = df_local_factory.create(port=BASE_PORT) - masters = [df_local_factory.create(port=BASE_PORT+i+1) for i in range(4)] + masters = [df_local_factory.create(port=BASE_PORT + i + 1) for i in range(4)] seeders = [df_seeder_factory.create(port=m.port) for m in masters] df_local_factory.start_all([replica] + masters) @@ -443,7 +452,7 @@ async def replicate(index): @pytest.mark.asyncio async def test_flushall(df_local_factory): master = df_local_factory.create(port=BASE_PORT, proactor_threads=4) - replica = df_local_factory.create(port=BASE_PORT+1, proactor_threads=2) + replica = df_local_factory.create(port=BASE_PORT + 1, proactor_threads=2) master.start() replica.start() @@ -465,8 +474,7 @@ def gen_test_data(start, end): # flushall pipe.flushall() # Set simple keys n_keys..n_keys*2 on master - batch_fill_data(client=pipe, gen=gen_test_data( - n_keys, n_keys*2), batch_size=3) + batch_fill_data(client=pipe, gen=gen_test_data(n_keys, n_keys * 2), batch_size=3) await pipe.execute() # Check replica finished executing the replicated commands @@ -480,7 +488,7 @@ def gen_test_data(start, end): assert all(v is None for v in vals) # Check replica keys n_keys..n_keys*2-1 exist - for i in range(n_keys, n_keys*2): + for i in range(n_keys, n_keys * 2): pipe.get(f"key-{i}") vals = await pipe.execute() assert all(v is not None for v in vals) @@ -494,11 +502,11 @@ def gen_test_data(start, end): @dfly_args({"proactor_threads": 4}) @pytest.mark.asyncio async def test_rewrites(df_local_factory): - CLOSE_TIMESTAMP = (int(time.time()) + 100) + CLOSE_TIMESTAMP = int(time.time()) + 100 CLOSE_TIMESTAMP_MS = CLOSE_TIMESTAMP * 1000 master = df_local_factory.create(port=BASE_PORT) - replica = df_local_factory.create(port=BASE_PORT+1) + replica = df_local_factory.create(port=BASE_PORT + 1) master.start() replica.start() @@ -513,16 +521,16 @@ async def test_rewrites(df_local_factory): m_replica = c_replica.monitor() async def get_next_command(): - mcmd = (await m_replica.next_command())['command'] + mcmd = (await m_replica.next_command())["command"] # skip select command - if (mcmd == "SELECT 0"): + if mcmd == "SELECT 0": print("Got:", mcmd) - mcmd = (await m_replica.next_command())['command'] + mcmd = (await m_replica.next_command())["command"] print("Got:", mcmd) return mcmd async def is_match_rsp(rx): - mcmd = (await get_next_command()) + mcmd = await get_next_command() print("Regex:", rx) return re.match(rx, mcmd) @@ -531,14 +539,14 @@ async def skip_cmd(): async def check(cmd, rx): await c_master.execute_command(cmd) - match = (await is_match_rsp(rx)) + match = await is_match_rsp(rx) assert match async def check_list(cmd, rx_list): print("master cmd:", cmd) await c_master.execute_command(cmd) for rx in rx_list: - match = (await is_match_rsp(rx)) + match = await is_match_rsp(rx) assert match async def check_list_ooo(cmd, rx_list): @@ -546,7 +554,7 @@ async def check_list_ooo(cmd, rx_list): await c_master.execute_command(cmd) expected_cmds = len(rx_list) for i in range(expected_cmds): - mcmd = (await get_next_command()) + mcmd = await get_next_command() # check command matches one regex from list match_rx = list(filter(lambda rx: re.match(rx, mcmd), rx_list)) assert len(match_rx) == 1 @@ -556,7 +564,7 @@ async def check_expire(key): ttl1 = await c_master.ttl(key) ttl2 = await c_replica.ttl(key) await skip_cmd() - assert abs(ttl1-ttl2) <= 1 + assert abs(ttl1 - ttl2) <= 1 async with m_replica: # CHECK EXPIRE, PEXPIRE, PEXPIRE turn into EXPIREAT @@ -650,10 +658,16 @@ async def check_expire(key): await c_master.set("renamekey", "1000", px=50000) await skip_cmd() # Check RENAME turns into DEL SET and PEXPIREAT - await check_list_ooo("RENAME renamekey renamed", [r"DEL renamekey", r"SET renamed 1000", r"PEXPIREAT renamed (.*?)"]) + await check_list_ooo( + "RENAME renamekey renamed", + [r"DEL renamekey", r"SET renamed 1000", r"PEXPIREAT renamed (.*?)"], + ) await check_expire("renamed") # Check RENAMENX turns into DEL SET and PEXPIREAT - await check_list_ooo("RENAMENX renamed renamekey", [r"DEL renamed", r"SET renamekey 1000", r"PEXPIREAT renamekey (.*?)"]) + await check_list_ooo( + "RENAMENX renamed renamekey", + [r"DEL renamed", r"SET renamekey 1000", r"PEXPIREAT renamekey (.*?)"], + ) await check_expire("renamekey") @@ -666,7 +680,7 @@ async def check_expire(key): @pytest.mark.asyncio async def test_expiry(df_local_factory, n_keys=1000): master = df_local_factory.create(port=BASE_PORT) - replica = df_local_factory.create(port=BASE_PORT+1, logtostdout=True) + replica = df_local_factory.create(port=BASE_PORT + 1, logtostdout=True) df_local_factory.start_all([master, replica]) @@ -700,10 +714,9 @@ async def test_expiry(df_local_factory, n_keys=1000): c_master_db = aioredis.Redis(port=master.port, db=i) pipe = c_master_db.pipeline(transaction=is_multi) # Set simple keys n_keys..n_keys*2 on master - start_key = n_keys*(i+1) + start_key = n_keys * (i + 1) end_key = start_key + n_keys - batch_fill_data(client=pipe, gen=gen_test_data( - end_key, start_key), batch_size=20) + batch_fill_data(client=pipe, gen=gen_test_data(end_key, start_key), batch_size=20) await pipe.execute() @@ -771,14 +784,15 @@ async def test_expiry(df_local_factory, n_keys=1000): """ -@pytest.mark.skip(reason='Failing') +@pytest.mark.skip(reason="Failing") @pytest.mark.asyncio @pytest.mark.parametrize("t_master, t_replicas, num_ops, num_keys, num_par, flags", script_cases) async def test_scripts(df_local_factory, t_master, t_replicas, num_ops, num_keys, num_par, flags): - master = df_local_factory.create( - port=BASE_PORT, proactor_threads=t_master) - replicas = [df_local_factory.create( - port=BASE_PORT+i+1, proactor_threads=t) for i, t in enumerate(t_replicas)] + master = df_local_factory.create(port=BASE_PORT, proactor_threads=t_master) + replicas = [ + df_local_factory.create(port=BASE_PORT + i + 1, proactor_threads=t) + for i, t in enumerate(t_replicas) + ] df_local_factory.start_all([master] + replicas) @@ -788,16 +802,15 @@ async def test_scripts(df_local_factory, t_master, t_replicas, num_ops, num_keys await c_replica.execute_command(f"REPLICAOF localhost {master.port}") await wait_available_async(c_replica) - script = script_test_s1.format( - flags=f'#!lua flags={flags}' if flags else '') + script = script_test_s1.format(flags=f"#!lua flags={flags}" if flags else "") sha = await c_master.script_load(script) - key_sets = [ - [f'{i}-{j}' for j in range(num_keys)] for i in range(num_par) - ] + key_sets = [[f"{i}-{j}" for j in range(num_keys)] for i in range(num_par)] - rsps = await asyncio.gather(*(c_master.evalsha(sha, len(keys), *keys, num_ops) for keys in key_sets)) - assert rsps == [b'OK'] * num_par + rsps = await asyncio.gather( + *(c_master.evalsha(sha, len(keys), *keys, num_ops) for keys in key_sets) + ) + assert rsps == [b"OK"] * num_par await check_all_replicas_finished(c_replicas, c_master) @@ -805,17 +818,18 @@ async def test_scripts(df_local_factory, t_master, t_replicas, num_ops, num_keys for key_set in key_sets: for j, k in enumerate(key_set): l = await c_replica.lrange(k, 0, -1) - assert l == [f'{j}'.encode()] * num_ops + assert l == [f"{j}".encode()] * num_ops @dfly_args({"proactor_threads": 4}) @pytest.mark.asyncio async def test_auth_master(df_local_factory, n_keys=20): - masterpass = 'requirepass' - replicapass = 'replicapass' + masterpass = "requirepass" + replicapass = "replicapass" master = df_local_factory.create(port=BASE_PORT, requirepass=masterpass) replica = df_local_factory.create( - port=BASE_PORT+1, logtostdout=True, masterauth=masterpass, requirepass=replicapass) + port=BASE_PORT + 1, logtostdout=True, masterauth=masterpass, requirepass=replicapass + ) df_local_factory.start_all([master, replica]) @@ -845,7 +859,7 @@ async def test_auth_master(df_local_factory, n_keys=20): @dfly_args({"proactor_threads": 2}) async def test_script_transfer(df_local_factory): master = df_local_factory.create(port=BASE_PORT) - replica = df_local_factory.create(port=BASE_PORT+1) + replica = df_local_factory.create(port=BASE_PORT + 1) df_local_factory.start_all([master, replica]) @@ -879,28 +893,38 @@ async def test_script_transfer(df_local_factory): @pytest.mark.asyncio async def test_role_command(df_local_factory, n_keys=20): master = df_local_factory.create(port=BASE_PORT) - replica = df_local_factory.create(port=BASE_PORT+1, logtostdout=True) + replica = df_local_factory.create(port=BASE_PORT + 1, logtostdout=True) df_local_factory.start_all([master, replica]) c_master = aioredis.Redis(port=master.port) c_replica = aioredis.Redis(port=replica.port) - assert await c_master.execute_command("role") == [b'master', []] + assert await c_master.execute_command("role") == [b"master", []] await c_replica.execute_command(f"REPLICAOF localhost {master.port}") await wait_available_async(c_replica) assert await c_master.execute_command("role") == [ - b'master', [[b'127.0.0.1', bytes(str(replica.port), 'ascii'), b'stable_sync']]] + b"master", + [[b"127.0.0.1", bytes(str(replica.port), "ascii"), b"stable_sync"]], + ] assert await c_replica.execute_command("role") == [ - b'replica', b'localhost', bytes(str(master.port), 'ascii'), b'stable_sync'] + b"replica", + b"localhost", + bytes(str(master.port), "ascii"), + b"stable_sync", + ] # This tests that we react fast to socket shutdowns and don't hang on # things like the ACK or execution fibers. master.stop() await asyncio.sleep(0.1) assert await c_replica.execute_command("role") == [ - b'replica', b'localhost', bytes(str(master.port), 'ascii'), b'connecting'] + b"replica", + b"localhost", + bytes(str(master.port), "ascii"), + b"connecting", + ] await c_master.connection_pool.disconnect() await c_replica.connection_pool.disconnect() @@ -943,7 +967,8 @@ async def assert_lag_condition(inst, client, condition): async def test_replication_info(df_local_factory, df_seeder_factory, n_keys=2000): master = df_local_factory.create(port=BASE_PORT) replica = df_local_factory.create( - port=BASE_PORT+1, logtostdout=True, replication_acks_interval=100) + port=BASE_PORT + 1, logtostdout=True, replication_acks_interval=100 + ) df_local_factory.start_all([master, replica]) c_master = aioredis.Redis(port=master.port) c_replica = aioredis.Redis(port=replica.port) @@ -974,18 +999,15 @@ async def test_replication_info(df_local_factory, df_seeder_factory, n_keys=2000 @pytest.mark.asyncio async def test_flushall_in_full_sync(df_local_factory, df_seeder_factory): - master = df_local_factory.create( - port=BASE_PORT, proactor_threads=4, logtostdout=True) - replica = df_local_factory.create( - port=BASE_PORT+1, proactor_threads=2, logtostdout=True) + master = df_local_factory.create(port=BASE_PORT, proactor_threads=4, logtostdout=True) + replica = df_local_factory.create(port=BASE_PORT + 1, proactor_threads=2, logtostdout=True) # Start master master.start() c_master = aioredis.Redis(port=master.port) # Fill master with test data - seeder = df_seeder_factory.create( - port=master.port, keys=100_000, dbcount=1) + seeder = df_seeder_factory.create(port=master.port, keys=100_000, dbcount=1) await seeder.run(target_deviation=0.1) # Start replica @@ -998,7 +1020,7 @@ async def get_sync_mode(c_replica): return result[3] async def is_full_sync_mode(c_replica): - return await get_sync_mode(c_replica) == b'full_sync' + return await get_sync_mode(c_replica) == b"full_sync" # Wait for full sync to start while not await is_full_sync_mode(c_replica): @@ -1010,12 +1032,10 @@ async def is_full_sync_mode(c_replica): await c_master.execute_command("FLUSHALL") if not await is_full_sync_mode(c_replica): - logging.error( - "!!! Full sync finished too fast. Adjust test parameters !!!") + logging.error("!!! Full sync finished too fast. Adjust test parameters !!!") return - post_seeder = df_seeder_factory.create( - port=master.port, keys=10, dbcount=1) + post_seeder = df_seeder_factory.create(port=master.port, keys=10, dbcount=1) await post_seeder.run(target_deviation=0.1) await check_all_replicas_finished([c_replica], c_master) @@ -1047,28 +1067,26 @@ async def is_full_sync_mode(c_replica): @pytest.mark.asyncio async def test_readonly_script(df_local_factory): - master = df_local_factory.create( - port=BASE_PORT, proactor_threads=2, logtostdout=True) - replica = df_local_factory.create( - port=BASE_PORT+1, proactor_threads=2, logtostdout=True) + master = df_local_factory.create(port=BASE_PORT, proactor_threads=2, logtostdout=True) + replica = df_local_factory.create(port=BASE_PORT + 1, proactor_threads=2, logtostdout=True) df_local_factory.start_all([master, replica]) c_master = aioredis.Redis(port=master.port) c_replica = aioredis.Redis(port=replica.port) - await c_master.set('WORKS', 'YES') + await c_master.set("WORKS", "YES") await c_replica.execute_command(f"REPLICAOF localhost {master.port}") await wait_available_async(c_replica) - await c_replica.eval(READONLY_SCRIPT, 3, 'A', 'B', 'WORKS') == 'YES' + await c_replica.eval(READONLY_SCRIPT, 3, "A", "B", "WORKS") == "YES" try: - await c_replica.eval(WRITE_SCRIPT, 1, 'A') + await c_replica.eval(WRITE_SCRIPT, 1, "A") assert False except aioredis.ResponseError as roe: - assert 'READONLY ' in str(roe) + assert "READONLY " in str(roe) take_over_cases = [ @@ -1082,16 +1100,15 @@ async def test_readonly_script(df_local_factory): @pytest.mark.parametrize("master_threads, replica_threads", take_over_cases) @pytest.mark.asyncio async def test_take_over_counters(df_local_factory, master_threads, replica_threads): - master = df_local_factory.create(proactor_threads=master_threads, - port=BASE_PORT, - # vmodule="journal_slice=2,dflycmd=2,main_service=1", - logtostderr=True) - replica1 = df_local_factory.create( - port=BASE_PORT+1, proactor_threads=replica_threads) - replica2 = df_local_factory.create( - port=BASE_PORT+2, proactor_threads=replica_threads) - replica3 = df_local_factory.create( - port=BASE_PORT+3, proactor_threads=replica_threads) + master = df_local_factory.create( + proactor_threads=master_threads, + port=BASE_PORT, + # vmodule="journal_slice=2,dflycmd=2,main_service=1", + logtostderr=True, + ) + replica1 = df_local_factory.create(port=BASE_PORT + 1, proactor_threads=replica_threads) + replica2 = df_local_factory.create(port=BASE_PORT + 2, proactor_threads=replica_threads) + replica3 = df_local_factory.create(port=BASE_PORT + 3, proactor_threads=replica_threads) df_local_factory.start_all([master, replica1, replica2, replica3]) c_master = master.client() c1 = replica1.client() @@ -1130,8 +1147,10 @@ async def delayed_takeover(): await asyncio.sleep(1) await c1.execute_command(f"REPLTAKEOVER 5") - _, _, *results = await asyncio.gather(delayed_takeover(), block_during_takeover(), *[counter(f"key{i}") for i in range(16)]) - assert await c1.execute_command("role") == [b'master', []] + _, _, *results = await asyncio.gather( + delayed_takeover(), block_during_takeover(), *[counter(f"key{i}") for i in range(16)] + ) + assert await c1.execute_command("role") == [b"master", []] for key, client_value in results: replicated_value = await c1.get(key) @@ -1142,18 +1161,20 @@ async def delayed_takeover(): @pytest.mark.parametrize("master_threads, replica_threads", take_over_cases) @pytest.mark.asyncio -async def test_take_over_seeder(request, df_local_factory, df_seeder_factory, master_threads, replica_threads): - tmp_file_name = ''.join(random.choices(string.ascii_letters, k=10)) - master = df_local_factory.create(proactor_threads=master_threads, - port=BASE_PORT, - dbfilename=f"dump_{tmp_file_name}", - logtostderr=True) - replica = df_local_factory.create( - port=BASE_PORT+1, proactor_threads=replica_threads) +async def test_take_over_seeder( + request, df_local_factory, df_seeder_factory, master_threads, replica_threads +): + tmp_file_name = "".join(random.choices(string.ascii_letters, k=10)) + master = df_local_factory.create( + proactor_threads=master_threads, + port=BASE_PORT, + dbfilename=f"dump_{tmp_file_name}", + logtostderr=True, + ) + replica = df_local_factory.create(port=BASE_PORT + 1, proactor_threads=replica_threads) df_local_factory.start_all([master, replica]) - seeder = df_seeder_factory.create( - port=master.port, keys=1000, dbcount=5, stop_on_failure=False) + seeder = df_seeder_factory.create(port=master.port, keys=1000, dbcount=5, stop_on_failure=False) c_master = master.client() c_replica = replica.client() @@ -1171,7 +1192,7 @@ async def seed(): await c_replica.execute_command(f"REPLTAKEOVER 5") seeder.stop() - assert await c_replica.execute_command("role") == [b'master', []] + assert await c_replica.execute_command("role") == [b"master", []] # Need to wait a bit to give time to write the shutdown snapshot await asyncio.sleep(1) @@ -1188,15 +1209,11 @@ async def seed(): @pytest.mark.asyncio async def test_take_over_timeout(df_local_factory, df_seeder_factory): - master = df_local_factory.create(proactor_threads=2, - port=BASE_PORT, - logtostderr=True) - replica = df_local_factory.create( - port=BASE_PORT+1, proactor_threads=2) + master = df_local_factory.create(proactor_threads=2, port=BASE_PORT, logtostderr=True) + replica = df_local_factory.create(port=BASE_PORT + 1, proactor_threads=2) df_local_factory.start_all([master, replica]) - seeder = df_seeder_factory.create( - port=master.port, keys=1000, dbcount=5, stop_on_failure=False) + seeder = df_seeder_factory.create(port=master.port, keys=1000, dbcount=5, stop_on_failure=False) c_master = master.client() c_replica = replica.client() @@ -1217,8 +1234,16 @@ async def test_take_over_timeout(df_local_factory, df_seeder_factory): seeder.stop() await fill_task - assert await c_master.execute_command("role") == [b'master', [[b'127.0.0.1', bytes(str(replica.port), 'ascii'), b'stable_sync']]] - assert await c_replica.execute_command("role") == [b'replica', b'localhost', bytes(str(master.port), 'ascii'), b'stable_sync'] + assert await c_master.execute_command("role") == [ + b"master", + [[b"127.0.0.1", bytes(str(replica.port), "ascii"), b"stable_sync"]], + ] + assert await c_replica.execute_command("role") == [ + b"replica", + b"localhost", + bytes(str(master.port), "ascii"), + b"stable_sync", + ] await disconnect_clients(c_master, c_replica) @@ -1230,11 +1255,18 @@ async def test_take_over_timeout(df_local_factory, df_seeder_factory): @pytest.mark.asyncio @pytest.mark.parametrize("t_master, t_replica", replication_cases) -async def test_no_tls_on_admin_port(df_local_factory, df_seeder_factory, t_master, t_replica, with_tls_server_args): +async def test_no_tls_on_admin_port( + df_local_factory, df_seeder_factory, t_master, t_replica, with_tls_server_args +): # 1. Spin up dragonfly without tls, debug populate master = df_local_factory.create( - no_tls_on_admin_port="true", admin_port=ADMIN_PORT, **with_tls_server_args, port=BASE_PORT, proactor_threads=t_master) + no_tls_on_admin_port="true", + admin_port=ADMIN_PORT, + **with_tls_server_args, + port=BASE_PORT, + proactor_threads=t_master, + ) master.start() c_master = aioredis.Redis(port=master.admin_port) await c_master.execute_command("DEBUG POPULATE 100") @@ -1244,7 +1276,12 @@ async def test_no_tls_on_admin_port(df_local_factory, df_seeder_factory, t_maste # 2. Spin up a replica and initiate a REPLICAOF replica = df_local_factory.create( - no_tls_on_admin_port="true", admin_port=ADMIN_PORT + 1, **with_tls_server_args, port=BASE_PORT + 1, proactor_threads=t_replica) + no_tls_on_admin_port="true", + admin_port=ADMIN_PORT + 1, + **with_tls_server_args, + port=BASE_PORT + 1, + proactor_threads=t_replica, + ) replica.start() c_replica = aioredis.Redis(port=replica.admin_port) res = await c_replica.execute_command("REPLICAOF localhost " + str(master.admin_port)) diff --git a/tests/dragonfly/search_test.py b/tests/dragonfly/search_test.py index ce70ff337ebc..b8c29552d17f 100644 --- a/tests/dragonfly/search_test.py +++ b/tests/dragonfly/search_test.py @@ -14,29 +14,46 @@ from redis.commands.search.indexDefinition import IndexDefinition, IndexType TEST_DATA = [ - {"title": "First article", "content": "Long description", - "views": 100, "topic": "world, science"}, - - {"title": "Second article", "content": "Small text", - "views": 200, "topic": "national, policits"}, - - {"title": "Third piece", "content": "Brief description", - "views": 300, "topic": "health, lifestyle"}, - - {"title": "Last piece", "content": "Interesting text", - "views": 400, "topic": "world, business"}, + { + "title": "First article", + "content": "Long description", + "views": 100, + "topic": "world, science", + }, + { + "title": "Second article", + "content": "Small text", + "views": 200, + "topic": "national, policits", + }, + { + "title": "Third piece", + "content": "Brief description", + "views": 300, + "topic": "health, lifestyle", + }, + { + "title": "Last piece", + "content": "Interesting text", + "views": 400, + "topic": "world, business", + }, ] -BASIC_TEST_SCHEMA = [TextField("title"), TextField( - "content"), NumericField("views"), TagField("topic")] +BASIC_TEST_SCHEMA = [ + TextField("title"), + TextField("content"), + NumericField("views"), + TagField("topic"), +] async def index_test_data(async_client: aioredis.Redis, itype: IndexType, prefix=""): for i, e in enumerate(TEST_DATA): if itype == IndexType.HASH: - await async_client.hset(prefix+str(i), mapping=e) + await async_client.hset(prefix + str(i), mapping=e) else: - await async_client.json().set(prefix+str(i), "$", e) + await async_client.json().set(prefix + str(i), "$", e) def doc_to_str(doc): @@ -44,10 +61,10 @@ def doc_to_str(doc): doc = doc.__dict__ doc = dict(doc) # copy to remove fields - doc.pop('id', None) - doc.pop('payload', None) + doc.pop("id", None) + doc.pop("payload", None) - return '//'.join(sorted(doc)) + return "//".join(sorted(doc)) def contains_test_data(res, td_indices): @@ -66,7 +83,7 @@ def contains_test_data(res, td_indices): @dfly_args({"proactor_threads": 4}) @pytest.mark.parametrize("index_type", [IndexType.HASH, IndexType.JSON]) async def test_basic(async_client: aioredis.Redis, index_type): - i1 = async_client.ft("i1-"+str(index_type)) + i1 = async_client.ft("i1-" + str(index_type)) await index_test_data(async_client, index_type) await i1.create_index(BASIC_TEST_SCHEMA, definition=IndexDefinition(index_type=index_type)) @@ -105,21 +122,27 @@ async def test_basic(async_client: aioredis.Redis, index_type): async def knn_query(idx, query, vector): params = {"vec": np.array(vector, dtype=np.float32).tobytes()} result = await idx.search(query, params) - return {doc['id'] for doc in result.docs} + return {doc["id"] for doc in result.docs} @dfly_args({"proactor_threads": 4}) @pytest.mark.parametrize("index_type", [IndexType.HASH, IndexType.JSON]) async def test_knn(async_client: aioredis.Redis, index_type): - i2 = async_client.ft("i2-"+str(index_type)) - - vector_field = VectorField("pos", "FLAT", { - "TYPE": "FLOAT32", - "DIM": 1, - "DISTANCE_METRIC": "L2", - }) - - await i2.create_index([TagField("even"), vector_field], definition=IndexDefinition(index_type=index_type)) + i2 = async_client.ft("i2-" + str(index_type)) + + vector_field = VectorField( + "pos", + "FLAT", + { + "TYPE": "FLOAT32", + "DIM": 1, + "DISTANCE_METRIC": "L2", + }, + ) + + await i2.create_index( + [TagField("even"), vector_field], definition=IndexDefinition(index_type=index_type) + ) pipe = async_client.pipeline() for i in range(100): @@ -131,13 +154,18 @@ async def test_knn(async_client: aioredis.Redis, index_type): pipe.json().set(f"k{i}", "$", {"even": even, "pos": [float(i)]}) await pipe.execute() - assert await knn_query(i2, "* => [KNN 3 @pos VEC]", [50.0]) == {'k49', 'k50', 'k51'} + assert await knn_query(i2, "* => [KNN 3 @pos VEC]", [50.0]) == {"k49", "k50", "k51"} - assert await knn_query(i2, "@even:{yes} => [KNN 3 @pos VEC]", [20.0]) == {'k18', 'k20', 'k22'} + assert await knn_query(i2, "@even:{yes} => [KNN 3 @pos VEC]", [20.0]) == {"k18", "k20", "k22"} - assert await knn_query(i2, "@even:{no} => [KNN 4 @pos VEC]", [30.0]) == {'k27', 'k29', 'k31', 'k33'} + assert await knn_query(i2, "@even:{no} => [KNN 4 @pos VEC]", [30.0]) == { + "k27", + "k29", + "k31", + "k33", + } - assert await knn_query(i2, "@even:{yes} => [KNN 3 @pos VEC]", [10.0] == {'k8', 'k10', 'k12'}) + assert await knn_query(i2, "@even:{yes} => [KNN 3 @pos VEC]", [10.0] == {"k8", "k10", "k12"}) NUM_DIMS = 10 @@ -147,21 +175,24 @@ async def test_knn(async_client: aioredis.Redis, index_type): @dfly_args({"proactor_threads": 4}) @pytest.mark.parametrize("index_type", [IndexType.HASH, IndexType.JSON]) async def test_multidim_knn(async_client: aioredis.Redis, index_type): - vector_field = VectorField("pos", "FLAT", { - "TYPE": "FLOAT32", - "DIM": NUM_DIMS, - "DISTANCE_METRIC": "L2", - }) - - i3 = async_client.ft("i3-"+str(index_type)) + vector_field = VectorField( + "pos", + "FLAT", + { + "TYPE": "FLOAT32", + "DIM": NUM_DIMS, + "DISTANCE_METRIC": "L2", + }, + ) + + i3 = async_client.ft("i3-" + str(index_type)) await i3.create_index([vector_field], definition=IndexDefinition(index_type=index_type)) def rand_point(): return np.random.uniform(0, 10, NUM_DIMS).astype(np.float32) # Generate points and send to DF - points = [rand_point() - for _ in range(NUM_POINTS)] + points = [rand_point() for _ in range(NUM_POINTS)] points = list(enumerate(points)) pipe = async_client.pipeline(transaction=False) @@ -175,10 +206,12 @@ def rand_point(): # Run 10 random queries for _ in range(10): center = rand_point() - limit = random.randint(1, NUM_POINTS/10) + limit = random.randint(1, NUM_POINTS / 10) - expected_ids = [f"k{i}" for i, point in sorted( - points, key=lambda p: np.linalg.norm(center - p[1]))[:limit]] + expected_ids = [ + f"k{i}" + for i, point in sorted(points, key=lambda p: np.linalg.norm(center - p[1]))[:limit] + ] got_ids = await knn_query(i3, f"* => [KNN {limit} @pos VEC]", center) diff --git a/tests/dragonfly/sentinel_test.py b/tests/dragonfly/sentinel_test.py index 47c30bcfc041..8bfe95f8e867 100644 --- a/tests/dragonfly/sentinel_test.py +++ b/tests/dragonfly/sentinel_test.py @@ -14,26 +14,28 @@ # Output is expected be of even number of lines where each pair of consecutive lines results in a single key value pair. # If new_dict_key is not empty, encountering it in the output will start a new dictionary, this let us return multiple # dictionaries, for example in the 'slaves' command, one dictionary for each slave. -def stdout_as_list_of_dicts(cp: subprocess.CompletedProcess, new_dict_key =""): +def stdout_as_list_of_dicts(cp: subprocess.CompletedProcess, new_dict_key=""): lines = cp.stdout.splitlines() res = [] d = None - if (new_dict_key == ''): + if new_dict_key == "": d = dict() res.append(d) for i in range(0, len(lines), 2): - if (lines[i]) == new_dict_key: # assumes output never has '' as a key + if (lines[i]) == new_dict_key: # assumes output never has '' as a key d = dict() res.append(d) d[lines[i]] = lines[i + 1] return res + def wait_for(func, pred, timeout_sec, timeout_msg=""): while not pred(func()): assert timeout_sec > 0, timeout_msg timeout_sec = timeout_sec - 1 time.sleep(1) + async def await_for(func, pred, timeout_sec, timeout_msg=""): done = False while not done: @@ -60,21 +62,26 @@ def start(self): config = [ f"port {self.port}", f"sentinel monitor {self.default_deployment} 127.0.0.1 {self.initial_master_port} 1", - f"sentinel down-after-milliseconds {self.default_deployment} 3000" - ] + f"sentinel down-after-milliseconds {self.default_deployment} 3000", + ] self.config_file.write_text("\n".join(config)) logging.info(self.config_file.read_text()) - self.proc = subprocess.Popen(["redis-server", f"{self.config_file.absolute()}", "--sentinel"]) + self.proc = subprocess.Popen( + ["redis-server", f"{self.config_file.absolute()}", "--sentinel"] + ) def stop(self): self.proc.terminate() self.proc.wait(timeout=10) - def run_cmd(self, args, sentinel_cmd=True, capture_output=False, assert_ok=True) -> subprocess.CompletedProcess: + def run_cmd( + self, args, sentinel_cmd=True, capture_output=False, assert_ok=True + ) -> subprocess.CompletedProcess: run_args = ["redis-cli", "-p", f"{self.port}"] - if sentinel_cmd: run_args = run_args + ["sentinel"] + if sentinel_cmd: + run_args = run_args + ["sentinel"] run_args = run_args + args cp = subprocess.run(run_args, capture_output=capture_output, text=True) if assert_ok: @@ -84,8 +91,10 @@ def run_cmd(self, args, sentinel_cmd=True, capture_output=False, assert_ok=True) def wait_ready(self): wait_for( lambda: self.run_cmd(["ping"], sentinel_cmd=False, assert_ok=False), - lambda cp:cp.returncode == 0, - timeout_sec=10, timeout_msg="Timeout waiting for sentinel to become ready.") + lambda cp: cp.returncode == 0, + timeout_sec=10, + timeout_msg="Timeout waiting for sentinel to become ready.", + ) def master(self, deployment="") -> dict: if deployment == "": @@ -108,10 +117,17 @@ def live_master_port(self, deployment=""): def failover(self, deployment=""): if deployment == "": deployment = self.default_deployment - self.run_cmd(["failover", deployment,]) + self.run_cmd( + [ + "failover", + deployment, + ] + ) -@pytest.fixture(scope="function") # Sentinel has state which we don't want carried over form test to test. +@pytest.fixture( + scope="function" +) # Sentinel has state which we don't want carried over form test to test. def sentinel(tmp_dir, port_picker) -> Sentinel: s = Sentinel(port_picker.get_available_port(), port_picker.get_available_port(), tmp_dir) s.start() @@ -138,9 +154,11 @@ async def test_failover(df_local_factory, sentinel, port_picker): # Verify sentinel picked up replica. await await_for( - lambda: sentinel.master(), - lambda m: m["num-slaves"] == "1", - timeout_sec=15, timeout_msg="Timeout waiting for sentinel to pick up replica.") + lambda: sentinel.master(), + lambda m: m["num-slaves"] == "1", + timeout_sec=15, + timeout_msg="Timeout waiting for sentinel to pick up replica.", + ) sentinel.failover() @@ -148,7 +166,8 @@ async def test_failover(df_local_factory, sentinel, port_picker): await await_for( lambda: sentinel.live_master_port(), lambda p: p == replica.port, - timeout_sec=10, timeout_msg="Timeout waiting for sentinel to report replica as master." + timeout_sec=10, + timeout_msg="Timeout waiting for sentinel to report replica as master.", ) assert sentinel.slaves()[0]["port"] == str(master.port) @@ -159,7 +178,8 @@ async def test_failover(df_local_factory, sentinel, port_picker): await await_for( lambda: master_client.get("key"), lambda val: val == b"value", - 15, "Timeout waiting for key to exist in replica." + 15, + "Timeout waiting for key to exist in replica.", ) except AssertionError: syncid, r_offset = await master_client.execute_command("DEBUG REPLICA OFFSET") @@ -197,9 +217,11 @@ async def test_master_failure(df_local_factory, sentinel, port_picker): # Verify sentinel picked up replica. await await_for( - lambda: sentinel.master(), - lambda m: m["num-slaves"] == "1", - timeout_sec=15, timeout_msg="Timeout waiting for sentinel to pick up replica.") + lambda: sentinel.master(), + lambda m: m["num-slaves"] == "1", + timeout_sec=15, + timeout_msg="Timeout waiting for sentinel to pick up replica.", + ) # Simulate master failure. master.stop() @@ -208,7 +230,8 @@ async def test_master_failure(df_local_factory, sentinel, port_picker): await await_for( lambda: sentinel.live_master_port(), lambda p: p == replica.port, - timeout_sec=300, timeout_msg="Timeout waiting for sentinel to report replica as master." + timeout_sec=300, + timeout_msg="Timeout waiting for sentinel to report replica as master.", ) # Verify we can now write to replica. diff --git a/tests/dragonfly/server_family_test.py b/tests/dragonfly/server_family_test.py index 6e050338d78d..dffc2f454ae9 100644 --- a/tests/dragonfly/server_family_test.py +++ b/tests/dragonfly/server_family_test.py @@ -5,7 +5,7 @@ def test_quit(connection): connection.send_command("QUIT") - assert connection.read_response() == b'OK' + assert connection.read_response() == b"OK" with pytest.raises(redis.exceptions.ConnectionError) as e: connection.read_response() @@ -16,7 +16,7 @@ def test_quit_after_sub(connection): connection.read_response() connection.send_command("QUIT") - assert connection.read_response() == b'OK' + assert connection.read_response() == b"OK" with pytest.raises(redis.exceptions.ConnectionError) as e: connection.read_response() @@ -30,7 +30,7 @@ async def test_multi_exec(async_client: aioredis.Redis): assert val == [True, "bar"] -''' +""" see https://github.com/dragonflydb/dragonfly/issues/457 For now we would not allow for eval command inside multi As this would create to level transactions (in effect recursive call @@ -38,7 +38,8 @@ async def test_multi_exec(async_client: aioredis.Redis): When this issue is fully fixed, this test would failed, and then it should change to match the fact that we supporting this operation. For now we are expecting to get an error -''' +""" + @pytest.mark.skip("Skip until we decided on correct behaviour of eval inside multi") async def test_multi_eval(async_client: aioredis.Redis): @@ -76,10 +77,12 @@ async def test_client_list(df_factory): instance.stop() await disconnect_clients(client, admin_client) + async def test_scan(async_client: aioredis.Redis): - ''' + """ make sure that the scan command is working with python - ''' + """ + def gen_test_data(): for i in range(10): yield f"key-{i}", f"value-{i}" diff --git a/tests/dragonfly/shutdown_test.py b/tests/dragonfly/shutdown_test.py index b740c11507c9..eb889c56a799 100644 --- a/tests/dragonfly/shutdown_test.py +++ b/tests/dragonfly/shutdown_test.py @@ -9,13 +9,16 @@ BASIC_ARGS = {"dir": "{DRAGONFLY_TMP}/"} -@pytest.mark.skip(reason='Currently we can not guarantee that on shutdown if command is executed and value is written we response before breaking the connection') +@pytest.mark.skip( + reason="Currently we can not guarantee that on shutdown if command is executed and value is written we response before breaking the connection" +) @dfly_args({"proactor_threads": "4"}) -class TestDflyAutoLoadSnapshot(): +class TestDflyAutoLoadSnapshot: """ Test automatic loading of dump files on startup with timestamp. When command is executed if a value is written we should send the response before shutdown """ + @pytest.mark.asyncio async def test_gracefull_shutdown(self, df_local_factory): df_args = {"dbfilename": "dump", **BASIC_ARGS, "port": 1111} @@ -39,7 +42,9 @@ async def delayed_takeover(): await client.execute_command("SHUTDOWN") await client.connection_pool.disconnect() - _, *results = await asyncio.gather(delayed_takeover(), *[counter(f"key{i}") for i in range(16)]) + _, *results = await asyncio.gather( + delayed_takeover(), *[counter(f"key{i}") for i in range(16)] + ) df_server.start() client = aioredis.Redis(port=df_server.port) diff --git a/tests/dragonfly/snapshot_test.py b/tests/dragonfly/snapshot_test.py index 6b8675708d4d..8135f34c1eda 100644 --- a/tests/dragonfly/snapshot_test.py +++ b/tests/dragonfly/snapshot_test.py @@ -18,9 +18,10 @@ def setup(self, tmp_dir: Path): self.tmp_dir = tmp_dir def get_main_file(self, pattern): - def is_main(f): return "summary" in f if pattern.endswith( - "dfs") else True - files = glob.glob(str(self.tmp_dir.absolute()) + '/' + pattern) + def is_main(f): + return "summary" in f if pattern.endswith("dfs") else True + + files = glob.glob(str(self.tmp_dir.absolute()) + "/" + pattern) possible_mains = list(filter(is_main, files)) assert len(possible_mains) == 1, possible_mains return possible_mains[0] @@ -29,6 +30,7 @@ def is_main(f): return "summary" in f if pattern.endswith( @dfly_args({**BASIC_ARGS, "dbfilename": "test-rdb-{{timestamp}}"}) class TestRdbSnapshot(SnapshotTestBase): """Test single file rdb snapshot""" + @pytest.fixture(autouse=True) def setup(self, tmp_dir: Path): super().setup(tmp_dir) @@ -51,6 +53,7 @@ async def test_snapshot(self, df_seeder_factory, async_client, df_server): @dfly_args({**BASIC_ARGS, "dbfilename": "test-rdbexact.rdb", "nodf_snapshot_format": None}) class TestRdbSnapshotExactFilename(SnapshotTestBase): """Test single file rdb snapshot without a timestamp""" + @pytest.fixture(autouse=True) def setup(self, tmp_dir: Path): super().setup(tmp_dir) @@ -74,6 +77,7 @@ async def test_snapshot(self, df_seeder_factory, async_client, df_server): @dfly_args({**BASIC_ARGS, "dbfilename": "test-dfs"}) class TestDflySnapshot(SnapshotTestBase): """Test multi file snapshot""" + @pytest.fixture(autouse=True) def setup(self, tmp_dir: Path): self.tmp_dir = tmp_dir @@ -88,16 +92,20 @@ async def test_snapshot(self, df_seeder_factory, async_client, df_server): # save + flush + load await async_client.execute_command("SAVE DF") assert await async_client.flushall() - await async_client.execute_command("DEBUG LOAD " + super().get_main_file("test-dfs-summary.dfs")) + await async_client.execute_command( + "DEBUG LOAD " + super().get_main_file("test-dfs-summary.dfs") + ) assert await seeder.compare(start_capture) + # We spawn instances manually, so reduce memory usage of default to minimum @dfly_args({"proactor_threads": "1"}) class TestDflyAutoLoadSnapshot(SnapshotTestBase): """Test automatic loading of dump files on startup with timestamp""" + @pytest.fixture(autouse=True) def setup(self, tmp_dir: Path): self.tmp_dir = tmp_dir @@ -115,8 +123,8 @@ def setup(self, tmp_dir: Path): @pytest.mark.parametrize("save_type, dbfilename", cases) async def test_snapshot(self, df_local_factory, save_type, dbfilename): df_args = {"dbfilename": dbfilename, **BASIC_ARGS, "port": 1111} - if save_type == 'rdb': - df_args['nodf_snapshot_format'] = "" + if save_type == "rdb": + df_args["nodf_snapshot_format"] = "" df_server = df_local_factory.create(**df_args) df_server.start() @@ -135,6 +143,7 @@ async def test_snapshot(self, df_local_factory, save_type, dbfilename): @dfly_args({**BASIC_ARGS, "dbfilename": "test-periodic", "save_schedule": "*:*"}) class TestPeriodicSnapshot(SnapshotTestBase): """Test periodic snapshotting""" + @pytest.fixture(autouse=True) def setup(self, tmp_dir: Path): super().setup(tmp_dir) @@ -142,7 +151,8 @@ def setup(self, tmp_dir: Path): @pytest.mark.asyncio async def test_snapshot(self, df_seeder_factory, df_server): seeder = df_seeder_factory.create( - port=df_server.port, keys=10, multi_transaction_probability=0) + port=df_server.port, keys=10, multi_transaction_probability=0 + ) await seeder.run(target_deviation=0.5) time.sleep(60) @@ -154,14 +164,14 @@ async def test_snapshot(self, df_seeder_factory, df_server): class TestPathEscapes(SnapshotTestBase): """Test that we don't allow path escapes. We just check that df_server.start() fails because we don't have a much better way to test that.""" + @pytest.fixture(autouse=True) def setup(self, tmp_dir: Path): super().setup(tmp_dir) @pytest.mark.asyncio async def test_snapshot(self, df_local_factory): - df_server = df_local_factory.create( - dbfilename="../../../../etc/passwd") + df_server = df_local_factory.create(dbfilename="../../../../etc/passwd") try: df_server.start() assert False, "Server should not start correctly" @@ -172,6 +182,7 @@ async def test_snapshot(self, df_local_factory): @dfly_args({**BASIC_ARGS, "dbfilename": "test-shutdown"}) class TestDflySnapshotOnShutdown(SnapshotTestBase): """Test multi file snapshot""" + @pytest.fixture(autouse=True) def setup(self, tmp_dir: Path): self.tmp_dir = tmp_dir @@ -192,15 +203,17 @@ async def test_snapshot(self, df_seeder_factory, df_server): assert await seeder.compare(start_capture) + @dfly_args({**BASIC_ARGS, "dbfilename": "test-info-persistence"}) class TestDflyInfoPersistenceLoadingField(SnapshotTestBase): """Test is_loading field on INFO PERSISTENCE during snapshot loading""" + @pytest.fixture(autouse=True) def setup(self, tmp_dir: Path): self.tmp_dir = tmp_dir def extract_is_loading_field(self, res): - matcher = b'loading:' + matcher = b"loading:" start = res.find(matcher) pos = start + len(matcher) return chr(res[pos]) @@ -211,9 +224,9 @@ async def test_snapshot(self, df_seeder_factory, df_server): await seeder.run(target_deviation=0.05) a_client = aioredis.Redis(port=df_server.port) - #Wait for snapshot to finish loading and try INFO PERSISTENCE + # Wait for snapshot to finish loading and try INFO PERSISTENCE await wait_available_async(a_client) res = await a_client.execute_command("INFO PERSISTENCE") - assert '0' == self.extract_is_loading_field(res) + assert "0" == self.extract_is_loading_field(res) await a_client.connection_pool.disconnect() diff --git a/tests/dragonfly/utility.py b/tests/dragonfly/utility.py index 386c2303e49d..c3c07c845b69 100644 --- a/tests/dragonfly/utility.py +++ b/tests/dragonfly/utility.py @@ -30,7 +30,7 @@ def eprint(*args, **kwargs): def gen_test_data(n, start=0, seed=None): for i in range(start, n): - yield "k-"+str(i), "v-"+str(i) + ("-"+str(seed) if seed else "") + yield "k-" + str(i), "v-" + str(i) + ("-" + str(seed) if seed else "") def batch_fill_data(client, gen, batch_size=100): @@ -43,16 +43,16 @@ async def wait_available_async(client: aioredis.Redis): its = 0 while True: try: - await client.get('key') + await client.get("key") return except aioredis.ResponseError as e: - if ("MOVED" in str(e)): + if "MOVED" in str(e): # MOVED means we *can* serve traffic, but 'key' does not belong to an owned slot return assert "Can not execute during LOADING" in str(e) # Print W to indicate test is waiting for replica - print('W', end='', flush=True) + print("W", end="", flush=True) await asyncio.sleep(0.01) its += 1 @@ -141,7 +141,8 @@ def randomize_key(self, t=None, pop=False): def generate_val(self, t: ValueType): """Generate filler value of configured size for type t""" - def rand_str(k=3, s=''): + + def rand_str(k=3, s=""): # Use small k value to reduce mem usage and increase number of ops return s.join(random.choices(string.ascii_letters, k=k)) @@ -150,19 +151,21 @@ def rand_str(k=3, s=''): return (rand_str(self.val_size),) elif t == ValueType.LIST: # Random sequence k-letter elements for LPUSH - return tuple(rand_str() for _ in range(self.val_size//4)) + return tuple(rand_str() for _ in range(self.val_size // 4)) elif t == ValueType.SET: # Random sequence of k-letter elements for SADD - return tuple(rand_str() for _ in range(self.val_size//4)) + return tuple(rand_str() for _ in range(self.val_size // 4)) elif t == ValueType.HSET: # Random sequence of k-letter keys + int and two start values for HSET - elements = ((rand_str(), random.randint(0, self.val_size)) - for _ in range(self.val_size//5)) - return ('v0', 0, 'v1', 0) + tuple(itertools.chain(*elements)) + elements = ( + (rand_str(), random.randint(0, self.val_size)) for _ in range(self.val_size // 5) + ) + return ("v0", 0, "v1", 0) + tuple(itertools.chain(*elements)) elif t == ValueType.ZSET: # Random sequnce of k-letter keys and int score for ZSET - elements = ((random.randint(0, self.val_size), rand_str()) - for _ in range(self.val_size//4)) + elements = ( + (random.randint(0, self.val_size), rand_str()) for _ in range(self.val_size // 4) + ) return tuple(itertools.chain(*elements)) elif t == ValueType.JSON: @@ -170,9 +173,8 @@ def rand_str(k=3, s=''): # - arr (array of random strings) # - ints (array of objects {i:random integer}) # - i (random integer) - ints = [{"i": random.randint(0, 100)} - for i in range(self.val_size//6)] - strs = [rand_str() for _ in range(self.val_size//6)] + ints = [{"i": random.randint(0, 100)} for i in range(self.val_size // 6)] + strs = [rand_str() for _ in range(self.val_size // 6)] return "$", json.dumps({"arr": strs, "ints": ints, "i": random.randint(0, 100)}) else: assert False, "Invalid ValueType" @@ -187,8 +189,9 @@ def gen_shrink_cmd(self): return None, 0 return f"PEXPIRE k{key} {random.randint(0, 50)}", -1 else: - keys_gen = (self.randomize_key(pop=True) - for _ in range(random.randint(1, self.max_multikey))) + keys_gen = ( + self.randomize_key(pop=True) for _ in range(random.randint(1, self.max_multikey)) + ) keys = [f"k{k}" for k, _ in keys_gen if k is not None] if len(keys) == 0: @@ -196,19 +199,19 @@ def gen_shrink_cmd(self): return "DEL " + " ".join(keys), -len(keys) UPDATE_ACTIONS = [ - ('APPEND {k} {val}', ValueType.STRING), - ('SETRANGE {k} 10 {val}', ValueType.STRING), - ('LPUSH {k} {val}', ValueType.LIST), - ('LPOP {k}', ValueType.LIST), - ('SADD {k} {val}', ValueType.SET), - ('SPOP {k}', ValueType.SET), - ('HSETNX {k} v0 {val}', ValueType.HSET), - ('HINCRBY {k} v1 1', ValueType.HSET), - ('ZPOPMIN {k} 1', ValueType.ZSET), - ('ZADD {k} 0 {val}', ValueType.ZSET), - ('JSON.NUMINCRBY {k} $..i 1', ValueType.JSON), - ('JSON.ARRPOP {k} $.arr', ValueType.JSON), - ('JSON.ARRAPPEND {k} $.arr "{val}"', ValueType.JSON) + ("APPEND {k} {val}", ValueType.STRING), + ("SETRANGE {k} 10 {val}", ValueType.STRING), + ("LPUSH {k} {val}", ValueType.LIST), + ("LPOP {k}", ValueType.LIST), + ("SADD {k} {val}", ValueType.SET), + ("SPOP {k}", ValueType.SET), + ("HSETNX {k} v0 {val}", ValueType.HSET), + ("HINCRBY {k} v1 1", ValueType.HSET), + ("ZPOPMIN {k} 1", ValueType.ZSET), + ("ZADD {k} 0 {val}", ValueType.ZSET), + ("JSON.NUMINCRBY {k} $..i 1", ValueType.JSON), + ("JSON.ARRPOP {k} $.arr", ValueType.JSON), + ('JSON.ARRAPPEND {k} $.arr "{val}"', ValueType.JSON), ] def gen_update_cmd(self): @@ -217,16 +220,16 @@ def gen_update_cmd(self): """ cmd, t = random.choice(self.UPDATE_ACTIONS) k, _ = self.randomize_key(t) - val = ''.join(random.choices(string.ascii_letters, k=3)) + val = "".join(random.choices(string.ascii_letters, k=3)) return cmd.format(k=f"k{k}", val=val) if k is not None else None, 0 GROW_ACTINONS = { - ValueType.STRING: 'MSET', - ValueType.LIST: 'LPUSH', - ValueType.SET: 'SADD', - ValueType.HSET: 'HMSET', - ValueType.ZSET: 'ZADD', - ValueType.JSON: 'JSON.SET' + ValueType.STRING: "MSET", + ValueType.LIST: "LPUSH", + ValueType.SET: "SADD", + ValueType.HSET: "HMSET", + ValueType.ZSET: "ZADD", + ValueType.JSON: "JSON.SET", } def gen_grow_cmd(self): @@ -241,14 +244,13 @@ def gen_grow_cmd(self): count = 1 keys = (self.add_key(t) for _ in range(count)) - payload = itertools.chain( - *((f"k{k}",) + self.generate_val(t) for k in keys)) + payload = itertools.chain(*((f"k{k}",) + self.generate_val(t) for k in keys)) filtered_payload = filter(lambda p: p is not None, payload) return (self.GROW_ACTINONS[t],) + tuple(filtered_payload), count def make(self, action): - """ Create command for action and return it together with number of keys added (removed)""" + """Create command for action and return it together with number of keys added (removed)""" if action == SizeChange.SHRINK: return self.gen_shrink_cmd() elif action == SizeChange.NO_CHANGE: @@ -269,8 +271,7 @@ def size_change_probs(self): return [ max(self.base_diff_prob - self.diff_speed * dist, self.min_diff_prob), 1.0, - max(self.base_diff_prob + 2 * - self.diff_speed * dist, self.min_diff_prob) + max(self.base_diff_prob + 2 * self.diff_speed * dist, self.min_diff_prob), ] def generate(self): @@ -280,15 +281,14 @@ def generate(self): while len(cmds) < self.batch_size: # Re-calculating changes in small groups if len(changes) == 0: - changes = random.choices( - list(SizeChange), weights=self.size_change_probs(), k=20) + changes = random.choices(list(SizeChange), weights=self.size_change_probs(), k=20) cmd, delta = self.make(changes.pop()) if cmd is not None: cmds.append(cmd) self.key_cnt += delta - return cmds, self.key_cnt/self.key_cnt_target + return cmds, self.key_cnt / self.key_cnt_target class DataCapture: @@ -311,7 +311,7 @@ def _print_diff(self, other): printed = 0 diff = difflib.ndiff(self.entries, other.entries) for line in diff: - if line.startswith(' '): + if line.startswith(" "): continue eprint(line) if printed >= 20: @@ -344,10 +344,20 @@ class DflySeeder: assert await seeder.compare(capture, port=1112) """ - def __init__(self, port=6379, keys=1000, val_size=50, batch_size=100, max_multikey=5, dbcount=1, multi_transaction_probability=0.3, log_file=None, unsupported_types=[], stop_on_failure=True): - self.gen = CommandGenerator( - keys, val_size, batch_size, max_multikey, unsupported_types - ) + def __init__( + self, + port=6379, + keys=1000, + val_size=50, + batch_size=100, + max_multikey=5, + dbcount=1, + multi_transaction_probability=0.3, + log_file=None, + unsupported_types=[], + stop_on_failure=True, + ): + self.gen = CommandGenerator(keys, val_size, batch_size, max_multikey, unsupported_types) self.port = port self.dbcount = dbcount self.multi_transaction_probability = multi_transaction_probability @@ -356,7 +366,7 @@ def __init__(self, port=6379, keys=1000, val_size=50, batch_size=100, max_multik self.log_file = log_file if self.log_file is not None: - open(self.log_file, 'w').close() + open(self.log_file, "w").close() async def run(self, target_ops=None, target_deviation=None): """ @@ -366,11 +376,11 @@ async def run(self, target_ops=None, target_deviation=None): print(f"Running ops:{target_ops} deviation:{target_deviation}") self.stop_flag = False queues = [asyncio.Queue(maxsize=3) for _ in range(self.dbcount)] - producer = asyncio.create_task(self._generator_task( - queues, target_ops=target_ops, target_deviation=target_deviation)) + producer = asyncio.create_task( + self._generator_task(queues, target_ops=target_ops, target_deviation=target_deviation) + ) consumers = [ - asyncio.create_task(self._executor_task(i, queue)) - for i, queue in enumerate(queues) + asyncio.create_task(self._executor_task(i, queue)) for i, queue in enumerate(queues) ] time_start = time.time() @@ -388,7 +398,7 @@ def stop(self): self.stop_flag = True def reset(self): - """ Reset internal state. Needs to be called after flush or restart""" + """Reset internal state. Needs to be called after flush or restart""" self.gen.reset() async def capture(self, port=None): @@ -398,9 +408,9 @@ async def capture(self, port=None): port = self.port keys = sorted(list(self.gen.keys_and_types())) - captures = await asyncio.gather(*( - self._capture_db(port=port, target_db=db, keys=keys) for db in range(self.dbcount) - )) + captures = await asyncio.gather( + *(self._capture_db(port=port, target_db=db, keys=keys) for db in range(self.dbcount)) + ) return captures async def compare(self, initial_captures, port=6379): @@ -408,7 +418,9 @@ async def compare(self, initial_captures, port=6379): print(f"comparing capture to {port}") target_captures = await self.capture(port=port) - for db, target_capture, initial_capture in zip(range(self.dbcount), target_captures, initial_captures): + for db, target_capture, initial_capture in zip( + range(self.dbcount), target_captures, initial_captures + ): print(f"comparing capture to {port}, db: {db}") if not initial_capture.compare(target_capture): eprint(f">>> Inconsistent data on port {port}, db {db}") @@ -433,14 +445,14 @@ async def _generator_task(self, queues, target_ops=None, target_deviation=None): file = None if self.log_file: - file = open(self.log_file, 'a') + file = open(self.log_file, "a") def should_run(): if self.stop_flag: return False if target_ops is not None and submitted >= target_ops: return False - if target_deviation is not None and abs(1-deviation) < target_deviation: + if target_deviation is not None and abs(1 - deviation) < target_deviation: return False return True @@ -455,7 +467,7 @@ def stringify_cmd(cmd): blob, deviation = self.gen.generate() is_multi_transaction = random.random() < self.multi_transaction_probability tx_data = (blob, is_multi_transaction) - cpu_time += (time.time() - start_time) + cpu_time += time.time() - start_time await asyncio.gather(*(q.put(tx_data) for q in queues)) submitted += len(blob) @@ -463,14 +475,12 @@ def stringify_cmd(cmd): if file is not None: pattern = "MULTI\n{}\nEXEC\n" if is_multi_transaction else "{}\n" - file.write(pattern.format('\n'.join(stringify_cmd(cmd) - for cmd in blob))) + file.write(pattern.format("\n".join(stringify_cmd(cmd) for cmd in blob))) - print('.', end='', flush=True) + print(".", end="", flush=True) await asyncio.sleep(0.0) - print("\ncpu time", cpu_time, "batches", - batches, "commands", submitted) + print("\ncpu time", cpu_time, "batches", batches, "commands", submitted) await asyncio.gather(*(q.put(None) for q in queues)) for q in queues: @@ -512,20 +522,19 @@ async def _executor_task(self, db, queue): ValueType.LIST: lambda pipe, k: pipe.lrange(k, 0, -1), ValueType.SET: lambda pipe, k: pipe.smembers(k), ValueType.HSET: lambda pipe, k: pipe.hgetall(k), - ValueType.ZSET: lambda pipe, k: pipe.zrange( - k, start=0, end=-1, withscores=True), - ValueType.JSON: lambda pipe, k: pipe.execute_command( - "JSON.GET", k, "$") + ValueType.ZSET: lambda pipe, k: pipe.zrange(k, start=0, end=-1, withscores=True), + ValueType.JSON: lambda pipe, k: pipe.execute_command("JSON.GET", k, "$"), } CAPTURE_EXTRACTORS = { ValueType.STRING: lambda res, tostr: (tostr(res),), ValueType.LIST: lambda res, tostr: (tostr(s) for s in res), ValueType.SET: lambda res, tostr: sorted(tostr(s) for s in res), - ValueType.HSET: lambda res, tostr: sorted(tostr(k)+"="+tostr(v) for k, v in res.items()), - ValueType.ZSET: lambda res, tostr: ( - tostr(s)+"-"+str(f) for (s, f) in res), - ValueType.JSON: lambda res, tostr: (tostr(res),) + ValueType.HSET: lambda res, tostr: sorted( + tostr(k) + "=" + tostr(v) for k, v in res.items() + ), + ValueType.ZSET: lambda res, tostr: (tostr(s) + "-" + str(f) for (s, f) in res), + ValueType.JSON: lambda res, tostr: (tostr(res),), } async def _capture_entries(self, client, keys): @@ -540,8 +549,7 @@ def tostr(b): results = await pipe.execute() for (k, t), res in zip(group, results): - out = f"{t.name} k{k}: " + \ - ' '.join(self.CAPTURE_EXTRACTORS[t](res, tostr)) + out = f"{t.name} k{k}: " + " ".join(self.CAPTURE_EXTRACTORS[t](res, tostr)) entries.append(out) return entries @@ -563,11 +571,13 @@ async def disconnect_clients(*clients): await asyncio.gather(*(c.connection_pool.disconnect() for c in clients)) -def gen_certificate(ca_key_path, ca_certificate_path, certificate_request_path, private_key_path, certificate_path): +def gen_certificate( + ca_key_path, ca_certificate_path, certificate_request_path, private_key_path, certificate_path +): # Generate Dragonfly's private key and certificate signing request (CSR) step1 = rf'openssl req -newkey rsa:4096 -nodes -keyout {private_key_path} -out {certificate_request_path} -subj "/C=GR/ST=SKG/L=Thessaloniki/O=KK/OU=Comp/CN=Gr/emailAddress=does_not_exist@gmail.com"' subprocess.run(step1, shell=True) # Use CA's private key to sign dragonfly's CSR and get back the signed certificate - step2 = fr'openssl x509 -req -in {certificate_request_path} -days 1 -CA {ca_certificate_path} -CAkey {ca_key_path} -CAcreateserial -out {certificate_path}' + step2 = rf"openssl x509 -req -in {certificate_request_path} -days 1 -CA {ca_certificate_path} -CAkey {ca_key_path} -CAcreateserial -out {certificate_path}" subprocess.run(step2, shell=True) diff --git a/tests/integration/async.py b/tests/integration/async.py index f4a31c89358f..2c9926fc602b 100755 --- a/tests/integration/async.py +++ b/tests/integration/async.py @@ -13,12 +13,14 @@ import sys import random -connection_pool = aioredis.ConnectionPool(host="localhost", port=6379, - db=1, decode_responses=True, max_connections=16) +connection_pool = aioredis.ConnectionPool( + host="localhost", port=6379, db=1, decode_responses=True, max_connections=16 +) key_index = 1 + async def post_to_redis(sem, db_name, index): global key_index async with sem: @@ -26,10 +28,10 @@ async def post_to_redis(sem, db_name, index): try: redis_client = aioredis.Redis(connection_pool=connection_pool) async with redis_client.pipeline(transaction=True) as pipe: - for i in range(1, 15): + for i in range(1, 15): pipe.hsetnx(name=f"key_{key_index}", key="name", value="bla") key_index += 1 - #log.info(f"after first half {key_index}") + # log.info(f"after first half {key_index}") for i in range(1, 15): pipe.hsetnx(name=f"bla_{key_index}", key="name2", value="bla") key_index += 1 @@ -40,8 +42,8 @@ async def post_to_redis(sem, db_name, index): finally: # log.info(f"before close {index}") await redis_client.close() - #log.info(f"after close {index} {len(results)}") - + # log.info(f"after close {index} {len(results)}") + async def do_concurrent(db_name): tasks = [] @@ -49,10 +51,10 @@ async def do_concurrent(db_name): for i in range(1, 3000): tasks.append(post_to_redis(sem, db_name, i)) res = await asyncio.gather(*tasks) - -if __name__ == '__main__': + +if __name__ == "__main__": log.remove() - log.add(sys.stdout, enqueue=True, level='INFO') + log.add(sys.stdout, enqueue=True, level="INFO") loop = asyncio.get_event_loop() loop.run_until_complete(do_concurrent("my_db")) diff --git a/tests/integration/generate_sets.py b/tests/integration/generate_sets.py index 840ace78ddd2..f9d5d98f6362 100755 --- a/tests/integration/generate_sets.py +++ b/tests/integration/generate_sets.py @@ -12,40 +12,40 @@ def fill_set(args, redis: rclient.Redis): for j in range(args.num): token = uuid.uuid1().hex # print(token) - key = f'USER_OTP:{token}' + key = f"USER_OTP:{token}" arr = [] for i in range(30): - otp = ''.join(random.choices( - string.ascii_uppercase + string.digits, k=12)) + otp = "".join(random.choices(string.ascii_uppercase + string.digits, k=12)) arr.append(otp) - redis.execute_command('sadd', key, *arr) - + redis.execute_command("sadd", key, *arr) + + def fill_hset(args, redis): for j in range(args.num): token = uuid.uuid1().hex - key = f'USER_INFO:{token}' - phone = f'555-999-{j}' - user_id = 'user' * 5 + f'-{j}' - redis.hset(key, 'phone', phone) - redis.hset(key, 'user_id', user_id) - redis.hset(key, 'login_time', time.time()) + key = f"USER_INFO:{token}" + phone = f"555-999-{j}" + user_id = "user" * 5 + f"-{j}" + redis.hset(key, "phone", phone) + redis.hset(key, "user_id", user_id) + redis.hset(key, "login_time", time.time()) def main(): - parser = argparse.ArgumentParser(description='fill hset entities') - parser.add_argument( - '-p', type=int, help='redis port', dest='port', default=6380) + parser = argparse.ArgumentParser(description="fill hset entities") + parser.add_argument("-p", type=int, help="redis port", dest="port", default=6380) + parser.add_argument("-n", type=int, help="number of keys", dest="num", default=10000) parser.add_argument( - '-n', type=int, help='number of keys', dest='num', default=10000) - parser.add_argument( - '--type', type=str, choices=['hset', 'set'], help='set type', default='hset') + "--type", type=str, choices=["hset", "set"], help="set type", default="hset" + ) args = parser.parse_args() - redis = rclient.Redis(host='localhost', port=args.port, db=0) - if args.type == 'hset': + redis = rclient.Redis(host="localhost", port=args.port, db=0) + if args.type == "hset": fill_hset(args, redis) - elif args.type == 'set': + elif args.type == "set": fill_set(args, redis) + if __name__ == "__main__": main()