diff --git a/ex_app/lib/main.py b/ex_app/lib/main.py index f4eaede..fda37f1 100644 --- a/ex_app/lib/main.py +++ b/ex_app/lib/main.py @@ -9,6 +9,8 @@ from base64 import b64decode from contextlib import asynccontextmanager from pathlib import Path +import asyncio +import base64 import httpx from fastapi import BackgroundTasks, Depends, FastAPI, Request, responses, status @@ -23,6 +25,7 @@ # os.environ["APP_SECRET"] = "12345" # os.environ["APP_PORT"] = "23000" +DEFAULT_USER_NAME = "admin@windmill.dev" USERS_STORAGE_PATH = Path(persistent_storage()).joinpath("windmill_users_config.json") USERS_STORAGE = {} print(str(USERS_STORAGE_PATH), flush=True) @@ -98,6 +101,7 @@ async def provision_user(request: Request) -> None: @asynccontextmanager async def lifespan(app: FastAPI): # pylint: disable=unused-argument + await sync_timer() yield @@ -219,7 +223,7 @@ def initialize_windmill() -> None: ) if r.status_code >= 400: raise RuntimeError(f"initialize_windmill: can not change default credentials password, {r.text}") - add_user_to_storage("admin@windmill.dev", new_default_password, default_token) + add_user_to_storage(DEFAULT_USER_NAME, new_default_password, default_token) r = httpx.post( url="http://127.0.0.1:8000/api/workspaces/create", json={"id": "nextcloud", "name": "nextcloud"}, @@ -241,6 +245,258 @@ def generate_random_string(length=10): return "".join(random.choice(letters) for i in range(length)) # noqa +async def sync_timer(): + while True: + nc = NextcloudApp() + enabled_flag = nc.ocs("GET", "/ocs/v1.php/apps/app_api/ex-app/state") + if enabled_flag: + print("Running workflow sync") + workspace = 'nextcloud' + token = USERS_STORAGE[DEFAULT_USER_NAME]['token'] + flow_paths = await get_flow_paths(workspace, token) + expected_listeners = await get_expected_listeners(workspace, token, flow_paths) + print(json.dumps(expected_listeners, indent=4), flush=True) + registered_listeners = await get_registered_listeners() + for expected_listener in expected_listeners: + registered_listeners_for_uri = await get_registered_listeners_for_uri(expected_listener['webhook'], registered_listeners) + for event in expected_listener['events']: + listener = next(filter(lambda listener: listener['event'] == event, registered_listeners_for_uri), None) + if listener is not None: + json.dumps(listener) + if json.dumps(listener['eventFilter']) != json.dumps(expected_listener['filters']): + await update_listener(listener, expected_listener['filters'], token) + else: + await register_listener(event, expected_listener['filters'], expected_listener['webhook'], token) + registered_listeners = await get_registered_listeners() + for registered_listener in registered_listeners: + if registered_listener['appId'] == os.environ["APP_ID"]: + if next(filter(lambda expected_listener: registered_listener['uri'] == expected_listener['webhook'] and registered_listener['event'] in expected_listener['events'], expected_listeners), None) is None: + await delete_listener(registered_listener) + await asyncio.sleep(5 * 60) + + +async def get_flow_paths(workspace, token): + method = 'GET' + path = f"/w/{workspace}/flows/list" + print(f"sync_API_request: {path} - {method}", flush=True) + flow_paths = [] + async with httpx.AsyncClient() as client: + url = f"http://127.0.0.1:8000/api/{path}" + headers = {'Authorization': f"Bearer {token}"} + # print(f"proxy_BACKEND_requests: method={request.method}, path={path}, status={response.status_code}") + response = await client.request( + method=method, + url=url, + params={'per_page': 100}, + headers=headers, + ) + print( + f"sync_API_request: method={method}, path={path}, status={response.status_code}", + flush=True + ) + try: + response_data = json.loads(response.content) + except json.JSONDecodeError as e: + print(f"Error parsing JSON: {e}") + return + for flow in response_data: + flow_paths.append(flow['path']) + return flow_paths + + +async def get_expected_listeners(workspace, token, flow_paths): + flows = [] + for flow_path in flow_paths: + async with httpx.AsyncClient() as client: + method = 'GET' + path = f"/w/{workspace}/flows/get/{flow_path}" + url = f"http://127.0.0.1:8000/api/{path}" + headers = {'Authorization': f"Bearer {token}"} + print(f"sync_API_request: {path} - {method}", flush=True) + response = await client.request( + method=method, + url=url, + params={'per_page': 100}, + headers=headers, + ) + print( + f"sync_API_request: method={method}, path={path}, status={response.status_code}", + flush=True + ) + try: + response_data = json.loads(response.content) + except json.JSONDecodeError as e: + print(f"Error parsing JSON: {e}") + return + first_module = response_data['value']['modules'][0] + if first_module['summary'] == 'CORE:LISTEN_TO_EVENT' \ + and first_module['value']['input_transforms']['events']['type'] == 'static' \ + and first_module['value']['input_transforms']['filters']['type'] == 'static': + # webhook = f"http://127.0.0.1:8000/api/w/{workspace}/jobs/run/f/{flow_path}" + webhook = f"https://app.windmill.dev/api/w/{workspace}/jobs/run/f/{flow_path}" + flows.append({ + 'webhook': webhook, + 'filters': first_module['value']['input_transforms']['filters']['value'], + # Remove backslashes from the beginning to yield canonical reference + 'events': [event[1:] if event.startswith('\\') else event for event in first_module['value']['input_transforms']['events']['value']], + }) + return flows + + +async def get_registered_listeners_for_uri(webhook, registered_listeners): + return [listener for listener in registered_listeners if listener['uri'] == webhook] + + +async def register_listener(event, filter, webhook, token): + async with httpx.AsyncClient() as client: + method = 'POST' + path = '/ocs/v2.php/apps/webhook_listeners/api/v1/webhooks' + url = os.environ["NEXTCLOUD_URL"]+path + userid = os.environ["NEXTCLOUD_USERID"] + secret = os.environ["NEXTCLOUD_SECRET"] + + headers = { + + 'AUTHORIZATION-APP-API': base64.b64encode(bytes(f"{userid}:{secret}", 'utf-8')).decode('utf-8'), + 'OCS-APIREQUEST': 'true', + 'EX-APP-VERSION': 'v1.0.0', + 'EX-APP-ID': os.environ['APP_ID'], + 'AA-VERSION': 'v2.3.0' + } + auth_data = { + 'Authorization': f"Bearer {token}" + } + print(f"sync_API_nextcloud_request: {url} - {method}", flush=True) + print(json.dumps(filter, indent=4), flush=True) + response = await client.request( + method=method, + url=url, + json={'httpMethod': 'POST', 'uri': webhook, 'event': event, 'eventFilter': filter, 'authMethod': 'header', 'authData': auth_data}, + headers=headers, + params={'format': 'json'} + ) + print( + f"sync_API_nextcloud_request: method={method}, path={path}, status={response.status_code}", + flush=True + ) + try: + response_data = json.loads(response.content) + except json.JSONDecodeError as e: + print(f"Error parsing JSON: {e}") + return + print(json.dumps(response_data, indent=4), flush=True) + return response_data['ocs']['data'] + + +async def update_listener(registered_listener, filter, token): + async with httpx.AsyncClient() as client: + method = 'POST' + path = f"/ocs/v2.php/apps/webhook_listeners/api/v1/webhooks/{registered_listener['id']}" + url = os.environ["NEXTCLOUD_URL"] + path + userid = os.environ["NEXTCLOUD_USERID"] + secret = os.environ["NEXTCLOUD_SECRET"] + + headers = { + 'AUTHORIZATION-APP-API': base64.b64encode(bytes(f"{userid}:{secret}", 'utf-8')).decode('utf-8'), + 'OCS-APIREQUEST': 'true', + 'EX-APP-VERSION': 'v1.0.0', + 'EX-APP-ID': os.environ['APP_ID'], + 'AA-VERSION': 'v2.3.0' + } + auth_data = { + 'Authorization': f"Bearer {token}" + } + print(f"sync_API_nextcloud_request: {url} - {method}", flush=True) + print(json.dumps(filter, indent=4), flush=True) + response = await client.request( + method=method, + url=url, + json={'httpMethod': 'POST', 'uri': registered_listener['uri'], 'event': registered_listener['event'], 'eventFilter': filter, 'authMethod': 'header', + 'authData': auth_data}, + headers=headers, + params={'format': 'json'} + ) + print( + f"sync_API_nextcloud_request: method={method}, path={path}, status={response.status_code}", + flush=True + ) + try: + response_data = json.loads(response.content) + except json.JSONDecodeError as e: + print(f"Error parsing JSON: {e}") + return + print(json.dumps(response_data, indent=4), flush=True) + return response_data['ocs']['data'] + + +async def get_registered_listeners(): + async with httpx.AsyncClient() as client: + method = 'GET' + path = '/ocs/v2.php/apps/webhook_listeners/api/v1/webhooks' + url = os.environ["NEXTCLOUD_URL"] + path + userid = os.environ["NEXTCLOUD_USERID"] + secret = os.environ["NEXTCLOUD_SECRET"] + headers = { + 'AUTHORIZATION-APP-API': base64.b64encode(bytes(f"{userid}:{secret}", 'utf-8')).decode('utf-8'), + 'OCS-APIREQUEST': 'true', + 'EX-APP-VERSION': 'v1.0.0', + 'EX-APP-ID': os.environ['APP_ID'], + 'AA-VERSION': 'v2.3.0' + } + print(f"sync_API_nextcloud_request: {url} - {method}", flush=True) + response = await client.request( + method=method, + url=url, + params={'format': 'json'}, + headers=headers, + ) + print( + f"sync_API_nextcloud_request: method={method}, path={path}, status={response.status_code}", + flush=True + ) + try: + response_data = json.loads(response.content) + except json.JSONDecodeError as e: + print(f"Error parsing JSON: {e}") + return + print(json.dumps(response_data, indent=4), flush=True) + return response_data['ocs']['data'] + + +async def delete_listener(registered_listener): + async with httpx.AsyncClient() as client: + method = 'DELETE' + path = f"/ocs/v2.php/apps/webhook_listeners/api/v1/webhooks/{registered_listener['id']}" + url = os.environ["NEXTCLOUD_URL"] + path + userid = os.environ["NEXTCLOUD_USERID"] + secret = os.environ["NEXTCLOUD_SECRET"] + + headers = { + 'AUTHORIZATION-APP-API': base64.b64encode(bytes(f"{userid}:{secret}", 'utf-8')).decode('utf-8'), + 'OCS-APIREQUEST': 'true', + 'EX-APP-VERSION': 'v1.0.0', + 'EX-APP-ID': os.environ['APP_ID'], + 'AA-VERSION': 'v2.3.0' + } + print(f"sync_API_nextcloud_request: {url} - {method}", flush=True) + response = await client.request( + method=method, + url=url, + headers=headers, + params={'format': 'json'} + ) + print( + f"sync_API_nextcloud_request: method={method}, path={path}, status={response.status_code}", + flush=True + ) + try: + response_data = json.loads(response.content) + except json.JSONDecodeError as e: + print(f"Error parsing JSON: {e}") + return + print(json.dumps(response_data, indent=4), flush=True) + return response_data['ocs']['data'] + if __name__ == "__main__": initialize_windmill() # Current working dir is set for the Service we are wrapping, so change we first for ExApp default one