Skip to content

Commit

Permalink
feat: Implement webhook sync
Browse files Browse the repository at this point in the history
  • Loading branch information
marcelklehr committed Jul 5, 2024
1 parent 7b61b84 commit 7a068c0
Showing 1 changed file with 256 additions and 0 deletions.
256 changes: 256 additions & 0 deletions ex_app/lib/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,9 @@
from base64 import b64decode
from contextlib import asynccontextmanager
from pathlib import Path
import asyncio
import json
import base64

import httpx
from fastapi import BackgroundTasks, Depends, FastAPI, Request, responses, status
Expand Down Expand Up @@ -98,6 +101,7 @@ async def provision_user(request: Request) -> None:

@asynccontextmanager
async def lifespan(app: FastAPI): # pylint: disable=unused-argument
await sync_timer()
yield


Expand Down Expand Up @@ -140,8 +144,10 @@ async def init_callback(b_tasks: BackgroundTasks, nc: typing.Annotated[Nextcloud
return responses.JSONResponse(content={})


app_enabled = False
@APP.put("/enabled")
def enabled_callback(enabled: bool, nc: typing.Annotated[NextcloudApp, Depends(nc_app)]):
app_enabled = enabled
return responses.JSONResponse(content={"error": enabled_handler(enabled, nc)})


Expand Down Expand Up @@ -241,6 +247,256 @@ def generate_random_string(length=10):
return "".join(random.choice(letters) for i in range(length)) # noqa


async def sync_timer():
while True:
if app_enabled:
print("Running workflow sync")
workspace = 'nextcloud'
token = 'YOUR_TOKEN_HERE'
flow_paths = await get_flow_paths(workspace, token)
expected_listeners = await get_expected_listeners(workspace, token, flow_paths)
print(json.dumps(expected_listeners, indent=4), flush=True)
registered_listeners = await get_registered_listeners()
for expected_listener in expected_listeners:
registered_listeners_for_uri = await get_registered_listeners_for_uri(expected_listener['webhook'], registered_listeners)
for event in expected_listener['events']:
listener = next(filter(lambda listener: listener['event'] == event, registered_listeners_for_uri), None)
if listener is not None:
json.dumps(listener)
if json.dumps(listener['eventFilter']) != json.dumps(expected_listener['filters']):
await update_listener(listener, expected_listener['filters'], token)
else:
await register_listener(event, expected_listener['filters'], expected_listener['webhook'], token)
registered_listeners = await get_registered_listeners()
for registered_listener in registered_listeners:
if registered_listener['appId'] == os.environ["APP_ID"]:
if next(filter(lambda expected_listener: registered_listener['uri'] == expected_listener['webhook'] and registered_listener['event'] in expected_listener['events'], expected_listeners), None) is None:
await delete_listener(registered_listener)
await asyncio.sleep(5 * 60)


async def get_flow_paths(workspace, token):
method = 'GET'
path = f"/w/{workspace}/flows/list"
print(f"sync_API_request: {path} - {method}", flush=True)
flow_paths = []
async with httpx.AsyncClient() as client:
url = f"http://127.0.0.1:8000/api/{path}"
headers = {'Authorization': f"Bearer {token}"}
# print(f"proxy_BACKEND_requests: method={request.method}, path={path}, status={response.status_code}")
response = await client.request(
method=method,
url=url,
params={'per_page': 100},
headers=headers,
)
print(
f"sync_API_request: method={method}, path={path}, status={response.status_code}",
flush=True
)
try:
response_data = json.loads(response.content)
except json.JSONDecodeError as e:
print(f"Error parsing JSON: {e}")
return
for flow in response_data:
flow_paths.append(flow['path'])
return flow_paths


async def get_expected_listeners(workspace, token, flow_paths):
flows = []
for flow_path in flow_paths:
async with httpx.AsyncClient() as client:
method = 'GET'
path = f"/w/{workspace}/flows/get/{flow_path}"
url = f"http://127.0.0.1:8000/api/{path}"
headers = {'Authorization': f"Bearer {token}"}
print(f"sync_API_request: {path} - {method}", flush=True)
response = await client.request(
method=method,
url=url,
params={'per_page': 100},
headers=headers,
)
print(
f"sync_API_request: method={method}, path={path}, status={response.status_code}",
flush=True
)
try:
response_data = json.loads(response.content)
except json.JSONDecodeError as e:
print(f"Error parsing JSON: {e}")
return
first_module = response_data['value']['modules'][0]
if first_module['summary'] == 'CORE:LISTEN_TO_EVENT' \
and first_module['value']['input_transforms']['events']['type'] == 'static' \
and first_module['value']['input_transforms']['filters']['type'] == 'static':
# webhook = f"http://127.0.0.1:8000/api/w/{workspace}/jobs/run/f/{flow_path}"
webhook = f"https://app.windmill.dev/api/w/{workspace}/jobs/run/f/{flow_path}"
flows.append({
'webhook': webhook,
'filters': first_module['value']['input_transforms']['filters']['value'],
# Remove backslashes from the beginning to yield canonical reference
'events': [event[1:] if event.startswith('\\') else event for event in first_module['value']['input_transforms']['events']['value']],
})
return flows


async def get_registered_listeners_for_uri(webhook, registered_listeners):
return [listener for listener in registered_listeners if listener['uri'] == webhook]


async def register_listener(event, filter, webhook, token):
async with httpx.AsyncClient() as client:
method = 'POST'
path = '/ocs/v2.php/apps/webhook_listeners/api/v1/webhooks'
url = os.environ["NEXTCLOUD_URL"]+path
userid = os.environ["NEXTCLOUD_USERID"]
secret = os.environ["NEXTCLOUD_SECRET"]

headers = {

'AUTHORIZATION-APP-API': base64.b64encode(bytes(f"{userid}:{secret}", 'utf-8')).decode('utf-8'),
'OCS-APIREQUEST': 'true',
'EX-APP-VERSION': 'v1.0.0',
'EX-APP-ID': os.environ['APP_ID'],
'AA-VERSION': 'v2.3.0'
}
auth_data = {
'Authorization': f"Bearer {token}"
}
print(f"sync_API_nextcloud_request: {url} - {method}", flush=True)
print(json.dumps(filter, indent=4), flush=True)
response = await client.request(
method=method,
url=url,
json={'httpMethod': 'POST', 'uri': webhook, 'event': event, 'eventFilter': filter, 'authMethod': 'header', 'authData': auth_data},
headers=headers,
params={'format': 'json'}
)
print(
f"sync_API_nextcloud_request: method={method}, path={path}, status={response.status_code}",
flush=True
)
try:
response_data = json.loads(response.content)
except json.JSONDecodeError as e:
print(f"Error parsing JSON: {e}")
return
print(json.dumps(response_data, indent=4), flush=True)
return response_data['ocs']['data']


async def update_listener(registered_listener, filter, token):
async with httpx.AsyncClient() as client:
method = 'POST'
path = f"/ocs/v2.php/apps/webhook_listeners/api/v1/webhooks/{registered_listener['id']}"
url = os.environ["NEXTCLOUD_URL"] + path
userid = os.environ["NEXTCLOUD_USERID"]
secret = os.environ["NEXTCLOUD_SECRET"]

headers = {
'AUTHORIZATION-APP-API': base64.b64encode(bytes(f"{userid}:{secret}", 'utf-8')).decode('utf-8'),
'OCS-APIREQUEST': 'true',
'EX-APP-VERSION': 'v1.0.0',
'EX-APP-ID': os.environ['APP_ID'],
'AA-VERSION': 'v2.3.0'
}
auth_data = {
'Authorization': f"Bearer {token}"
}
print(f"sync_API_nextcloud_request: {url} - {method}", flush=True)
print(json.dumps(filter, indent=4), flush=True)
response = await client.request(
method=method,
url=url,
json={'httpMethod': 'POST', 'uri': registered_listener['uri'], 'event': registered_listener['event'], 'eventFilter': filter, 'authMethod': 'header',
'authData': auth_data},
headers=headers,
params={'format': 'json'}
)
print(
f"sync_API_nextcloud_request: method={method}, path={path}, status={response.status_code}",
flush=True
)
try:
response_data = json.loads(response.content)
except json.JSONDecodeError as e:
print(f"Error parsing JSON: {e}")
return
print(json.dumps(response_data, indent=4), flush=True)
return response_data['ocs']['data']


async def get_registered_listeners():
async with httpx.AsyncClient() as client:
method = 'GET'
path = '/ocs/v2.php/apps/webhook_listeners/api/v1/webhooks'
url = os.environ["NEXTCLOUD_URL"] + path
userid = os.environ["NEXTCLOUD_USERID"]
secret = os.environ["NEXTCLOUD_SECRET"]
headers = {
'AUTHORIZATION-APP-API': base64.b64encode(bytes(f"{userid}:{secret}", 'utf-8')).decode('utf-8'),
'OCS-APIREQUEST': 'true',
'EX-APP-VERSION': 'v1.0.0',
'EX-APP-ID': os.environ['APP_ID'],
'AA-VERSION': 'v2.3.0'
}
print(f"sync_API_nextcloud_request: {url} - {method}", flush=True)
response = await client.request(
method=method,
url=url,
params={'format': 'json'},
headers=headers,
)
print(
f"sync_API_nextcloud_request: method={method}, path={path}, status={response.status_code}",
flush=True
)
try:
response_data = json.loads(response.content)
except json.JSONDecodeError as e:
print(f"Error parsing JSON: {e}")
return
print(json.dumps(response_data, indent=4), flush=True)
return response_data['ocs']['data']


async def delete_listener(registered_listener):
async with httpx.AsyncClient() as client:
method = 'DELETE'
path = f"/ocs/v2.php/apps/webhook_listeners/api/v1/webhooks/{registered_listener['id']}"
url = os.environ["NEXTCLOUD_URL"] + path
userid = os.environ["NEXTCLOUD_USERID"]
secret = os.environ["NEXTCLOUD_SECRET"]

headers = {
'AUTHORIZATION-APP-API': base64.b64encode(bytes(f"{userid}:{secret}", 'utf-8')).decode('utf-8'),
'OCS-APIREQUEST': 'true',
'EX-APP-VERSION': 'v1.0.0',
'EX-APP-ID': os.environ['APP_ID'],
'AA-VERSION': 'v2.3.0'
}
print(f"sync_API_nextcloud_request: {url} - {method}", flush=True)
response = await client.request(
method=method,
url=url,
headers=headers,
params={'format': 'json'}
)
print(
f"sync_API_nextcloud_request: method={method}, path={path}, status={response.status_code}",
flush=True
)
try:
response_data = json.loads(response.content)
except json.JSONDecodeError as e:
print(f"Error parsing JSON: {e}")
return
print(json.dumps(response_data, indent=4), flush=True)
return response_data['ocs']['data']

if __name__ == "__main__":
initialize_windmill()
# Current working dir is set for the Service we are wrapping, so change we first for ExApp default one
Expand Down

0 comments on commit 7a068c0

Please sign in to comment.