Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

PBM-1223 load test #125

Merged
merged 1 commit into from
Feb 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion pbm-functional/pytest/cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -429,7 +429,8 @@ def make_restore(self, name, **kwargs):
assert False, "Cannot start restore, another operation running"
time.sleep(1)
Cluster.log("Restore started")
result = n.run('timeout 240 pbm restore ' + name + ' --wait')
timeout=kwargs.get('timeout', 240)
result = n.run('timeout ' + str(timeout) + ' pbm restore ' + name + ' --wait')
if result.rc == 124:
# try to catch possible failures if timeout exceeded
for host in self.mongod_hosts:
Expand Down
85 changes: 84 additions & 1 deletion pbm-functional/pytest/test_PBM-1223.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,9 @@
import time
import os
import docker
import threading
import concurrent.futures
import random
import json

from datetime import datetime
from cluster import Cluster
Expand Down Expand Up @@ -45,6 +46,8 @@
client=pymongo.MongoClient(cluster.connection)
client.admin.command("enableSharding", "test")
client.admin.command("shardCollection", "test.test", key={"_id": "hashed"})
client.admin.command("shardCollection", "test.test1", key={"_id": "hashed"})
client.admin.command("shardCollection", "test.test2", key={"_id": "hashed"})
yield True

finally:
Expand Down Expand Up @@ -76,7 +79,87 @@
Cluster.log("Transaction commited\n")
backup=background_backup.result()
assert pymongo.MongoClient(cluster.connection)["test"]["test"].count_documents({}) == 8
cluster.make_restore(backup,check_pbm_status=True)

Check failure on line 82 in pbm-functional/pytest/test_PBM-1223.py

View workflow job for this annotation

GitHub Actions / JUnit Test Report

test_PBM-1223.test_logical

AssertionError: Starting restore 2024-02-29T10:23:10.661426885Z from '2024-02-29T10:22:53Z'...Started logical restore. Waiting to finish....Error: operation failed with: waiting for dumpDone: cluster failed: waiting for dumpDone: cluster failed: reply oplog: replay chunk 1709202174.1709202179: apply oplog for chunk: applying a transaction entry: apply txn: { "Timestamp": { "T": 1709202174, "I": 13 }, "Term": 1, "Hash": null, "Version": 2, "Operation": "c", "Namespace": "admin.$cmd", "Object": [ { "Key": "commitTransaction", "Value": 1 }, { "Key": "commitTimestamp", "Value": { "T": 1709202174, "I": 10 } } ], "Query": null, "UI": null, "LSID": "SAAAAAVpZAAQAAAABDi28rJGRUhMsjVm12fCmukFdWlkACAAAAAAY5mrDaxi8gv8RmdTsQ+1j7fmkr7JUsabhNmXAheU0fgA", "TxnNumber": 1, "PrevOpTime": "HAAAABF0cwAKAAAA/lrgZRJ0AAEAAAAAAAAAAA==" }: unknown transaction id SAAAAAVpZAAQAAAABDi28rJGRUhMsjVm12fCmukFdWlkACAAAAAAY5mrDaxi8gv8RmdTsQ+1j7fmkr7JUsabhNmXAheU0fgA-1
Raw output
start_cluster = True, cluster = <cluster.Cluster object at 0x7fd96ada4790>

    @pytest.mark.testcase(test_case_key="T249", test_step_key=1)
    @pytest.mark.timeout(300,func_only=True)
    def test_logical(start_cluster,cluster):
        cluster.check_pbm_status()
        client = pymongo.MongoClient(cluster.connection)
        db = client.test
        collection = db.test
        with client.start_session() as session:
            with session.start_transaction():
                Cluster.log("Transaction started\n")
                collection.insert_one({"e": 5}, session=session)
                collection.insert_one({"f": 6}, session=session)
                collection.insert_one({"g": 7}, session=session)
                collection.insert_one({"h": 8}, session=session)
                collection.insert_one({"i": 9}, session=session)
                background_backup=concurrent.futures.ThreadPoolExecutor().submit(cluster.make_backup, 'logical')
                time.sleep(1)
                collection.insert_one({"j": 10}, session=session)
                collection.insert_one({"k": 11}, session=session)
                collection.insert_one({"l": 12}, session=session)
                session.commit_transaction()
                Cluster.log("Transaction commited\n")
        backup=background_backup.result()
        assert pymongo.MongoClient(cluster.connection)["test"]["test"].count_documents({}) == 8
>       cluster.make_restore(backup,check_pbm_status=True)

test_PBM-1223.py:82: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <cluster.Cluster object at 0x7fd96ada4790>, name = '2024-02-29T10:22:53Z'
kwargs = {'check_pbm_status': True}
client = MongoClient(host=['mongos:27017'], document_class=dict, tz_aware=False, connect=True)
result = CommandResult(backend=<testinfra.backend.docker.DockerBackend object at 0x7fd9536d0090>, exit_status=1, command=b'time... id SAAAAAVpZAAQAAAABDi28rJGRUhMsjVm12fCmukFdWlkACAAAAAAY5mrDaxi8gv8RmdTsQ+1j7fmkr7JUsabhNmXAheU0fgA-1\n', _stderr=b'')
n = <testinfra.host.Host docker://rscfg01>, timeout = 240

    def make_restore(self, name, **kwargs):
        if self.layout == "sharded":
            client = pymongo.MongoClient(self.connection)
            result = client.admin.command("balancerStop")
            client.close()
            Cluster.log("Stopping balancer: " + str(result))
            self.stop_mongos()
        self.stop_arbiters()
        n = testinfra.get_host("docker://" + self.pbm_cli)
        timeout = time.time() + 60
    
        while True:
            if not self.get_status()['running']:
                break
            if time.time() > timeout:
                assert False, "Cannot start restore, another operation running"
            time.sleep(1)
        Cluster.log("Restore started")
        timeout=kwargs.get('timeout', 240)
        result = n.run('timeout ' + str(timeout) + ' pbm restore ' + name + ' --wait')
        if result.rc == 124:
            # try to catch possible failures if timeout exceeded
            for host in self.mongod_hosts:
                try:
                    container = docker.from_env().containers.get(host)
                    get_logs = container.exec_run(
                        'cat /var/lib/mongo/pbm.restore.log', stderr=False)
                    if get_logs.exit_code == 0:
                        Cluster.log(
                            "!!!!Possible failure on {}, file pbm.restore.log was found:".format(host))
                        Cluster.log(get_logs.output.decode('utf-8'))
                except docker.errors.APIError:
                    pass
            assert False, "Timeout for restore exceeded"
        elif result.rc == 0:
            Cluster.log(result.stdout)
        else:
>           assert False, result.stdout + result.stderr
E           AssertionError: Starting restore 2024-02-29T10:23:10.661426885Z from '2024-02-29T10:22:53Z'...Started logical restore.
E           Waiting to finish....Error: operation failed with: waiting for dumpDone: cluster failed: waiting for dumpDone: cluster failed: reply oplog: replay chunk 1709202174.1709202179: apply oplog for chunk: applying a transaction entry: apply txn: {
E            "Timestamp": {
E             "T": 1709202174,
E             "I": 13
E            },
E            "Term": 1,
E            "Hash": null,
E            "Version": 2,
E            "Operation": "c",
E            "Namespace": "admin.$cmd",
E            "Object": [
E             {
E              "Key": "commitTransaction",
E              "Value": 1
E             },
E             {
E              "Key": "commitTimestamp",
E              "Value": {
E               "T": 1709202174,
E               "I": 10
E              }
E             }
E            ],
E            "Query": null,
E            "UI": null,
E            "LSID": "SAAAAAVpZAAQAAAABDi28rJGRUhMsjVm12fCmukFdWlkACAAAAAAY5mrDaxi8gv8RmdTsQ+1j7fmkr7JUsabhNmXAheU0fgA",
E            "TxnNumber": 1,
E            "PrevOpTime": "HAAAABF0cwAKAAAA/lrgZRJ0AAEAAAAAAAAAAA=="
E           }: unknown transaction id SAAAAAVpZAAQAAAABDi28rJGRUhMsjVm12fCmukFdWlkACAAAAAAY5mrDaxi8gv8RmdTsQ+1j7fmkr7JUsabhNmXAheU0fgA-1

cluster.py:451: AssertionError
assert pymongo.MongoClient(cluster.connection)["test"]["test"].count_documents({}) == 8
assert pymongo.MongoClient(cluster.connection)["test"].command("collstats", "test").get("sharded", False)
Cluster.log("Finished successfully\n")

@pytest.mark.timeout(3600,func_only=True)
def test_load(start_cluster,cluster):
# run transactions, returns array of tuples each includes oplog timestamp, oplog increment, resulted documents count
def background_transaction(db,collection):
Cluster.log("Starting background insert to " + collection)
j = 0
result = []
while upsert:
data = random.randbytes(1024 * 1024)
client = pymongo.MongoClient(cluster.connection)
with client.start_session() as session:
try:
with session.start_transaction():
for i in range(20):
client[db][collection].insert_one({str(i): data }, session=session)
timeout = random.uniform(0.4,0.6)
time.sleep(timeout)
session.commit_transaction()
j = j + 20
Cluster.log(collection + ": " + str(session.cluster_time['clusterTime'].time) + "." + str(session.cluster_time['clusterTime'].inc) + " " + str(j))
timestamp = float(str(session.cluster_time['clusterTime'].time) + "." + str(session.cluster_time['clusterTime'].inc))
result.append((timestamp,j))
except Exception as e:
Cluster.log(e)
continue
finally:
client.close()
Cluster.log("Stopping background insert to " + collection)
return result

cluster.check_pbm_status()
upsert=True
background_transaction1 = concurrent.futures.ThreadPoolExecutor().submit(background_transaction, 'test', 'test1')
background_transaction2 = concurrent.futures.ThreadPoolExecutor().submit(background_transaction, 'test', 'test2')

time.sleep(300)
backup=cluster.make_backup('logical')

upsert=False
upsert1_result = background_transaction1.result()
upsert2_result = background_transaction2.result()
Cluster.log("test1 documents count: " + str(pymongo.MongoClient(cluster.connection)["test"]["test1"].count_documents({})))
Cluster.log("test2 documents count: " + str(pymongo.MongoClient(cluster.connection)["test"]["test2"].count_documents({})))

# backup_meta=json.loads(cluster.exec_pbm_cli("describe-backup " + backup + " --out=json").stdout)
# since pbm describe-backup doesn't return exact oplog timestamp let's check metadata on the storage
backup_meta=json.loads(testinfra.get_host("docker://rscfg01").check_output('cat /backups/' + backup + '.pbm.json'))
Cluster.log(json.dumps(backup_meta, indent=4))
last_write_ts = float(str(backup_meta["last_write_ts"]["T"]) + "." + str(backup_meta["last_write_ts"]["I"]))

# let's find the real count of documents inserted till last_write_ts
# we have the array of tuples containing oplog timestamp and resulted documents count like
# [(1709134884.23, 20), (1709134886.5, 40), (1709134887.39, 60), (1709134889.6, 80), (1709134891.5, 100), (1709134955.4, 120)]
# the second argument is last_write_ts like 1709134954.11
# the result should be (1709134891.5, 100)
# result should be t[1] from the resulted tuple
def find_inserted(array_tuples,timestamp):
print(array_tuples)
print(timestamp)
resulted_tuples = [t for t in array_tuples if t[0] <= timestamp]
result = max(resulted_tuples, key=lambda t: t[0])
print(result)
return result[1]

inserted_test1 = find_inserted(upsert1_result,last_write_ts)
inserted_test2 = find_inserted(upsert2_result,last_write_ts)
Cluster.log("test1 inserted count: " + str(inserted_test1))
Cluster.log("test2 inserted count: " + str(inserted_test2))

cluster.make_restore(backup,check_pbm_status=True,timeout=600)

count_test1 = pymongo.MongoClient(cluster.connection)["test"]["test1"].count_documents({})
count_test2 = pymongo.MongoClient(cluster.connection)["test"]["test2"].count_documents({})
Cluster.log("test1 documents count: " + str(count_test1))
Cluster.log("test2 documents count: " + str(count_test2))

assert inserted_test1 == count_test1
assert inserted_test2 == count_test2
Cluster.log("Finished successfully\n")
Loading