Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update azure-pipelines-github-telemetry.yml #4

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
178 changes: 2 additions & 176 deletions azure-pipelines/azure-pipelines-github-telemetry.yml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
# https://aka.ms/yaml

trigger: none
pr: none

schedules:
- cron: "0 */2 * * *"
Expand All @@ -24,179 +23,6 @@ stages:
- job: Build
timeoutInMinutes: 120
steps:
- task: UsePythonVersion@0
inputs:
versionSpec: '3.x'
addToPath: true
architecture: 'x64'
- script: |
pip install azure-storage-queue azure-storage-blob pytz python-dateutil
displayName: Install build tools
- task: PythonScript@0
displayName: Publish SONiC telemetry
env:
AZURE_STORAGE_CONNECTION_STRING: '$(AZURE_STORAGE_CONNECTION_STRING)'
GITHUB_TOKEN: '$(GITHUB_TOKEN)'
inputs:
scriptSource: 'inline'
script: |
import datetime, base64, json, time, os, re, pytz, math
from urllib import request
from urllib.error import HTTPError
from http.client import IncompleteRead
from azure.core.exceptions import ResourceNotFoundError
from dateutil import parser
import http.client
from azure.storage.blob import BlobServiceClient

CONTAINER = 'build'
INFO_PULLREQUESTS_FILE = "info/pullrequests.json"
GITHUB_TOKEN = '$(GITHUB_TOKEN)'
AZURE_STORAGE_CONNECTION_STRING = '$(AZURE_STORAGE_CONNECTION_STRING)'
blob_service_client = BlobServiceClient.from_connection_string(AZURE_STORAGE_CONNECTION_STRING)

url="https://api.github.com/graphql"
timestamp = datetime.datetime.utcnow()
timeoffset = datetime.timedelta(minutes=5)
until = (timestamp - timeoffset).replace(tzinfo=pytz.UTC)
if 'END_TIMESTAMP' in os.environ and os.environ['END_TIMESTAMP']:
until = parser.isoparse(os.environ['END_TIMESTAMP']).replace(tzinfo=pytz.UTC)
delta = datetime.timedelta(minutes=60)
if 'TIMEDELTA_IN_MINUTES' in os.environ and os.environ['TIMEDELTA_IN_MINUTES']:
timedelta_in_minutes = max(int(os.environ['TIMEDELTA_IN_MINUTES']), 30)
delta = datetime.timedelta(minutes=timedelta_in_minutes)
max_timedelta_in_days = 35

# Upload a list of lines to blob
def upload_to_blob(lines, blob_prefix, file_prefix=""):
now = datetime.datetime.now()
if not lines:
print("no lines to upload, skipped")
return
local_file_name = file_prefix + now.strftime("_%Y%m%d-%H%M%S-%f") + '.json'
with open(local_file_name, "w") as file:
count = file.write('\n'.join(lines))
blob_file_name = blob_prefix + now.strftime("/%Y/%m/%d/") + local_file_name
blob_client = blob_service_client.get_blob_client(container=CONTAINER, blob=blob_file_name)
with open(local_file_name, "rb") as data:
blob_client.upload_blob(data)
os.remove(local_file_name)

def get_start_timestamp(force=False):
if not force and 'START_TIMESTAMP' in os.environ and os.environ['START_TIMESTAMP']:
return parser.isoparse(os.environ['START_TIMESTAMP']).replace(tzinfo=pytz.UTC)
blob_client = blob_service_client.get_blob_client(container=CONTAINER, blob=INFO_PULLREQUESTS_FILE)
try:
download_stream = blob_client.download_blob()
info = json.loads(download_stream.readall())
return parser.isoparse(info['timestamp']).replace(tzinfo=pytz.UTC)
except ResourceNotFoundError:
pass
start_timestamp = datetime.datetime.utcnow() - datetime.timedelta(days=max_timedelta_in_days)
return start_timestamp.replace(tzinfo=pytz.UTC)

def update_start_timestamp():
if 'END_TIMESTAMP' in os.environ and os.environ['END_TIMESTAMP']:
last = get_start_timestamp(True)
if last > until:
print('skipped update the start timestamp, until:%s < last:%s'.format(until.isoformat(), last.isoformat()))
return
blob_file_name="info/pullrequests.json"
blob_client = blob_service_client.get_blob_client(container=CONTAINER, blob=INFO_PULLREQUESTS_FILE)
info = {}
info['timestamp'] = until.isoformat()
data = json.dumps(info)
blob_client.upload_blob(data, overwrite=True)

# The GitHub Graphql supports to query 100 items per page, and 10 page in max.
# To workaround it, split the query into several time range "delta", in a time range, need to make sure less than 1000 items.
def get_pullrequests():
results = []
start_timestamp = get_start_timestamp()
print('start: {0}, until: {1}'.format(start_timestamp.isoformat(), until.isoformat()), flush=True)
query_pattern = '''
{
search(query: "org:azure is:pr updated:%s..%s sort:updated", %s type: ISSUE, first: 100) {
issueCount
pageInfo {
hasNextPage
endCursor
}
edges {
cursor
node {
... on PullRequest {
url
number
assignees (first: 10) {
nodes {
login
}
}
title
createdAt
closedAt
merged
mergedAt
updatedAt
mergedBy {login}
author {login}
baseRefName
baseRepository {name, url, owner{login}}
repository {name, url, owner{login}}
mergeCommit {id, oid, committedDate}
commits (first: 3) {nodes{commit{oid, message}}}
state
}
}
}
}
}
'''
start = start_timestamp
count = math.ceil((until - start) / delta)
for index in range(count):
end = min(start+delta, until)
condition = ""
while True: # pagination, support 1000 total, support 100 per page
print("Query: index:%s, count:%s, start:%s, end:%s, page:%s" % (index, count, start.isoformat(), end.isoformat(), condition), flush=True)
query = query_pattern %(start.isoformat(), end.isoformat(), condition)
req = request.Request(url, method="POST")
req.add_header('Content-Type', 'application/json')
req.add_header('Authorization', "Bearer {0}".format(GITHUB_TOKEN))
body = {}
body['query'] = query
data = bytes(json.dumps(body), encoding="utf-8")
content = {}
for i in range(10):
try:
r = request.urlopen(req, data=data)
content = json.loads(r.read())
break
except HTTPError as e:
print('Try count: {0}, error code: {1}, reason: {2}'.format(i, e.code, e.reason))
time.sleep(3)
except IncompleteRead as e:
print("IncompleteRead", e)
time.sleep(3)
if 'data' not in content:
print(content)
break
edges = content['data']['search']['edges']
for edge in edges:
node = edge['node']
node['dumpedAt'] = timestamp.isoformat()
results.append(json.dumps(node))
print("Read edge count: {0}, total count: {1}".format(len(results), content['data']['search']['issueCount']), flush=True)
hasNextPage = content['data']['search']['pageInfo']['hasNextPage']
print(content['data']['search']['pageInfo'])
if not hasNextPage:
break
condition = 'after: "{0}",'.format(edges[-1]['cursor'])
print(condition)
start = end
return results

results = get_pullrequests()
upload_to_blob(results, 'pullrequests')
update_start_timestamp()
echo hello
displayName: 'Test'