;
+ 204: void;
/**
* Validation Error
*/
diff --git a/frontend/src/components/executions/event-details.tsx b/frontend/src/components/executions/event-details.tsx
index 5bd323b35..7b3263986 100644
--- a/frontend/src/components/executions/event-details.tsx
+++ b/frontend/src/components/executions/event-details.tsx
@@ -1,9 +1,5 @@
import React from "react"
-import {
- DSLRunArgs,
- EventHistoryResponse,
- RunActionInput_Output,
-} from "@/client"
+import { DSLRunArgs, EventHistoryResponse, RunActionInput } from "@/client"
import JsonView from "react18-json-view"
import {
@@ -139,7 +135,7 @@ export function WorkflowExecutionEventDetailView({
-
+
@@ -412,7 +408,7 @@ function ActionEventGeneralInfo({
task: { depends_on, run_if, for_each },
},
}: {
- input: RunActionInput_Output
+ input: RunActionInput
}) {
return (
@@ -467,12 +463,12 @@ function ActionEventGeneralInfo({
function isRunActionInput_Output(
actionInput: unknown
-): actionInput is RunActionInput_Output {
+): actionInput is RunActionInput {
return (
typeof actionInput === "object" &&
actionInput !== null &&
"task" in actionInput &&
- typeof (actionInput as RunActionInput_Output).task === "object"
+ typeof (actionInput as RunActionInput).task === "object"
)
}
diff --git a/tests/conftest.py b/tests/conftest.py
index ccb4072c3..b0bcf711c 100644
--- a/tests/conftest.py
+++ b/tests/conftest.py
@@ -59,8 +59,9 @@ def env_sandbox(monkeysession: pytest.MonkeyPatch):
"TRACECAT__REMOTE_REPOSITORY_URL",
"git+ssh://git@github.com/TracecatHQ/udfs.git",
)
+ # Need this for local unit tests
monkeysession.setattr(
- config, "TRACECAT__REGISTRY_URL", "http://localhost/api/registry"
+ config, "TRACECAT__EXECUTOR_URL", "http://localhost/api/executor"
)
monkeysession.setenv(
@@ -69,9 +70,9 @@ def env_sandbox(monkeysession: pytest.MonkeyPatch):
)
# monkeysession.setenv("TRACECAT__DB_ENCRYPTION_KEY", Fernet.generate_key().decode())
monkeysession.setenv("TRACECAT__API_URL", "http://api:8000")
- monkeysession.setenv("TRACECAT__REGISTRY_URL", "http://registry:8000")
+ # Needed for local unit tests
+ monkeysession.setenv("TRACECAT__EXECUTOR_URL", "http://executor:8000")
monkeysession.setenv("TRACECAT__PUBLIC_API_URL", "http://localhost/api")
- monkeysession.setenv("TRACECAT__PUBLIC_RUNNER_URL", "http://localhost:8001")
monkeysession.setenv("TRACECAT__SERVICE_KEY", os.environ["TRACECAT__SERVICE_KEY"])
monkeysession.setenv("TRACECAT__SIGNING_SECRET", "test-signing-secret")
# When launching the worker directly in a test, use localhost
diff --git a/tests/unit/test_workflows.py b/tests/unit/test_workflows.py
index 9f7a9eb46..1f35e4ab9 100644
--- a/tests/unit/test_workflows.py
+++ b/tests/unit/test_workflows.py
@@ -1601,7 +1601,7 @@ async def test_pull_based_workflow_fetches_latest_version(temporal_client, test_
"------------------------------\n"
"File: /app/tracecat/registry/executor.py\n"
"Function: run_action_in_pool\n"
- "Line: 83"
+ "Line: 200"
),
"type": "RegistryActionError",
"expr_context": "ACTIONS",
diff --git a/tracecat/api/app.py b/tracecat/api/app.py
index 2832b06b7..0c678db62 100644
--- a/tracecat/api/app.py
+++ b/tracecat/api/app.py
@@ -12,6 +12,7 @@
from tracecat.api.common import (
custom_generate_unique_id,
generic_exception_handler,
+ setup_registry,
tracecat_exception_handler,
)
from tracecat.auth.constants import AuthType
@@ -29,6 +30,8 @@
from tracecat.logger import logger
from tracecat.middleware import RequestLoggingMiddleware
from tracecat.organization.router import router as org_router
+from tracecat.registry.actions.router import router as registry_actions_router
+from tracecat.registry.repositories.router import router as registry_repos_router
from tracecat.secrets.router import router as secrets_router
from tracecat.types.auth import AccessLevel, Role
from tracecat.types.exceptions import TracecatException
@@ -50,6 +53,7 @@ async def lifespan(app: FastAPI):
)
async with get_async_session_context_manager() as session:
await setup_defaults(session, admin_role)
+ await setup_registry(session, admin_role)
yield
@@ -139,6 +143,8 @@ def create_app(**kwargs) -> FastAPI:
app.include_router(users_router)
app.include_router(org_router)
app.include_router(editor_router)
+ app.include_router(registry_repos_router)
+ app.include_router(registry_actions_router)
app.include_router(
fastapi_users.get_users_router(UserRead, UserUpdate),
prefix="/users",
diff --git a/tracecat/api/registry.py b/tracecat/api/executor.py
similarity index 64%
rename from tracecat/api/registry.py
rename to tracecat/api/executor.py
index 796955803..8664d2b61 100644
--- a/tracecat/api/registry.py
+++ b/tracecat/api/executor.py
@@ -9,31 +9,19 @@
custom_generate_unique_id,
generic_exception_handler,
setup_oss_models,
- setup_registry,
tracecat_exception_handler,
)
-from tracecat.db.engine import get_async_session_context_manager
from tracecat.logger import logger
from tracecat.middleware import RequestLoggingMiddleware
-from tracecat.registry.actions.router import router as registry_actions_router
-from tracecat.registry.executor import get_executor
-from tracecat.registry.repositories.router import router as registry_repos_router
-from tracecat.types.auth import AccessLevel, Role
+from tracecat.registry.executor import get_executor, router
from tracecat.types.exceptions import TracecatException
@asynccontextmanager
async def lifespan(app: FastAPI):
- admin_role = Role(
- type="service",
- access_level=AccessLevel.ADMIN,
- service_id="tracecat-registry",
- )
- async with get_async_session_context_manager() as session:
- await setup_registry(session, admin_role)
await setup_oss_models()
+ executor = get_executor()
try:
- executor = get_executor()
yield
finally:
executor.shutdown()
@@ -45,20 +33,19 @@ def create_app(**kwargs) -> FastAPI:
else:
allow_origins = ["*"]
app = FastAPI(
- title="Tracecat Registry",
- description="Registry action executor.",
- summary="Tracecat Registry",
+ title="Tracecat Executor",
+ description="Action executor for Tracecat.",
+ summary="Tracecat Executor",
lifespan=lifespan,
default_response_class=ORJSONResponse,
generate_unique_id_function=custom_generate_unique_id,
- root_path="/api/registry",
+ root_path="/api/executor",
**kwargs,
)
app.logger = logger # type: ignore
# Routers
- app.include_router(registry_repos_router)
- app.include_router(registry_actions_router)
+ app.include_router(router)
# Exception handlers
app.add_exception_handler(Exception, generic_exception_handler)
@@ -75,7 +62,7 @@ def create_app(**kwargs) -> FastAPI:
)
logger.info(
- "Registry service started",
+ "Executor service started",
env=config.TRACECAT__APP_ENV,
origins=allow_origins,
auth_types=config.TRACECAT__AUTH_TYPES,
@@ -89,4 +76,4 @@ def create_app(**kwargs) -> FastAPI:
@app.get("/", include_in_schema=False)
def root() -> dict[str, str]:
- return {"message": "Hello world. I am the registry."}
+ return {"message": "Hello world. I am the executor."}
diff --git a/tracecat/config.py b/tracecat/config.py
index 07507ef63..c23a1f3b6 100644
--- a/tracecat/config.py
+++ b/tracecat/config.py
@@ -4,23 +4,12 @@
from tracecat.auth.constants import AuthType
-# === Actions Config === #
-HTTP_MAX_RETRIES = 10
-LLM_MAX_RETRIES = 3
-
# === Internal Services === #
-TRACECAT__SCHEDULE_INTERVAL_SECONDS = os.environ.get(
- "TRACECAT__SCHEDULE_INTERVAL_SECONDS", 60
-)
-TRACECAT__SCHEDULE_MAX_CONNECTIONS = 6
TRACECAT__APP_ENV: Literal["development", "staging", "production"] = os.environ.get(
"TRACECAT__APP_ENV", "development"
) # type: ignore
TRACECAT__API_URL = os.environ.get("TRACECAT__API_URL", "http://localhost:8000")
TRACECAT__API_ROOT_PATH = os.environ.get("TRACECAT__API_ROOT_PATH", "/api")
-TRACECAT__PUBLIC_RUNNER_URL = os.environ.get(
- "TRACECAT__PUBLIC_RUNNER_URL", "http://localhost/api"
-)
TRACECAT__PUBLIC_API_URL = os.environ.get(
"TRACECAT__PUBLIC_API_URL", "http://localhost/api"
)
@@ -32,8 +21,8 @@
"TRACECAT__DB_URI",
"postgresql+psycopg://postgres:postgres@postgres_db:5432/postgres",
)
-TRACECAT__REGISTRY_URL = os.environ.get(
- "TRACECAT__REGISTRY_URL", "http://registry:8000"
+TRACECAT__EXECUTOR_URL = os.environ.get(
+ "TRACECAT__EXECUTOR_URL", "http://executor:8000"
)
TRACECAT__DB_NAME = os.environ.get("TRACECAT__DB_NAME")
diff --git a/tracecat/db/schemas.py b/tracecat/db/schemas.py
index 721adff0f..806f64362 100644
--- a/tracecat/db/schemas.py
+++ b/tracecat/db/schemas.py
@@ -342,7 +342,7 @@ def secret(self) -> str:
@computed_field
@property
def url(self) -> str:
- return f"{config.TRACECAT__PUBLIC_RUNNER_URL}/webhooks/{self.workflow_id}/{self.secret}"
+ return f"{config.TRACECAT__PUBLIC_API_URL}/webhooks/{self.workflow_id}/{self.secret}"
class Schedule(Resource, table=True):
diff --git a/tracecat/identifiers/__init__.py b/tracecat/identifiers/__init__.py
index d883a4778..19e274f7f 100644
--- a/tracecat/identifiers/__init__.py
+++ b/tracecat/identifiers/__init__.py
@@ -71,7 +71,7 @@
"tracecat-cli",
"tracecat-schedule-runner",
"tracecat-service",
- "tracecat-registry",
+ "tracecat-executor",
]
__all__ = [
diff --git a/tracecat/registry/actions/router.py b/tracecat/registry/actions/router.py
index 8af7da1be..dc7f52484 100644
--- a/tracecat/registry/actions/router.py
+++ b/tracecat/registry/actions/router.py
@@ -1,29 +1,19 @@
-import traceback
-from typing import Any
-
from fastapi import APIRouter, HTTPException, status
from sqlalchemy.exc import IntegrityError
from tracecat.auth.credentials import RoleACL
from tracecat.concurrency import GatheringTaskGroup
-from tracecat.contexts import ctx_logger, ctx_role
from tracecat.db.dependencies import AsyncDBSession
-from tracecat.dsl.models import RunActionInput
from tracecat.logger import logger
-from tracecat.registry import executor
from tracecat.registry.actions.models import (
RegistryActionCreate,
- RegistryActionErrorInfo,
RegistryActionRead,
RegistryActionUpdate,
- RegistryActionValidate,
- RegistryActionValidateResponse,
)
from tracecat.registry.actions.service import RegistryActionsService
from tracecat.registry.constants import DEFAULT_REGISTRY_ORIGIN, REGISTRY_ACTIONS_PATH
from tracecat.types.auth import AccessLevel, Role
from tracecat.types.exceptions import RegistryError
-from tracecat.validation.service import validate_registry_action_args
router = APIRouter(prefix=REGISTRY_ACTIONS_PATH, tags=["registry-actions"])
@@ -154,82 +144,3 @@ async def delete_registry_action(
)
# Delete the action as it's not a base action
await service.delete_action(action)
-
-
-# Registry Action Controls
-
-
-@router.post("/{action_name}/execute")
-async def run_registry_action(
- *,
- role: Role = RoleACL(
- allow_user=False, # XXX(authz): Users cannot execute actions
- allow_service=True, # Only services can execute actions
- require_workspace="no",
- ),
- action_name: str,
- action_input: RunActionInput,
-) -> Any:
- """Execute a registry action."""
- ref = action_input.task.ref
- ctx_role.set(role)
- act_logger = logger.bind(role=role, action_name=action_name, ref=ref)
- ctx_logger.set(act_logger)
-
- act_logger.info("Starting action")
- try:
- return await executor.run_action_in_pool(input=action_input)
- except Exception as e:
- # Get the traceback info
- tb = traceback.extract_tb(e.__traceback__)[-1] # Get the last frame
- error_detail = RegistryActionErrorInfo(
- action_name=action_name,
- type=e.__class__.__name__,
- message=str(e),
- filename=tb.filename,
- function=tb.name,
- lineno=tb.lineno,
- )
- act_logger.error(
- "Error running action",
- action_name=action_name,
- type=error_detail.type,
- message=error_detail.message,
- filename=error_detail.filename,
- function=error_detail.function,
- lineno=error_detail.lineno,
- )
- raise HTTPException(
- status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
- detail=error_detail.model_dump(mode="json"),
- ) from e
-
-
-@router.post("/{action_name}/validate")
-async def validate_registry_action(
- *,
- role: Role = RoleACL(
- allow_user=False, # XXX(authz): Users cannot validate actions
- allow_service=True, # Only services can validate actions
- require_workspace="no",
- ),
- session: AsyncDBSession,
- action_name: str,
- params: RegistryActionValidate,
-) -> RegistryActionValidateResponse:
- """Validate a registry action."""
- try:
- result = await validate_registry_action_args(
- session=session, action_name=action_name, args=params.args
- )
-
- if result.status == "error":
- logger.warning(
- "Error validating UDF args", message=result.msg, details=result.detail
- )
- return RegistryActionValidateResponse.from_validation_result(result)
- except KeyError as e:
- raise HTTPException(
- status_code=status.HTTP_404_NOT_FOUND,
- detail=f"Action {action_name!r} not found in registry",
- ) from e
diff --git a/tracecat/registry/actions/service.py b/tracecat/registry/actions/service.py
index 07a7402c0..5648059cc 100644
--- a/tracecat/registry/actions/service.py
+++ b/tracecat/registry/actions/service.py
@@ -22,6 +22,7 @@
RegistryActionUpdate,
model_converters,
)
+from tracecat.registry.client import RegistryClient
from tracecat.registry.loaders import get_bound_action_impl
from tracecat.registry.repository import Repository
from tracecat.types.auth import Role
@@ -208,9 +209,11 @@ async def sync_actions_from_repository(self, db_repo: RegistryRepository) -> Non
- For each repository, we need to reimport the packages to run decorators. (for remote this involves pulling)
- Scan the repositories for implementation details/metadata and update the DB
"""
+ # (1) Update the API's view of the repository
repo = Repository(origin=db_repo.origin, role=self.role)
await repo.load_from_origin()
+ # (2) Handle DB bookkeeping for the API's view of the repository
# Perform diffing here. The expectation for this endpoint is to sync Tracecat's view of
# the repository with the remote repository -- meaning any creation/updates/deletions to
# actions should be propogated to the db.
@@ -275,6 +278,11 @@ async def sync_actions_from_repository(self, db_repo: RegistryRepository) -> Non
deleted=n_deleted,
)
+ # (3) Update the executor's view of the repository
+ self.logger.info("Syncing executor", origin=db_repo.origin)
+ client = RegistryClient(role=self.role)
+ await client.sync_executor(origin=db_repo.origin)
+
async def load_action_impl(self, action_name: str) -> BoundRegistryAction:
"""
Load the implementation for a registry action.
diff --git a/tracecat/registry/client.py b/tracecat/registry/client.py
index 43bc50749..c6fa04039 100644
--- a/tracecat/registry/client.py
+++ b/tracecat/registry/client.py
@@ -1,11 +1,18 @@
"""Use this in worker to execute actions."""
-from collections.abc import Mapping
+from collections.abc import AsyncIterator, Mapping
+from contextlib import asynccontextmanager
from json import JSONDecodeError
from typing import Any, cast
import httpx
import orjson
+from tenacity import (
+ retry,
+ retry_if_exception_type,
+ stop_after_attempt,
+ wait_exponential,
+)
from tracecat import config
from tracecat.clients import AuthenticatedServiceClient
@@ -26,9 +33,11 @@ class _RegistryHTTPClient(AuthenticatedServiceClient):
"""Async httpx client for the registry service."""
def __init__(self, role: Role | None = None, *args: Any, **kwargs: Any) -> None:
- self._registry_base_url = config.TRACECAT__REGISTRY_URL
+ self._registry_base_url = config.TRACECAT__EXECUTOR_URL
super().__init__(role, *args, base_url=self._registry_base_url, **kwargs)
- self.params = self.params.add("workspace_id", str(self.role.workspace_id))
+ self.params = self.params.add(
+ "workspace_id", str(self.role.workspace_id) if self.role else None
+ )
class RegistryClient:
@@ -42,6 +51,11 @@ def __init__(self, role: Role | None = None):
self.role = role or ctx_role.get()
self.logger = logger.bind(service="registry-client", role=self.role)
+ @asynccontextmanager
+ async def _client(self) -> AsyncIterator[_RegistryHTTPClient]:
+ async with _RegistryHTTPClient(self.role) as client:
+ yield client
+
"""Execution"""
async def call_action(self, input: RunActionInput) -> Any:
@@ -75,7 +89,6 @@ async def call_action(self, input: RunActionInput) -> Any:
action_type = input.task.action
content = input.model_dump_json()
- workspace_id = str(self.role.workspace_id) if self.role.workspace_id else None
logger.debug(
f"Calling action {action_type!r} with content",
content=content,
@@ -83,9 +96,9 @@ async def call_action(self, input: RunActionInput) -> Any:
timeout=self._timeout,
)
try:
- async with _RegistryHTTPClient(self.role) as client:
+ async with self._client() as client:
response = await client.post(
- f"{self._actions_endpoint}/{action_type}/execute",
+ f"/run/{action_type}",
# NOTE(perf): Maybe serialize with orjson.dumps instead
headers={
"Content-Type": "application/json",
@@ -93,7 +106,6 @@ async def call_action(self, input: RunActionInput) -> Any:
**self.role.to_headers(),
},
content=content,
- params={"workspace_id": workspace_id},
timeout=self._timeout,
)
response.raise_for_status()
@@ -144,10 +156,9 @@ async def validate_action(
"""Validate an action."""
try:
logger.warning("Validating action")
- async with _RegistryHTTPClient(self.role) as client:
+ async with self._client() as client:
response = await client.post(
- f"{self._actions_endpoint}/{action_name}/validate",
- json={"args": args},
+ f"/validate/{action_name}", json={"args": args}
)
response.raise_for_status()
return RegistryActionValidateResponse.model_validate_json(response.content)
@@ -164,6 +175,55 @@ async def validate_action(
f"Unexpected error while listing registries: {str(e)}"
) from e
+ """Executor"""
+
+ async def sync_executor(self, origin: str, *, max_attempts: int = 3) -> None:
+ """Sync the executor from the registry.
+
+ Args:
+ origin: The origin of the sync request
+
+ Raises:
+ RegistryError: If the sync fails after all retries
+ """
+
+ @retry(
+ stop=stop_after_attempt(max_attempts),
+ wait=wait_exponential(multiplier=1, min=4, max=10),
+ retry=retry_if_exception_type(
+ (
+ httpx.HTTPStatusError,
+ httpx.RequestError,
+ httpx.TimeoutException,
+ httpx.ConnectError,
+ )
+ ),
+ )
+ async def _sync_request() -> None:
+ try:
+ async with self._client() as client:
+ response = await client.post("/sync", json={"origin": origin})
+ response.raise_for_status()
+ except Exception as e:
+ logger.error("Error syncing executor", error=e)
+ raise
+
+ try:
+ logger.info("Syncing executor", origin=origin)
+ _ = await _sync_request()
+ except httpx.HTTPStatusError as e:
+ raise RegistryError(
+ f"Failed to sync executor: HTTP {e.response.status_code}"
+ ) from e
+ except httpx.RequestError as e:
+ raise RegistryError(
+ f"Network error while syncing executor: {str(e)}"
+ ) from e
+ except Exception as e:
+ raise RegistryError(
+ f"Unexpected error while syncing executor: {str(e)}"
+ ) from e
+
"""Registry management"""
async def list_repositories(self) -> list[str]:
diff --git a/tracecat/registry/constants.py b/tracecat/registry/constants.py
index 25167d959..311598999 100644
--- a/tracecat/registry/constants.py
+++ b/tracecat/registry/constants.py
@@ -3,8 +3,8 @@
CUSTOM_REPOSITORY_ORIGIN = "custom"
GITHUB_SSH_KEY_SECRET_NAME = "github-ssh-key"
-REGISTRY_REPOS_PATH: str = "/repos"
+REGISTRY_REPOS_PATH: str = "/registry/repos"
"""Base path for repository-related endpoints"""
-REGISTRY_ACTIONS_PATH: str = "/actions"
+REGISTRY_ACTIONS_PATH: str = "/registry/actions"
"""Base path for action-related endpoints"""
diff --git a/tracecat/registry/executor.py b/tracecat/registry/executor.py
index ffd7de38a..6eb4b8ec8 100644
--- a/tracecat/registry/executor.py
+++ b/tracecat/registry/executor.py
@@ -6,16 +6,21 @@
from __future__ import annotations
import asyncio
+import traceback
from collections.abc import Iterator, Mapping
from concurrent.futures import ProcessPoolExecutor
from typing import Any, cast
import uvloop
+from fastapi import APIRouter, HTTPException, status
+from pydantic import BaseModel
from tracecat import config
+from tracecat.auth.credentials import RoleACL
from tracecat.auth.sandbox import AuthSandbox
from tracecat.concurrency import GatheringTaskGroup
from tracecat.contexts import ctx_logger, ctx_role, ctx_run
+from tracecat.db.dependencies import AsyncDBSession
from tracecat.db.engine import get_async_engine
from tracecat.dsl.common import context_locator, create_default_dsl_context
from tracecat.dsl.models import (
@@ -33,20 +38,132 @@
from tracecat.expressions.shared import ExprContext
from tracecat.logger import logger
from tracecat.parse import traverse_leaves
-from tracecat.registry.actions.models import ArgsClsT, BoundRegistryAction
+from tracecat.registry.actions.models import (
+ ArgsClsT,
+ BoundRegistryAction,
+ RegistryActionErrorInfo,
+ RegistryActionValidate,
+ RegistryActionValidateResponse,
+)
from tracecat.registry.actions.service import RegistryActionsService
+from tracecat.registry.repository import Repository
from tracecat.secrets.common import apply_masks_object
from tracecat.secrets.constants import DEFAULT_SECRETS_ENVIRONMENT
from tracecat.secrets.secrets_manager import env_sandbox
from tracecat.types.auth import Role
-from tracecat.types.exceptions import TracecatException
+from tracecat.types.exceptions import RegistryError, TracecatException
+from tracecat.validation.service import validate_registry_action_args
"""All these methods are used in the registry executor, not on the worker"""
-type ArgsT = Mapping[str, Any]
+# Registry Action Controls
+type ArgsT = Mapping[str, Any]
_executor: ProcessPoolExecutor | None = None
+router = APIRouter(tags=["executor"])
+
+
+class ExecutorSyncInput(BaseModel):
+ origin: str
+
+
+@router.post("/sync")
+async def sync_executor(
+ *,
+ role: Role = RoleACL(
+ allow_user=False, # XXX(authz): Users cannot sync the executor
+ allow_service=True, # Only services can sync the executor
+ require_workspace="no",
+ ),
+ input: ExecutorSyncInput,
+) -> None:
+ """Sync the executor from the registry."""
+ repo = Repository(origin=input.origin, role=role)
+ try:
+ await repo.load_from_origin()
+ except RegistryError as e:
+ raise HTTPException(
+ status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=str(e)
+ ) from e
+
+
+@router.post("/run/{action_name}")
+async def run_action(
+ *,
+ role: Role = RoleACL(
+ allow_user=False, # XXX(authz): Users cannot execute actions
+ allow_service=True, # Only services can execute actions
+ require_workspace="no",
+ ),
+ action_name: str,
+ action_input: RunActionInput,
+) -> Any:
+ """Execute a registry action."""
+ ref = action_input.task.ref
+ ctx_role.set(role)
+ act_logger = logger.bind(role=role, action_name=action_name, ref=ref)
+ ctx_logger.set(act_logger)
+
+ act_logger.info("Starting action")
+ try:
+ return await run_action_in_pool(input=action_input)
+ except Exception as e:
+ # Get the traceback info
+ tb = traceback.extract_tb(e.__traceback__)[-1] # Get the last frame
+ error_detail = RegistryActionErrorInfo(
+ action_name=action_name,
+ type=e.__class__.__name__,
+ message=str(e),
+ filename=tb.filename,
+ function=tb.name,
+ lineno=tb.lineno,
+ )
+ act_logger.error(
+ "Error running action",
+ action_name=action_name,
+ type=error_detail.type,
+ message=error_detail.message,
+ filename=error_detail.filename,
+ function=error_detail.function,
+ lineno=error_detail.lineno,
+ )
+ raise HTTPException(
+ status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
+ detail=error_detail.model_dump(mode="json"),
+ ) from e
+
+
+@router.post("/validate/{action_name}")
+async def validate_action(
+ *,
+ role: Role = RoleACL(
+ allow_user=False, # XXX(authz): Users cannot validate actions
+ allow_service=True, # Only services can validate actions
+ require_workspace="no",
+ ),
+ session: AsyncDBSession,
+ action_name: str,
+ params: RegistryActionValidate,
+) -> RegistryActionValidateResponse:
+ """Validate a registry action."""
+ try:
+ result = await validate_registry_action_args(
+ session=session, action_name=action_name, args=params.args
+ )
+
+ if result.status == "error":
+ logger.warning(
+ "Error validating UDF args", message=result.msg, details=result.detail
+ )
+ return RegistryActionValidateResponse.from_validation_result(result)
+ except KeyError as e:
+ raise HTTPException(
+ status_code=status.HTTP_404_NOT_FOUND,
+ detail=f"Action {action_name!r} not found in registry",
+ ) from e
+
+
# We want to be able to serve a looped action
# Before we send out tasks to the executor we should inspect the size of the loop
# and set the right chunk size for each worker
diff --git a/tracecat/workflow/executions/service.py b/tracecat/workflow/executions/service.py
index 2cbe025ad..6a71fbb75 100644
--- a/tracecat/workflow/executions/service.py
+++ b/tracecat/workflow/executions/service.py
@@ -19,6 +19,7 @@
WorkflowHandle,
WorkflowHistoryEventFilterType,
)
+from temporalio.service import RPCError
from tracecat import config
from tracecat.contexts import ctx_role
@@ -435,6 +436,15 @@ async def _dispatch_workflow(
except WorkflowFailureError as e:
self.logger.error(str(e), role=self.role, wf_exec_id=wf_exec_id, e=e)
raise e
+ except RPCError as e:
+ self.logger.error(
+ f"Temporal service RPC error occurred while executing the workflow: {e}",
+ role=self.role,
+ wf_exec_id=wf_exec_id,
+ e=e,
+ )
+ raise e
+
except Exception as e:
self.logger.exception(
"Unexpected workflow error", role=self.role, wf_exec_id=wf_exec_id, e=e
From 3315fccff9dc13cce55bd396300e401fd122471b Mon Sep 17 00:00:00 2001
From: Chris Lo <46541035+topher-lo@users.noreply.github.com>
Date: Mon, 9 Dec 2024 19:20:26 -0800
Subject: [PATCH 12/13] ci(build): Run ARM docker builds on ARM runners (#594)
---
.github/workflows/build-push-images.yml | 22 ++++++++++++++++++----
1 file changed, 18 insertions(+), 4 deletions(-)
diff --git a/.github/workflows/build-push-images.yml b/.github/workflows/build-push-images.yml
index 0f346e42e..55e2bc088 100644
--- a/.github/workflows/build-push-images.yml
+++ b/.github/workflows/build-push-images.yml
@@ -12,7 +12,14 @@ permissions:
jobs:
push-api-to-ghcr:
- runs-on: ubuntu-latest
+ runs-on: ${{ matrix.runner }}
+ strategy:
+ matrix:
+ include:
+ - platform: linux/amd64
+ runner: ubuntu-latest
+ - platform: linux/arm64
+ runner: ubuntu-arm64-latest
steps:
- name: Checkout repository
uses: actions/checkout@v4
@@ -48,7 +55,7 @@ jobs:
with:
context: .
push: true
- platforms: linux/amd64,linux/arm64
+ platforms: ${{ matrix.platform }}
tags: |
${{ steps.meta.outputs.tags }}
${{ github.event_name == 'push' && startsWith(github.ref, 'refs/tags/v') && 'ghcr.io/tracecathq/tracecat:latest' || '' }}
@@ -57,7 +64,14 @@ jobs:
cache-to: type=gha,mode=max
push-ui-to-ghcr:
- runs-on: ubuntu-latest
+ runs-on: ${{ matrix.runner }}
+ strategy:
+ matrix:
+ include:
+ - platform: linux/amd64
+ runner: ubuntu-latest
+ - platform: linux/arm64
+ runner: ubuntu-arm64-latest
steps:
- name: Checkout repository
uses: actions/checkout@v4
@@ -106,7 +120,7 @@ jobs:
NEXT_SERVER_API_URL=${{ env.NEXT_SERVER_API_URL }}
NODE_ENV=${{ env.NODE_ENV }}
push: true
- platforms: linux/amd64,linux/arm64
+ platforms: ${{ matrix.platform }}
tags: ${{ steps.meta.outputs.tags }}
${{ github.event_name == 'push' && startsWith(github.ref, 'refs/tags/v') && 'ghcr.io/tracecathq/tracecat-ui:latest' || '' }}
labels: ${{ steps.meta.outputs.labels }}
From fe5350e7f6f5426b123525a7db2535977fc9b810 Mon Sep 17 00:00:00 2001
From: topher-lo <46541035+topher-lo@users.noreply.github.com>
Date: Mon, 9 Dec 2024 19:39:51 -0800
Subject: [PATCH 13/13] ci(infra): Executor port command 8002 not 8000
---
deployments/aws/ecs/ecs-executor.tf | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
diff --git a/deployments/aws/ecs/ecs-executor.tf b/deployments/aws/ecs/ecs-executor.tf
index 8677a98d7..bc004b973 100644
--- a/deployments/aws/ecs/ecs-executor.tf
+++ b/deployments/aws/ecs/ecs-executor.tf
@@ -20,7 +20,7 @@ resource "aws_ecs_task_definition" "executor_task_definition" {
"--host",
"0.0.0.0",
"--port",
- "8000"
+ "8002"
]
portMappings = [
{