Skip to content

Commit

Permalink
almost working, missing only persistent base token
Browse files Browse the repository at this point in the history
Signed-off-by: Alexander Piskun <[email protected]>
  • Loading branch information
bigcat88 committed Jul 22, 2024
1 parent e569738 commit de80bbf
Showing 1 changed file with 80 additions and 58 deletions.
138 changes: 80 additions & 58 deletions ex_app/lib/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,18 +14,20 @@

import httpx
from fastapi import BackgroundTasks, Depends, FastAPI, Request, responses, status
from nc_py_api import NextcloudApp
from nc_py_api import NextcloudApp, NextcloudException
from nc_py_api.ex_app import LogLvl, nc_app, persistent_storage, run_app
from nc_py_api.ex_app.integration_fastapi import AppAPIAuthMiddleware, fetch_models_task
from starlette.responses import FileResponse, Response


# os.environ["NEXTCLOUD_URL"] = "http://nextcloud.local/index.php"
# os.environ["APP_HOST"] = "0.0.0.0"
# os.environ["APP_ID"] = "windmill_app"
# os.environ["APP_SECRET"] = "12345"
# os.environ["APP_PORT"] = "23000"
# os.environ["APP_SECRET"] = "5NREvpXuj8aOgzNaEVvy0lIpJABeACppCV9m9UZo29XqXAlvPt2wcpw0vZ87o73PgsrowG/qfPvpm8KmUIL/HUzMv+nxb+WNsxG5E2ccWsZ2qruEcXjguWGcDYzWQTy0" # noqa
# os.environ["APP_PORT"] = "23001"


DEFAULT_USER_NAME = "[email protected]"
DEFAULT_USER_EMAIL = "[email protected]"
USERS_STORAGE_PATH = Path(persistent_storage()).joinpath("windmill_users_config.json")
USERS_STORAGE = {}
print("[DEBUG]: USERS_STORAGE_PATH=", str(USERS_STORAGE_PATH), flush=True)
Expand All @@ -34,20 +36,25 @@
USERS_STORAGE.update(json.load(__f))


def add_user_to_storage(user_name: str, password: str, token: str = "") -> None:
USERS_STORAGE[user_name] = {"password": password, "token": token}
def get_user_email(user_name: str) -> str:
return f"{user_name}@windmill.dev"


def add_user_to_storage(user_email: str, password: str, token: str = "") -> None:
USERS_STORAGE[user_email] = {"password": password, "token": token}
with open(USERS_STORAGE_PATH, "w", encoding="utf-8") as f:
json.dump(USERS_STORAGE, f, indent=4)


async def create_user(user_name: str) -> str:
password = generate_random_string()
user_email = get_user_email(user_name)
async with httpx.AsyncClient() as client:
r = await client.request(
method="POST",
url="http://127.0.0.1:8000/api/users/create",
json={
"email": f"{user_name}@windmill.dev",
"email": user_email,
"password": password,
"super_admin": True,
"name": user_name,
Expand All @@ -56,29 +63,29 @@ async def create_user(user_name: str) -> str:
)
r = await client.post(
url="http://127.0.0.1:8000/api/auth/login",
json={"email": f"{user_name}@windmill.dev", "password": password},
json={"email": user_email, "password": password},
)
add_user_to_storage(user_name, password, r.text)
add_user_to_storage(user_email, password, r.text)
return r.text


async def login_user(user_name: str, password: str) -> str:
print("login_user:DEBUG:", user_name, flush=True)
async def login_user(user_email: str, password: str) -> str:
print("login_user:DEBUG:", user_email, flush=True)
async with httpx.AsyncClient() as client:
r = await client.post(
url="http://127.0.0.1:8000/api/auth/login",
json={"email": f"{user_name}@windmill.dev", "password": password},
json={"email": user_email, "password": password},
)
if r.status_code >= 400:
raise RuntimeError(f"login_user: {r.text}")
return r.text


def login_user_sync(user_name: str, password: str) -> str:
def login_user_sync(user_email: str, password: str) -> str:
with httpx.Client() as client:
r = client.post(
url="http://127.0.0.1:8000/api/auth/login",
json={"email": user_name, "password": password},
json={"email": user_email, "password": password},
)
if r.status_code >= 400:
raise RuntimeError(f"login_user: {r.text}")
Expand All @@ -97,13 +104,13 @@ def check_token_sync(token: str) -> bool:
return bool(r.status_code < 400)


def get_valid_user_token_sync(user_name: str) -> str:
token = USERS_STORAGE[user_name]["token"]
def get_valid_user_token_sync(user_email: str) -> str:
token = USERS_STORAGE[user_email]["token"]
if check_token_sync(token):
return token
user_password = USERS_STORAGE[user_name]["password"]
token = login_user_sync(user_name, user_password)
add_user_to_storage(user_name, user_password, token)
user_password = USERS_STORAGE[user_email]["password"]
token = login_user_sync(user_email, user_password)
add_user_to_storage(user_email, user_password, token)
return token


Expand All @@ -112,17 +119,21 @@ async def provision_user(request: Request) -> None:
print(f"DEBUG: TOKEN IS PRESENT: {request.cookies['token']}", flush=True)
if (await check_token(request.cookies["token"])) is True:
return
print("DEBUG: TOKEN IS INVALID", flush=True)
print(f"DEBUG: TOKEN IS INVALID: {request.cookies['token']}", flush=True)

user_name = get_windmill_username_from_request(request)
if user_name in USERS_STORAGE:
windmill_token_valid = await check_token(USERS_STORAGE[user_name]["token"])
if not USERS_STORAGE[user_name]["token"] or windmill_token_valid is False:
user_password = USERS_STORAGE[user_name]["password"]
add_user_to_storage(user_name, user_password, await login_user(user_name, user_password))
if not user_name:
print("WARNING: provision_user: `username` is missing in the request to ExApp.", flush=True)
return
user_email = get_user_email(user_name)
if user_email in USERS_STORAGE:
windmill_token_valid = await check_token(USERS_STORAGE[user_email]["token"])
if not USERS_STORAGE[user_email]["token"] or windmill_token_valid is False:
user_password = USERS_STORAGE[user_email]["password"]
add_user_to_storage(user_email, user_password, await login_user(user_email, user_password))
else:
await create_user(user_name)
request.cookies["token"] = USERS_STORAGE[user_name]["token"]
request.cookies["token"] = USERS_STORAGE[user_email]["token"]
print(f"DEBUG: ADDING TOKEN({request.cookies['token']}) to request", flush=True)


Expand All @@ -143,7 +154,7 @@ def get_windmill_username_from_request(request: Request) -> str:
except ValueError:
username = ""
if not username:
raise RuntimeError("`username` should be always set.")
return ""
return "wapp_" + username


Expand Down Expand Up @@ -250,7 +261,7 @@ def initialize_windmill() -> None:
)
if r.status_code >= 400:
raise RuntimeError(f"initialize_windmill: can not change default credentials password, {r.text}")
add_user_to_storage(DEFAULT_USER_NAME, new_default_password, default_token)
add_user_to_storage(DEFAULT_USER_EMAIL, new_default_password, default_token)
r = httpx.post(
url="http://127.0.0.1:8000/api/workspaces/create",
json={"id": "nextcloud", "name": "nextcloud"},
Expand Down Expand Up @@ -285,11 +296,11 @@ def webhooks_syncing():
sleep(5 * 60)
continue
print("Running workflow sync")
token = get_valid_user_token_sync(DEFAULT_USER_NAME)
token = get_valid_user_token_sync(DEFAULT_USER_EMAIL)
flow_paths = get_flow_paths(workspace, token)
print(flow_paths)
print("webhooks_syncing(flow_paths):\n", flow_paths, flush=True)
expected_listeners = get_expected_listeners(workspace, token, flow_paths)
print(json.dumps(expected_listeners, indent=4), flush=True)
print("webhooks_syncing(expected_listeners):\n", json.dumps(expected_listeners, indent=4), flush=True)
registered_listeners = get_registered_listeners()
for expected_listener in expected_listeners:
registered_listeners_for_uri = get_registered_listeners_for_uri(
Expand All @@ -298,8 +309,8 @@ def webhooks_syncing():
for event in expected_listener["events"]:
listener = next(filter(lambda listener: listener["event"] == event, registered_listeners_for_uri), None)
if listener is not None:
json.dumps(listener)
if json.dumps(listener["eventFilter"]) != json.dumps(expected_listener["filters"]):
if listener["eventFilter"] != expected_listener["filters"]:
print("webhooks_syncing: before update_listener:", json.dumps(listener))
update_listener(listener, expected_listener["filters"], token)
else:
register_listener(event, expected_listener["filters"], expected_listener["webhook"], token)
Expand Down Expand Up @@ -329,7 +340,6 @@ def get_flow_paths(workspace: str, token: str) -> list[str]:
with httpx.Client() as client:
url = f"http://127.0.0.1:8000/api/{path}"
headers = {"Authorization": f"Bearer {token}"}
# print(f"proxy_BACKEND_requests: method={request.method}, path={path}, status={response.status_code}")
response = client.request(
method=method,
url=url,
Expand Down Expand Up @@ -357,29 +367,29 @@ def get_expected_listeners(workspace: str, token: str, flow_paths: list[str]) ->
path = f"w/{workspace}/flows/get/{flow_path}"
url = f"http://127.0.0.1:8000/api/{path}"
headers = {"Authorization": f"Bearer {token}"}
print(f"sync_API_request: {path} - {method}", flush=True)
print(f"get_expected_listeners: {path} - {method}", flush=True)
response = client.request(
method=method,
url=url,
params={"per_page": 100},
headers=headers,
)
print(
f"sync_API_request: method={method}, path={path}, status={response.status_code}",
f"get_expected_listeners: method={method}, path={path}, status={response.status_code}",
flush=True,
)
try:
response_data = json.loads(response.content)
except json.JSONDecodeError as e:
print(f"Error parsing JSON: {e}")
print(f"get_expected_listeners: Error parsing JSON: {e}")
return []
first_module = response_data["value"]["modules"][0]
if (
first_module["summary"] == "CORE:LISTEN_TO_EVENT"
first_module.get("summary", "") == "CORE:LISTEN_TO_EVENT"
and first_module["value"]["input_transforms"]["events"]["type"] == "static"
and first_module["value"]["input_transforms"]["filters"]["type"] == "static"
):
webhook = f"http://127.0.0.1:8000/api/w/{workspace}/jobs/run/f/{flow_path}"
webhook = f"/api/w/{workspace}/jobs/run/f/{flow_path}"
# webhook = f"https://app.windmill.dev/api/w/{workspace}/jobs/run/f/{flow_path}"
input_transforms = first_module["value"]["input_transforms"]
flows.append(
Expand All @@ -400,18 +410,22 @@ def get_registered_listeners_for_uri(webhook: str, registered_listeners: list) -
return [listener for listener in registered_listeners if listener["uri"] == webhook]


async def register_listener(event, event_filter, webhook, token: str) -> dict:
def register_listener(event, event_filter, webhook, token: str) -> dict:
auth_data = {
"Authorization": f"Bearer {token}",
}
nc = NextcloudApp()
print(f"register_listener: {webhook} - {event}", flush=True)
print(json.dumps(event_filter, indent=4), flush=True)
r = nc.webhooks.register(
"POST", webhook, event, event_filter=event_filter, auth_method="header", auth_data=auth_data
)
print(json.dumps(r, indent=4), flush=True)
return r
try:
r = nc.webhooks.register(
"POST", webhook, event, event_filter=event_filter, auth_method="header", auth_data=auth_data
)
except NextcloudException as e:
print(f"Exception during registering webhook: {e}", flush=True)
return {}
print("register_listener:\n", json.dumps(r._raw_data, indent=4), flush=True) # noqa
return r._raw_data # noqa


def update_listener(registered_listener: dict, event_filter, token: str) -> dict:
Expand All @@ -421,29 +435,37 @@ def update_listener(registered_listener: dict, event_filter, token: str) -> dict
nc = NextcloudApp()
print(f"update_listener: {registered_listener['uri']} - {registered_listener['event']}", flush=True)
print(json.dumps(event_filter, indent=4), flush=True)
r = nc.webhooks.update(
registered_listener["id"],
"POST",
registered_listener["uri"],
registered_listener["event"],
event_filter=event_filter,
auth_method="header",
auth_data=auth_data,
)
print(json.dumps(r, indent=4), flush=True)
return r
try:
r = nc.webhooks.update(
registered_listener["id"],
"POST",
registered_listener["uri"],
registered_listener["event"],
event_filter=event_filter,
auth_method="header",
auth_data=auth_data,
)
except NextcloudException as e:
print(f"Exception during updating webhook: {e}", flush=True)
return {}
print("update_listener:\n", json.dumps(r._raw_data, indent=4), flush=True) # noqa
return r._raw_data # noqa


def get_registered_listeners():
nc = NextcloudApp()
r = nc.ocs("GET", "/ocs/v1.php/apps/webhook_listeners/api/v1/webhooks")
print("get_registered_listeners: ", json.dumps(r, indent=4), flush=True)
for i in r: # we need the same format as in `get_expected_listeners(workspace, token, flow_paths)`
if not i["eventFilter"]:
i["eventFilter"] = None # replace [] with None
return r


async def delete_listener(registered_listener: dict):
def delete_listener(registered_listener: dict) -> bool:
r = NextcloudApp().webhooks.unregister(registered_listener["id"])
print("delete_listener: ", json.dumps(r, indent=4), flush=True)
if r:
print("delete_listener: removed registered listener with id=%d", registered_listener["id"], flush=True)
return r


Expand Down

0 comments on commit de80bbf

Please sign in to comment.