From c1a4ba725f6667d4a50c108068e50c9f45975fa9 Mon Sep 17 00:00:00 2001 From: Kamil Paradowski Date: Wed, 13 Aug 2025 11:45:38 +0200 Subject: [PATCH 01/11] feat: Added ThreadsafeFunction class and runtime methods --- packages/host/android/CMakeLists.txt | 2 + packages/host/cpp/RuntimeNodeApiAsync.cpp | 83 ++++++ packages/host/cpp/RuntimeNodeApiAsync.hpp | 30 ++ packages/host/cpp/ThreadsafeFunction.cpp | 260 ++++++++++++++++++ packages/host/cpp/ThreadsafeFunction.hpp | 78 ++++++ .../generate-weak-node-api-injector.ts | 7 + 6 files changed, 460 insertions(+) create mode 100644 packages/host/cpp/ThreadsafeFunction.cpp create mode 100644 packages/host/cpp/ThreadsafeFunction.hpp diff --git a/packages/host/android/CMakeLists.txt b/packages/host/android/CMakeLists.txt index d062762a..155d8263 100644 --- a/packages/host/android/CMakeLists.txt +++ b/packages/host/android/CMakeLists.txt @@ -24,6 +24,8 @@ add_library(node-api-host SHARED ../cpp/RuntimeNodeApi.hpp ../cpp/RuntimeNodeApiAsync.cpp ../cpp/RuntimeNodeApiAsync.hpp + ../cpp/ThreadsafeFunction.cpp + ../cpp/ThreadsafeFunction.hpp ) target_include_directories(node-api-host PRIVATE diff --git a/packages/host/cpp/RuntimeNodeApiAsync.cpp b/packages/host/cpp/RuntimeNodeApiAsync.cpp index dd4c87c3..405cf58b 100644 --- a/packages/host/cpp/RuntimeNodeApiAsync.cpp +++ b/packages/host/cpp/RuntimeNodeApiAsync.cpp @@ -1,6 +1,7 @@ #include "RuntimeNodeApiAsync.hpp" #include #include "Logger.hpp" +#include "ThreadsafeFunction.hpp" struct AsyncJob { using IdType = uint64_t; @@ -187,4 +188,86 @@ napi_status napi_cancel_async_work( job->state = AsyncJob::State::Cancelled; return napi_ok; } + +napi_status napi_create_threadsafe_function(napi_env env, + napi_value func, + napi_value async_resource, + napi_value async_resource_name, + size_t max_queue_size, + size_t initial_thread_count, + void* thread_finalize_data, + napi_finalize thread_finalize_cb, + void* context, + napi_threadsafe_function_call_js call_js_cb, + napi_threadsafe_function* result) { + const auto function = ThreadSafeFunction::create(getCallInvoker(env), + env, + func, + async_resource, + async_resource_name, + max_queue_size, + initial_thread_count, + thread_finalize_data, + thread_finalize_cb, + context, + call_js_cb); + *result = reinterpret_cast(function.get()); + return napi_ok; +} + +napi_status napi_get_threadsafe_function_context( + napi_threadsafe_function func, void** result) { + const auto function = ThreadSafeFunction::get(func); + if (!function) { + return napi_invalid_arg; + } + return function->getContext(result); +} + +napi_status napi_call_threadsafe_function(napi_threadsafe_function func, + void* data, + napi_threadsafe_function_call_mode is_blocking) { + const auto function = ThreadSafeFunction::get(func); + if (!function) { + return napi_invalid_arg; + } + return function->call(data, is_blocking); +} + +napi_status napi_acquire_threadsafe_function(napi_threadsafe_function func) { + const auto function = ThreadSafeFunction::get(func); + if (!function) { + return napi_invalid_arg; + } + return function->acquire(); +} + +napi_status napi_release_threadsafe_function( + napi_threadsafe_function func, napi_threadsafe_function_release_mode mode) { + const auto function = ThreadSafeFunction::get(func); + if (!function) { + return napi_invalid_arg; + } + return function->release(mode); +} + +napi_status napi_unref_threadsafe_function( + node_api_basic_env env, napi_threadsafe_function func) { + const auto function = ThreadSafeFunction::get(func); + if (!function) { + return napi_invalid_arg; + } + // RN has no libuv loop to unref; we only update internal state for parity. + return function->unref(); +} + +napi_status napi_ref_threadsafe_function( + node_api_basic_env env, napi_threadsafe_function func) { + const auto function = ThreadSafeFunction::get(func); + if (!function) { + return napi_invalid_arg; + } + // RN has no libuv loop to ref; we only update internal state for parity. + return function->ref(); +} } // namespace callstack::nodeapihost diff --git a/packages/host/cpp/RuntimeNodeApiAsync.hpp b/packages/host/cpp/RuntimeNodeApiAsync.hpp index f0108e6d..9f9cd14c 100644 --- a/packages/host/cpp/RuntimeNodeApiAsync.hpp +++ b/packages/host/cpp/RuntimeNodeApiAsync.hpp @@ -23,4 +23,34 @@ napi_status napi_delete_async_work( napi_status napi_cancel_async_work( node_api_basic_env env, napi_async_work work); + +napi_status napi_create_threadsafe_function(napi_env env, + napi_value func, + napi_value async_resource, + napi_value async_resource_name, + size_t max_queue_size, + size_t initial_thread_count, + void* thread_finalize_data, + napi_finalize thread_finalize_cb, + void* context, + napi_threadsafe_function_call_js call_js_cb, + napi_threadsafe_function* result); + +napi_status napi_get_threadsafe_function_context( + napi_threadsafe_function func, void** result); + +napi_status napi_call_threadsafe_function(napi_threadsafe_function func, + void* data, + napi_threadsafe_function_call_mode is_blocking); + +napi_status napi_acquire_threadsafe_function(napi_threadsafe_function func); + +napi_status napi_release_threadsafe_function( + napi_threadsafe_function func, napi_threadsafe_function_release_mode mode); + +napi_status napi_unref_threadsafe_function( + node_api_basic_env env, napi_threadsafe_function func); + +napi_status napi_ref_threadsafe_function( + node_api_basic_env env, napi_threadsafe_function func); } // namespace callstack::nodeapihost diff --git a/packages/host/cpp/ThreadsafeFunction.cpp b/packages/host/cpp/ThreadsafeFunction.cpp new file mode 100644 index 00000000..6cb9f4d1 --- /dev/null +++ b/packages/host/cpp/ThreadsafeFunction.cpp @@ -0,0 +1,260 @@ +#include "ThreadsafeFunction.hpp" +#include "Logger.hpp" + +// This file provides a React Native-friendly implementation of Node-API's +// thread-safe function primitive. In RN we don't own/libuv, so we: +// - Use CallInvoker to hop onto the JS thread instead of uv_async. +// - Track a registry mapping native handles to shared_ptrs for lookup/lifetime. +// - Emulate ref/unref semantics without affecting any event loop. + +static std::unordered_map> + registry; +static std::mutex registryMutex; + +namespace callstack::nodeapihost { + +ThreadSafeFunction::ThreadSafeFunction( + std::weak_ptr callInvoker, + napi_env env, + napi_value jsFunc, + napi_value asyncResource, + napi_value asyncResourceName, + size_t maxQueueSize, + size_t initialThreadCount, + void* threadFinalizeData, + napi_finalize threadFinalizeCb, + void* context, + napi_threadsafe_function_call_js callJsCb) + : callInvoker_{std::move(callInvoker)}, + env_{env}, + jsFunc_{jsFunc}, + asyncResource_{asyncResource}, + asyncResourceName_{asyncResourceName}, + maxQueueSize_{maxQueueSize}, + threadCount_{initialThreadCount}, + threadFinalizeData_{threadFinalizeData}, + threadFinalizeCb_{threadFinalizeCb}, + context_{context}, + callJsCb_{callJsCb}, + refCount_{initialThreadCount} { + if (jsFunc) { + // Keep JS function alive across async hops; fatal here mirrors Node-API's + // behavior when environment is irrecoverable. + const auto status = napi_create_reference(env, jsFunc, 1, &jsFuncRef_); + if (status != napi_ok) { + napi_fatal_error(nullptr, + 0, + "Failed to create JS function reference", + NAPI_AUTO_LENGTH); + } + } +} + +ThreadSafeFunction::~ThreadSafeFunction() { + if (jsFuncRef_) { + napi_delete_reference(env_, jsFuncRef_); + } +} + +std::shared_ptr ThreadSafeFunction::create( + std::weak_ptr callInvoker, + napi_env env, + napi_value jsFunc, + napi_value asyncResource, + napi_value asyncResourceName, + size_t maxQueueSize, + size_t initialThreadCount, + void* threadFinalizeData, + napi_finalize threadFinalizeCb, + void* context, + napi_threadsafe_function_call_js callJsCb) { + const auto function = + std::make_shared(std::move(callInvoker), + env, + jsFunc, + asyncResource, + asyncResourceName, + maxQueueSize, + initialThreadCount, + threadFinalizeData, + threadFinalizeCb, + context, + callJsCb); + + { + auto handle = reinterpret_cast(function.get()); + std::lock_guard lock{registryMutex}; + registry[handle] = function; + } + + return std::move(function); +} + +std::shared_ptr ThreadSafeFunction::get( + napi_threadsafe_function func) { + std::lock_guard lock{registryMutex}; + return registry.contains(func) ? registry[func] : nullptr; +} + +napi_status ThreadSafeFunction::getContext(void** result) { + if (!result) { + return napi_invalid_arg; + } + + *result = context_; + return napi_ok; +} + +napi_status ThreadSafeFunction::call( + void* data, napi_threadsafe_function_call_mode isBlocking) { + if (aborted_ || closing_) { + return napi_closing; + } + + { + std::unique_lock lock{queueMutex_}; + // Backpressure: enforce maxQueueSize_. If nonblocking, fail fast; if + // blocking, wait until space is available or closing/aborted. + if (maxQueueSize_ && queue_.size() >= maxQueueSize_) { + if (isBlocking == napi_tsfn_nonblocking) { + return napi_queue_full; + } + queueCv_.wait(lock, [&] { + return queue_.size() < maxQueueSize_ || aborted_ || closing_; + }); + if (aborted_ || closing_) return napi_closing; + } + queue_.push(data); + } + + const auto invoker = callInvoker_.lock(); + if (!invoker) { + log_debug("Error: No CallInvoker available for ThreadSafeFunction"); + return napi_generic_failure; + } + // Hop to JS thread; we drain one item per hop to keep latency predictable + // and avoid long monopolization of the JS queue. + invoker->invokeAsync([this] { + void* queuedData{nullptr}; + auto empty{false}; + { + std::lock_guard lock{queueMutex_}; + if (!queue_.empty()) { + queuedData = queue_.front(); + const auto size = queue_.size(); + queue_.pop(); + empty = queue_.empty(); + if (size == maxQueueSize_ && maxQueueSize_) { + queueCv_.notify_one(); + } + } + } + if (queuedData && !aborted_) { + // Prefer the user-provided callJsCb_ (Node-API compatible). If absent + // but we have a JS function ref, call it directly with no args. + if (callJsCb_) { + napi_value fn{nullptr}; + if (jsFuncRef_) { + napi_get_reference_value(env_, jsFuncRef_, &fn); + } + callJsCb_(env_, fn, context_, queuedData); + } else if (jsFuncRef_) { + napi_value fn; + napi_get_reference_value(env_, jsFuncRef_, &fn); + napi_value recv; + napi_get_undefined(env_, &recv); + napi_value result; + napi_call_function(env_, recv, fn, 0, nullptr, &result); + } + } + + // Auto-finalize when: no remaining threads (acquire/release balance), + // queue drained, and not already closing. + if (!threadCount_ && empty && !closing_) { + if (maxQueueSize_) { + std::lock_guard lock{queueMutex_}; + queueCv_.notify_all(); + } + finalize(); + } + }); + return napi_ok; +} + +napi_status ThreadSafeFunction::acquire() { + if (closing_) { + return napi_closing; + } + refCount_++; + threadCount_++; + return napi_ok; +} + +napi_status ThreadSafeFunction::release( + napi_threadsafe_function_release_mode mode) { + // Node-API semantics: abort prevents further JS calls and wakes any waiters. + if (mode == napi_tsfn_abort) { + aborted_ = true; + closing_ = true; + } + if (refCount_) { + refCount_--; + } + if (threadCount_) { + threadCount_--; + } + // When the last ref is gone (or we're closing), queue is drained, notify and + // finalize. + std::lock_guard lock{queueMutex_}; + if (!refCount_ && !threadCount_ && queue_.empty() || closing_) { + closing_ = true; + if (maxQueueSize_) { + queueCv_.notify_all(); + } + finalize(); + } + return napi_ok; +} + +napi_status ThreadSafeFunction::ref() { + // In libuv, this would keep the loop alive. In RN we don't own or expose a + // libuv loop. We just track the state for API parity. + referenced_.store(true, std::memory_order_relaxed); + return napi_ok; +} + +napi_status ThreadSafeFunction::unref() { + // In libuv, this allows the loop to exit if nothing else is keeping it + // alive. In RN this is a no-op beyond state tracking. + referenced_.store(false, std::memory_order_relaxed); + return napi_ok; +} + +void ThreadSafeFunction::finalize() { + std::lock_guard lock{finalizeMutex_}; + if (handlesClosing_) { + return; + } + handlesClosing_ = true; + closing_ = true; + + const auto onFinalize = [this] { + // Invoke user finalizer and unregister the handle from the global map. + if (threadFinalizeCb_) { + threadFinalizeCb_(env_, threadFinalizeData_, context_); + } + std::lock_guard lock{registryMutex}; + registry.erase(reinterpret_cast(this)); + }; + + // Prefer running the finalizer on the JS thread to match expectations; + // if CallInvoker is gone, run synchronously. + if (const auto invoker = callInvoker_.lock()) { + invoker->invokeAsync([=]() { onFinalize(); }); + } else { + onFinalize(); + } +} + +} // namespace callstack::nodeapihost diff --git a/packages/host/cpp/ThreadsafeFunction.hpp b/packages/host/cpp/ThreadsafeFunction.hpp new file mode 100644 index 00000000..b4fdf706 --- /dev/null +++ b/packages/host/cpp/ThreadsafeFunction.hpp @@ -0,0 +1,78 @@ +#pragma once + +#include +#include +#include +#include +#include +#include +#include "node_api.h" + +namespace callstack::nodeapihost { +class ThreadSafeFunction + : public std::enable_shared_from_this { + public: + ThreadSafeFunction(std::weak_ptr callInvoker, + napi_env env, + napi_value jsFunc, + napi_value asyncResource, + napi_value asyncResourceName, + size_t maxQueueSize, + size_t initialThreadCount, + void* threadFinalizeData, + napi_finalize threadFinalizeCb, + void* context, + napi_threadsafe_function_call_js callJsCb); + ~ThreadSafeFunction(); + + static std::shared_ptr create( + std::weak_ptr callInvoker, + napi_env env, + napi_value jsFunc, + napi_value asyncResource, + napi_value asyncResourceName, + size_t maxQueueSize, + size_t initialThreadCount, + void* threadFinalizeData, + napi_finalize threadFinalizeCb, + void* context, + napi_threadsafe_function_call_js callJsCb); + + static std::shared_ptr get(napi_threadsafe_function func); + + napi_status getContext(void** result); + napi_status call(void* data, napi_threadsafe_function_call_mode isBlocking); + napi_status acquire(); + napi_status release(napi_threadsafe_function_release_mode mode); + // Node-API compatibility: These do not affect RN's lifecycle. We only track + // the state for diagnostics and API parity with libuv's ref/unref. + napi_status ref(); + napi_status unref(); + + private: + void finalize(); + + std::weak_ptr callInvoker_; + napi_env env_; + napi_value jsFunc_; + napi_ref jsFuncRef_{nullptr}; + napi_value asyncResource_; + napi_value asyncResourceName_; + size_t maxQueueSize_; + std::atomic threadCount_; + std::atomic aborted_{false}; + void* threadFinalizeData_; + napi_finalize threadFinalizeCb_; + void* context_; + napi_threadsafe_function_call_js callJsCb_; + std::mutex queueMutex_; + std::condition_variable queueCv_; + std::queue queue_; + std::atomic closing_{false}; + std::atomic referenced_{true}; + std::atomic refCount_; + std::mutex finalizeMutex_; + bool handlesClosing_{false}; +}; + +} // namespace callstack::nodeapihost diff --git a/packages/host/scripts/generate-weak-node-api-injector.ts b/packages/host/scripts/generate-weak-node-api-injector.ts index d5adfd83..e2ddc578 100644 --- a/packages/host/scripts/generate-weak-node-api-injector.ts +++ b/packages/host/scripts/generate-weak-node-api-injector.ts @@ -20,6 +20,13 @@ const IMPLEMENTED_RUNTIME_FUNCTIONS = [ "napi_fatal_error", "napi_get_node_version", "napi_get_version", + "napi_create_threadsafe_function", + "napi_get_threadsafe_function_context", + "napi_call_threadsafe_function", + "napi_acquire_threadsafe_function", + "napi_release_threadsafe_function", + "napi_unref_threadsafe_function", + "napi_ref_threadsafe_function", ]; /** From 2c7b5a93c8600e42793873b4a2b08e49b60abc61 Mon Sep 17 00:00:00 2001 From: Kamil Paradowski Date: Wed, 13 Aug 2025 11:45:55 +0200 Subject: [PATCH 02/11] feat: Adopted threadsafe functions tests --- packages/node-addon-examples/src/index.ts | 2 + .../tests/threadsafe_function/CMakeLists.txt | 16 + .../tests/threadsafe_function/addon.cpp | 391 ++++++++++++++++++ .../tests/threadsafe_function/addon.js | 223 ++++++++++ .../tests/threadsafe_function/binding.gyp | 14 + .../tests/threadsafe_function/package.json | 14 + 6 files changed, 660 insertions(+) create mode 100644 packages/node-addon-examples/tests/threadsafe_function/CMakeLists.txt create mode 100644 packages/node-addon-examples/tests/threadsafe_function/addon.cpp create mode 100644 packages/node-addon-examples/tests/threadsafe_function/addon.js create mode 100644 packages/node-addon-examples/tests/threadsafe_function/binding.gyp create mode 100644 packages/node-addon-examples/tests/threadsafe_function/package.json diff --git a/packages/node-addon-examples/src/index.ts b/packages/node-addon-examples/src/index.ts index 869d4825..c4d74d23 100644 --- a/packages/node-addon-examples/src/index.ts +++ b/packages/node-addon-examples/src/index.ts @@ -85,5 +85,7 @@ export const suites: Record< require("../tests/buffers/addon.js"); }, async: () => require("../tests/async/addon.js") as () => Promise, + threadsafe_function: () => + require("../tests/threadsafe_function/addon.js") as () => Promise, }, }; diff --git a/packages/node-addon-examples/tests/threadsafe_function/CMakeLists.txt b/packages/node-addon-examples/tests/threadsafe_function/CMakeLists.txt new file mode 100644 index 00000000..67146934 --- /dev/null +++ b/packages/node-addon-examples/tests/threadsafe_function/CMakeLists.txt @@ -0,0 +1,16 @@ +cmake_minimum_required(VERSION 3.15) +project(tests-threadsafe_function) + +add_compile_definitions(NAPI_VERSION=8) + +add_library(addon SHARED addon.cpp ${CMAKE_JS_SRC}) +set_target_properties(addon PROPERTIES PREFIX "" SUFFIX ".node") +target_include_directories(addon PRIVATE "/Users/kamil.paradowski/projects/napi/node_modules/node-addon-api" ${CMAKE_JS_INC}) +target_link_libraries(addon PRIVATE ${CMAKE_JS_LIB}) +target_compile_features(addon PRIVATE cxx_std_17) +target_compile_definitions(addon PRIVATE NAPI_DISABLE_CPP_EXCEPTIONS) + +if(MSVC AND CMAKE_JS_NODELIB_DEF AND CMAKE_JS_NODELIB_TARGET) + # Generate node.lib + execute_process(COMMAND ${CMAKE_AR} /def:${CMAKE_JS_NODELIB_DEF} /out:${CMAKE_JS_NODELIB_TARGET} ${CMAKE_STATIC_LINKER_FLAGS}) +endif() \ No newline at end of file diff --git a/packages/node-addon-examples/tests/threadsafe_function/addon.cpp b/packages/node-addon-examples/tests/threadsafe_function/addon.cpp new file mode 100644 index 00000000..d1a9224a --- /dev/null +++ b/packages/node-addon-examples/tests/threadsafe_function/addon.cpp @@ -0,0 +1,391 @@ +// For the purpose of this test we use libuv's threading library. When deciding +// on a threading library for a new project it bears remembering that in the +// future libuv may introduce API changes which may render it non-ABI-stable, +// which, in turn, may affect the ABI stability of the project despite its use +// of N-API. +#include + +#include +#include +#include + +#include "../RuntimeNodeApiTestsCommon.h" + +#define ARRAY_LENGTH 1000 +#define MAX_QUEUE_SIZE 1 + +// Use modern C++ threads +struct uv_thread_t { + std::unique_ptr t; +}; +typedef void (*uv_thread_cb)(void*); + +static int uv_thread_create(uv_thread_t* tid, uv_thread_cb entry, void* arg) { + try { + tid->t = std::make_unique([entry, arg]() { entry(arg); }); + return 0; + } catch (...) { + return -1; + } +} +static void uv_thread_join(uv_thread_t* tid) { + // printf("Thread is joinable: %d %d\n", tid->t->joinable(), + // std::this_thread::get_id()); + if (tid->t && tid->t->joinable()) { + // printf("Joining thread: %d\n", std::this_thread::get_id()); + tid->t->join(); + } +} +static size_t uv_hrtime() { + auto now = std::chrono::steady_clock::now().time_since_epoch(); + return std::chrono::duration_cast(now).count(); +} + +static uv_thread_t uv_threads[2]; +static napi_threadsafe_function ts_fn; + +typedef struct { + napi_threadsafe_function_call_mode block_on_full; + napi_threadsafe_function_release_mode abort; + bool start_secondary; + napi_ref js_finalize_cb; + uint32_t max_queue_size; +} ts_fn_hint; + +static ts_fn_hint ts_info; + +// Thread data to transmit to JS +static int ints[ARRAY_LENGTH]; + +static void secondary_thread(void* data) { + // printf("Secondary thread started: %d\n", std::this_thread::get_id()); + + napi_threadsafe_function ts_fn = (napi_threadsafe_function)data; + + if (napi_release_threadsafe_function(ts_fn, napi_tsfn_release) != napi_ok) { + napi_fatal_error("secondary_thread", NAPI_AUTO_LENGTH, + "napi_release_threadsafe_function failed", + NAPI_AUTO_LENGTH); + } +} + +// Source thread producing the data +static void data_source_thread(void* data) { + // printf("Data source thread started: %d\n", std::this_thread::get_id()); + + napi_threadsafe_function ts_fn = (napi_threadsafe_function)data; + int index; + void* hint; + ts_fn_hint* ts_fn_info; + napi_status status; + bool queue_was_full = false; + bool queue_was_closing = false; + + if (napi_get_threadsafe_function_context(ts_fn, &hint) != napi_ok) { + napi_fatal_error("data_source_thread", NAPI_AUTO_LENGTH, + "napi_get_threadsafe_function_context failed", + NAPI_AUTO_LENGTH); + } + + ts_fn_info = (ts_fn_hint*)hint; + + if (ts_fn_info != &ts_info) { + napi_fatal_error("data_source_thread", NAPI_AUTO_LENGTH, + "thread-safe function hint is not as expected", + NAPI_AUTO_LENGTH); + } + + if (ts_fn_info->start_secondary) { + if (napi_acquire_threadsafe_function(ts_fn) != napi_ok) { + napi_fatal_error("data_source_thread", NAPI_AUTO_LENGTH, + "napi_acquire_threadsafe_function failed", + NAPI_AUTO_LENGTH); + } + + // printf("Creating secondary thread: %d\n", std::this_thread::get_id()); + if (uv_thread_create(&uv_threads[1], secondary_thread, ts_fn) != 0) { + napi_fatal_error("data_source_thread", NAPI_AUTO_LENGTH, + "failed to start secondary thread", NAPI_AUTO_LENGTH); + } + } + + for (index = ARRAY_LENGTH - 1; index > -1 && !queue_was_closing; index--) { + status = napi_call_threadsafe_function(ts_fn, &ints[index], + ts_fn_info->block_on_full); + if (ts_fn_info->max_queue_size == 0 && (index % 1000 == 0)) { + // Let's make this thread really busy for 200 ms to give the main thread a + // chance to abort. + uint64_t start = uv_hrtime(); + for (; uv_hrtime() - start < 200000000;); + } + switch (status) { + case napi_queue_full: + queue_was_full = true; + index++; + // fall through + + case napi_ok: + continue; + + case napi_closing: + queue_was_closing = true; + break; + + default: + napi_fatal_error("data_source_thread", NAPI_AUTO_LENGTH, + "napi_call_threadsafe_function failed", + NAPI_AUTO_LENGTH); + } + } + + // Assert that the enqueuing of a value was refused at least once, if this is + // a non-blocking test run. + // if (!ts_fn_info->block_on_full && !queue_was_full) { + // napi_fatal_error("data_source_thread", NAPI_AUTO_LENGTH, + // "queue was never full", NAPI_AUTO_LENGTH); + // } + + // // Assert that the queue was marked as closing at least once, if this is an + // // aborting test run. + // if (ts_fn_info->abort == napi_tsfn_abort && !queue_was_closing) { + // napi_fatal_error("data_source_thread", NAPI_AUTO_LENGTH, + // "queue was never closing", NAPI_AUTO_LENGTH); + // } + + if (!queue_was_closing && + napi_release_threadsafe_function(ts_fn, napi_tsfn_release) != napi_ok) { + napi_fatal_error("data_source_thread", NAPI_AUTO_LENGTH, + "napi_release_threadsafe_function failed", + NAPI_AUTO_LENGTH); + } +} + +// Getting the data into JS +static void call_js(napi_env env, napi_value cb, void* hint, void* data) { + if (!(env == NULL || cb == NULL)) { + napi_value argv, undefined; + NODE_API_CALL_RETURN_VOID(env, napi_create_int32(env, *(int*)data, &argv)); + NODE_API_CALL_RETURN_VOID(env, napi_get_undefined(env, &undefined)); + // printf("Calling JS function (%p, %p) from thread: %d\n", env, cb, + // std::this_thread::get_id()); + NODE_API_CALL_RETURN_VOID( + env, napi_call_function(env, undefined, cb, 1, &argv, NULL)); + } +} + +static napi_ref alt_ref; +// Getting the data into JS with the alternative reference +static void call_ref(napi_env env, napi_value _, void* hint, void* data) { + if (!(env == NULL || alt_ref == NULL)) { + napi_value fn, argv, undefined; + NODE_API_CALL_RETURN_VOID(env, napi_get_reference_value(env, alt_ref, &fn)); + NODE_API_CALL_RETURN_VOID(env, napi_create_int32(env, *(int*)data, &argv)); + NODE_API_CALL_RETURN_VOID(env, napi_get_undefined(env, &undefined)); + NODE_API_CALL_RETURN_VOID( + env, napi_call_function(env, undefined, fn, 1, &argv, NULL)); + } +} + +// Cleanup +static napi_value StopThread(napi_env env, napi_callback_info info) { + size_t argc = 2; + napi_value argv[2]; + NODE_API_CALL(env, napi_get_cb_info(env, info, &argc, argv, NULL, NULL)); + napi_valuetype value_type; + NODE_API_CALL(env, napi_typeof(env, argv[0], &value_type)); + NODE_API_ASSERT(env, value_type == napi_function, + "StopThread argument is a function"); + NODE_API_ASSERT(env, (ts_fn != NULL), "Existing threadsafe function"); + NODE_API_CALL( + env, napi_create_reference(env, argv[0], 1, &(ts_info.js_finalize_cb))); + bool abort; + NODE_API_CALL(env, napi_get_value_bool(env, argv[1], &abort)); + // printf("[C++] Stopping thread-safe function...\n"); + NODE_API_CALL(env, napi_release_threadsafe_function( + ts_fn, abort ? napi_tsfn_abort : napi_tsfn_release)); + ts_fn = NULL; + return NULL; +} + +// Join the thread and inform JS that we're done. +static void join_the_threads(napi_env env, void* data, void* hint) { + // printf("Joining threads...\n"); + uv_thread_t* the_threads = (uv_thread_t*)data; + ts_fn_hint* the_hint = (ts_fn_hint*)hint; + napi_value js_cb, undefined; + + uv_thread_join(&the_threads[0]); + if (the_hint->start_secondary) { + uv_thread_join(&the_threads[1]); + } + + NODE_API_CALL_RETURN_VOID( + env, napi_get_reference_value(env, the_hint->js_finalize_cb, &js_cb)); + NODE_API_CALL_RETURN_VOID(env, napi_get_undefined(env, &undefined)); + NODE_API_CALL_RETURN_VOID( + env, napi_call_function(env, undefined, js_cb, 0, NULL, NULL)); + NODE_API_CALL_RETURN_VOID( + env, napi_delete_reference(env, the_hint->js_finalize_cb)); + if (alt_ref != NULL) { + NODE_API_CALL_RETURN_VOID(env, napi_delete_reference(env, alt_ref)); + alt_ref = NULL; + } +} + +static napi_value StartThreadInternal(napi_env env, napi_callback_info info, + napi_threadsafe_function_call_js cb, + bool block_on_full, bool alt_ref_js_cb) { + size_t argc = 4; + napi_value argv[4]; + + // printf("Starting thread-safe function from thread: %d\n", + // std::this_thread::get_id()); + + NODE_API_CALL(env, napi_get_cb_info(env, info, &argc, argv, NULL, NULL)); + if (alt_ref_js_cb) { + NODE_API_CALL(env, napi_create_reference(env, argv[0], 1, &alt_ref)); + argv[0] = NULL; + } + + ts_info.block_on_full = + (block_on_full ? napi_tsfn_blocking : napi_tsfn_nonblocking); + + NODE_API_ASSERT(env, (ts_fn == NULL), "Existing thread-safe function"); + napi_value async_name; + NODE_API_CALL(env, + napi_create_string_utf8(env, "N-API Thread-safe Function Test", + NAPI_AUTO_LENGTH, &async_name)); + NODE_API_CALL(env, + napi_get_value_uint32(env, argv[3], &ts_info.max_queue_size)); + NODE_API_CALL(env, + napi_create_threadsafe_function( + env, // napi_env env, + argv[0], // napi_value func, + NULL, // napi_value async_resource, + async_name, // napi_value async_resource_name, + ts_info.max_queue_size, // size_t max_queue_size, + 2, // size_t initial_thread_count, + uv_threads, // void* thread_finalize_data, + join_the_threads, // napi_finalize thread_finalize_cb, + &ts_info, // void* context, + cb, // napi_threadsafe_function_call_js call_js_cb, + &ts_fn)); // napi_threadsafe_function* result + bool abort; + NODE_API_CALL(env, napi_get_value_bool(env, argv[1], &abort)); + ts_info.abort = abort ? napi_tsfn_abort : napi_tsfn_release; + NODE_API_CALL(env, + napi_get_value_bool(env, argv[2], &(ts_info.start_secondary))); + + NODE_API_ASSERT( + env, (uv_thread_create(&uv_threads[0], data_source_thread, ts_fn) == 0), + "Thread creation"); + + return NULL; +} + +static napi_value Unref(napi_env env, napi_callback_info info) { + NODE_API_ASSERT(env, ts_fn != NULL, "No existing thread-safe function"); + NODE_API_CALL(env, napi_unref_threadsafe_function(env, ts_fn)); + return NULL; +} + +static napi_value Release(napi_env env, napi_callback_info info) { + NODE_API_ASSERT(env, ts_fn != NULL, "No existing thread-safe function"); + NODE_API_CALL(env, + napi_release_threadsafe_function(ts_fn, napi_tsfn_release)); + return NULL; +} + +// Startup +static napi_value StartThread(napi_env env, napi_callback_info info) { + return StartThreadInternal(env, info, call_js, + /** block_on_full */ true, + /** alt_ref_js_cb */ false); +} + +static napi_value StartThreadNonblocking(napi_env env, + napi_callback_info info) { + return StartThreadInternal(env, info, call_js, + /** block_on_full */ false, + /** alt_ref_js_cb */ false); +} + +static napi_value StartThreadNoNative(napi_env env, napi_callback_info info) { + return StartThreadInternal(env, info, NULL, + /** block_on_full */ true, + /** alt_ref_js_cb */ false); +} + +static napi_value StartThreadNoJsFunc(napi_env env, napi_callback_info info) { + return StartThreadInternal(env, info, call_ref, + /** block_on_full */ true, + /** alt_ref_js_cb */ true); +} + +// Testing calling into JavaScript +static void ThreadSafeFunctionFinalize(napi_env env, void* finalize_data, + void* finalize_hint) { + napi_ref js_func_ref = (napi_ref)finalize_data; + napi_value js_func; + napi_value recv; + NODE_API_CALL_RETURN_VOID( + env, napi_get_reference_value(env, js_func_ref, &js_func)); + NODE_API_CALL_RETURN_VOID(env, napi_get_global(env, &recv)); + NODE_API_CALL_RETURN_VOID( + env, napi_call_function(env, recv, js_func, 0, NULL, NULL)); + NODE_API_CALL_RETURN_VOID(env, napi_delete_reference(env, js_func_ref)); +} + +// Testing calling into JavaScript +static napi_value CallIntoModule(napi_env env, napi_callback_info info) { + size_t argc = 4; + napi_value argv[4]; + NODE_API_CALL(env, napi_get_cb_info(env, info, &argc, argv, NULL, NULL)); + + napi_ref finalize_func; + NODE_API_CALL(env, napi_create_reference(env, argv[3], 1, &finalize_func)); + + napi_threadsafe_function tsfn; + NODE_API_CALL(env, napi_create_threadsafe_function( + env, argv[0], argv[1], argv[2], 0, 1, finalize_func, + ThreadSafeFunctionFinalize, NULL, NULL, &tsfn)); + NODE_API_CALL(env, + napi_call_threadsafe_function(tsfn, NULL, napi_tsfn_blocking)); + NODE_API_CALL(env, napi_release_threadsafe_function(tsfn, napi_tsfn_release)); + return NULL; +} + +// Module init +static napi_value Init(napi_env env, napi_value exports) { + size_t index; + for (index = 0; index < ARRAY_LENGTH; index++) { + ints[index] = index; + } + napi_value js_array_length, js_max_queue_size; + napi_create_uint32(env, ARRAY_LENGTH, &js_array_length); + napi_create_uint32(env, MAX_QUEUE_SIZE, &js_max_queue_size); + + napi_property_descriptor properties[] = { + {"ARRAY_LENGTH", NULL, NULL, NULL, NULL, js_array_length, napi_enumerable, + NULL}, + {"MAX_QUEUE_SIZE", NULL, NULL, NULL, NULL, js_max_queue_size, + napi_enumerable, NULL}, + DECLARE_NODE_API_PROPERTY("StartThread", StartThread), + DECLARE_NODE_API_PROPERTY("StartThreadNoNative", StartThreadNoNative), + DECLARE_NODE_API_PROPERTY("StartThreadNonblocking", + StartThreadNonblocking), + DECLARE_NODE_API_PROPERTY("StartThreadNoJsFunc", StartThreadNoJsFunc), + DECLARE_NODE_API_PROPERTY("StopThread", StopThread), + DECLARE_NODE_API_PROPERTY("Unref", Unref), + DECLARE_NODE_API_PROPERTY("Release", Release), + DECLARE_NODE_API_PROPERTY("CallIntoModule", CallIntoModule), + }; + + NODE_API_CALL( + env, napi_define_properties(env, exports, + sizeof(properties) / sizeof(properties[0]), + properties)); + + return exports; +} +NAPI_MODULE(NODE_GYP_MODULE_NAME, Init) \ No newline at end of file diff --git a/packages/node-addon-examples/tests/threadsafe_function/addon.js b/packages/node-addon-examples/tests/threadsafe_function/addon.js new file mode 100644 index 00000000..8a329f52 --- /dev/null +++ b/packages/node-addon-examples/tests/threadsafe_function/addon.js @@ -0,0 +1,223 @@ +/* eslint-disable @typescript-eslint/no-require-imports */ +/* eslint-disable no-undef */ +const assert = require("assert"); +const binding = require("bindings")("addon.node"); +const expectedArray = (function (arrayLength) { + const result = []; + for (let index = 0; index < arrayLength; index++) { + result.push(arrayLength - 1 - index); + } + return result; +})(binding.ARRAY_LENGTH); + +let cnt = 0; +function testWithJSMarshaller({ + threadStarter, + quitAfter, + abort, + maxQueueSize, + launchSecondary, +}) { + console.log( + `[JS {${cnt++}}] Starting thread: ${threadStarter}, quitAfter: ${quitAfter}, abort: ${abort}, maxQueueSize: ${maxQueueSize}, launchSecondary: ${launchSecondary}` + ); + return new Promise((resolve) => { + const array = []; + binding[threadStarter]( + function testCallback(value) { + array.push(value); + if (array.length === quitAfter) { + // console.log("[JS] Reached quitAfter, calling StopThread"); + setImmediate(() => { + binding.StopThread(() => { + // console.log("[JS] StopThread callback called, resolving promise"); + resolve(array); + }, !!abort); + }); + } + }, + !!abort, + !!launchSecondary, + maxQueueSize + ); + }); +} + +module.exports = () => { + // console.log("[JS] Test entry"); + // return testWithJSMarshaller({ + // threadStarter: "StartThread", + // maxQueueSize: binding.MAX_QUEUE_SIZE, + // quitAfter: binding.ARRAY_LENGTH, + // }).then((result) => { + // console.log("Test completed successfully"); + // assert.deepStrictEqual(result, expectedArray); + // }); + return ( + new Promise(function testWithoutJSMarshaller(resolve) { + let callCount = 0; + binding.StartThreadNoNative( + function testCallback() { + callCount++; + + // console.log("Callback called with arguments:", arguments); + // The default call-into-JS implementation passes no arguments. + // assert.strictEqual(arguments.length, 0); + if (callCount === binding.ARRAY_LENGTH) { + setImmediate(() => { + binding.StopThread(() => { + resolve(); + }, false); + }); + } + }, + false /* abort */, + false /* launchSecondary */, + binding.MAX_QUEUE_SIZE + ); + }) + .then(() => + testWithJSMarshaller({ + threadStarter: "StartThread", + maxQueueSize: binding.MAX_QUEUE_SIZE, + quitAfter: binding.ARRAY_LENGTH, + }) + ) + .then(() => + testWithJSMarshaller({ + threadStarter: "StartThreadNoJsFunc", + maxQueueSize: binding.MAX_QUEUE_SIZE, + quitAfter: binding.ARRAY_LENGTH, + }) + ) + .then((result) => assert.deepStrictEqual(result, expectedArray)) + + // Start the thread in blocking mode with an infinite queue, and assert that all + // values are passed. Quit after it's done. + .then(() => + testWithJSMarshaller({ + threadStarter: "StartThread", + maxQueueSize: 0, + quitAfter: binding.ARRAY_LENGTH, + }) + ) + .then((result) => assert.deepStrictEqual(result, expectedArray)) + + // Start the thread in non-blocking mode, and assert that all values are passed. + // Quit after it's done. + .then(() => + testWithJSMarshaller({ + threadStarter: "StartThreadNonblocking", + maxQueueSize: binding.MAX_QUEUE_SIZE, + quitAfter: binding.ARRAY_LENGTH, + }) + ) + .then((result) => assert.deepStrictEqual(result, expectedArray)) + + // // Start the thread in blocking mode, and assert that all values are passed. + // // Quit early, but let the thread finish. + .then(() => + testWithJSMarshaller({ + threadStarter: "StartThread", + maxQueueSize: binding.MAX_QUEUE_SIZE, + quitAfter: 1, + }) + ) + .then((result) => assert.deepStrictEqual(result, expectedArray)) + + // // Start the thread in blocking mode with an infinite queue, and assert that all + // // values are passed. Quit early, but let the thread finish. + .then(() => + testWithJSMarshaller({ + threadStarter: "StartThread", + maxQueueSize: 0, + quitAfter: 1, + }) + ) + .then((result) => assert.deepStrictEqual(result, expectedArray)) + + // // Start the thread in non-blocking mode, and assert that all values are passed. + // // Quit early, but let the thread finish. + .then(() => + testWithJSMarshaller({ + threadStarter: "StartThreadNonblocking", + maxQueueSize: binding.MAX_QUEUE_SIZE, + quitAfter: 1, + }) + ) + .then((result) => assert.deepStrictEqual(result, expectedArray)) + + // // Start the thread in blocking mode, and assert that all values are passed. + // // Quit early, but let the thread finish. Launch a secondary thread to test the + // // reference counter incrementing functionality. + .then(() => + testWithJSMarshaller({ + threadStarter: "StartThread", + quitAfter: 1, + maxQueueSize: binding.MAX_QUEUE_SIZE, + launchSecondary: true, + }) + ) + .then((result) => assert.deepStrictEqual(result, expectedArray)) + + // // Start the thread in non-blocking mode, and assert that all values are passed. + // // Quit early, but let the thread finish. Launch a secondary thread to test the + // // reference counter incrementing functionality. + .then(() => + testWithJSMarshaller({ + threadStarter: "StartThreadNonblocking", + quitAfter: 1, + maxQueueSize: binding.MAX_QUEUE_SIZE, + launchSecondary: true, + }) + ) + .then((result) => assert.deepStrictEqual(result, expectedArray)) + + // // Start the thread in blocking mode, and assert that it could not finish. + // // Quit early by aborting. + .then(() => + testWithJSMarshaller({ + threadStarter: "StartThread", + quitAfter: 1, + maxQueueSize: binding.MAX_QUEUE_SIZE, + abort: true, + }) + ) + .then((result) => assert.strictEqual(result.indexOf(0), -1)) + + // Start the thread in blocking mode with an infinite queue, and assert that it + // could not finish. Quit early by aborting. + .then(() => + testWithJSMarshaller({ + threadStarter: "StartThread", + quitAfter: 1, + maxQueueSize: 0, + abort: true, + }) + ) + .then((result) => assert.strictEqual(result.indexOf(0), -1)) + + // // Start the thread in non-blocking mode, and assert that it could not finish. + // // Quit early and aborting. + .then(() => + testWithJSMarshaller({ + threadStarter: "StartThreadNonblocking", + quitAfter: 1, + maxQueueSize: binding.MAX_QUEUE_SIZE, + abort: true, + }) + ) + .then((result) => assert.strictEqual(result.indexOf(0), -1)) + + // // Make sure that threadsafe function isn't stalled when we hit + // // `kMaxIterationCount` in `src/node_api.cc` + .then(() => + testWithJSMarshaller({ + threadStarter: "StartThreadNonblocking", + maxQueueSize: binding.ARRAY_LENGTH >>> 1, + quitAfter: binding.ARRAY_LENGTH, + }) + ) + .then((result) => assert.deepStrictEqual(result, expectedArray)) + ); +}; diff --git a/packages/node-addon-examples/tests/threadsafe_function/binding.gyp b/packages/node-addon-examples/tests/threadsafe_function/binding.gyp new file mode 100644 index 00000000..26cbd2c9 --- /dev/null +++ b/packages/node-addon-examples/tests/threadsafe_function/binding.gyp @@ -0,0 +1,14 @@ +{ + "targets": [ + { + "target_name": "addon", + "cflags!": [ "-fno-exceptions" ], + "cflags_cc!": [ "-fno-exceptions" ], + "sources": [ "addon.cpp" ], + "include_dirs": [ + " Date: Wed, 13 Aug 2025 13:04:35 +0200 Subject: [PATCH 03/11] chore: improved release/abort behaviour --- packages/host/cpp/ThreadsafeFunction.cpp | 80 +++++++++++------------- packages/host/cpp/ThreadsafeFunction.hpp | 4 +- 2 files changed, 39 insertions(+), 45 deletions(-) diff --git a/packages/host/cpp/ThreadsafeFunction.cpp b/packages/host/cpp/ThreadsafeFunction.cpp index 6cb9f4d1..cdcf592c 100644 --- a/packages/host/cpp/ThreadsafeFunction.cpp +++ b/packages/host/cpp/ThreadsafeFunction.cpp @@ -36,8 +36,7 @@ ThreadSafeFunction::ThreadSafeFunction( threadFinalizeData_{threadFinalizeData}, threadFinalizeCb_{threadFinalizeCb}, context_{context}, - callJsCb_{callJsCb}, - refCount_{initialThreadCount} { + callJsCb_{callJsCb} { if (jsFunc) { // Keep JS function alive across async hops; fatal here mirrors Node-API's // behavior when environment is irrecoverable. @@ -135,48 +134,48 @@ napi_status ThreadSafeFunction::call( } // Hop to JS thread; we drain one item per hop to keep latency predictable // and avoid long monopolization of the JS queue. - invoker->invokeAsync([this] { + invoker->invokeAsync([self = shared_from_this()] { void* queuedData{nullptr}; auto empty{false}; { - std::lock_guard lock{queueMutex_}; - if (!queue_.empty()) { - queuedData = queue_.front(); - const auto size = queue_.size(); - queue_.pop(); - empty = queue_.empty(); - if (size == maxQueueSize_ && maxQueueSize_) { - queueCv_.notify_one(); + std::lock_guard lock{self->queueMutex_}; + if (!self->queue_.empty()) { + queuedData = self->queue_.front(); + const auto size = self->queue_.size(); + self->queue_.pop(); + empty = self->queue_.empty(); + if (size == self->maxQueueSize_ && self->maxQueueSize_) { + self->queueCv_.notify_one(); } } } - if (queuedData && !aborted_) { + if (queuedData && !self->aborted_) { // Prefer the user-provided callJsCb_ (Node-API compatible). If absent // but we have a JS function ref, call it directly with no args. - if (callJsCb_) { + if (self->callJsCb_) { napi_value fn{nullptr}; - if (jsFuncRef_) { - napi_get_reference_value(env_, jsFuncRef_, &fn); + if (self->jsFuncRef_) { + napi_get_reference_value(self->env_, self->jsFuncRef_, &fn); } - callJsCb_(env_, fn, context_, queuedData); - } else if (jsFuncRef_) { + self->callJsCb_(self->env_, fn, self->context_, queuedData); + } else if (self->jsFuncRef_) { napi_value fn; - napi_get_reference_value(env_, jsFuncRef_, &fn); + napi_get_reference_value(self->env_, self->jsFuncRef_, &fn); napi_value recv; - napi_get_undefined(env_, &recv); + napi_get_undefined(self->env_, &recv); napi_value result; - napi_call_function(env_, recv, fn, 0, nullptr, &result); + napi_call_function(self->env_, recv, fn, 0, nullptr, &result); } } // Auto-finalize when: no remaining threads (acquire/release balance), // queue drained, and not already closing. - if (!threadCount_ && empty && !closing_) { - if (maxQueueSize_) { - std::lock_guard lock{queueMutex_}; - queueCv_.notify_all(); - } - finalize(); + if (!self->threadCount_ && empty) { + // if (self->maxQueueSize_) { + // std::lock_guard lock{self->queueMutex_}; + // self->queueCv_.notify_all(); + // } + self->finalize(); } }); return napi_ok; @@ -186,7 +185,6 @@ napi_status ThreadSafeFunction::acquire() { if (closing_) { return napi_closing; } - refCount_++; threadCount_++; return napi_ok; } @@ -198,21 +196,19 @@ napi_status ThreadSafeFunction::release( aborted_ = true; closing_ = true; } - if (refCount_) { - refCount_--; - } if (threadCount_) { threadCount_--; } - // When the last ref is gone (or we're closing), queue is drained, notify and - // finalize. - std::lock_guard lock{queueMutex_}; - if (!refCount_ && !threadCount_ && queue_.empty() || closing_) { - closing_ = true; + // When the last thread is gone (or we're closing), notify and finalize. + if (!threadCount_ || closing_) { + std::lock_guard lock{queueMutex_}; + auto emptyQueue{queue_.empty()}; if (maxQueueSize_) { queueCv_.notify_all(); } - finalize(); + if (aborted_ || emptyQueue) { + finalize(); + } } return napi_ok; } @@ -232,26 +228,26 @@ napi_status ThreadSafeFunction::unref() { } void ThreadSafeFunction::finalize() { - std::lock_guard lock{finalizeMutex_}; if (handlesClosing_) { return; } handlesClosing_ = true; closing_ = true; - const auto onFinalize = [this] { + const auto onFinalize = [self = shared_from_this()] { // Invoke user finalizer and unregister the handle from the global map. - if (threadFinalizeCb_) { - threadFinalizeCb_(env_, threadFinalizeData_, context_); + if (self->threadFinalizeCb_) { + self->threadFinalizeCb_( + self->env_, self->threadFinalizeData_, self->context_); } std::lock_guard lock{registryMutex}; - registry.erase(reinterpret_cast(this)); + registry.erase(reinterpret_cast(self.get())); }; // Prefer running the finalizer on the JS thread to match expectations; // if CallInvoker is gone, run synchronously. if (const auto invoker = callInvoker_.lock()) { - invoker->invokeAsync([=]() { onFinalize(); }); + invoker->invokeAsync(onFinalize); } else { onFinalize(); } diff --git a/packages/host/cpp/ThreadsafeFunction.hpp b/packages/host/cpp/ThreadsafeFunction.hpp index b4fdf706..37338702 100644 --- a/packages/host/cpp/ThreadsafeFunction.hpp +++ b/packages/host/cpp/ThreadsafeFunction.hpp @@ -70,9 +70,7 @@ class ThreadSafeFunction std::queue queue_; std::atomic closing_{false}; std::atomic referenced_{true}; - std::atomic refCount_; - std::mutex finalizeMutex_; - bool handlesClosing_{false}; + std::atomic handlesClosing_{false}; }; } // namespace callstack::nodeapihost From 55541ae8453ce03923b22123eaa1ab063a37e4a3 Mon Sep 17 00:00:00 2001 From: Kamil Paradowski Date: Wed, 13 Aug 2025 13:08:34 +0200 Subject: [PATCH 04/11] chore: changed name + added nodiscard --- packages/host/cpp/ThreadsafeFunction.hpp | 15 ++++++++------- 1 file changed, 8 insertions(+), 7 deletions(-) diff --git a/packages/host/cpp/ThreadsafeFunction.hpp b/packages/host/cpp/ThreadsafeFunction.hpp index 37338702..d896bcce 100644 --- a/packages/host/cpp/ThreadsafeFunction.hpp +++ b/packages/host/cpp/ThreadsafeFunction.hpp @@ -40,14 +40,15 @@ class ThreadSafeFunction static std::shared_ptr get(napi_threadsafe_function func); - napi_status getContext(void** result); - napi_status call(void* data, napi_threadsafe_function_call_mode isBlocking); - napi_status acquire(); - napi_status release(napi_threadsafe_function_release_mode mode); + [[nodiscard]] napi_status getContext(void** result); + [[nodiscard]] napi_status call( + void* data, napi_threadsafe_function_call_mode isBlocking); + [[nodiscard]] napi_status acquire(); + [[nodiscard]] napi_status release(napi_threadsafe_function_release_mode mode); // Node-API compatibility: These do not affect RN's lifecycle. We only track // the state for diagnostics and API parity with libuv's ref/unref. - napi_status ref(); - napi_status unref(); + [[nodiscard]] napi_status ref(); + [[nodiscard]] napi_status unref(); private: void finalize(); @@ -70,7 +71,7 @@ class ThreadSafeFunction std::queue queue_; std::atomic closing_{false}; std::atomic referenced_{true}; - std::atomic handlesClosing_{false}; + std::atomic finalizeScheduled_{false}; }; } // namespace callstack::nodeapihost From 08997eefd90b1c9f552e6b7e629e9db26ef34c8a Mon Sep 17 00:00:00 2001 From: Kamil Paradowski Date: Wed, 13 Aug 2025 13:08:54 +0200 Subject: [PATCH 05/11] chore: no need to drain queue in callback --- packages/host/cpp/ThreadsafeFunction.cpp | 8 ++------ 1 file changed, 2 insertions(+), 6 deletions(-) diff --git a/packages/host/cpp/ThreadsafeFunction.cpp b/packages/host/cpp/ThreadsafeFunction.cpp index cdcf592c..ff3f38e3 100644 --- a/packages/host/cpp/ThreadsafeFunction.cpp +++ b/packages/host/cpp/ThreadsafeFunction.cpp @@ -171,10 +171,6 @@ napi_status ThreadSafeFunction::call( // Auto-finalize when: no remaining threads (acquire/release balance), // queue drained, and not already closing. if (!self->threadCount_ && empty) { - // if (self->maxQueueSize_) { - // std::lock_guard lock{self->queueMutex_}; - // self->queueCv_.notify_all(); - // } self->finalize(); } }); @@ -228,10 +224,10 @@ napi_status ThreadSafeFunction::unref() { } void ThreadSafeFunction::finalize() { - if (handlesClosing_) { + if (finalizeScheduled_) { return; } - handlesClosing_ = true; + finalizeScheduled_ = true; closing_ = true; const auto onFinalize = [self = shared_from_this()] { From 41e56de4e852359f936ce857555aba92f3521707 Mon Sep 17 00:00:00 2001 From: Kamil Paradowski Date: Wed, 13 Aug 2025 13:37:04 +0200 Subject: [PATCH 06/11] chore: minor improvements, improved memory layout --- packages/host/cpp/RuntimeNodeApiAsync.cpp | 2 +- packages/host/cpp/ThreadsafeFunction.cpp | 168 +++++++++++++--------- packages/host/cpp/ThreadsafeFunction.hpp | 45 +++--- 3 files changed, 129 insertions(+), 86 deletions(-) diff --git a/packages/host/cpp/RuntimeNodeApiAsync.cpp b/packages/host/cpp/RuntimeNodeApiAsync.cpp index 405cf58b..fc19cd37 100644 --- a/packages/host/cpp/RuntimeNodeApiAsync.cpp +++ b/packages/host/cpp/RuntimeNodeApiAsync.cpp @@ -211,7 +211,7 @@ napi_status napi_create_threadsafe_function(napi_env env, thread_finalize_cb, context, call_js_cb); - *result = reinterpret_cast(function.get()); + *result = function->getHandle(); return napi_ok; } diff --git a/packages/host/cpp/ThreadsafeFunction.cpp b/packages/host/cpp/ThreadsafeFunction.cpp index ff3f38e3..a428481e 100644 --- a/packages/host/cpp/ThreadsafeFunction.cpp +++ b/packages/host/cpp/ThreadsafeFunction.cpp @@ -1,16 +1,21 @@ #include "ThreadsafeFunction.hpp" +#include #include "Logger.hpp" // This file provides a React Native-friendly implementation of Node-API's // thread-safe function primitive. In RN we don't own/libuv, so we: // - Use CallInvoker to hop onto the JS thread instead of uv_async. -// - Track a registry mapping native handles to shared_ptrs for lookup/lifetime. +// - Track a registry mapping unique IDs to shared_ptrs for lookup/lifetime. // - Emulate ref/unref semantics without affecting any event loop. -static std::unordered_map> registry; static std::mutex registryMutex; +static std::atomic nextId{1}; + +// Constants for better readability +static constexpr size_t INITIAL_REF_COUNT = 1; namespace callstack::nodeapihost { @@ -26,7 +31,8 @@ ThreadSafeFunction::ThreadSafeFunction( napi_finalize threadFinalizeCb, void* context, napi_threadsafe_function_call_js callJsCb) - : callInvoker_{std::move(callInvoker)}, + : id_{nextId.fetch_add(1, std::memory_order_relaxed)}, + callInvoker_{std::move(callInvoker)}, env_{env}, jsFunc_{jsFunc}, asyncResource_{asyncResource}, @@ -40,8 +46,11 @@ ThreadSafeFunction::ThreadSafeFunction( if (jsFunc) { // Keep JS function alive across async hops; fatal here mirrors Node-API's // behavior when environment is irrecoverable. - const auto status = napi_create_reference(env, jsFunc, 1, &jsFuncRef_); + const auto status = + napi_create_reference(env, jsFunc, INITIAL_REF_COUNT, &jsFuncRef_); if (status != napi_ok) { + // Consider throwing an exception instead of fatal error in future + // versions napi_fatal_error(nullptr, 0, "Failed to create JS function reference", @@ -82,21 +91,26 @@ std::shared_ptr ThreadSafeFunction::create( callJsCb); { - auto handle = reinterpret_cast(function.get()); std::lock_guard lock{registryMutex}; - registry[handle] = function; + registry[function->id_] = function; } - return std::move(function); + return function; } std::shared_ptr ThreadSafeFunction::get( napi_threadsafe_function func) { std::lock_guard lock{registryMutex}; - return registry.contains(func) ? registry[func] : nullptr; + const auto id = reinterpret_cast(func); + const auto it = registry.find(id); + return it != registry.end() ? it->second : nullptr; +} + +napi_threadsafe_function ThreadSafeFunction::getHandle() const noexcept { + return reinterpret_cast(id_); } -napi_status ThreadSafeFunction::getContext(void** result) { +napi_status ThreadSafeFunction::getContext(void** result) noexcept { if (!result) { return napi_invalid_arg; } @@ -107,7 +121,7 @@ napi_status ThreadSafeFunction::getContext(void** result) { napi_status ThreadSafeFunction::call( void* data, napi_threadsafe_function_call_mode isBlocking) { - if (aborted_ || closing_) { + if (isClosingOrAborted()) { return napi_closing; } @@ -120,9 +134,9 @@ napi_status ThreadSafeFunction::call( return napi_queue_full; } queueCv_.wait(lock, [&] { - return queue_.size() < maxQueueSize_ || aborted_ || closing_; + return queue_.size() < maxQueueSize_ || isClosingOrAborted(); }); - if (aborted_ || closing_) return napi_closing; + if (isClosingOrAborted()) return napi_closing; } queue_.push(data); } @@ -134,54 +148,15 @@ napi_status ThreadSafeFunction::call( } // Hop to JS thread; we drain one item per hop to keep latency predictable // and avoid long monopolization of the JS queue. - invoker->invokeAsync([self = shared_from_this()] { - void* queuedData{nullptr}; - auto empty{false}; - { - std::lock_guard lock{self->queueMutex_}; - if (!self->queue_.empty()) { - queuedData = self->queue_.front(); - const auto size = self->queue_.size(); - self->queue_.pop(); - empty = self->queue_.empty(); - if (size == self->maxQueueSize_ && self->maxQueueSize_) { - self->queueCv_.notify_one(); - } - } - } - if (queuedData && !self->aborted_) { - // Prefer the user-provided callJsCb_ (Node-API compatible). If absent - // but we have a JS function ref, call it directly with no args. - if (self->callJsCb_) { - napi_value fn{nullptr}; - if (self->jsFuncRef_) { - napi_get_reference_value(self->env_, self->jsFuncRef_, &fn); - } - self->callJsCb_(self->env_, fn, self->context_, queuedData); - } else if (self->jsFuncRef_) { - napi_value fn; - napi_get_reference_value(self->env_, self->jsFuncRef_, &fn); - napi_value recv; - napi_get_undefined(self->env_, &recv); - napi_value result; - napi_call_function(self->env_, recv, fn, 0, nullptr, &result); - } - } - - // Auto-finalize when: no remaining threads (acquire/release balance), - // queue drained, and not already closing. - if (!self->threadCount_ && empty) { - self->finalize(); - } - }); + invoker->invokeAsync([self = shared_from_this()] { self->processQueue(); }); return napi_ok; } napi_status ThreadSafeFunction::acquire() { - if (closing_) { + if (closing_.load(std::memory_order_acquire)) { return napi_closing; } - threadCount_++; + threadCount_.fetch_add(1, std::memory_order_acq_rel); return napi_ok; } @@ -189,34 +164,34 @@ napi_status ThreadSafeFunction::release( napi_threadsafe_function_release_mode mode) { // Node-API semantics: abort prevents further JS calls and wakes any waiters. if (mode == napi_tsfn_abort) { - aborted_ = true; - closing_ = true; - } - if (threadCount_) { - threadCount_--; + aborted_.store(true, std::memory_order_relaxed); + closing_.store(true, std::memory_order_release); } + + const auto remaining = threadCount_.fetch_sub(1, std::memory_order_acq_rel); + // When the last thread is gone (or we're closing), notify and finalize. - if (!threadCount_ || closing_) { + if (remaining <= 1 || closing_.load(std::memory_order_acquire)) { std::lock_guard lock{queueMutex_}; - auto emptyQueue{queue_.empty()}; + const bool emptyQueue = queue_.empty(); if (maxQueueSize_) { queueCv_.notify_all(); } - if (aborted_ || emptyQueue) { + if (aborted_.load(std::memory_order_acquire) || emptyQueue) { finalize(); } } return napi_ok; } -napi_status ThreadSafeFunction::ref() { +napi_status ThreadSafeFunction::ref() noexcept { // In libuv, this would keep the loop alive. In RN we don't own or expose a // libuv loop. We just track the state for API parity. referenced_.store(true, std::memory_order_relaxed); return napi_ok; } -napi_status ThreadSafeFunction::unref() { +napi_status ThreadSafeFunction::unref() noexcept { // In libuv, this allows the loop to exit if nothing else is keeping it // alive. In RN this is a no-op beyond state tracking. referenced_.store(false, std::memory_order_relaxed); @@ -224,11 +199,13 @@ napi_status ThreadSafeFunction::unref() { } void ThreadSafeFunction::finalize() { - if (finalizeScheduled_) { + bool expected = false; + if (!finalizeScheduled_.compare_exchange_strong( + expected, true, std::memory_order_acq_rel)) { return; } - finalizeScheduled_ = true; - closing_ = true; + + closing_.store(true, std::memory_order_release); const auto onFinalize = [self = shared_from_this()] { // Invoke user finalizer and unregister the handle from the global map. @@ -237,7 +214,7 @@ void ThreadSafeFunction::finalize() { self->env_, self->threadFinalizeData_, self->context_); } std::lock_guard lock{registryMutex}; - registry.erase(reinterpret_cast(self.get())); + registry.erase(self->id_); }; // Prefer running the finalizer on the JS thread to match expectations; @@ -249,4 +226,59 @@ void ThreadSafeFunction::finalize() { } } +void ThreadSafeFunction::processQueue() { + void* queuedData{nullptr}; + bool empty{false}; + + // Extract data from queue + { + std::lock_guard lock{queueMutex_}; + if (!queue_.empty()) { + queuedData = queue_.front(); + const bool wasAtMaxCapacity = (queue_.size() == maxQueueSize_); + queue_.pop(); + empty = queue_.empty(); + + // Notify waiting threads if queue was at max capacity + if (wasAtMaxCapacity && maxQueueSize_) { + queueCv_.notify_one(); + } + } + } + + // Execute JS callback if we have data and aren't aborted + if (queuedData && !aborted_.load(std::memory_order_relaxed)) { + if (callJsCb_) { + napi_value fn{nullptr}; + if (jsFuncRef_) { + napi_get_reference_value(env_, jsFuncRef_, &fn); + } + callJsCb_(env_, fn, context_, queuedData); + } else if (jsFuncRef_) { + napi_value fn{nullptr}; + if (napi_get_reference_value(env_, jsFuncRef_, &fn) == napi_ok) { + napi_value recv{nullptr}; + napi_get_undefined(env_, &recv); + napi_value result{nullptr}; + napi_call_function(env_, recv, fn, 0, nullptr, &result); + } + } + } + + // Auto-finalize when: no remaining threads, queue drained, and not closing + if (shouldFinalize() && empty) { + finalize(); + } +} + +[[nodiscard]] bool ThreadSafeFunction::isClosingOrAborted() const noexcept { + return aborted_.load(std::memory_order_relaxed) || + closing_.load(std::memory_order_relaxed); +} + +[[nodiscard]] bool ThreadSafeFunction::shouldFinalize() const noexcept { + return threadCount_.load(std::memory_order_acquire) == 0 && + !closing_.load(std::memory_order_acquire); +} + } // namespace callstack::nodeapihost diff --git a/packages/host/cpp/ThreadsafeFunction.hpp b/packages/host/cpp/ThreadsafeFunction.hpp index d896bcce..d3469b03 100644 --- a/packages/host/cpp/ThreadsafeFunction.hpp +++ b/packages/host/cpp/ThreadsafeFunction.hpp @@ -40,38 +40,49 @@ class ThreadSafeFunction static std::shared_ptr get(napi_threadsafe_function func); - [[nodiscard]] napi_status getContext(void** result); + [[nodiscard]] napi_threadsafe_function getHandle() const noexcept; + [[nodiscard]] napi_status getContext(void** result) noexcept; [[nodiscard]] napi_status call( void* data, napi_threadsafe_function_call_mode isBlocking); [[nodiscard]] napi_status acquire(); [[nodiscard]] napi_status release(napi_threadsafe_function_release_mode mode); // Node-API compatibility: These do not affect RN's lifecycle. We only track // the state for diagnostics and API parity with libuv's ref/unref. - [[nodiscard]] napi_status ref(); - [[nodiscard]] napi_status unref(); + [[nodiscard]] napi_status ref() noexcept; + [[nodiscard]] napi_status unref() noexcept; private: void finalize(); + void processQueue(); + + [[nodiscard]] bool isClosingOrAborted() const noexcept; + [[nodiscard]] bool shouldFinalize() const noexcept; + + const std::uintptr_t id_; + const size_t maxQueueSize_; - std::weak_ptr callInvoker_; - napi_env env_; - napi_value jsFunc_; - napi_ref jsFuncRef_{nullptr}; - napi_value asyncResource_; - napi_value asyncResourceName_; - size_t maxQueueSize_; std::atomic threadCount_; std::atomic aborted_{false}; - void* threadFinalizeData_; - napi_finalize threadFinalizeCb_; - void* context_; - napi_threadsafe_function_call_js callJsCb_; - std::mutex queueMutex_; - std::condition_variable queueCv_; - std::queue queue_; std::atomic closing_{false}; std::atomic referenced_{true}; std::atomic finalizeScheduled_{false}; + + mutable std::mutex queueMutex_; + std::condition_variable queueCv_; + std::queue queue_; + + napi_env env_; + napi_value jsFunc_; + napi_ref jsFuncRef_{nullptr}; + napi_value asyncResource_; + napi_value asyncResourceName_; + + void* const threadFinalizeData_; + napi_finalize const threadFinalizeCb_; + void* const context_; + napi_threadsafe_function_call_js const callJsCb_; + + std::weak_ptr callInvoker_; }; } // namespace callstack::nodeapihost From f1de6b49d4bbed997010ae60a63979d1bdb6bb7d Mon Sep 17 00:00:00 2001 From: Kamil Paradowski Date: Thu, 14 Aug 2025 13:12:25 +0200 Subject: [PATCH 07/11] chore: remove debug lines --- .../tests/threadsafe_function/addon.js | 48 ++++++------------- 1 file changed, 15 insertions(+), 33 deletions(-) diff --git a/packages/node-addon-examples/tests/threadsafe_function/addon.js b/packages/node-addon-examples/tests/threadsafe_function/addon.js index 8a329f52..dea3e064 100644 --- a/packages/node-addon-examples/tests/threadsafe_function/addon.js +++ b/packages/node-addon-examples/tests/threadsafe_function/addon.js @@ -18,19 +18,14 @@ function testWithJSMarshaller({ maxQueueSize, launchSecondary, }) { - console.log( - `[JS {${cnt++}}] Starting thread: ${threadStarter}, quitAfter: ${quitAfter}, abort: ${abort}, maxQueueSize: ${maxQueueSize}, launchSecondary: ${launchSecondary}` - ); return new Promise((resolve) => { const array = []; binding[threadStarter]( function testCallback(value) { array.push(value); if (array.length === quitAfter) { - // console.log("[JS] Reached quitAfter, calling StopThread"); setImmediate(() => { binding.StopThread(() => { - // console.log("[JS] StopThread callback called, resolving promise"); resolve(array); }, !!abort); }); @@ -38,31 +33,18 @@ function testWithJSMarshaller({ }, !!abort, !!launchSecondary, - maxQueueSize + maxQueueSize, ); }); } module.exports = () => { - // console.log("[JS] Test entry"); - // return testWithJSMarshaller({ - // threadStarter: "StartThread", - // maxQueueSize: binding.MAX_QUEUE_SIZE, - // quitAfter: binding.ARRAY_LENGTH, - // }).then((result) => { - // console.log("Test completed successfully"); - // assert.deepStrictEqual(result, expectedArray); - // }); return ( new Promise(function testWithoutJSMarshaller(resolve) { let callCount = 0; binding.StartThreadNoNative( function testCallback() { callCount++; - - // console.log("Callback called with arguments:", arguments); - // The default call-into-JS implementation passes no arguments. - // assert.strictEqual(arguments.length, 0); if (callCount === binding.ARRAY_LENGTH) { setImmediate(() => { binding.StopThread(() => { @@ -73,7 +55,7 @@ module.exports = () => { }, false /* abort */, false /* launchSecondary */, - binding.MAX_QUEUE_SIZE + binding.MAX_QUEUE_SIZE, ); }) .then(() => @@ -81,14 +63,14 @@ module.exports = () => { threadStarter: "StartThread", maxQueueSize: binding.MAX_QUEUE_SIZE, quitAfter: binding.ARRAY_LENGTH, - }) + }), ) .then(() => testWithJSMarshaller({ threadStarter: "StartThreadNoJsFunc", maxQueueSize: binding.MAX_QUEUE_SIZE, quitAfter: binding.ARRAY_LENGTH, - }) + }), ) .then((result) => assert.deepStrictEqual(result, expectedArray)) @@ -99,7 +81,7 @@ module.exports = () => { threadStarter: "StartThread", maxQueueSize: 0, quitAfter: binding.ARRAY_LENGTH, - }) + }), ) .then((result) => assert.deepStrictEqual(result, expectedArray)) @@ -110,7 +92,7 @@ module.exports = () => { threadStarter: "StartThreadNonblocking", maxQueueSize: binding.MAX_QUEUE_SIZE, quitAfter: binding.ARRAY_LENGTH, - }) + }), ) .then((result) => assert.deepStrictEqual(result, expectedArray)) @@ -121,7 +103,7 @@ module.exports = () => { threadStarter: "StartThread", maxQueueSize: binding.MAX_QUEUE_SIZE, quitAfter: 1, - }) + }), ) .then((result) => assert.deepStrictEqual(result, expectedArray)) @@ -132,7 +114,7 @@ module.exports = () => { threadStarter: "StartThread", maxQueueSize: 0, quitAfter: 1, - }) + }), ) .then((result) => assert.deepStrictEqual(result, expectedArray)) @@ -143,7 +125,7 @@ module.exports = () => { threadStarter: "StartThreadNonblocking", maxQueueSize: binding.MAX_QUEUE_SIZE, quitAfter: 1, - }) + }), ) .then((result) => assert.deepStrictEqual(result, expectedArray)) @@ -156,7 +138,7 @@ module.exports = () => { quitAfter: 1, maxQueueSize: binding.MAX_QUEUE_SIZE, launchSecondary: true, - }) + }), ) .then((result) => assert.deepStrictEqual(result, expectedArray)) @@ -169,7 +151,7 @@ module.exports = () => { quitAfter: 1, maxQueueSize: binding.MAX_QUEUE_SIZE, launchSecondary: true, - }) + }), ) .then((result) => assert.deepStrictEqual(result, expectedArray)) @@ -181,7 +163,7 @@ module.exports = () => { quitAfter: 1, maxQueueSize: binding.MAX_QUEUE_SIZE, abort: true, - }) + }), ) .then((result) => assert.strictEqual(result.indexOf(0), -1)) @@ -193,7 +175,7 @@ module.exports = () => { quitAfter: 1, maxQueueSize: 0, abort: true, - }) + }), ) .then((result) => assert.strictEqual(result.indexOf(0), -1)) @@ -205,7 +187,7 @@ module.exports = () => { quitAfter: 1, maxQueueSize: binding.MAX_QUEUE_SIZE, abort: true, - }) + }), ) .then((result) => assert.strictEqual(result.indexOf(0), -1)) @@ -216,7 +198,7 @@ module.exports = () => { threadStarter: "StartThreadNonblocking", maxQueueSize: binding.ARRAY_LENGTH >>> 1, quitAfter: binding.ARRAY_LENGTH, - }) + }), ) .then((result) => assert.deepStrictEqual(result, expectedArray)) ); From 0cdc8c408c68aef72b603fea0edabc32dbea0b6b Mon Sep 17 00:00:00 2001 From: Kamil Paradowski Date: Thu, 14 Aug 2025 13:12:45 +0200 Subject: [PATCH 08/11] chore: improved comments + cleanup --- packages/host/cpp/ThreadsafeFunction.cpp | 36 ++++++++++-------------- packages/host/cpp/ThreadsafeFunction.hpp | 9 +++--- 2 files changed, 19 insertions(+), 26 deletions(-) diff --git a/packages/host/cpp/ThreadsafeFunction.cpp b/packages/host/cpp/ThreadsafeFunction.cpp index a428481e..e94a1f34 100644 --- a/packages/host/cpp/ThreadsafeFunction.cpp +++ b/packages/host/cpp/ThreadsafeFunction.cpp @@ -2,19 +2,14 @@ #include #include "Logger.hpp" -// This file provides a React Native-friendly implementation of Node-API's -// thread-safe function primitive. In RN we don't own/libuv, so we: -// - Use CallInvoker to hop onto the JS thread instead of uv_async. -// - Track a registry mapping unique IDs to shared_ptrs for lookup/lifetime. -// - Emulate ref/unref semantics without affecting any event loop. - +// Global registry to map unique IDs to ThreadSafeFunction instances. +// We use IDs instead of raw pointers to avoid any use-after-free issues. static std::unordered_map> registry; static std::mutex registryMutex; static std::atomic nextId{1}; -// Constants for better readability static constexpr size_t INITIAL_REF_COUNT = 1; namespace callstack::nodeapihost { @@ -44,13 +39,10 @@ ThreadSafeFunction::ThreadSafeFunction( context_{context}, callJsCb_{callJsCb} { if (jsFunc) { - // Keep JS function alive across async hops; fatal here mirrors Node-API's - // behavior when environment is irrecoverable. + // Keep JS function alive across async hops const auto status = napi_create_reference(env, jsFunc, INITIAL_REF_COUNT, &jsFuncRef_); if (status != napi_ok) { - // Consider throwing an exception instead of fatal error in future - // versions napi_fatal_error(nullptr, 0, "Failed to create JS function reference", @@ -101,6 +93,7 @@ std::shared_ptr ThreadSafeFunction::create( std::shared_ptr ThreadSafeFunction::get( napi_threadsafe_function func) { std::lock_guard lock{registryMutex}; + // Cast the handle back to ID for registry lookup const auto id = reinterpret_cast(func); const auto it = registry.find(id); return it != registry.end() ? it->second : nullptr; @@ -120,7 +113,7 @@ napi_status ThreadSafeFunction::getContext(void** result) noexcept { } napi_status ThreadSafeFunction::call( - void* data, napi_threadsafe_function_call_mode isBlocking) { + void* data, napi_threadsafe_function_call_mode isBlocking) noexcept { if (isClosingOrAborted()) { return napi_closing; } @@ -152,7 +145,7 @@ napi_status ThreadSafeFunction::call( return napi_ok; } -napi_status ThreadSafeFunction::acquire() { +napi_status ThreadSafeFunction::acquire() noexcept { if (closing_.load(std::memory_order_acquire)) { return napi_closing; } @@ -161,7 +154,7 @@ napi_status ThreadSafeFunction::acquire() { } napi_status ThreadSafeFunction::release( - napi_threadsafe_function_release_mode mode) { + napi_threadsafe_function_release_mode mode) noexcept { // Node-API semantics: abort prevents further JS calls and wakes any waiters. if (mode == napi_tsfn_abort) { aborted_.store(true, std::memory_order_relaxed); @@ -185,8 +178,8 @@ napi_status ThreadSafeFunction::release( } napi_status ThreadSafeFunction::ref() noexcept { - // In libuv, this would keep the loop alive. In RN we don't own or expose a - // libuv loop. We just track the state for API parity. + // In libuv, this allows the loop to exit if nothing else is keeping it + // alive. In RN this is a no-op beyond state tracking. referenced_.store(true, std::memory_order_relaxed); return napi_ok; } @@ -199,6 +192,7 @@ napi_status ThreadSafeFunction::unref() noexcept { } void ThreadSafeFunction::finalize() { + // Ensure finalization happens exactly once bool expected = false; if (!finalizeScheduled_.compare_exchange_strong( expected, true, std::memory_order_acq_rel)) { @@ -208,7 +202,6 @@ void ThreadSafeFunction::finalize() { closing_.store(true, std::memory_order_release); const auto onFinalize = [self = shared_from_this()] { - // Invoke user finalizer and unregister the handle from the global map. if (self->threadFinalizeCb_) { self->threadFinalizeCb_( self->env_, self->threadFinalizeData_, self->context_); @@ -249,12 +242,14 @@ void ThreadSafeFunction::processQueue() { // Execute JS callback if we have data and aren't aborted if (queuedData && !aborted_.load(std::memory_order_relaxed)) { if (callJsCb_) { + // Prefer the user-provided callJsCb_ (Node-API compatible) napi_value fn{nullptr}; if (jsFuncRef_) { napi_get_reference_value(env_, jsFuncRef_, &fn); } callJsCb_(env_, fn, context_, queuedData); } else if (jsFuncRef_) { + // Fallback: call JS function directly with no args napi_value fn{nullptr}; if (napi_get_reference_value(env_, jsFuncRef_, &fn) == napi_ok) { napi_value recv{nullptr}; @@ -271,14 +266,13 @@ void ThreadSafeFunction::processQueue() { } } -[[nodiscard]] bool ThreadSafeFunction::isClosingOrAborted() const noexcept { +bool ThreadSafeFunction::isClosingOrAborted() const noexcept { return aborted_.load(std::memory_order_relaxed) || closing_.load(std::memory_order_relaxed); } -[[nodiscard]] bool ThreadSafeFunction::shouldFinalize() const noexcept { +bool ThreadSafeFunction::shouldFinalize() const noexcept { return threadCount_.load(std::memory_order_acquire) == 0 && !closing_.load(std::memory_order_acquire); } - -} // namespace callstack::nodeapihost +} // namespace callstack::nodeapihost \ No newline at end of file diff --git a/packages/host/cpp/ThreadsafeFunction.hpp b/packages/host/cpp/ThreadsafeFunction.hpp index d3469b03..90ae467e 100644 --- a/packages/host/cpp/ThreadsafeFunction.hpp +++ b/packages/host/cpp/ThreadsafeFunction.hpp @@ -43,11 +43,10 @@ class ThreadSafeFunction [[nodiscard]] napi_threadsafe_function getHandle() const noexcept; [[nodiscard]] napi_status getContext(void** result) noexcept; [[nodiscard]] napi_status call( - void* data, napi_threadsafe_function_call_mode isBlocking); - [[nodiscard]] napi_status acquire(); - [[nodiscard]] napi_status release(napi_threadsafe_function_release_mode mode); - // Node-API compatibility: These do not affect RN's lifecycle. We only track - // the state for diagnostics and API parity with libuv's ref/unref. + void* data, napi_threadsafe_function_call_mode isBlocking) noexcept; + [[nodiscard]] napi_status acquire() noexcept; + [[nodiscard]] napi_status release( + napi_threadsafe_function_release_mode mode) noexcept; [[nodiscard]] napi_status ref() noexcept; [[nodiscard]] napi_status unref() noexcept; From 932f99d22b3ed6c8724385239d5765a7cb8acde9 Mon Sep 17 00:00:00 2001 From: Kamil Paradowski Date: Thu, 14 Aug 2025 13:23:02 +0200 Subject: [PATCH 09/11] chore: add changeset --- .changeset/loose-pans-smile.md | 5 +++++ 1 file changed, 5 insertions(+) create mode 100644 .changeset/loose-pans-smile.md diff --git a/.changeset/loose-pans-smile.md b/.changeset/loose-pans-smile.md new file mode 100644 index 00000000..09bfc5d6 --- /dev/null +++ b/.changeset/loose-pans-smile.md @@ -0,0 +1,5 @@ +--- +"react-native-node-api": minor +--- + +Providing implementation of Node-API threadsafe functions From c7aa8fe58570899f0acc1f31bfee76e4b3f85721 Mon Sep 17 00:00:00 2001 From: Kamil Paradowski Date: Thu, 14 Aug 2025 13:30:08 +0200 Subject: [PATCH 10/11] chore: deleting not needed cmake file --- .../tests/threadsafe_function/CMakeLists.txt | 16 ---------------- 1 file changed, 16 deletions(-) delete mode 100644 packages/node-addon-examples/tests/threadsafe_function/CMakeLists.txt diff --git a/packages/node-addon-examples/tests/threadsafe_function/CMakeLists.txt b/packages/node-addon-examples/tests/threadsafe_function/CMakeLists.txt deleted file mode 100644 index 67146934..00000000 --- a/packages/node-addon-examples/tests/threadsafe_function/CMakeLists.txt +++ /dev/null @@ -1,16 +0,0 @@ -cmake_minimum_required(VERSION 3.15) -project(tests-threadsafe_function) - -add_compile_definitions(NAPI_VERSION=8) - -add_library(addon SHARED addon.cpp ${CMAKE_JS_SRC}) -set_target_properties(addon PROPERTIES PREFIX "" SUFFIX ".node") -target_include_directories(addon PRIVATE "/Users/kamil.paradowski/projects/napi/node_modules/node-addon-api" ${CMAKE_JS_INC}) -target_link_libraries(addon PRIVATE ${CMAKE_JS_LIB}) -target_compile_features(addon PRIVATE cxx_std_17) -target_compile_definitions(addon PRIVATE NAPI_DISABLE_CPP_EXCEPTIONS) - -if(MSVC AND CMAKE_JS_NODELIB_DEF AND CMAKE_JS_NODELIB_TARGET) - # Generate node.lib - execute_process(COMMAND ${CMAKE_AR} /def:${CMAKE_JS_NODELIB_DEF} /out:${CMAKE_JS_NODELIB_TARGET} ${CMAKE_STATIC_LINKER_FLAGS}) -endif() \ No newline at end of file From 3239f2b92d6e4f6d1370ced6df799f75dd21930d Mon Sep 17 00:00:00 2001 From: Kamil Paradowski Date: Thu, 14 Aug 2025 13:32:42 +0200 Subject: [PATCH 11/11] chore: add error location --- packages/host/cpp/ThreadsafeFunction.cpp | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/packages/host/cpp/ThreadsafeFunction.cpp b/packages/host/cpp/ThreadsafeFunction.cpp index e94a1f34..7cc86067 100644 --- a/packages/host/cpp/ThreadsafeFunction.cpp +++ b/packages/host/cpp/ThreadsafeFunction.cpp @@ -43,8 +43,8 @@ ThreadSafeFunction::ThreadSafeFunction( const auto status = napi_create_reference(env, jsFunc, INITIAL_REF_COUNT, &jsFuncRef_); if (status != napi_ok) { - napi_fatal_error(nullptr, - 0, + napi_fatal_error("ThreadSafeFunction::ThreadSafeFunction", + NAPI_AUTO_LENGTH, "Failed to create JS function reference", NAPI_AUTO_LENGTH); }