Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Implement webhook sync #1

Merged
merged 3 commits into from
Jul 8, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
258 changes: 257 additions & 1 deletion ex_app/lib/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@
from base64 import b64decode
from contextlib import asynccontextmanager
from pathlib import Path
import asyncio
import base64

import httpx
from fastapi import BackgroundTasks, Depends, FastAPI, Request, responses, status
Expand All @@ -23,6 +25,7 @@
# os.environ["APP_SECRET"] = "12345"
# os.environ["APP_PORT"] = "23000"

DEFAULT_USER_NAME = "[email protected]"
USERS_STORAGE_PATH = Path(persistent_storage()).joinpath("windmill_users_config.json")
USERS_STORAGE = {}
print(str(USERS_STORAGE_PATH), flush=True)
Expand Down Expand Up @@ -98,6 +101,7 @@ async def provision_user(request: Request) -> None:

@asynccontextmanager
async def lifespan(app: FastAPI): # pylint: disable=unused-argument
await sync_timer()
yield


Expand Down Expand Up @@ -219,7 +223,7 @@ def initialize_windmill() -> None:
)
if r.status_code >= 400:
raise RuntimeError(f"initialize_windmill: can not change default credentials password, {r.text}")
add_user_to_storage("[email protected]", new_default_password, default_token)
add_user_to_storage(DEFAULT_USER_NAME, new_default_password, default_token)
r = httpx.post(
url="http://127.0.0.1:8000/api/workspaces/create",
json={"id": "nextcloud", "name": "nextcloud"},
Expand All @@ -241,6 +245,258 @@ def generate_random_string(length=10):
return "".join(random.choice(letters) for i in range(length)) # noqa


async def sync_timer():
while True:
nc = NextcloudApp()
enabled_flag = nc.ocs("GET", "/ocs/v1.php/apps/app_api/ex-app/state")
if enabled_flag:
print("Running workflow sync")
workspace = 'nextcloud'
token = USERS_STORAGE[DEFAULT_USER_NAME]['token']
flow_paths = await get_flow_paths(workspace, token)
expected_listeners = await get_expected_listeners(workspace, token, flow_paths)
print(json.dumps(expected_listeners, indent=4), flush=True)
registered_listeners = await get_registered_listeners()
for expected_listener in expected_listeners:
registered_listeners_for_uri = await get_registered_listeners_for_uri(expected_listener['webhook'], registered_listeners)
for event in expected_listener['events']:
listener = next(filter(lambda listener: listener['event'] == event, registered_listeners_for_uri), None)
if listener is not None:
json.dumps(listener)
if json.dumps(listener['eventFilter']) != json.dumps(expected_listener['filters']):
await update_listener(listener, expected_listener['filters'], token)
else:
await register_listener(event, expected_listener['filters'], expected_listener['webhook'], token)
registered_listeners = await get_registered_listeners()
marcelklehr marked this conversation as resolved.
Show resolved Hide resolved
for registered_listener in registered_listeners:
if registered_listener['appId'] == os.environ["APP_ID"]:
if next(filter(lambda expected_listener: registered_listener['uri'] == expected_listener['webhook'] and registered_listener['event'] in expected_listener['events'], expected_listeners), None) is None:
await delete_listener(registered_listener)
await asyncio.sleep(5 * 60)


async def get_flow_paths(workspace, token):
method = 'GET'
path = f"/w/{workspace}/flows/list"
print(f"sync_API_request: {path} - {method}", flush=True)
flow_paths = []
async with httpx.AsyncClient() as client:
url = f"http://127.0.0.1:8000/api/{path}"
headers = {'Authorization': f"Bearer {token}"}
# print(f"proxy_BACKEND_requests: method={request.method}, path={path}, status={response.status_code}")
response = await client.request(
method=method,
url=url,
params={'per_page': 100},
headers=headers,
)
print(
f"sync_API_request: method={method}, path={path}, status={response.status_code}",
flush=True
)
try:
response_data = json.loads(response.content)
except json.JSONDecodeError as e:
print(f"Error parsing JSON: {e}")
return
for flow in response_data:
flow_paths.append(flow['path'])
return flow_paths


async def get_expected_listeners(workspace, token, flow_paths):
flows = []
for flow_path in flow_paths:
async with httpx.AsyncClient() as client:
method = 'GET'
path = f"/w/{workspace}/flows/get/{flow_path}"
url = f"http://127.0.0.1:8000/api/{path}"
headers = {'Authorization': f"Bearer {token}"}
print(f"sync_API_request: {path} - {method}", flush=True)
response = await client.request(
method=method,
url=url,
params={'per_page': 100},
headers=headers,
)
print(
f"sync_API_request: method={method}, path={path}, status={response.status_code}",
flush=True
)
try:
response_data = json.loads(response.content)
except json.JSONDecodeError as e:
print(f"Error parsing JSON: {e}")
return
first_module = response_data['value']['modules'][0]
if first_module['summary'] == 'CORE:LISTEN_TO_EVENT' \
and first_module['value']['input_transforms']['events']['type'] == 'static' \
and first_module['value']['input_transforms']['filters']['type'] == 'static':
# webhook = f"http://127.0.0.1:8000/api/w/{workspace}/jobs/run/f/{flow_path}"
webhook = f"https://app.windmill.dev/api/w/{workspace}/jobs/run/f/{flow_path}"
flows.append({
'webhook': webhook,
'filters': first_module['value']['input_transforms']['filters']['value'],
# Remove backslashes from the beginning to yield canonical reference
'events': [event[1:] if event.startswith('\\') else event for event in first_module['value']['input_transforms']['events']['value']],
})
return flows


async def get_registered_listeners_for_uri(webhook, registered_listeners):
return [listener for listener in registered_listeners if listener['uri'] == webhook]


async def register_listener(event, filter, webhook, token):
async with httpx.AsyncClient() as client:
method = 'POST'
path = '/ocs/v2.php/apps/webhook_listeners/api/v1/webhooks'
url = os.environ["NEXTCLOUD_URL"]+path
userid = os.environ["NEXTCLOUD_USERID"]
secret = os.environ["NEXTCLOUD_SECRET"]

headers = {

'AUTHORIZATION-APP-API': base64.b64encode(bytes(f"{userid}:{secret}", 'utf-8')).decode('utf-8'),
'OCS-APIREQUEST': 'true',
'EX-APP-VERSION': 'v1.0.0',
'EX-APP-ID': os.environ['APP_ID'],
'AA-VERSION': 'v2.3.0'
}
auth_data = {
'Authorization': f"Bearer {token}"
}
print(f"sync_API_nextcloud_request: {url} - {method}", flush=True)
print(json.dumps(filter, indent=4), flush=True)
response = await client.request(
method=method,
url=url,
json={'httpMethod': 'POST', 'uri': webhook, 'event': event, 'eventFilter': filter, 'authMethod': 'header', 'authData': auth_data},
headers=headers,
params={'format': 'json'}
)
print(
f"sync_API_nextcloud_request: method={method}, path={path}, status={response.status_code}",
flush=True
)
try:
response_data = json.loads(response.content)
except json.JSONDecodeError as e:
print(f"Error parsing JSON: {e}")
return
print(json.dumps(response_data, indent=4), flush=True)
return response_data['ocs']['data']


async def update_listener(registered_listener, filter, token):
async with httpx.AsyncClient() as client:
method = 'POST'
path = f"/ocs/v2.php/apps/webhook_listeners/api/v1/webhooks/{registered_listener['id']}"
url = os.environ["NEXTCLOUD_URL"] + path
userid = os.environ["NEXTCLOUD_USERID"]
secret = os.environ["NEXTCLOUD_SECRET"]

headers = {
'AUTHORIZATION-APP-API': base64.b64encode(bytes(f"{userid}:{secret}", 'utf-8')).decode('utf-8'),
'OCS-APIREQUEST': 'true',
'EX-APP-VERSION': 'v1.0.0',
'EX-APP-ID': os.environ['APP_ID'],
'AA-VERSION': 'v2.3.0'
}
auth_data = {
'Authorization': f"Bearer {token}"
}
print(f"sync_API_nextcloud_request: {url} - {method}", flush=True)
print(json.dumps(filter, indent=4), flush=True)
response = await client.request(
method=method,
url=url,
json={'httpMethod': 'POST', 'uri': registered_listener['uri'], 'event': registered_listener['event'], 'eventFilter': filter, 'authMethod': 'header',
'authData': auth_data},
headers=headers,
params={'format': 'json'}
)
print(
f"sync_API_nextcloud_request: method={method}, path={path}, status={response.status_code}",
flush=True
)
try:
response_data = json.loads(response.content)
except json.JSONDecodeError as e:
print(f"Error parsing JSON: {e}")
return
print(json.dumps(response_data, indent=4), flush=True)
return response_data['ocs']['data']


async def get_registered_listeners():
async with httpx.AsyncClient() as client:
method = 'GET'
path = '/ocs/v2.php/apps/webhook_listeners/api/v1/webhooks'
url = os.environ["NEXTCLOUD_URL"] + path
userid = os.environ["NEXTCLOUD_USERID"]
secret = os.environ["NEXTCLOUD_SECRET"]
headers = {
'AUTHORIZATION-APP-API': base64.b64encode(bytes(f"{userid}:{secret}", 'utf-8')).decode('utf-8'),
'OCS-APIREQUEST': 'true',
'EX-APP-VERSION': 'v1.0.0',
'EX-APP-ID': os.environ['APP_ID'],
'AA-VERSION': 'v2.3.0'
}
print(f"sync_API_nextcloud_request: {url} - {method}", flush=True)
response = await client.request(
method=method,
url=url,
params={'format': 'json'},
headers=headers,
)
print(
f"sync_API_nextcloud_request: method={method}, path={path}, status={response.status_code}",
flush=True
)
try:
response_data = json.loads(response.content)
except json.JSONDecodeError as e:
print(f"Error parsing JSON: {e}")
return
print(json.dumps(response_data, indent=4), flush=True)
return response_data['ocs']['data']


async def delete_listener(registered_listener):
async with httpx.AsyncClient() as client:
method = 'DELETE'
path = f"/ocs/v2.php/apps/webhook_listeners/api/v1/webhooks/{registered_listener['id']}"
url = os.environ["NEXTCLOUD_URL"] + path
userid = os.environ["NEXTCLOUD_USERID"]
secret = os.environ["NEXTCLOUD_SECRET"]

headers = {
'AUTHORIZATION-APP-API': base64.b64encode(bytes(f"{userid}:{secret}", 'utf-8')).decode('utf-8'),
'OCS-APIREQUEST': 'true',
'EX-APP-VERSION': 'v1.0.0',
'EX-APP-ID': os.environ['APP_ID'],
'AA-VERSION': 'v2.3.0'
}
print(f"sync_API_nextcloud_request: {url} - {method}", flush=True)
response = await client.request(
method=method,
url=url,
headers=headers,
params={'format': 'json'}
)
print(
f"sync_API_nextcloud_request: method={method}, path={path}, status={response.status_code}",
flush=True
)
try:
response_data = json.loads(response.content)
except json.JSONDecodeError as e:
print(f"Error parsing JSON: {e}")
return
print(json.dumps(response_data, indent=4), flush=True)
return response_data['ocs']['data']

if __name__ == "__main__":
initialize_windmill()
# Current working dir is set for the Service we are wrapping, so change we first for ExApp default one
Expand Down
Loading