Skip to content

Commit

Permalink
feat(core): Dynamically loading dbgpts (#1211)
Browse files Browse the repository at this point in the history
  • Loading branch information
fangyinc authored Feb 29, 2024
1 parent 673ddaa commit 1d90711
Show file tree
Hide file tree
Showing 15 changed files with 504 additions and 33 deletions.
2 changes: 2 additions & 0 deletions dbgpt/app/component_configs.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,15 @@ def initialize_components(
):
# Lazy import to avoid high time cost
from dbgpt.app.initialization.embedding_component import _initialize_embedding_model
from dbgpt.app.initialization.scheduler import DefaultScheduler
from dbgpt.app.initialization.serve_initialization import register_serve_apps
from dbgpt.model.cluster.controller.controller import controller

# Register global default executor factory first
system_app.register(
DefaultExecutorFactory, max_workers=param.default_thread_pool_size
)
system_app.register(DefaultScheduler)
system_app.register_instance(controller)

from dbgpt.serve.agent.hub.controller import module_plugin
Expand Down
43 changes: 43 additions & 0 deletions dbgpt/app/initialization/scheduler.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
import logging
import threading
import time

import schedule

from dbgpt.component import BaseComponent, SystemApp

logger = logging.getLogger(__name__)


class DefaultScheduler(BaseComponent):
"""The default scheduler"""

name = "dbgpt_default_scheduler"

def __init__(
self,
system_app: SystemApp,
scheduler_delay_ms: int = 5000,
scheduler_interval_ms: int = 1000,
):
super().__init__(system_app)
self.system_app = system_app
self._scheduler_interval_ms = scheduler_interval_ms
self._scheduler_delay_ms = scheduler_delay_ms

def init_app(self, system_app: SystemApp):
self.system_app = system_app

def after_start(self):
thread = threading.Thread(target=self._scheduler)
thread.start()

def _scheduler(self):
time.sleep(self._scheduler_delay_ms / 1000)
while True:
try:
schedule.run_pending()
except Exception as e:
logger.debug(f"Scheduler error: {e}")
finally:
time.sleep(self._scheduler_interval_ms / 1000)
3 changes: 2 additions & 1 deletion dbgpt/cli/cli_scripts.py
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ def stop_all():
from dbgpt.util.dbgpts.cli import add_repo
from dbgpt.util.dbgpts.cli import install as app_install
from dbgpt.util.dbgpts.cli import list_all_apps as app_list
from dbgpt.util.dbgpts.cli import list_repos, remove_repo
from dbgpt.util.dbgpts.cli import list_repos, new_dbgpts, remove_repo
from dbgpt.util.dbgpts.cli import uninstall as app_uninstall
from dbgpt.util.dbgpts.cli import update_repo

Expand All @@ -174,6 +174,7 @@ def stop_all():
add_command_alias(app_install, name="install", parent_group=app)
add_command_alias(app_uninstall, name="uninstall", parent_group=app)
add_command_alias(app_list, name="list-remote", parent_group=app)
add_command_alias(new_dbgpts, name="app", parent_group=new)

except ImportError as e:
logging.warning(f"Integrating dbgpt dbgpts command line tool failed: {e}")
Expand Down
14 changes: 9 additions & 5 deletions dbgpt/core/awel/dag/loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,19 +63,23 @@ def _process_file(filepath) -> List[DAG]:
return results


def _load_modules_from_file(filepath: str):
def _load_modules_from_file(
filepath: str, mod_name: str | None = None, show_log: bool = True
):
import importlib
import importlib.machinery
import importlib.util

logger.info(f"Importing {filepath}")
if show_log:
logger.info(f"Importing {filepath}")

org_mod_name, _ = os.path.splitext(os.path.split(filepath)[-1])
path_hash = hashlib.sha1(filepath.encode("utf-8")).hexdigest()
mod_name = f"unusual_prefix_{path_hash}_{org_mod_name}"
if mod_name is None:
mod_name = f"unusual_prefix_{path_hash}_{org_mod_name}"

if mod_name in sys.modules:
del sys.modules[mod_name]
if mod_name in sys.modules:
del sys.modules[mod_name]

def parse(mod_name, filepath):
try:
Expand Down
2 changes: 1 addition & 1 deletion dbgpt/core/awel/trigger/http_trigger.py
Original file line number Diff line number Diff line change
Expand Up @@ -1046,7 +1046,7 @@ class RequestedParsedOperator(MapOperator[CommonLLMHttpRequestBody, str]):
"key",
str,
optional=True,
default="",
default="messages",
description="The key of the dict, link 'user_input'",
)
],
Expand Down
2 changes: 1 addition & 1 deletion dbgpt/model/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ def reduce(outs: List["WorkerApplyOutput"]) -> "WorkerApplyOutput":
return WorkerApplyOutput("Not outputs")
combined_success = all(out.success for out in outs)
max_timecost = max(out.timecost for out in outs)
combined_message = ", ".join(out.message for out in outs)
combined_message = "\n;".join(out.message for out in outs)
return WorkerApplyOutput(combined_message, combined_success, max_timecost)


Expand Down
31 changes: 26 additions & 5 deletions dbgpt/model/cluster/worker/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import random
import sys
import time
import traceback
from concurrent.futures import ThreadPoolExecutor
from dataclasses import asdict
from typing import Awaitable, Callable, Iterator
Expand Down Expand Up @@ -490,6 +491,8 @@ async def _apply_worker(
async def _start_all_worker(
self, apply_req: WorkerApplyRequest
) -> WorkerApplyOutput:
from httpx import TimeoutException, TransportError

# TODO avoid start twice
start_time = time.time()
logger.info(f"Begin start all worker, apply_req: {apply_req}")
Expand Down Expand Up @@ -520,9 +523,24 @@ async def _start_worker(worker_run_data: WorkerRunData):
)
)
out.message = f"{info} start successfully"
except Exception as e:
except TimeoutException as e:
out.success = False
out.message = (
f"{info} start failed for network timeout, please make "
f"sure your port is available, if you are using global network "
f"proxy, please close it"
)
except TransportError as e:
out.success = False
out.message = f"{info} start failed, {str(e)}"
out.message = (
f"{info} start failed for network error, please make "
f"sure your port is available, if you are using global network "
"proxy, please close it"
)
except Exception:
err_msg = traceback.format_exc()
out.success = False
out.message = f"{info} start failed, {err_msg}"
finally:
out.timecost = time.time() - _start_time
return out
Expand Down Expand Up @@ -837,10 +855,13 @@ async def start_worker_manager():
try:
await worker_manager.start()
except Exception as e:
logger.error(f"Error starting worker manager: {e}")
sys.exit(1)
import signal

logger.error(f"Error starting worker manager: {str(e)}")
os.kill(os.getpid(), signal.SIGINT)

# It cannot be blocked here because the startup of worker_manager depends on the fastapi app (registered to the controller)
# It cannot be blocked here because the startup of worker_manager depends on
# the fastapi app (registered to the controller)
asyncio.create_task(start_worker_manager())

@app.on_event("shutdown")
Expand Down
3 changes: 3 additions & 0 deletions dbgpt/serve/flow/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,3 +20,6 @@ class ServeConfig(BaseServeConfig):
api_keys: Optional[str] = field(
default=None, metadata={"help": "API keys for the endpoint, if None, allow all"}
)
load_dbgpts_interval: int = field(
default=5, metadata={"help": "Interval to load dbgpts from installed packages"}
)
25 changes: 18 additions & 7 deletions dbgpt/serve/flow/service/service.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import traceback
from typing import Any, List, Optional, cast

import schedule
from fastapi import HTTPException

from dbgpt.component import SystemApp
Expand Down Expand Up @@ -56,7 +57,10 @@ def init_app(self, system_app: SystemApp) -> None:
self._dao = self._dao or ServeDao(self._serve_config)
self._system_app = system_app
self._dbgpts_loader = system_app.get_component(
DBGPTsLoader.name, DBGPTsLoader, or_register_component=DBGPTsLoader
DBGPTsLoader.name,
DBGPTsLoader,
or_register_component=DBGPTsLoader,
load_dbgpts_interval=self._serve_config.load_dbgpts_interval,
)

def before_start(self):
Expand All @@ -68,7 +72,10 @@ def before_start(self):
def after_start(self):
"""Execute after the application starts"""
self.load_dag_from_db()
self.load_dag_from_dbgpts()
self.load_dag_from_dbgpts(is_first_load=True)
schedule.every(self._serve_config.load_dbgpts_interval).seconds.do(
self.load_dag_from_dbgpts
)

@property
def dao(self) -> BaseDao[ServeEntity, ServeRequest, ServerResponse]:
Expand Down Expand Up @@ -126,6 +133,7 @@ def create_and_save_dag(
if save_failed_flow:
request.state = State.LOAD_FAILED
request.error_message = str(e)
request.dag_id = ""
return self.dao.create(request)
else:
raise e
Expand All @@ -147,6 +155,7 @@ def create_and_save_dag(
if save_failed_flow:
request.state = State.LOAD_FAILED
request.error_message = f"Register DAG error: {str(e)}"
request.dag_id = ""
self.dao.update({"uid": request.uid}, request)
else:
# Rollback
Expand Down Expand Up @@ -198,7 +207,7 @@ def _pre_load_dag_from_dbgpts(self):
f"dbgpts error: {str(e)}"
)

def load_dag_from_dbgpts(self):
def load_dag_from_dbgpts(self, is_first_load: bool = False):
"""Load DAG from dbgpts"""
flows = self.dbgpts_loader.get_flows()
for flow in flows:
Expand All @@ -208,7 +217,7 @@ def load_dag_from_dbgpts(self):
exist_inst = self.get({"name": flow.name})
if not exist_inst:
self.create_and_save_dag(flow, save_failed_flow=True)
else:
elif is_first_load or exist_inst.state != State.RUNNING:
# TODO check version, must be greater than the exist one
flow.uid = exist_inst.uid
self.update_flow(flow, check_editable=False, save_failed_flow=True)
Expand Down Expand Up @@ -242,6 +251,7 @@ def update_flow(
if save_failed_flow:
request.state = State.LOAD_FAILED
request.error_message = str(e)
request.dag_id = ""
return self.dao.update({"uid": request.uid}, request)
else:
raise e
Expand Down Expand Up @@ -306,12 +316,13 @@ def delete(self, uid: str) -> Optional[ServerResponse]:
inst = self.get(query_request)
if inst is None:
raise HTTPException(status_code=404, detail=f"Flow {uid} not found")
if not inst.dag_id:
if inst.state == State.RUNNING and not inst.dag_id:
raise HTTPException(
status_code=404, detail=f"Flow {uid}'s dag id not found"
status_code=404, detail=f"Running flow {uid}'s dag id not found"
)
try:
self.dag_manager.unregister_dag(inst.dag_id)
if inst.dag_id:
self.dag_manager.unregister_dag(inst.dag_id)
except Exception as e:
logger.warning(f"Unregister DAG({inst.dag_id}) error: {str(e)}")
self.dao.delete(query_request)
Expand Down
7 changes: 7 additions & 0 deletions dbgpt/util/dbgpts/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,13 @@
INSTALL_METADATA_FILE = "install_metadata.toml"
DBGPTS_METADATA_FILE = "dbgpts.toml"

TYPE_TO_PACKAGE = {
"agent": "agents",
"app": "apps",
"operator": "operators",
"flow": "workflow",
}


def _get_env_sig() -> str:
"""Get a unique signature for the current Python environment."""
Expand Down
Loading

0 comments on commit 1d90711

Please sign in to comment.