Skip to content

Commit

Permalink
Merge pull request ClickHouse#58007 from ClickHouse/chesema-stateless…
Browse files Browse the repository at this point in the history
…-run-timeout

more messages in ci
  • Loading branch information
CheSema authored Dec 20, 2023
2 parents ae42704 + b4fec61 commit 291567a
Show file tree
Hide file tree
Showing 3 changed files with 111 additions and 40 deletions.
4 changes: 2 additions & 2 deletions docker/test/stateless/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -216,11 +216,11 @@ export -f run_tests
if [ "$NUM_TRIES" -gt "1" ]; then
# We don't run tests with Ordinary database in PRs, only in master.
# So run new/changed tests with Ordinary at least once in flaky check.
timeout "$MAX_RUN_TIME" bash -c 'NUM_TRIES=1; USE_DATABASE_ORDINARY=1; run_tests' \
timeout_with_logging "$MAX_RUN_TIME" bash -c 'NUM_TRIES=1; USE_DATABASE_ORDINARY=1; run_tests' \
| sed 's/All tests have finished//' | sed 's/No tests were run//' ||:
fi

timeout "$MAX_RUN_TIME" bash -c run_tests ||:
timeout_with_logging "$MAX_RUN_TIME" bash -c run_tests ||:

echo "Files in current directory"
ls -la ./
Expand Down
13 changes: 13 additions & 0 deletions docker/test/stateless/utils.lib
Original file line number Diff line number Diff line change
Expand Up @@ -35,4 +35,17 @@ function fn_exists() {
declare -F "$1" > /dev/null;
}

function timeout_with_logging() {
local exit_code=0

timeout "${@}" || exit_code="${?}"

if [[ "${exit_code}" -eq "124" ]]
then
echo "The command 'timeout ${*}' has been killed by timeout"
fi

return $exit_code
}

# vi: ft=bash
134 changes: 96 additions & 38 deletions tests/clickhouse-test
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ from typing import Tuple, Union, Optional, Dict, Set, List
import subprocess
from subprocess import Popen
from subprocess import PIPE
from datetime import datetime
from datetime import datetime, timedelta
from time import time, sleep
from errno import ESRCH

Expand Down Expand Up @@ -279,36 +279,42 @@ def need_retry(args, stdout, stderr, total_time):
)


def get_processlist_with_stacktraces(args):
try:
if args.replicated_database:
return clickhouse_execute(
def get_processlist_size(args):
if args.replicated_database:
return int(
clickhouse_execute(
args,
"""
SELECT materialize(hostName() || '::' || tcpPort()::String) as host_port, *
-- NOTE: view() here to do JOIN on shards, instead of initiator
FROM clusterAllReplicas('test_cluster_database_replicated', view(
SELECT
p.*,
arrayStringConcat(groupArray('Thread ID ' || toString(s.thread_id) || '\n' || arrayStringConcat(arrayMap(
x -> concat(addressToLine(x), '::', demangle(addressToSymbol(x))),
s.trace), '\n') AS stacktrace
)) AS stacktraces
FROM system.processes p
JOIN system.stack_trace s USING (query_id)
count()
FROM
FROM system.processes
WHERE query NOT LIKE '%system.processes%'
GROUP BY p.*
))
ORDER BY elapsed DESC FORMAT Vertical
""",
settings={
"allow_introspection_functions": 1,
},
)
else:
return clickhouse_execute(
""",
).strip()
)
else:
return int(
clickhouse_execute(
args,
"""
SELECT
count()
FROM system.processes
WHERE query NOT LIKE '%system.processes%'
""",
).strip()
)


def get_processlist_with_stacktraces(args):
if args.replicated_database:
return clickhouse_execute(
args,
"""
SELECT materialize(hostName() || '::' || tcpPort()::String) as host_port, *
-- NOTE: view() here to do JOIN on shards, instead of initiator
FROM clusterAllReplicas('test_cluster_database_replicated', view(
SELECT
p.*,
arrayStringConcat(groupArray('Thread ID ' || toString(s.thread_id) || '\n' || arrayStringConcat(arrayMap(
Expand All @@ -319,14 +325,35 @@ def get_processlist_with_stacktraces(args):
JOIN system.stack_trace s USING (query_id)
WHERE query NOT LIKE '%system.processes%'
GROUP BY p.*
ORDER BY elapsed DESC FORMAT Vertical
""",
settings={
"allow_introspection_functions": 1,
},
)
except Exception as e:
return "Failed to get processlist: " + str(e)
))
ORDER BY elapsed DESC FORMAT Vertical
""",
settings={
"allow_introspection_functions": 1,
},
timeout=120,
)
else:
return clickhouse_execute(
args,
"""
SELECT
p.*,
arrayStringConcat(groupArray('Thread ID ' || toString(s.thread_id) || '\n' || arrayStringConcat(arrayMap(
x -> concat(addressToLine(x), '::', demangle(addressToSymbol(x))),
s.trace), '\n') AS stacktrace
)) AS stacktraces
FROM system.processes p
JOIN system.stack_trace s USING (query_id)
WHERE query NOT LIKE '%system.processes%'
GROUP BY p.*
ORDER BY elapsed DESC FORMAT Vertical
""",
settings={
"allow_introspection_functions": 1,
},
timeout=120,
)


def get_transactions_list(args):
Expand Down Expand Up @@ -2427,11 +2454,42 @@ def main(args):

if args.hung_check:
# Some queries may execute in background for some time after test was finished. This is normal.
for _ in range(1, 60):
processlist = get_processlist_with_stacktraces(args)
if not processlist:
break
sleep(1)
print("Checking the hung queries: ", end="")
hung_count = 0
try:
deadline = datetime.now() + timedelta(seconds=90)
while datetime.now() < deadline:
hung_count = get_processlist_size(args)
if hung_count == 0:
print(" done")
break
print(". ", end="")
except Exception as e:
print(
colored(
"\nHung check failed. Failed to get processlist size: " + str(e),
args,
"red",
attrs=["bold"],
)
)
exit_code.value = 1

processlist = ""
if hung_count > 0:
try:
processlist = get_processlist_with_stacktraces(args)
except Exception as e:
print(
colored(
"\nHung check failed. Failed to get processlist with stacktraces: "
+ str(e),
args,
"red",
attrs=["bold"],
)
)
exit_code.value = 1

if processlist:
print(
Expand Down

0 comments on commit 291567a

Please sign in to comment.