Skip to content

Commit

Permalink
feat: Implement webhook sync
Browse files Browse the repository at this point in the history
  • Loading branch information
marcelklehr committed Jun 27, 2024
1 parent 786fa0e commit 7b77232
Showing 1 changed file with 285 additions and 0 deletions.
285 changes: 285 additions & 0 deletions ex_app/lib/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,9 @@
import typing
from contextlib import asynccontextmanager
from pathlib import Path
import asyncio
import json
import base64

import httpx
from fastapi import BackgroundTasks, Depends, FastAPI, Request, responses
Expand All @@ -21,6 +24,7 @@

@asynccontextmanager
async def lifespan(app: FastAPI): # pylint: disable=unused-argument
sync_timer()
yield


Expand Down Expand Up @@ -52,8 +56,10 @@ async def init_callback(b_tasks: BackgroundTasks, nc: typing.Annotated[Nextcloud
return responses.JSONResponse(content={})


app_enabled = false
@APP.put("/enabled")
def enabled_callback(enabled: bool, nc: typing.Annotated[NextcloudApp, Depends(nc_app)]):
app_enabled = enabled
return responses.JSONResponse(content={"error": enabled_handler(enabled, nc)})


Expand Down Expand Up @@ -148,6 +154,285 @@ async def proxy_frontend_requests(request: Request, path: str):
return response_to_nc


async def sync_timer():
while True:
if app_enabled:
print("Running workflow sync")
workspace = 'YOUR_WORKSPACE'
token = 'YOUR_TOKEN_HERE'
flow_paths = await get_flow_paths(workspace, token)
expected_listeners = await get_expected_listeners(workspace, token, flow_paths)
print(json.dumps(expected_listeners, indent=4), flush=True)
for expected_listener in expected_listeners:
registered_listeners = await get_registered_listeners_for_uri(expected_listener['webhook'])
for event in expected_listener['events']:
listener = next(filter(lambda listener: listener['event'] == event, registered_listeners), None)
if listener is not None:
json.dumps(listener)
if json.dumps(listener['eventFilter']) != json.dumps(expected_listener['filters']):
await update_listener(listener, expected_listener['filters'], token)
else:
await register_listener(event, expected_listener['filters'], expected_listener['webhook'], token)
registered_listeners = await get_registered_listeners()
for registered_listener in registered_listeners:
if registered_listener['appId'] == os.environ["APP_ID"]:
if next(filter(lambda expected_listener: registered_listener['uri'] == expected_listener['webhook'] and registered_listener['event'] in expected_listener['events'], expected_listeners), None) is None:
await delete_listener(registered_listener)
await asyncio.sleep(5 * 60)


async def get_flow_paths(workspace, token):
method = 'GET'
path = f"/w/{workspace}/flows/list"
print(f"sync_API_request: {path} - {method}", flush=True)
flow_paths = []
async with httpx.AsyncClient() as client:
url = f"http://127.0.0.1:8000/api/{path}"
headers = {'Authorization': f"Bearer {token}"}
# print(f"proxy_BACKEND_requests: method={request.method}, path={path}, status={response.status_code}")
response = await client.request(
method=method,
url=url,
params={'per_page': 100},
headers=headers,
)
print(
f"sync_API_request: method={method}, path={path}, status={response.status_code}",
flush=True
)
try:
response_data = json.loads(response.content)
except json.JSONDecodeError as e:
print(f"Error parsing JSON: {e}")
return
for flow in response_data:
flow_paths.append(flow['path'])
return flow_paths


async def get_expected_listeners(workspace, token, flow_paths):
flows = []
for flow_path in flow_paths:
async with httpx.AsyncClient() as client:
method = 'GET'
path = f"/w/{workspace}/flows/get/{flow_path}"
url = f"http://127.0.0.1:8000/api/{path}"
headers = {'Authorization': f"Bearer {token}"}
print(f"sync_API_request: {path} - {method}", flush=True)
response = await client.request(
method=method,
url=url,
params={'per_page': 100},
headers=headers,
)
print(
f"sync_API_request: method={method}, path={path}, status={response.status_code}",
flush=True
)
try:
response_data = json.loads(response.content)
except json.JSONDecodeError as e:
print(f"Error parsing JSON: {e}")
return
first_module = response_data['value']['modules'][0]
if first_module['summary'] == 'CORE:LISTEN_TO_EVENT' \
and first_module['value']['input_transforms']['events']['type'] == 'static' \
and first_module['value']['input_transforms']['filters']['type'] == 'static':
# webhook = f"http://127.0.0.1:8000/api/w/{workspace}/jobs/run/f/{flow_path}"
webhook = f"https://app.windmill.dev/api/w/{workspace}/jobs/run/f/{flow_path}"
flows.append({
'webhook': webhook,
'filters': first_module['value']['input_transforms']['filters']['value'],
# Remove backslashes from the beginning to yield canonical reference
'events': [event[1:] if event.startswith('\\') else event for event in first_module['value']['input_transforms']['events']['value']],
})
return flows


async def get_registered_listeners_for_uri(webhook):
async with httpx.AsyncClient() as client:
method = 'GET'
path = '/ocs/v2.php/apps/webhook_listeners/api/v1/webhooks'
url = os.environ["NEXTCLOUD_URL"]+path
userid = os.environ["NEXTCLOUD_USERID"]
secret = os.environ["NEXTCLOUD_SECRET"]
headers = {
'AUTHORIZATION-APP-API': base64.b64encode(bytes(f"{userid}:{secret}", 'utf-8')).decode('utf-8'),
'OCS-APIREQUEST': 'true',
'EX-APP-VERSION': 'v1.0.0',
'EX-APP-ID': os.environ['APP_ID'],
'AA-VERSION': 'v2.3.0'
}
print(f"sync_API_nextcloud_request: {url} - {method}", flush=True)
response = await client.request(
method=method,
url=url,
params={'uri': webhook, 'format': 'json'},
headers=headers,
)
print(
f"sync_API_nextcloud_request: method={method}, path={path}, status={response.status_code}",
flush=True
)
try:
response_data = json.loads(response.content)
except json.JSONDecodeError as e:
print(f"Error parsing JSON: {e}")
return
print(json.dumps(response_data, indent=4), flush=True)
return response_data['ocs']['data']


async def register_listener(event, filter, webhook, token):
async with httpx.AsyncClient() as client:
method = 'POST'
path = '/ocs/v2.php/apps/webhook_listeners/api/v1/webhooks'
url = os.environ["NEXTCLOUD_URL"]+path
userid = os.environ["NEXTCLOUD_USERID"]
secret = os.environ["NEXTCLOUD_SECRET"]

headers = {

'AUTHORIZATION-APP-API': base64.b64encode(bytes(f"{userid}:{secret}", 'utf-8')).decode('utf-8'),
'OCS-APIREQUEST': 'true',
'EX-APP-VERSION': 'v1.0.0',
'EX-APP-ID': os.environ['APP_ID'],
'AA-VERSION': 'v2.3.0'
}
auth_data = {
'Authorization': f"Bearer {token}"
}
print(f"sync_API_nextcloud_request: {url} - {method}", flush=True)
print(json.dumps(filter, indent=4), flush=True)
response = await client.request(
method=method,
url=url,
json={'httpMethod': 'POST', 'uri': webhook, 'event': event, 'eventFilter': filter, 'authMethod': 'header', 'authData': auth_data},
headers=headers,
params={'format': 'json'}
)
print(
f"sync_API_nextcloud_request: method={method}, path={path}, status={response.status_code}",
flush=True
)
try:
response_data = json.loads(response.content)
except json.JSONDecodeError as e:
print(f"Error parsing JSON: {e}")
return
print(json.dumps(response_data, indent=4), flush=True)
return response_data['ocs']['data']


async def update_listener(registered_listener, filter, token):
async with httpx.AsyncClient() as client:
method = 'POST'
path = f"/ocs/v2.php/apps/webhook_listeners/api/v1/webhooks/{registered_listener['id']}"
url = os.environ["NEXTCLOUD_URL"] + path
userid = os.environ["NEXTCLOUD_USERID"]
secret = os.environ["NEXTCLOUD_SECRET"]

headers = {
'AUTHORIZATION-APP-API': base64.b64encode(bytes(f"{userid}:{secret}", 'utf-8')).decode('utf-8'),
'OCS-APIREQUEST': 'true',
'EX-APP-VERSION': 'v1.0.0',
'EX-APP-ID': os.environ['APP_ID'],
'AA-VERSION': 'v2.3.0'
}
auth_data = {
'Authorization': f"Bearer {token}"
}
print(f"sync_API_nextcloud_request: {url} - {method}", flush=True)
print(json.dumps(filter, indent=4), flush=True)
response = await client.request(
method=method,
url=url,
json={'httpMethod': 'POST', 'uri': registered_listener['uri'], 'event': registered_listener['event'], 'eventFilter': filter, 'authMethod': 'header',
'authData': auth_data},
headers=headers,
params={'format': 'json'}
)
print(
f"sync_API_nextcloud_request: method={method}, path={path}, status={response.status_code}",
flush=True
)
try:
response_data = json.loads(response.content)
except json.JSONDecodeError as e:
print(f"Error parsing JSON: {e}")
return
print(json.dumps(response_data, indent=4), flush=True)
return response_data['ocs']['data']


async def get_registered_listeners():
async with httpx.AsyncClient() as client:
method = 'GET'
path = '/ocs/v2.php/apps/webhook_listeners/api/v1/webhooks'
url = os.environ["NEXTCLOUD_URL"] + path
userid = os.environ["NEXTCLOUD_USERID"]
secret = os.environ["NEXTCLOUD_SECRET"]
headers = {
'AUTHORIZATION-APP-API': base64.b64encode(bytes(f"{userid}:{secret}", 'utf-8')).decode('utf-8'),
'OCS-APIREQUEST': 'true',
'EX-APP-VERSION': 'v1.0.0',
'EX-APP-ID': os.environ['APP_ID'],
'AA-VERSION': 'v2.3.0'
}
print(f"sync_API_nextcloud_request: {url} - {method}", flush=True)
response = await client.request(
method=method,
url=url,
params={'format': 'json'},
headers=headers,
)
print(
f"sync_API_nextcloud_request: method={method}, path={path}, status={response.status_code}",
flush=True
)
try:
response_data = json.loads(response.content)
except json.JSONDecodeError as e:
print(f"Error parsing JSON: {e}")
return
print(json.dumps(response_data, indent=4), flush=True)
return response_data['ocs']['data']


async def delete_listener(registered_listener):
async with httpx.AsyncClient() as client:
method = 'DELETE'
path = f"/ocs/v2.php/apps/webhook_listeners/api/v1/webhooks/{registered_listener['id']}"
url = os.environ["NEXTCLOUD_URL"] + path
userid = os.environ["NEXTCLOUD_USERID"]
secret = os.environ["NEXTCLOUD_SECRET"]

headers = {
'AUTHORIZATION-APP-API': base64.b64encode(bytes(f"{userid}:{secret}", 'utf-8')).decode('utf-8'),
'OCS-APIREQUEST': 'true',
'EX-APP-VERSION': 'v1.0.0',
'EX-APP-ID': os.environ['APP_ID'],
'AA-VERSION': 'v2.3.0'
}
print(f"sync_API_nextcloud_request: {url} - {method}", flush=True)
response = await client.request(
method=method,
url=url,
headers=headers,
params={'format': 'json'}
)
print(
f"sync_API_nextcloud_request: method={method}, path={path}, status={response.status_code}",
flush=True
)
try:
response_data = json.loads(response.content)
except json.JSONDecodeError as e:
print(f"Error parsing JSON: {e}")
return
print(json.dumps(response_data, indent=4), flush=True)
return response_data['ocs']['data']

if __name__ == "__main__":
# Current working dir is set for the Service we are wrapping, so change we first for ExApp default one
os.chdir(Path(__file__).parent)
Expand Down

0 comments on commit 7b77232

Please sign in to comment.