Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: on_xxx timing checkings #556

Merged
merged 12 commits into from
Jan 14, 2025
2 changes: 1 addition & 1 deletion .vscode/launch.json
Original file line number Diff line number Diff line change
Expand Up @@ -834,7 +834,7 @@
"name": "Python Debugger: Python File",
"type": "debugpy",
"request": "launch",
"program": "${workspaceFolder}/out/linux/x64/tests/ten_runtime/integration/python/standalone_test_python/default_extension_python/tests/test_async_outer_thread.py",
"program": "${workspaceFolder}/out/linux/x64/tests/ten_runtime/integration/python/standalone_test_python/default_extension_python/tests/test_basic.py",
"env": {
"PYTHONPATH": "${workspaceFolder}/out/linux/x64/tests/ten_runtime/integration/python/standalone_test_python/default_extension_python/.ten/app/ten_packages/system/ten_runtime_python/lib:${workspaceFolder}/out/linux/x64/tests/ten_runtime/integration/python/standalone_test_python/default_extension_python/.ten/app/ten_packages/system/ten_runtime_python/interface:${workspaceFolder}/out/linux/x64/tests/ten_runtime/integration/python/standalone_test_python/default_extension_python/.ten/app",
"LD_PRELOAD": "${workspaceFolder}/out/linux/x64/tests/ten_runtime/integration/python/standalone_test_python/default_extension_python/.ten/app/ten_packages/system/ten_runtime/lib/libasan.so"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
#include "include_internal/ten_runtime/extension_group/extension_group.h"
#include "include_internal/ten_runtime/ten_env/ten_env.h"
#include "include_internal/ten_runtime/ten_env_proxy/ten_env_proxy.h"
#include "ten_runtime/addon/extension/extension.h"
#include "ten_runtime/binding/go/interface/ten/common.h"
#include "ten_runtime/binding/go/interface/ten/ten_env.h"
#include "ten_runtime/ten.h"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,9 @@ TEN_RUNTIME_PRIVATE_API PyTypeObject *ten_py_ten_env_tester_type(void);
TEN_RUNTIME_PRIVATE_API PyObject *ten_py_ten_env_tester_on_start_done(
PyObject *self, PyObject *args);

TEN_RUNTIME_PRIVATE_API PyObject *ten_py_ten_env_tester_on_stop_done(
PyObject *self, PyObject *args);

TEN_RUNTIME_PRIVATE_API PyObject *ten_py_ten_env_tester_stop_test(
PyObject *self, PyObject *args);

Expand Down
9 changes: 9 additions & 0 deletions core/include_internal/ten_runtime/extension/extension.h
Original file line number Diff line number Diff line change
Expand Up @@ -65,9 +65,15 @@ typedef struct ten_timer_t ten_timer_t;
typedef enum TEN_EXTENSION_STATE {
TEN_EXTENSION_STATE_INIT,

// on_configure() is called.
TEN_EXTENSION_STATE_ON_CONFIGURE,

// on_configure_done() is completed.
TEN_EXTENSION_STATE_ON_CONFIGURE_DONE,

// on_init() is called.
TEN_EXTENSION_STATE_ON_INIT,

// on_init_done() is completed.
TEN_EXTENSION_STATE_ON_INIT_DONE,

Expand All @@ -77,6 +83,9 @@ typedef enum TEN_EXTENSION_STATE {
// on_start_done() is completed.
TEN_EXTENSION_STATE_ON_START_DONE,

// on_stop() is called.
TEN_EXTENSION_STATE_ON_STOP,

// on_stop_done() is completed.
TEN_EXTENSION_STATE_ON_STOP_DONE,

Expand Down
3 changes: 2 additions & 1 deletion core/include_internal/ten_runtime/test/env_tester.h
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,8 @@ typedef struct ten_env_tester_t {
ten_env_tester_destroy_handler_in_target_lang_func_t destroy_handler;
} ten_env_tester_t;

TEN_RUNTIME_API bool ten_env_tester_check_integrity(ten_env_tester_t *self);
TEN_RUNTIME_API bool ten_env_tester_check_integrity(ten_env_tester_t *self,
bool check_thread);

TEN_RUNTIME_PRIVATE_API ten_env_tester_t *ten_env_tester_create(
ten_extension_tester_t *tester);
Expand Down
25 changes: 15 additions & 10 deletions core/src/ten_runtime/binding/go/native/test/extension_tester.c
Original file line number Diff line number Diff line change
Expand Up @@ -91,8 +91,9 @@ static void proxy_on_start(ten_extension_tester_t *self,
ten_env_tester_t *ten_env_tester) {
TEN_ASSERT(self && ten_extension_tester_check_integrity(self, true),
"Should not happen.");
TEN_ASSERT(ten_env_tester && ten_env_tester_check_integrity(ten_env_tester),
"Should not happen.");
TEN_ASSERT(
ten_env_tester && ten_env_tester_check_integrity(ten_env_tester, true),
"Should not happen.");
TEN_ASSERT(ten_extension_tester_get_ten_env_tester(self) == ten_env_tester,
"Should not happen.");

Expand All @@ -114,8 +115,9 @@ static void proxy_on_cmd(ten_extension_tester_t *self,
ten_shared_ptr_t *cmd) {
TEN_ASSERT(self && ten_extension_tester_check_integrity(self, true),
"Should not happen.");
TEN_ASSERT(ten_env_tester && ten_env_tester_check_integrity(ten_env_tester),
"Should not happen.");
TEN_ASSERT(
ten_env_tester && ten_env_tester_check_integrity(ten_env_tester, true),
"Should not happen.");
TEN_ASSERT(ten_extension_tester_get_ten_env_tester(self) == ten_env_tester,
"Should not happen.");
TEN_ASSERT(cmd && ten_cmd_check_integrity(cmd), "Should not happen.");
Expand All @@ -141,8 +143,9 @@ static void proxy_on_data(ten_extension_tester_t *self,
ten_shared_ptr_t *data) {
TEN_ASSERT(self && ten_extension_tester_check_integrity(self, true),
"Should not happen.");
TEN_ASSERT(ten_env_tester && ten_env_tester_check_integrity(ten_env_tester),
"Should not happen.");
TEN_ASSERT(
ten_env_tester && ten_env_tester_check_integrity(ten_env_tester, true),
"Should not happen.");
TEN_ASSERT(ten_extension_tester_get_ten_env_tester(self) == ten_env_tester,
"Should not happen.");
TEN_ASSERT(data && ten_msg_check_integrity(data), "Should not happen.");
Expand All @@ -168,8 +171,9 @@ static void proxy_on_audio_frame(ten_extension_tester_t *self,
ten_shared_ptr_t *audio_frame) {
TEN_ASSERT(self && ten_extension_tester_check_integrity(self, true),
"Should not happen.");
TEN_ASSERT(ten_env_tester && ten_env_tester_check_integrity(ten_env_tester),
"Should not happen.");
TEN_ASSERT(
ten_env_tester && ten_env_tester_check_integrity(ten_env_tester, true),
"Should not happen.");
TEN_ASSERT(ten_extension_tester_get_ten_env_tester(self) == ten_env_tester,
"Should not happen.");
TEN_ASSERT(audio_frame && ten_msg_check_integrity(audio_frame),
Expand All @@ -196,8 +200,9 @@ static void proxy_on_video_frame(ten_extension_tester_t *self,
ten_shared_ptr_t *video_frame) {
TEN_ASSERT(self && ten_extension_tester_check_integrity(self, true),
"Should not happen.");
TEN_ASSERT(ten_env_tester && ten_env_tester_check_integrity(ten_env_tester),
"Should not happen.");
TEN_ASSERT(
ten_env_tester && ten_env_tester_check_integrity(ten_env_tester, true),
"Should not happen.");
TEN_ASSERT(ten_extension_tester_get_ten_env_tester(self) == ten_env_tester,
"Should not happen.");
TEN_ASSERT(video_frame && ten_msg_check_integrity(video_frame),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,7 @@ def __init__(self, name: str) -> None:
self._ten_stop_event = asyncio.Event()

def __del__(self) -> None:
self._ten_stop_event.set()
if hasattr(self, "_ten_thread"):
self._ten_thread.join()
pass

async def _thread_routine(self, ten_env: TenEnv):
self._ten_loop = asyncio.get_running_loop()
Expand Down
46 changes: 43 additions & 3 deletions core/src/ten_runtime/binding/python/interface/ten/async_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,33 @@ def __init__(

self._ten_loop = loop
self._ten_thread = thread
ten_env_tester._set_release_handler(lambda: self._on_release())

def __del__(self) -> None:
pass

def _deinit_routine(self) -> None:
# Wait for the internal thread to finish.
self._ten_thread.join()

# Since the `_ten_thread` used by the asyncio task queue has already
# ended, we can be confident that no Python code will be using `ten_env`
# at this point. (Of course, if the user has created their own Python
# threads that are holding onto `ten_env`, they will need to handle the
# thread-safety issues themselves.) Therefore, it is safe to call
# `on_stop_done`, during which `ten_env` (_proxy) will be released.

self._internal.on_stop_done()

def _on_release(self) -> None:
if hasattr(self, "_deinit_thread"):
self._deinit_thread.join()

def _deinit(self) -> None:
# Start the deinit thread to avoid blocking the extension tester thread.
self._deinit_thread = threading.Thread(target=self._deinit_routine)
self._deinit_thread.start()

async def send_cmd(self, cmd: Cmd) -> CmdResultTuple:
q = asyncio.Queue(maxsize=1)
self._internal.send_cmd(
Expand Down Expand Up @@ -121,9 +144,7 @@ def __init__(self) -> None:
self._ten_stop_event = asyncio.Event()

def __del__(self) -> None:
self._ten_stop_event.set()
if hasattr(self, "_ten_thread"):
self._ten_thread.join()
pass

def _exit_on_exception(
self, async_ten_env_tester: AsyncTenEnvTester, e: Exception
Expand All @@ -146,6 +167,16 @@ async def _thread_routine(self, ten_env_tester: TenEnvTester) -> None:
# Suspend the thread until stopEvent is set.
await self._ten_stop_event.wait()

await self._wrapper_on_stop(self._async_ten_env_tester)

# We cannot directly call `on_stop_done` here as above, because after
# `on_stop_done`, `ten_env_proxy` will be released. Therefore, we need
# to wait until certain essential tasks are completed before calling
# `on_stop_done`. Otherwise, if anything needs to use `ten_env` (_proxy)
# after `on_stop_done`, it will cause issues.

self._async_ten_env_tester._deinit()

async def _stop_thread(self):
self._ten_stop_event.set()

Expand All @@ -164,6 +195,12 @@ async def _wrapper_on_start(
except Exception as e:
self._exit_on_exception(ten_env_tester, e)

async def _wrapper_on_stop(self, ten_env_tester: AsyncTenEnvTester) -> None:
try:
await self.on_stop(ten_env_tester)
except Exception as e:
self._exit_on_exception(ten_env_tester, e)

@final
def _proxy_on_stop(self, ten_env_tester: TenEnvTester) -> None:
asyncio.run_coroutine_threadsafe(self._stop_thread(), self._ten_loop)
Expand Down Expand Up @@ -251,6 +288,9 @@ def run(self) -> None:
async def on_start(self, ten_env_tester: AsyncTenEnvTester) -> None:
pass

async def on_stop(self, ten_env_tester: AsyncTenEnvTester) -> None:
pass

async def on_cmd(self, ten_env_tester: AsyncTenEnvTester, cmd: Cmd) -> None:
pass

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -245,6 +245,7 @@ class _Addon:

class _TenEnvTester:
def on_start_done(self) -> None: ...
def on_stop_done(self) -> None: ...
def send_cmd(
self, cmd: _Cmd, result_handler: TestResultHandler
) -> None: ...
Expand Down
15 changes: 14 additions & 1 deletion core/src/ten_runtime/binding/python/interface/ten/test.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,19 @@ def __init__(self, internal_obj: _TenEnvTester) -> None:
def __del__(self) -> None:
pass

def _set_release_handler(self, handler: Callable[[], None]) -> None:
self._release_handler = handler

def _on_release(self) -> None:
if hasattr(self, "_release_handler"):
self._release_handler()

def on_start_done(self) -> None:
return self._internal.on_start_done()

def on_stop_done(self) -> None:
return self._internal.on_stop_done()

def send_cmd(self, cmd: Cmd, result_handler: ResultHandler) -> None:
return self._internal.send_cmd(cmd, result_handler)

Expand Down Expand Up @@ -89,7 +99,10 @@ def on_start(self, ten_env_tester: TenEnvTester) -> None:

@final
def _proxy_on_stop(self, ten_env_tester: TenEnvTester) -> None:
pass
self.on_stop(ten_env_tester)

def on_stop(self, ten_env_tester: TenEnvTester) -> None:
ten_env_tester.on_stop_done()

@final
def _proxy_on_cmd(self, ten_env_tester: TenEnvTester, cmd: Cmd) -> None:
Expand Down
2 changes: 1 addition & 1 deletion core/src/ten_runtime/binding/python/manifest.json
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
{
"type": "addon_loader",
"name": "python_addon_loader",
"version": "0.6.0"
"version": "0.7.1"
}
],
"package": {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,12 @@ PyObject *ten_py_ten_env_get_property_to_json(PyObject *self, PyObject *args) {
"Failed to parse argument when ten_env.get_property_to_json.");
}

if (!py_ten_env->c_ten_env_proxy) {
return ten_py_raise_py_value_error_exception(
"ten_env.get_property_to_json() failed because ten_env_proxy is "
"invalid.");
}

ten_value_t *value =
ten_py_ten_property_get_and_check_if_exists(py_ten_env, path);
if (!value) {
Expand Down Expand Up @@ -163,6 +169,11 @@ PyObject *ten_py_ten_env_get_property_int(PyObject *self, PyObject *args) {
"Failed to parse argument when ten_env.get_property_int.");
}

if (!py_ten_env->c_ten_env_proxy) {
return ten_py_raise_py_value_error_exception(
"ten_env.get_property_int() failed because ten_env_proxy is invalid.");
}

ten_value_t *value =
ten_py_ten_property_get_and_check_if_exists(py_ten_env, path);
if (!value) {
Expand Down Expand Up @@ -202,6 +213,12 @@ PyObject *ten_py_ten_env_get_property_string(PyObject *self, PyObject *args) {
"Failed to parse argument when ten_env.get_property_string.");
}

if (!py_ten_env->c_ten_env_proxy) {
return ten_py_raise_py_value_error_exception(
"ten_env.get_property_string() failed because ten_env_proxy is "
"invalid.");
}

ten_value_t *value =
ten_py_ten_property_get_and_check_if_exists(py_ten_env, path);
if (!value) {
Expand Down Expand Up @@ -240,6 +257,11 @@ PyObject *ten_py_ten_env_get_property_bool(PyObject *self, PyObject *args) {
"Failed to parse argument when ten_env.get_property_bool.");
}

if (!py_ten_env->c_ten_env_proxy) {
return ten_py_raise_py_value_error_exception(
"ten_env.get_property_bool() failed because ten_env_proxy is invalid.");
}

ten_value_t *value =
ten_py_ten_property_get_and_check_if_exists(py_ten_env, path);
if (!value) {
Expand Down Expand Up @@ -279,6 +301,12 @@ PyObject *ten_py_ten_env_get_property_float(PyObject *self, PyObject *args) {
"Failed to parse argument when ten_env.get_property_float.");
}

if (!py_ten_env->c_ten_env_proxy) {
return ten_py_raise_py_value_error_exception(
"ten_env.get_property_float() failed because ten_env_proxy is "
"invalid.");
}

ten_value_t *value =
ten_py_ten_property_get_and_check_if_exists(py_ten_env, path);
if (!value) {
Expand Down Expand Up @@ -318,6 +346,11 @@ PyObject *ten_py_ten_env_is_property_exist(PyObject *self, PyObject *args) {
"Failed to parse argument when ten_env.is_property_exist.");
}

if (!py_ten_env->c_ten_env_proxy) {
return ten_py_raise_py_value_error_exception(
"ten_env.is_property_exist() failed because ten_env_proxy is invalid.");
}

ten_value_t *value =
ten_py_ten_property_get_and_check_if_exists(py_ten_env, path);
if (!value) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -426,6 +426,12 @@ static PyObject *ten_py_ten_env_get_property_async(PyObject *self,
"Invalid callback function when ten_env.get_property_to_json_async.");
}

if (!py_ten_env->c_ten_env_proxy) {
return ten_py_raise_py_value_error_exception(
"ten_env.get_property_to_json_async() failed because ten_env_proxy is "
"invalid.");
}

ten_error_t err;
ten_error_init(&err);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,12 @@ PyObject *ten_py_ten_env_init_property_from_json(PyObject *self,
"ten_env.init_property_from_json.");
}

if (!py_ten_env->c_ten_env_proxy) {
return ten_py_raise_py_value_error_exception(
"ten_env.init_property_from_json() failed because ten_env_proxy is "
"invalid.");
}

ten_error_t err;
ten_error_init(&err);

Expand Down Expand Up @@ -213,6 +219,12 @@ PyObject *ten_py_ten_env_init_property_from_json_async(PyObject *self,
"ten_env.init_property_from_json_async.");
}

if (!py_ten_env->c_ten_env_proxy) {
return ten_py_raise_py_value_error_exception(
"ten_env.init_property_from_json_async() failed because ten_env_proxy "
"is invalid.");
}

ten_error_t err;
ten_error_init(&err);

Expand Down
Loading
Loading