Skip to content

Commit

Permalink
fix: on_xxx timing checkings (#556)
Browse files Browse the repository at this point in the history
Co-authored-by: sunxilin <[email protected]>
  • Loading branch information
halajohn and sunxilin authored Jan 14, 2025
1 parent 7ef33e0 commit 06b2297
Show file tree
Hide file tree
Showing 48 changed files with 652 additions and 221 deletions.
2 changes: 1 addition & 1 deletion .vscode/launch.json
Original file line number Diff line number Diff line change
Expand Up @@ -834,7 +834,7 @@
"name": "Python Debugger: Python File",
"type": "debugpy",
"request": "launch",
"program": "${workspaceFolder}/out/linux/x64/tests/ten_runtime/integration/python/standalone_test_python/default_extension_python/tests/test_async_outer_thread.py",
"program": "${workspaceFolder}/out/linux/x64/tests/ten_runtime/integration/python/standalone_test_python/default_extension_python/tests/test_basic.py",
"env": {
"PYTHONPATH": "${workspaceFolder}/out/linux/x64/tests/ten_runtime/integration/python/standalone_test_python/default_extension_python/.ten/app/ten_packages/system/ten_runtime_python/lib:${workspaceFolder}/out/linux/x64/tests/ten_runtime/integration/python/standalone_test_python/default_extension_python/.ten/app/ten_packages/system/ten_runtime_python/interface:${workspaceFolder}/out/linux/x64/tests/ten_runtime/integration/python/standalone_test_python/default_extension_python/.ten/app",
"LD_PRELOAD": "${workspaceFolder}/out/linux/x64/tests/ten_runtime/integration/python/standalone_test_python/default_extension_python/.ten/app/ten_packages/system/ten_runtime/lib/libasan.so"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
#include "include_internal/ten_runtime/extension_group/extension_group.h"
#include "include_internal/ten_runtime/ten_env/ten_env.h"
#include "include_internal/ten_runtime/ten_env_proxy/ten_env_proxy.h"
#include "ten_runtime/addon/extension/extension.h"
#include "ten_runtime/binding/go/interface/ten/common.h"
#include "ten_runtime/binding/go/interface/ten/ten_env.h"
#include "ten_runtime/ten.h"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,9 @@ TEN_RUNTIME_PRIVATE_API PyTypeObject *ten_py_ten_env_tester_type(void);
TEN_RUNTIME_PRIVATE_API PyObject *ten_py_ten_env_tester_on_start_done(
PyObject *self, PyObject *args);

TEN_RUNTIME_PRIVATE_API PyObject *ten_py_ten_env_tester_on_stop_done(
PyObject *self, PyObject *args);

TEN_RUNTIME_PRIVATE_API PyObject *ten_py_ten_env_tester_stop_test(
PyObject *self, PyObject *args);

Expand Down
9 changes: 9 additions & 0 deletions core/include_internal/ten_runtime/extension/extension.h
Original file line number Diff line number Diff line change
Expand Up @@ -65,9 +65,15 @@ typedef struct ten_timer_t ten_timer_t;
typedef enum TEN_EXTENSION_STATE {
TEN_EXTENSION_STATE_INIT,

// on_configure() is called.
TEN_EXTENSION_STATE_ON_CONFIGURE,

// on_configure_done() is completed.
TEN_EXTENSION_STATE_ON_CONFIGURE_DONE,

// on_init() is called.
TEN_EXTENSION_STATE_ON_INIT,

// on_init_done() is completed.
TEN_EXTENSION_STATE_ON_INIT_DONE,

Expand All @@ -77,6 +83,9 @@ typedef enum TEN_EXTENSION_STATE {
// on_start_done() is completed.
TEN_EXTENSION_STATE_ON_START_DONE,

// on_stop() is called.
TEN_EXTENSION_STATE_ON_STOP,

// on_stop_done() is completed.
TEN_EXTENSION_STATE_ON_STOP_DONE,

Expand Down
3 changes: 2 additions & 1 deletion core/include_internal/ten_runtime/test/env_tester.h
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,8 @@ typedef struct ten_env_tester_t {
ten_env_tester_destroy_handler_in_target_lang_func_t destroy_handler;
} ten_env_tester_t;

TEN_RUNTIME_API bool ten_env_tester_check_integrity(ten_env_tester_t *self);
TEN_RUNTIME_API bool ten_env_tester_check_integrity(ten_env_tester_t *self,
bool check_thread);

TEN_RUNTIME_PRIVATE_API ten_env_tester_t *ten_env_tester_create(
ten_extension_tester_t *tester);
Expand Down
25 changes: 15 additions & 10 deletions core/src/ten_runtime/binding/go/native/test/extension_tester.c
Original file line number Diff line number Diff line change
Expand Up @@ -91,8 +91,9 @@ static void proxy_on_start(ten_extension_tester_t *self,
ten_env_tester_t *ten_env_tester) {
TEN_ASSERT(self && ten_extension_tester_check_integrity(self, true),
"Should not happen.");
TEN_ASSERT(ten_env_tester && ten_env_tester_check_integrity(ten_env_tester),
"Should not happen.");
TEN_ASSERT(
ten_env_tester && ten_env_tester_check_integrity(ten_env_tester, true),
"Should not happen.");
TEN_ASSERT(ten_extension_tester_get_ten_env_tester(self) == ten_env_tester,
"Should not happen.");

Expand All @@ -114,8 +115,9 @@ static void proxy_on_cmd(ten_extension_tester_t *self,
ten_shared_ptr_t *cmd) {
TEN_ASSERT(self && ten_extension_tester_check_integrity(self, true),
"Should not happen.");
TEN_ASSERT(ten_env_tester && ten_env_tester_check_integrity(ten_env_tester),
"Should not happen.");
TEN_ASSERT(
ten_env_tester && ten_env_tester_check_integrity(ten_env_tester, true),
"Should not happen.");
TEN_ASSERT(ten_extension_tester_get_ten_env_tester(self) == ten_env_tester,
"Should not happen.");
TEN_ASSERT(cmd && ten_cmd_check_integrity(cmd), "Should not happen.");
Expand All @@ -141,8 +143,9 @@ static void proxy_on_data(ten_extension_tester_t *self,
ten_shared_ptr_t *data) {
TEN_ASSERT(self && ten_extension_tester_check_integrity(self, true),
"Should not happen.");
TEN_ASSERT(ten_env_tester && ten_env_tester_check_integrity(ten_env_tester),
"Should not happen.");
TEN_ASSERT(
ten_env_tester && ten_env_tester_check_integrity(ten_env_tester, true),
"Should not happen.");
TEN_ASSERT(ten_extension_tester_get_ten_env_tester(self) == ten_env_tester,
"Should not happen.");
TEN_ASSERT(data && ten_msg_check_integrity(data), "Should not happen.");
Expand All @@ -168,8 +171,9 @@ static void proxy_on_audio_frame(ten_extension_tester_t *self,
ten_shared_ptr_t *audio_frame) {
TEN_ASSERT(self && ten_extension_tester_check_integrity(self, true),
"Should not happen.");
TEN_ASSERT(ten_env_tester && ten_env_tester_check_integrity(ten_env_tester),
"Should not happen.");
TEN_ASSERT(
ten_env_tester && ten_env_tester_check_integrity(ten_env_tester, true),
"Should not happen.");
TEN_ASSERT(ten_extension_tester_get_ten_env_tester(self) == ten_env_tester,
"Should not happen.");
TEN_ASSERT(audio_frame && ten_msg_check_integrity(audio_frame),
Expand All @@ -196,8 +200,9 @@ static void proxy_on_video_frame(ten_extension_tester_t *self,
ten_shared_ptr_t *video_frame) {
TEN_ASSERT(self && ten_extension_tester_check_integrity(self, true),
"Should not happen.");
TEN_ASSERT(ten_env_tester && ten_env_tester_check_integrity(ten_env_tester),
"Should not happen.");
TEN_ASSERT(
ten_env_tester && ten_env_tester_check_integrity(ten_env_tester, true),
"Should not happen.");
TEN_ASSERT(ten_extension_tester_get_ten_env_tester(self) == ten_env_tester,
"Should not happen.");
TEN_ASSERT(video_frame && ten_msg_check_integrity(video_frame),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,7 @@ def __init__(self, name: str) -> None:
self._ten_stop_event = asyncio.Event()

def __del__(self) -> None:
self._ten_stop_event.set()
if hasattr(self, "_ten_thread"):
self._ten_thread.join()
pass

async def _thread_routine(self, ten_env: TenEnv):
self._ten_loop = asyncio.get_running_loop()
Expand Down
46 changes: 43 additions & 3 deletions core/src/ten_runtime/binding/python/interface/ten/async_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,33 @@ def __init__(

self._ten_loop = loop
self._ten_thread = thread
ten_env_tester._set_release_handler(lambda: self._on_release())

def __del__(self) -> None:
pass

def _deinit_routine(self) -> None:
# Wait for the internal thread to finish.
self._ten_thread.join()

# Since the `_ten_thread` used by the asyncio task queue has already
# ended, we can be confident that no Python code will be using `ten_env`
# at this point. (Of course, if the user has created their own Python
# threads that are holding onto `ten_env`, they will need to handle the
# thread-safety issues themselves.) Therefore, it is safe to call
# `on_stop_done`, during which `ten_env` (_proxy) will be released.

self._internal.on_stop_done()

def _on_release(self) -> None:
if hasattr(self, "_deinit_thread"):
self._deinit_thread.join()

def _deinit(self) -> None:
# Start the deinit thread to avoid blocking the extension tester thread.
self._deinit_thread = threading.Thread(target=self._deinit_routine)
self._deinit_thread.start()

async def send_cmd(self, cmd: Cmd) -> CmdResultTuple:
q = asyncio.Queue(maxsize=1)
self._internal.send_cmd(
Expand Down Expand Up @@ -121,9 +144,7 @@ def __init__(self) -> None:
self._ten_stop_event = asyncio.Event()

def __del__(self) -> None:
self._ten_stop_event.set()
if hasattr(self, "_ten_thread"):
self._ten_thread.join()
pass

def _exit_on_exception(
self, async_ten_env_tester: AsyncTenEnvTester, e: Exception
Expand All @@ -146,6 +167,16 @@ async def _thread_routine(self, ten_env_tester: TenEnvTester) -> None:
# Suspend the thread until stopEvent is set.
await self._ten_stop_event.wait()

await self._wrapper_on_stop(self._async_ten_env_tester)

# We cannot directly call `on_stop_done` here as above, because after
# `on_stop_done`, `ten_env_proxy` will be released. Therefore, we need
# to wait until certain essential tasks are completed before calling
# `on_stop_done`. Otherwise, if anything needs to use `ten_env` (_proxy)
# after `on_stop_done`, it will cause issues.

self._async_ten_env_tester._deinit()

async def _stop_thread(self):
self._ten_stop_event.set()

Expand All @@ -164,6 +195,12 @@ async def _wrapper_on_start(
except Exception as e:
self._exit_on_exception(ten_env_tester, e)

async def _wrapper_on_stop(self, ten_env_tester: AsyncTenEnvTester) -> None:
try:
await self.on_stop(ten_env_tester)
except Exception as e:
self._exit_on_exception(ten_env_tester, e)

@final
def _proxy_on_stop(self, ten_env_tester: TenEnvTester) -> None:
asyncio.run_coroutine_threadsafe(self._stop_thread(), self._ten_loop)
Expand Down Expand Up @@ -251,6 +288,9 @@ def run(self) -> None:
async def on_start(self, ten_env_tester: AsyncTenEnvTester) -> None:
pass

async def on_stop(self, ten_env_tester: AsyncTenEnvTester) -> None:
pass

async def on_cmd(self, ten_env_tester: AsyncTenEnvTester, cmd: Cmd) -> None:
pass

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -245,6 +245,7 @@ class _Addon:

class _TenEnvTester:
def on_start_done(self) -> None: ...
def on_stop_done(self) -> None: ...
def send_cmd(
self, cmd: _Cmd, result_handler: TestResultHandler
) -> None: ...
Expand Down
15 changes: 14 additions & 1 deletion core/src/ten_runtime/binding/python/interface/ten/test.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,19 @@ def __init__(self, internal_obj: _TenEnvTester) -> None:
def __del__(self) -> None:
pass

def _set_release_handler(self, handler: Callable[[], None]) -> None:
self._release_handler = handler

def _on_release(self) -> None:
if hasattr(self, "_release_handler"):
self._release_handler()

def on_start_done(self) -> None:
return self._internal.on_start_done()

def on_stop_done(self) -> None:
return self._internal.on_stop_done()

def send_cmd(self, cmd: Cmd, result_handler: ResultHandler) -> None:
return self._internal.send_cmd(cmd, result_handler)

Expand Down Expand Up @@ -89,7 +99,10 @@ def on_start(self, ten_env_tester: TenEnvTester) -> None:

@final
def _proxy_on_stop(self, ten_env_tester: TenEnvTester) -> None:
pass
self.on_stop(ten_env_tester)

def on_stop(self, ten_env_tester: TenEnvTester) -> None:
ten_env_tester.on_stop_done()

@final
def _proxy_on_cmd(self, ten_env_tester: TenEnvTester, cmd: Cmd) -> None:
Expand Down
2 changes: 1 addition & 1 deletion core/src/ten_runtime/binding/python/manifest.json
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
{
"type": "addon_loader",
"name": "python_addon_loader",
"version": "0.6.0"
"version": "0.7.1"
}
],
"package": {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,12 @@ PyObject *ten_py_ten_env_get_property_to_json(PyObject *self, PyObject *args) {
"Failed to parse argument when ten_env.get_property_to_json.");
}

if (!py_ten_env->c_ten_env_proxy) {
return ten_py_raise_py_value_error_exception(
"ten_env.get_property_to_json() failed because ten_env_proxy is "
"invalid.");
}

ten_value_t *value =
ten_py_ten_property_get_and_check_if_exists(py_ten_env, path);
if (!value) {
Expand Down Expand Up @@ -163,6 +169,11 @@ PyObject *ten_py_ten_env_get_property_int(PyObject *self, PyObject *args) {
"Failed to parse argument when ten_env.get_property_int.");
}

if (!py_ten_env->c_ten_env_proxy) {
return ten_py_raise_py_value_error_exception(
"ten_env.get_property_int() failed because ten_env_proxy is invalid.");
}

ten_value_t *value =
ten_py_ten_property_get_and_check_if_exists(py_ten_env, path);
if (!value) {
Expand Down Expand Up @@ -202,6 +213,12 @@ PyObject *ten_py_ten_env_get_property_string(PyObject *self, PyObject *args) {
"Failed to parse argument when ten_env.get_property_string.");
}

if (!py_ten_env->c_ten_env_proxy) {
return ten_py_raise_py_value_error_exception(
"ten_env.get_property_string() failed because ten_env_proxy is "
"invalid.");
}

ten_value_t *value =
ten_py_ten_property_get_and_check_if_exists(py_ten_env, path);
if (!value) {
Expand Down Expand Up @@ -240,6 +257,11 @@ PyObject *ten_py_ten_env_get_property_bool(PyObject *self, PyObject *args) {
"Failed to parse argument when ten_env.get_property_bool.");
}

if (!py_ten_env->c_ten_env_proxy) {
return ten_py_raise_py_value_error_exception(
"ten_env.get_property_bool() failed because ten_env_proxy is invalid.");
}

ten_value_t *value =
ten_py_ten_property_get_and_check_if_exists(py_ten_env, path);
if (!value) {
Expand Down Expand Up @@ -279,6 +301,12 @@ PyObject *ten_py_ten_env_get_property_float(PyObject *self, PyObject *args) {
"Failed to parse argument when ten_env.get_property_float.");
}

if (!py_ten_env->c_ten_env_proxy) {
return ten_py_raise_py_value_error_exception(
"ten_env.get_property_float() failed because ten_env_proxy is "
"invalid.");
}

ten_value_t *value =
ten_py_ten_property_get_and_check_if_exists(py_ten_env, path);
if (!value) {
Expand Down Expand Up @@ -318,6 +346,11 @@ PyObject *ten_py_ten_env_is_property_exist(PyObject *self, PyObject *args) {
"Failed to parse argument when ten_env.is_property_exist.");
}

if (!py_ten_env->c_ten_env_proxy) {
return ten_py_raise_py_value_error_exception(
"ten_env.is_property_exist() failed because ten_env_proxy is invalid.");
}

ten_value_t *value =
ten_py_ten_property_get_and_check_if_exists(py_ten_env, path);
if (!value) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -426,6 +426,12 @@ static PyObject *ten_py_ten_env_get_property_async(PyObject *self,
"Invalid callback function when ten_env.get_property_to_json_async.");
}

if (!py_ten_env->c_ten_env_proxy) {
return ten_py_raise_py_value_error_exception(
"ten_env.get_property_to_json_async() failed because ten_env_proxy is "
"invalid.");
}

ten_error_t err;
ten_error_init(&err);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,12 @@ PyObject *ten_py_ten_env_init_property_from_json(PyObject *self,
"ten_env.init_property_from_json.");
}

if (!py_ten_env->c_ten_env_proxy) {
return ten_py_raise_py_value_error_exception(
"ten_env.init_property_from_json() failed because ten_env_proxy is "
"invalid.");
}

ten_error_t err;
ten_error_init(&err);

Expand Down Expand Up @@ -213,6 +219,12 @@ PyObject *ten_py_ten_env_init_property_from_json_async(PyObject *self,
"ten_env.init_property_from_json_async.");
}

if (!py_ten_env->c_ten_env_proxy) {
return ten_py_raise_py_value_error_exception(
"ten_env.init_property_from_json_async() failed because ten_env_proxy "
"is invalid.");
}

ten_error_t err;
ten_error_init(&err);

Expand Down
Loading

0 comments on commit 06b2297

Please sign in to comment.