diff --git a/ex_app/lib/main.py b/ex_app/lib/main.py index 61c0e86..ec71cdc 100644 --- a/ex_app/lib/main.py +++ b/ex_app/lib/main.py @@ -14,18 +14,20 @@ import httpx from fastapi import BackgroundTasks, Depends, FastAPI, Request, responses, status -from nc_py_api import NextcloudApp +from nc_py_api import NextcloudApp, NextcloudException from nc_py_api.ex_app import LogLvl, nc_app, persistent_storage, run_app from nc_py_api.ex_app.integration_fastapi import AppAPIAuthMiddleware, fetch_models_task from starlette.responses import FileResponse, Response + # os.environ["NEXTCLOUD_URL"] = "http://nextcloud.local/index.php" # os.environ["APP_HOST"] = "0.0.0.0" # os.environ["APP_ID"] = "windmill_app" -# os.environ["APP_SECRET"] = "12345" -# os.environ["APP_PORT"] = "23000" +# os.environ["APP_SECRET"] = "5NREvpXuj8aOgzNaEVvy0lIpJABeACppCV9m9UZo29XqXAlvPt2wcpw0vZ87o73PgsrowG/qfPvpm8KmUIL/HUzMv+nxb+WNsxG5E2ccWsZ2qruEcXjguWGcDYzWQTy0" # noqa +# os.environ["APP_PORT"] = "23001" + -DEFAULT_USER_NAME = "admin@windmill.dev" +DEFAULT_USER_EMAIL = "admin@windmill.dev" USERS_STORAGE_PATH = Path(persistent_storage()).joinpath("windmill_users_config.json") USERS_STORAGE = {} print("[DEBUG]: USERS_STORAGE_PATH=", str(USERS_STORAGE_PATH), flush=True) @@ -34,20 +36,25 @@ USERS_STORAGE.update(json.load(__f)) -def add_user_to_storage(user_name: str, password: str, token: str = "") -> None: - USERS_STORAGE[user_name] = {"password": password, "token": token} +def get_user_email(user_name: str) -> str: + return f"{user_name}@windmill.dev" + + +def add_user_to_storage(user_email: str, password: str, token: str = "") -> None: + USERS_STORAGE[user_email] = {"password": password, "token": token} with open(USERS_STORAGE_PATH, "w", encoding="utf-8") as f: json.dump(USERS_STORAGE, f, indent=4) async def create_user(user_name: str) -> str: password = generate_random_string() + user_email = get_user_email(user_name) async with httpx.AsyncClient() as client: r = await client.request( method="POST", url="http://127.0.0.1:8000/api/users/create", json={ - "email": f"{user_name}@windmill.dev", + "email": user_email, "password": password, "super_admin": True, "name": user_name, @@ -56,29 +63,29 @@ async def create_user(user_name: str) -> str: ) r = await client.post( url="http://127.0.0.1:8000/api/auth/login", - json={"email": f"{user_name}@windmill.dev", "password": password}, + json={"email": user_email, "password": password}, ) - add_user_to_storage(user_name, password, r.text) + add_user_to_storage(user_email, password, r.text) return r.text -async def login_user(user_name: str, password: str) -> str: - print("login_user:DEBUG:", user_name, flush=True) +async def login_user(user_email: str, password: str) -> str: + print("login_user:DEBUG:", user_email, flush=True) async with httpx.AsyncClient() as client: r = await client.post( url="http://127.0.0.1:8000/api/auth/login", - json={"email": f"{user_name}@windmill.dev", "password": password}, + json={"email": user_email, "password": password}, ) if r.status_code >= 400: raise RuntimeError(f"login_user: {r.text}") return r.text -def login_user_sync(user_name: str, password: str) -> str: +def login_user_sync(user_email: str, password: str) -> str: with httpx.Client() as client: r = client.post( url="http://127.0.0.1:8000/api/auth/login", - json={"email": user_name, "password": password}, + json={"email": user_email, "password": password}, ) if r.status_code >= 400: raise RuntimeError(f"login_user: {r.text}") @@ -97,13 +104,13 @@ def check_token_sync(token: str) -> bool: return bool(r.status_code < 400) -def get_valid_user_token_sync(user_name: str) -> str: - token = USERS_STORAGE[user_name]["token"] +def get_valid_user_token_sync(user_email: str) -> str: + token = USERS_STORAGE[user_email]["token"] if check_token_sync(token): return token - user_password = USERS_STORAGE[user_name]["password"] - token = login_user_sync(user_name, user_password) - add_user_to_storage(user_name, user_password, token) + user_password = USERS_STORAGE[user_email]["password"] + token = login_user_sync(user_email, user_password) + add_user_to_storage(user_email, user_password, token) return token @@ -112,17 +119,21 @@ async def provision_user(request: Request) -> None: print(f"DEBUG: TOKEN IS PRESENT: {request.cookies['token']}", flush=True) if (await check_token(request.cookies["token"])) is True: return - print("DEBUG: TOKEN IS INVALID", flush=True) + print(f"DEBUG: TOKEN IS INVALID: {request.cookies['token']}", flush=True) user_name = get_windmill_username_from_request(request) - if user_name in USERS_STORAGE: - windmill_token_valid = await check_token(USERS_STORAGE[user_name]["token"]) - if not USERS_STORAGE[user_name]["token"] or windmill_token_valid is False: - user_password = USERS_STORAGE[user_name]["password"] - add_user_to_storage(user_name, user_password, await login_user(user_name, user_password)) + if not user_name: + print("WARNING: provision_user: `username` is missing in the request to ExApp.", flush=True) + return + user_email = get_user_email(user_name) + if user_email in USERS_STORAGE: + windmill_token_valid = await check_token(USERS_STORAGE[user_email]["token"]) + if not USERS_STORAGE[user_email]["token"] or windmill_token_valid is False: + user_password = USERS_STORAGE[user_email]["password"] + add_user_to_storage(user_email, user_password, await login_user(user_email, user_password)) else: await create_user(user_name) - request.cookies["token"] = USERS_STORAGE[user_name]["token"] + request.cookies["token"] = USERS_STORAGE[user_email]["token"] print(f"DEBUG: ADDING TOKEN({request.cookies['token']}) to request", flush=True) @@ -143,7 +154,7 @@ def get_windmill_username_from_request(request: Request) -> str: except ValueError: username = "" if not username: - raise RuntimeError("`username` should be always set.") + return "" return "wapp_" + username @@ -250,7 +261,7 @@ def initialize_windmill() -> None: ) if r.status_code >= 400: raise RuntimeError(f"initialize_windmill: can not change default credentials password, {r.text}") - add_user_to_storage(DEFAULT_USER_NAME, new_default_password, default_token) + add_user_to_storage(DEFAULT_USER_EMAIL, new_default_password, default_token) r = httpx.post( url="http://127.0.0.1:8000/api/workspaces/create", json={"id": "nextcloud", "name": "nextcloud"}, @@ -285,11 +296,11 @@ def webhooks_syncing(): sleep(5 * 60) continue print("Running workflow sync") - token = get_valid_user_token_sync(DEFAULT_USER_NAME) + token = get_valid_user_token_sync(DEFAULT_USER_EMAIL) flow_paths = get_flow_paths(workspace, token) - print(flow_paths) + print("webhooks_syncing(flow_paths):\n", flow_paths, flush=True) expected_listeners = get_expected_listeners(workspace, token, flow_paths) - print(json.dumps(expected_listeners, indent=4), flush=True) + print("webhooks_syncing(expected_listeners):\n", json.dumps(expected_listeners, indent=4), flush=True) registered_listeners = get_registered_listeners() for expected_listener in expected_listeners: registered_listeners_for_uri = get_registered_listeners_for_uri( @@ -298,8 +309,8 @@ def webhooks_syncing(): for event in expected_listener["events"]: listener = next(filter(lambda listener: listener["event"] == event, registered_listeners_for_uri), None) if listener is not None: - json.dumps(listener) - if json.dumps(listener["eventFilter"]) != json.dumps(expected_listener["filters"]): + if listener["eventFilter"] != expected_listener["filters"]: + print("webhooks_syncing: before update_listener:", json.dumps(listener)) update_listener(listener, expected_listener["filters"], token) else: register_listener(event, expected_listener["filters"], expected_listener["webhook"], token) @@ -329,7 +340,6 @@ def get_flow_paths(workspace: str, token: str) -> list[str]: with httpx.Client() as client: url = f"http://127.0.0.1:8000/api/{path}" headers = {"Authorization": f"Bearer {token}"} - # print(f"proxy_BACKEND_requests: method={request.method}, path={path}, status={response.status_code}") response = client.request( method=method, url=url, @@ -357,7 +367,7 @@ def get_expected_listeners(workspace: str, token: str, flow_paths: list[str]) -> path = f"w/{workspace}/flows/get/{flow_path}" url = f"http://127.0.0.1:8000/api/{path}" headers = {"Authorization": f"Bearer {token}"} - print(f"sync_API_request: {path} - {method}", flush=True) + print(f"get_expected_listeners: {path} - {method}", flush=True) response = client.request( method=method, url=url, @@ -365,21 +375,21 @@ def get_expected_listeners(workspace: str, token: str, flow_paths: list[str]) -> headers=headers, ) print( - f"sync_API_request: method={method}, path={path}, status={response.status_code}", + f"get_expected_listeners: method={method}, path={path}, status={response.status_code}", flush=True, ) try: response_data = json.loads(response.content) except json.JSONDecodeError as e: - print(f"Error parsing JSON: {e}") + print(f"get_expected_listeners: Error parsing JSON: {e}") return [] first_module = response_data["value"]["modules"][0] if ( - first_module["summary"] == "CORE:LISTEN_TO_EVENT" + first_module.get("summary", "") == "CORE:LISTEN_TO_EVENT" and first_module["value"]["input_transforms"]["events"]["type"] == "static" and first_module["value"]["input_transforms"]["filters"]["type"] == "static" ): - webhook = f"http://127.0.0.1:8000/api/w/{workspace}/jobs/run/f/{flow_path}" + webhook = f"/api/w/{workspace}/jobs/run/f/{flow_path}" # webhook = f"https://app.windmill.dev/api/w/{workspace}/jobs/run/f/{flow_path}" input_transforms = first_module["value"]["input_transforms"] flows.append( @@ -400,18 +410,22 @@ def get_registered_listeners_for_uri(webhook: str, registered_listeners: list) - return [listener for listener in registered_listeners if listener["uri"] == webhook] -async def register_listener(event, event_filter, webhook, token: str) -> dict: +def register_listener(event, event_filter, webhook, token: str) -> dict: auth_data = { "Authorization": f"Bearer {token}", } nc = NextcloudApp() print(f"register_listener: {webhook} - {event}", flush=True) print(json.dumps(event_filter, indent=4), flush=True) - r = nc.webhooks.register( - "POST", webhook, event, event_filter=event_filter, auth_method="header", auth_data=auth_data - ) - print(json.dumps(r, indent=4), flush=True) - return r + try: + r = nc.webhooks.register( + "POST", webhook, event, event_filter=event_filter, auth_method="header", auth_data=auth_data + ) + except NextcloudException as e: + print(f"Exception during registering webhook: {e}", flush=True) + return {} + print("register_listener:\n", json.dumps(r._raw_data, indent=4), flush=True) # noqa + return r._raw_data # noqa def update_listener(registered_listener: dict, event_filter, token: str) -> dict: @@ -421,29 +435,37 @@ def update_listener(registered_listener: dict, event_filter, token: str) -> dict nc = NextcloudApp() print(f"update_listener: {registered_listener['uri']} - {registered_listener['event']}", flush=True) print(json.dumps(event_filter, indent=4), flush=True) - r = nc.webhooks.update( - registered_listener["id"], - "POST", - registered_listener["uri"], - registered_listener["event"], - event_filter=event_filter, - auth_method="header", - auth_data=auth_data, - ) - print(json.dumps(r, indent=4), flush=True) - return r + try: + r = nc.webhooks.update( + registered_listener["id"], + "POST", + registered_listener["uri"], + registered_listener["event"], + event_filter=event_filter, + auth_method="header", + auth_data=auth_data, + ) + except NextcloudException as e: + print(f"Exception during updating webhook: {e}", flush=True) + return {} + print("update_listener:\n", json.dumps(r._raw_data, indent=4), flush=True) # noqa + return r._raw_data # noqa def get_registered_listeners(): nc = NextcloudApp() r = nc.ocs("GET", "/ocs/v1.php/apps/webhook_listeners/api/v1/webhooks") print("get_registered_listeners: ", json.dumps(r, indent=4), flush=True) + for i in r: # we need the same format as in `get_expected_listeners(workspace, token, flow_paths)` + if not i["eventFilter"]: + i["eventFilter"] = None # replace [] with None return r -async def delete_listener(registered_listener: dict): +def delete_listener(registered_listener: dict) -> bool: r = NextcloudApp().webhooks.unregister(registered_listener["id"]) - print("delete_listener: ", json.dumps(r, indent=4), flush=True) + if r: + print("delete_listener: removed registered listener with id=%d", registered_listener["id"], flush=True) return r