Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Dynamic pagination in OCP tab #118

Open
wants to merge 14 commits into
base: revamp
Choose a base branch
from
5 changes: 4 additions & 1 deletion .github/workflows/build-push.yaml
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
name: Build and Push Image
on: [ push ]
on:
push:
branches:
- main

jobs:
build:
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/release-build-push.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ name: Release Build and Push Image
on:
push:
tags:
- "*"
- "*"

jobs:
build:
Expand Down
223 changes: 184 additions & 39 deletions backend/app/api/v1/commons/ocp.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,51 +2,196 @@
import pandas as pd
import app.api.v1.commons.utils as utils
from app.services.search import ElasticService
import json

def buildFilterQuery(filter: dict, query: dict):
minimum_match = 0
filter_dict = json.loads(filter)
if bool(filter_dict):
for key,val in filter_dict.items():
if key == "workerNodesCount":
query["query"]["bool"]["filter"].append({"terms":{"workerNodesCount":val}})
elif key == "build":
for item in filter_dict["build"]:
buildObj = getMatchPhrase("ocpVersion", item)
query["query"]["bool"]["should"].append(buildObj)
minimum_match+=1
elif key == "jobType":
for item in filter_dict["jobType"]:
obj = getMatchPhrase("upstreamJob", item)
query["query"]["bool"]["should"].append(obj)
minimum_match+=1
elif key == "isRehearse":
rehearseObj = {"match_phrase": {"upstreamJob":"rehearse"}}
if True in filter_dict["isRehearse"]:
query["query"]["bool"]["should"].append(rehearseObj)
minimum_match+=1
if False in filter_dict["isRehearse"]:
query["query"]["bool"]["must_not"].append(rehearseObj)
else:

for item in filter_dict[key]:
queryObj = getMatchPhrase(key, item)
query["query"]["bool"]["should"].append(queryObj)
minimum_match+=1
if len(query["query"]["bool"]["should"]) >= 1:
query["query"]["bool"].update({"minimum_should_match": minimum_match})

return query

def buildAggregateQuery():
keysDict = {"ciSystem":"ciSystem.keyword","platform":"platform.keyword","benchmark":"benchmark.keyword",
"releaseStream":"releaseStream.keyword","networkType":"networkType.keyword", "workerNodesCount":"workerNodesCount","jobStatus":"jobStatus.keyword",
"controlPlaneArch":"controlPlaneArch.keyword","publish":"publish.keyword","fips":"fips.keyword","encrypted":"encrypted.keyword",
"ipsec":"ipsec.keyword", "ocpVersion":"ocpVersion.keyword", "build":"ocpVersion.keyword",
"upstream":"upstreamJob.keyword"
}
aggregate = {}
for x,y in keysDict.items():
obj = {x:{"terms":{"field":y,"size":10}}}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we have size hardcoded to 10 here?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

10 instances is actually the Elasticsearch default for aggregations, so there's no point in specifying a size unless you want it bigger or smaller.

And, especially if you want to frequently override the instance limit here, a global (or class) constant would be better than an inline literal so they're easier to find and control.

aggregate.update(obj)
return aggregate

async def getData(start_datetime: date, end_datetime: date, configpath: str):
query = {
"query": {
"bool": {
"filter": {
"range": {
"timestamp": {
"format": "yyyy-MM-dd"
async def getData(start_datetime: date, end_datetime: date, size:int, offset:int, sort:str, filter:str, configpath: str):
try:
query = {
"query": {
"bool": {
"filter":[{
"range": {
"timestamp": {
"format": "yyyy-MM-dd"
}
}
}

}],
"should":[],
"must_not":[]
}
}
}
}

es = ElasticService(configpath=configpath)
response = await es.post(query=query, start_date=start_datetime, end_date=end_datetime, timestamp_field='timestamp')
await es.close()
tasks = [item['_source'] for item in response]
jobs = pd.json_normalize(tasks)
if len(jobs) == 0:
return jobs

jobs[['masterNodesCount', 'workerNodesCount',
'infraNodesCount', 'totalNodesCount']] = jobs[['masterNodesCount', 'workerNodesCount', 'infraNodesCount', 'totalNodesCount']].fillna(0)
jobs.fillna('', inplace=True)
jobs[['ipsec', 'fips', 'encrypted',
'publish', 'computeArch', 'controlPlaneArch']] = jobs[['ipsec', 'fips', 'encrypted',
'publish', 'computeArch', 'controlPlaneArch']].replace(r'^\s*$', "N/A", regex=True)
jobs['encryptionType'] = jobs.apply(fillEncryptionType, axis=1)
jobs['benchmark'] = jobs.apply(utils.updateBenchmark, axis=1)
jobs['platform'] = jobs.apply(utils.clasifyAWSJobs, axis=1)
jobs['jobType'] = jobs.apply(utils.jobType, axis=1)
jobs['isRehearse'] = jobs.apply(utils.isRehearse, axis=1)
jobs['jobStatus'] = jobs.apply(utils.updateStatus, axis=1)
jobs['build'] = jobs.apply(utils.getBuild, axis=1)

cleanJobs = jobs[jobs['platform'] != ""]

jbs = cleanJobs
jbs['shortVersion'] = jbs['ocpVersion'].str.slice(0, 4)

return jbs
es = ElasticService(configpath=configpath)

if sort:
query.update({"sort": json.loads(sort)})
if filter:
query=buildFilterQuery(filter, query)

response = await es.post(query=query, size=size, offset=offset, start_date=start_datetime, end_date=end_datetime, timestamp_field='timestamp')
await es.close()
tasks = [item['_source'] for item in response["data"]]
jobs = pd.json_normalize(tasks)
if len(jobs) == 0:
return jobs

jobs[['masterNodesCount', 'workerNodesCount',
'infraNodesCount', 'totalNodesCount']] = jobs[['masterNodesCount', 'workerNodesCount', 'infraNodesCount', 'totalNodesCount']].fillna(0)
jobs.fillna('', inplace=True)
jobs[['ipsec', 'fips', 'encrypted',
'publish', 'computeArch', 'controlPlaneArch']] = jobs[['ipsec', 'fips', 'encrypted',
'publish', 'computeArch', 'controlPlaneArch']].replace(r'^\s*$', "N/A", regex=True)
jobs['encryptionType'] = jobs.apply(fillEncryptionType, axis=1)
jobs['benchmark'] = jobs.apply(utils.updateBenchmark, axis=1)
jobs['platform'] = jobs.apply(utils.clasifyAWSJobs, axis=1)
jobs['jobType'] = jobs.apply(utils.jobType, axis=1)
jobs['isRehearse'] = jobs.apply(utils.isRehearse, axis=1)
jobs['jobStatus'] = jobs.apply(utils.updateStatus, axis=1)
jobs['build'] = jobs.apply(utils.getBuild, axis=1)

cleanJobs = jobs[jobs['platform'] != ""]

jbs = cleanJobs
jbs['shortVersion'] = jbs['ocpVersion'].str.slice(0, 4)

return ({"data":jobs,"total":response["total"]})

except Exception as err:
print(f"{type(err).__name__} was raised14: {err}")
Comment on lines +108 to +109
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think relying on the default unhandled exception traceback is better for debugging -- and simply returning None after a print in production seems bad. (And this should work with Python 3.9 and the existing package dependencies.)


def getMatchPhrase(key, item):
buildObj = {"match_phrase": {key: item}}
return buildObj

def getSummary(jobStatus, isFilterReset):
if isFilterReset:
new_dict = {item['key']:0 for item in jobStatus}
else:
new_dict = {item['key']:item['doc_count'] for item in jobStatus}
return new_dict

async def getFilterData(start_datetime: date, end_datetime: date, size:int, offset:int, sort:str, filter:str, configpath: str):
try:
start_date = start_datetime.strftime('%Y-%m-%d') if start_datetime else (datetime.utcnow().date() - timedelta(days=5).strftime('%Y-%m-%d'))
end_date = end_datetime.strftime('%Y-%m-%d') if end_datetime else datetime.utcnow().strftime('%Y-%m-%d')
query = {"aggs":{"min_timestamp":{"min":{"field":start_date}},"max_timestamp":{"max":{"field":end_date}}},"query":{"bool":{"filter":[{"range":{"timestamp":{"format":"yyyy-MM-dd","lte":end_date,"gte":start_date}}}],"should":[],"must_not":[]}}}

es = ElasticService(configpath=configpath)
if bool(sort):
query.update({"sort": json.loads(sort)})

if bool(filter):
query = buildFilterQuery(filter, query)

aggregate = buildAggregateQuery()
query["aggs"].update(aggregate)

isFilterReset = False
response = await es.filterPost(query=query)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@MVarshini Did you make sure that this query is working?

RequestError was raised19: RequestError(400, 'search_phase_execution_exception', 'No mapping found for [build.keyword] in order to sort on')
TypeError was raised14: 'NoneType' object is not subscriptable
TypeError was raised15: argument of type 'NoneType' is not iterable
INFO:     127.0.0.1:42960 - "GET /api/v1/ocp/jobs?pretty=true&start_date=2024-10-10&end_date=2024-10-15&size=75&offset=1&sort=%7B%22build.keyword%22:%7B%22order%22:%22asc%22%7D%7D&filter=%7B%22ciSystem%22:[%22prow%22]%7D HTTP/1.1" 200 OK
Error retrieving filter data': RequestError(400, 'search_phase_execution_exception', 'No mapping found for [build.keyword] in order to sort on')
TypeError was raised: 'NoneType' object is not subscriptable
TypeError was raised: 'NoneType' object is not subscriptable
INFO:     127.0.0.1:42966 - "GET /api/v1/ocp/filters?pretty=true&start_date=2024-10-10&end_date=2024-10-15&size=75&offset=1&sort=%7B%22build.keyword%22:%7B%22order%22:%22asc%22%7D%7D&filter=%7B%22ciSystem%22:[%22prow%22]%7D HTTP/1.1" 200 OK
Unclosed connector
connections: ['[(<aiohttp.client_proto.ResponseHandler object at 0x7feb9ce77be0>, 441476.621)]']
connector: <aiohttp.connector.TCPConnector object at 0x7feb8308b430>
{'query': {'query_string': {'query': 'uuid: "bc78c231-6d7f-4726-8afb-ef4a87d84331"'}}}
{'ciSystem': 'PROW', 'uuid': 'bc78c231-6d7f-4726-8afb-ef4a87d84331', 'releaseStream': '4.15.0-0.nightly', 'platform': 'AWS', 'clusterType': 'self-managed', 'benchmark': 'cluster-density-v2', 'masterNodesCount': 3, 'workerNodesCount': 6, 'infraNodesCount': 3, 'masterNodesType': 'm6a.xlarge', 'workerNodesType': 'm6a.xlarge', 'infraNodesType': 'r5.xlarge', 'totalNodesCount': 12, 'clusterName': 'ci-op-yfrkcx95-96149-rzgmn', 'ocpVersion': '4.15.0-0.nightly-2024-10-11-141906', 'networkType': 'OVNKubernetes', 'buildTag': '1844750966205714432', 'jobStatus': 'success', 'buildUrl': 'https://prow.ci.openshift.org/view/gs/origin-ci-test/logs/periodic-ci-openshift-qe-ocp-qe-perfscale-ci-main-aws-4.15-nightly-x86-payload-control-plane-6nodes/1844750966205714432', 'upstreamJob': 'periodic-ci-openshift-qe-ocp-qe-perfscale-ci-main-aws-4.15-nightly-x86-payload-control-plane-6nodes', 'upstreamJobBuild': '4.15.0-0.nightly-2024-10-11-141906-qe-perfscale', 'executionDate': '2024-10-11T16:08:02Z', 'jobDuration': '1634', 'startDate': '2024-10-11T16:08:02Z', 'endDate': '2024-10-11T16:35:16Z', 'startDateUnixTimestamp': '1728662882', 'endDateUnixTimestamp': '1728664516', 'timestamp': '2024-10-11T16:35:29Z', 'ipsec': 'false', 'ipsecMode': 'Disabled', 'fips': 'false', 'encrypted': 'false', 'encryptionType': '', 'publish': 'External', 'computeArch': 'amd64', 'controlPlaneArch': 'amd64'}
{'query': {'bool': {'must': [{'query_string': {'query': 'benchmark: "cluster-density-v2$" AND workerNodesType: "m6a.xlarge" AND masterNodesType: "m6a.xlarge" AND masterNodesCount: "3" AND workerNodesCount: "6" AND platform: "AWS" AND ocpVersion: 4.15* AND jobStatus: success'}}]}}}
{'query': {'query_string': {'query': '( uuid: "bc78c231-6d7f-4726-8afb-ef4a87d84331" ) AND metricName: "jobSummary"'}}}
{'query': {'query_string': {'query': '( uuid: "f1055fa3-8c5c-4cc9-b75d-39594c499a58" OR uuid: "4d59851e-b665-4c6e-914d-fa7fea1e2cb0" OR uuid: "ce07640a-a094-411a-84d2-f24d5cd6d064" OR uuid: "14a0d6ae-d4e4-435e-95ab-2a37b5633d3c" OR uuid: "c2c59959-7980-4d04-8495-c42d75957e28" OR uuid: "7335a811-95a3-453b-ab8d-182b6c1d1c3e" OR uuid: "44ee7997-9327-4dff-8d42-e64a40110cfe" OR uuid: "28e7d49f-535e-43ca-a383-feff6e669908" OR uuid: "d80cfd21-d005-4085-8f4f-57117a9a016f" OR uuid: "d3b26ef5-54a5-44df-b025-e6465e00a596" ) AND metricName: "jobSummary"'}}}
INFO:     127.0.0.1:51932 - "GET /api/v1/ocp/graph/bc78c231-6d7f-4726-8afb-ef4a87d84331 HTTP/1.1" 500 Internal Server Error
ERROR:    Exception in ASGI application
Traceback (most recent call last):
  File "/usr/local/lib/python3.9/site-packages/anyio/streams/memory.py", line 98, in receive
    return self.receive_nowait()
  File "/usr/local/lib/python3.9/site-packages/anyio/streams/memory.py", line 93, in receive_nowait
    raise WouldBlock
anyio.WouldBlock

I see a bunch of errors in the dashboard backend pod. Also it doesn't render frontend UI at all while applying filters.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just minimize the search this is only happening to me when looking at cluster-density-v2 jobs in ocp tab, seems to be realted to the Graph of this type of jobs

metrics = {'total': response["total"]}

if(response["total"] == 0):
query.pop('query', None)
response = await es.filterPost(query=query)
isFilterReset = True

await es.close()
filter_=[]

if bool(response) and bool(response["filter_"]):
for k,v in response["filter_"].items():
if k!= "max_timestamp" and k!= "min_timestamp":
obj={"key":k}
for x in v:
values=[]
if x == "buckets":
buck = v[x]
if k == "jobStatus":
metrics.update(getSummary(buck, isFilterReset))
for m in buck:
if k == "ocpVersion":
values.append(m["key"][0:4])
elif k == "build":
values.append("-".join(m["key"].split("-")[-4:]))
else:
values.append(str(m["key"]).lower())
obj.update({"value": values})
filter_.append(obj)

if response["filter_"]["upstream"] and response["filter_"]["upstream"]["buckets"]:
upstreamData = response["filter_"]["upstream"]["buckets"]
jobType = []
isRehearse = []
if len(upstreamData) > 0:
for i in upstreamData:
if i["key"].find("periodic"):
jobType.append("periodic")
else:
jobType.append("pull request")
if i["key"].find("rehearse"):
isRehearse.append("true")
else:
isRehearse.append("false")
filter_.append({"key":"jobType", "value":list(set(jobType))})
filter_.append({"key":"isRehearse","value": list(set(isRehearse))})

filterResponse = [d for d in filter_ if d['key']!="upstream"]

return ({"filterData":filterResponse, "summary":metrics})
return({"filterData":[], "summary":[]})

except Exception as err:
print(f"{type(err).__name__} was raised: {err}")


def fillEncryptionType(row):
if row["encrypted"] == "N/A":
Expand Down
2 changes: 1 addition & 1 deletion backend/app/api/v1/commons/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ async def getMetadata(uuid: str, configpath: str) :
es = ElasticService(configpath=configpath)
response = await es.post(query=query)
await es.close()
meta = [item['_source'] for item in response]
meta = [item['_source'] for item in response["data"]]
return meta[0]

def updateStatus(job):
Expand Down
7 changes: 5 additions & 2 deletions backend/app/api/v1/endpoints/ocm/ocmJobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,11 @@ async def jobs(start_date: date = Query(None, description="Start date for search

if start_date > end_date:
return Response(content=json.dumps({'error': "invalid date format, start_date must be less than end_date"}), status_code=422)

results = await getData(start_date, end_date, 'ocm.elasticsearch')
sort = {}
filter = {}
offset = 0
size = 75
results = await getData(start_date, end_date, sort, filter, offset, 'ocm.elasticsearch')

if len(results) >= 1:
response = {
Expand Down
8 changes: 4 additions & 4 deletions backend/app/api/v1/endpoints/ocp/graph.py
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,7 @@ async def jobSummary(uuids: list):
es = ElasticService(configpath="ocp.elasticsearch",index=index)
response = await es.post(query=query)
await es.close()
runs = [item['_source'] for item in response]
runs = [item['_source'] for item in response["data"]]
return runs

async def processBurner(data: dict) :
Expand Down Expand Up @@ -346,7 +346,7 @@ async def getBurnerResults(uuid: str, uuids: list, index: str ):
es = ElasticService(configpath="ocp.elasticsearch",index=index)
response = await es.post(query=query)
await es.close()
runs = [item['_source'] for item in response]
runs = [item['_source'] for item in response["data"]]
return runs

async def getResults(uuid: str, uuids: list, index: str ):
Expand All @@ -366,7 +366,7 @@ async def getResults(uuid: str, uuids: list, index: str ):
es = ElasticService(configpath="ocp.elasticsearch",index=index)
response = await es.post(query=query)
await es.close()
runs = [item['_source'] for item in response]
runs = [item['_source'] for item in response["data"]]
return runs

async def getMatchRuns(meta: dict, workerCount: False):
Expand Down Expand Up @@ -416,7 +416,7 @@ async def getMatchRuns(meta: dict, workerCount: False):
es = ElasticService(configpath="ocp.elasticsearch")
response = await es.post(query=query)
await es.close()
runs = [item['_source'] for item in response]
runs = [item['_source'] for item in response["data"]]
uuids = []
for run in runs :
uuids.append(run["uuid"])
Expand Down
Loading
Loading