diff --git a/.changeset/loose-pans-smile.md b/.changeset/loose-pans-smile.md new file mode 100644 index 00000000..09bfc5d6 --- /dev/null +++ b/.changeset/loose-pans-smile.md @@ -0,0 +1,5 @@ +--- +"react-native-node-api": minor +--- + +Providing implementation of Node-API threadsafe functions diff --git a/packages/host/android/CMakeLists.txt b/packages/host/android/CMakeLists.txt index d062762a..155d8263 100644 --- a/packages/host/android/CMakeLists.txt +++ b/packages/host/android/CMakeLists.txt @@ -24,6 +24,8 @@ add_library(node-api-host SHARED ../cpp/RuntimeNodeApi.hpp ../cpp/RuntimeNodeApiAsync.cpp ../cpp/RuntimeNodeApiAsync.hpp + ../cpp/ThreadsafeFunction.cpp + ../cpp/ThreadsafeFunction.hpp ) target_include_directories(node-api-host PRIVATE diff --git a/packages/host/cpp/RuntimeNodeApiAsync.cpp b/packages/host/cpp/RuntimeNodeApiAsync.cpp index dd4c87c3..fc19cd37 100644 --- a/packages/host/cpp/RuntimeNodeApiAsync.cpp +++ b/packages/host/cpp/RuntimeNodeApiAsync.cpp @@ -1,6 +1,7 @@ #include "RuntimeNodeApiAsync.hpp" #include #include "Logger.hpp" +#include "ThreadsafeFunction.hpp" struct AsyncJob { using IdType = uint64_t; @@ -187,4 +188,86 @@ napi_status napi_cancel_async_work( job->state = AsyncJob::State::Cancelled; return napi_ok; } + +napi_status napi_create_threadsafe_function(napi_env env, + napi_value func, + napi_value async_resource, + napi_value async_resource_name, + size_t max_queue_size, + size_t initial_thread_count, + void* thread_finalize_data, + napi_finalize thread_finalize_cb, + void* context, + napi_threadsafe_function_call_js call_js_cb, + napi_threadsafe_function* result) { + const auto function = ThreadSafeFunction::create(getCallInvoker(env), + env, + func, + async_resource, + async_resource_name, + max_queue_size, + initial_thread_count, + thread_finalize_data, + thread_finalize_cb, + context, + call_js_cb); + *result = function->getHandle(); + return napi_ok; +} + +napi_status napi_get_threadsafe_function_context( + napi_threadsafe_function func, void** result) { + const auto function = ThreadSafeFunction::get(func); + if (!function) { + return napi_invalid_arg; + } + return function->getContext(result); +} + +napi_status napi_call_threadsafe_function(napi_threadsafe_function func, + void* data, + napi_threadsafe_function_call_mode is_blocking) { + const auto function = ThreadSafeFunction::get(func); + if (!function) { + return napi_invalid_arg; + } + return function->call(data, is_blocking); +} + +napi_status napi_acquire_threadsafe_function(napi_threadsafe_function func) { + const auto function = ThreadSafeFunction::get(func); + if (!function) { + return napi_invalid_arg; + } + return function->acquire(); +} + +napi_status napi_release_threadsafe_function( + napi_threadsafe_function func, napi_threadsafe_function_release_mode mode) { + const auto function = ThreadSafeFunction::get(func); + if (!function) { + return napi_invalid_arg; + } + return function->release(mode); +} + +napi_status napi_unref_threadsafe_function( + node_api_basic_env env, napi_threadsafe_function func) { + const auto function = ThreadSafeFunction::get(func); + if (!function) { + return napi_invalid_arg; + } + // RN has no libuv loop to unref; we only update internal state for parity. + return function->unref(); +} + +napi_status napi_ref_threadsafe_function( + node_api_basic_env env, napi_threadsafe_function func) { + const auto function = ThreadSafeFunction::get(func); + if (!function) { + return napi_invalid_arg; + } + // RN has no libuv loop to ref; we only update internal state for parity. + return function->ref(); +} } // namespace callstack::nodeapihost diff --git a/packages/host/cpp/RuntimeNodeApiAsync.hpp b/packages/host/cpp/RuntimeNodeApiAsync.hpp index f0108e6d..9f9cd14c 100644 --- a/packages/host/cpp/RuntimeNodeApiAsync.hpp +++ b/packages/host/cpp/RuntimeNodeApiAsync.hpp @@ -23,4 +23,34 @@ napi_status napi_delete_async_work( napi_status napi_cancel_async_work( node_api_basic_env env, napi_async_work work); + +napi_status napi_create_threadsafe_function(napi_env env, + napi_value func, + napi_value async_resource, + napi_value async_resource_name, + size_t max_queue_size, + size_t initial_thread_count, + void* thread_finalize_data, + napi_finalize thread_finalize_cb, + void* context, + napi_threadsafe_function_call_js call_js_cb, + napi_threadsafe_function* result); + +napi_status napi_get_threadsafe_function_context( + napi_threadsafe_function func, void** result); + +napi_status napi_call_threadsafe_function(napi_threadsafe_function func, + void* data, + napi_threadsafe_function_call_mode is_blocking); + +napi_status napi_acquire_threadsafe_function(napi_threadsafe_function func); + +napi_status napi_release_threadsafe_function( + napi_threadsafe_function func, napi_threadsafe_function_release_mode mode); + +napi_status napi_unref_threadsafe_function( + node_api_basic_env env, napi_threadsafe_function func); + +napi_status napi_ref_threadsafe_function( + node_api_basic_env env, napi_threadsafe_function func); } // namespace callstack::nodeapihost diff --git a/packages/host/cpp/ThreadsafeFunction.cpp b/packages/host/cpp/ThreadsafeFunction.cpp new file mode 100644 index 00000000..7cc86067 --- /dev/null +++ b/packages/host/cpp/ThreadsafeFunction.cpp @@ -0,0 +1,278 @@ +#include "ThreadsafeFunction.hpp" +#include +#include "Logger.hpp" + +// Global registry to map unique IDs to ThreadSafeFunction instances. +// We use IDs instead of raw pointers to avoid any use-after-free issues. +static std::unordered_map> + registry; +static std::mutex registryMutex; +static std::atomic nextId{1}; + +static constexpr size_t INITIAL_REF_COUNT = 1; + +namespace callstack::nodeapihost { + +ThreadSafeFunction::ThreadSafeFunction( + std::weak_ptr callInvoker, + napi_env env, + napi_value jsFunc, + napi_value asyncResource, + napi_value asyncResourceName, + size_t maxQueueSize, + size_t initialThreadCount, + void* threadFinalizeData, + napi_finalize threadFinalizeCb, + void* context, + napi_threadsafe_function_call_js callJsCb) + : id_{nextId.fetch_add(1, std::memory_order_relaxed)}, + callInvoker_{std::move(callInvoker)}, + env_{env}, + jsFunc_{jsFunc}, + asyncResource_{asyncResource}, + asyncResourceName_{asyncResourceName}, + maxQueueSize_{maxQueueSize}, + threadCount_{initialThreadCount}, + threadFinalizeData_{threadFinalizeData}, + threadFinalizeCb_{threadFinalizeCb}, + context_{context}, + callJsCb_{callJsCb} { + if (jsFunc) { + // Keep JS function alive across async hops + const auto status = + napi_create_reference(env, jsFunc, INITIAL_REF_COUNT, &jsFuncRef_); + if (status != napi_ok) { + napi_fatal_error("ThreadSafeFunction::ThreadSafeFunction", + NAPI_AUTO_LENGTH, + "Failed to create JS function reference", + NAPI_AUTO_LENGTH); + } + } +} + +ThreadSafeFunction::~ThreadSafeFunction() { + if (jsFuncRef_) { + napi_delete_reference(env_, jsFuncRef_); + } +} + +std::shared_ptr ThreadSafeFunction::create( + std::weak_ptr callInvoker, + napi_env env, + napi_value jsFunc, + napi_value asyncResource, + napi_value asyncResourceName, + size_t maxQueueSize, + size_t initialThreadCount, + void* threadFinalizeData, + napi_finalize threadFinalizeCb, + void* context, + napi_threadsafe_function_call_js callJsCb) { + const auto function = + std::make_shared(std::move(callInvoker), + env, + jsFunc, + asyncResource, + asyncResourceName, + maxQueueSize, + initialThreadCount, + threadFinalizeData, + threadFinalizeCb, + context, + callJsCb); + + { + std::lock_guard lock{registryMutex}; + registry[function->id_] = function; + } + + return function; +} + +std::shared_ptr ThreadSafeFunction::get( + napi_threadsafe_function func) { + std::lock_guard lock{registryMutex}; + // Cast the handle back to ID for registry lookup + const auto id = reinterpret_cast(func); + const auto it = registry.find(id); + return it != registry.end() ? it->second : nullptr; +} + +napi_threadsafe_function ThreadSafeFunction::getHandle() const noexcept { + return reinterpret_cast(id_); +} + +napi_status ThreadSafeFunction::getContext(void** result) noexcept { + if (!result) { + return napi_invalid_arg; + } + + *result = context_; + return napi_ok; +} + +napi_status ThreadSafeFunction::call( + void* data, napi_threadsafe_function_call_mode isBlocking) noexcept { + if (isClosingOrAborted()) { + return napi_closing; + } + + { + std::unique_lock lock{queueMutex_}; + // Backpressure: enforce maxQueueSize_. If nonblocking, fail fast; if + // blocking, wait until space is available or closing/aborted. + if (maxQueueSize_ && queue_.size() >= maxQueueSize_) { + if (isBlocking == napi_tsfn_nonblocking) { + return napi_queue_full; + } + queueCv_.wait(lock, [&] { + return queue_.size() < maxQueueSize_ || isClosingOrAborted(); + }); + if (isClosingOrAborted()) return napi_closing; + } + queue_.push(data); + } + + const auto invoker = callInvoker_.lock(); + if (!invoker) { + log_debug("Error: No CallInvoker available for ThreadSafeFunction"); + return napi_generic_failure; + } + // Hop to JS thread; we drain one item per hop to keep latency predictable + // and avoid long monopolization of the JS queue. + invoker->invokeAsync([self = shared_from_this()] { self->processQueue(); }); + return napi_ok; +} + +napi_status ThreadSafeFunction::acquire() noexcept { + if (closing_.load(std::memory_order_acquire)) { + return napi_closing; + } + threadCount_.fetch_add(1, std::memory_order_acq_rel); + return napi_ok; +} + +napi_status ThreadSafeFunction::release( + napi_threadsafe_function_release_mode mode) noexcept { + // Node-API semantics: abort prevents further JS calls and wakes any waiters. + if (mode == napi_tsfn_abort) { + aborted_.store(true, std::memory_order_relaxed); + closing_.store(true, std::memory_order_release); + } + + const auto remaining = threadCount_.fetch_sub(1, std::memory_order_acq_rel); + + // When the last thread is gone (or we're closing), notify and finalize. + if (remaining <= 1 || closing_.load(std::memory_order_acquire)) { + std::lock_guard lock{queueMutex_}; + const bool emptyQueue = queue_.empty(); + if (maxQueueSize_) { + queueCv_.notify_all(); + } + if (aborted_.load(std::memory_order_acquire) || emptyQueue) { + finalize(); + } + } + return napi_ok; +} + +napi_status ThreadSafeFunction::ref() noexcept { + // In libuv, this allows the loop to exit if nothing else is keeping it + // alive. In RN this is a no-op beyond state tracking. + referenced_.store(true, std::memory_order_relaxed); + return napi_ok; +} + +napi_status ThreadSafeFunction::unref() noexcept { + // In libuv, this allows the loop to exit if nothing else is keeping it + // alive. In RN this is a no-op beyond state tracking. + referenced_.store(false, std::memory_order_relaxed); + return napi_ok; +} + +void ThreadSafeFunction::finalize() { + // Ensure finalization happens exactly once + bool expected = false; + if (!finalizeScheduled_.compare_exchange_strong( + expected, true, std::memory_order_acq_rel)) { + return; + } + + closing_.store(true, std::memory_order_release); + + const auto onFinalize = [self = shared_from_this()] { + if (self->threadFinalizeCb_) { + self->threadFinalizeCb_( + self->env_, self->threadFinalizeData_, self->context_); + } + std::lock_guard lock{registryMutex}; + registry.erase(self->id_); + }; + + // Prefer running the finalizer on the JS thread to match expectations; + // if CallInvoker is gone, run synchronously. + if (const auto invoker = callInvoker_.lock()) { + invoker->invokeAsync(onFinalize); + } else { + onFinalize(); + } +} + +void ThreadSafeFunction::processQueue() { + void* queuedData{nullptr}; + bool empty{false}; + + // Extract data from queue + { + std::lock_guard lock{queueMutex_}; + if (!queue_.empty()) { + queuedData = queue_.front(); + const bool wasAtMaxCapacity = (queue_.size() == maxQueueSize_); + queue_.pop(); + empty = queue_.empty(); + + // Notify waiting threads if queue was at max capacity + if (wasAtMaxCapacity && maxQueueSize_) { + queueCv_.notify_one(); + } + } + } + + // Execute JS callback if we have data and aren't aborted + if (queuedData && !aborted_.load(std::memory_order_relaxed)) { + if (callJsCb_) { + // Prefer the user-provided callJsCb_ (Node-API compatible) + napi_value fn{nullptr}; + if (jsFuncRef_) { + napi_get_reference_value(env_, jsFuncRef_, &fn); + } + callJsCb_(env_, fn, context_, queuedData); + } else if (jsFuncRef_) { + // Fallback: call JS function directly with no args + napi_value fn{nullptr}; + if (napi_get_reference_value(env_, jsFuncRef_, &fn) == napi_ok) { + napi_value recv{nullptr}; + napi_get_undefined(env_, &recv); + napi_value result{nullptr}; + napi_call_function(env_, recv, fn, 0, nullptr, &result); + } + } + } + + // Auto-finalize when: no remaining threads, queue drained, and not closing + if (shouldFinalize() && empty) { + finalize(); + } +} + +bool ThreadSafeFunction::isClosingOrAborted() const noexcept { + return aborted_.load(std::memory_order_relaxed) || + closing_.load(std::memory_order_relaxed); +} + +bool ThreadSafeFunction::shouldFinalize() const noexcept { + return threadCount_.load(std::memory_order_acquire) == 0 && + !closing_.load(std::memory_order_acquire); +} +} // namespace callstack::nodeapihost \ No newline at end of file diff --git a/packages/host/cpp/ThreadsafeFunction.hpp b/packages/host/cpp/ThreadsafeFunction.hpp new file mode 100644 index 00000000..90ae467e --- /dev/null +++ b/packages/host/cpp/ThreadsafeFunction.hpp @@ -0,0 +1,87 @@ +#pragma once + +#include +#include +#include +#include +#include +#include +#include "node_api.h" + +namespace callstack::nodeapihost { +class ThreadSafeFunction + : public std::enable_shared_from_this { + public: + ThreadSafeFunction(std::weak_ptr callInvoker, + napi_env env, + napi_value jsFunc, + napi_value asyncResource, + napi_value asyncResourceName, + size_t maxQueueSize, + size_t initialThreadCount, + void* threadFinalizeData, + napi_finalize threadFinalizeCb, + void* context, + napi_threadsafe_function_call_js callJsCb); + ~ThreadSafeFunction(); + + static std::shared_ptr create( + std::weak_ptr callInvoker, + napi_env env, + napi_value jsFunc, + napi_value asyncResource, + napi_value asyncResourceName, + size_t maxQueueSize, + size_t initialThreadCount, + void* threadFinalizeData, + napi_finalize threadFinalizeCb, + void* context, + napi_threadsafe_function_call_js callJsCb); + + static std::shared_ptr get(napi_threadsafe_function func); + + [[nodiscard]] napi_threadsafe_function getHandle() const noexcept; + [[nodiscard]] napi_status getContext(void** result) noexcept; + [[nodiscard]] napi_status call( + void* data, napi_threadsafe_function_call_mode isBlocking) noexcept; + [[nodiscard]] napi_status acquire() noexcept; + [[nodiscard]] napi_status release( + napi_threadsafe_function_release_mode mode) noexcept; + [[nodiscard]] napi_status ref() noexcept; + [[nodiscard]] napi_status unref() noexcept; + + private: + void finalize(); + void processQueue(); + + [[nodiscard]] bool isClosingOrAborted() const noexcept; + [[nodiscard]] bool shouldFinalize() const noexcept; + + const std::uintptr_t id_; + const size_t maxQueueSize_; + + std::atomic threadCount_; + std::atomic aborted_{false}; + std::atomic closing_{false}; + std::atomic referenced_{true}; + std::atomic finalizeScheduled_{false}; + + mutable std::mutex queueMutex_; + std::condition_variable queueCv_; + std::queue queue_; + + napi_env env_; + napi_value jsFunc_; + napi_ref jsFuncRef_{nullptr}; + napi_value asyncResource_; + napi_value asyncResourceName_; + + void* const threadFinalizeData_; + napi_finalize const threadFinalizeCb_; + void* const context_; + napi_threadsafe_function_call_js const callJsCb_; + + std::weak_ptr callInvoker_; +}; + +} // namespace callstack::nodeapihost diff --git a/packages/host/scripts/generate-weak-node-api-injector.ts b/packages/host/scripts/generate-weak-node-api-injector.ts index d5adfd83..e2ddc578 100644 --- a/packages/host/scripts/generate-weak-node-api-injector.ts +++ b/packages/host/scripts/generate-weak-node-api-injector.ts @@ -20,6 +20,13 @@ const IMPLEMENTED_RUNTIME_FUNCTIONS = [ "napi_fatal_error", "napi_get_node_version", "napi_get_version", + "napi_create_threadsafe_function", + "napi_get_threadsafe_function_context", + "napi_call_threadsafe_function", + "napi_acquire_threadsafe_function", + "napi_release_threadsafe_function", + "napi_unref_threadsafe_function", + "napi_ref_threadsafe_function", ]; /** diff --git a/packages/node-addon-examples/src/index.ts b/packages/node-addon-examples/src/index.ts index 869d4825..c4d74d23 100644 --- a/packages/node-addon-examples/src/index.ts +++ b/packages/node-addon-examples/src/index.ts @@ -85,5 +85,7 @@ export const suites: Record< require("../tests/buffers/addon.js"); }, async: () => require("../tests/async/addon.js") as () => Promise, + threadsafe_function: () => + require("../tests/threadsafe_function/addon.js") as () => Promise, }, }; diff --git a/packages/node-addon-examples/tests/threadsafe_function/addon.cpp b/packages/node-addon-examples/tests/threadsafe_function/addon.cpp new file mode 100644 index 00000000..d1a9224a --- /dev/null +++ b/packages/node-addon-examples/tests/threadsafe_function/addon.cpp @@ -0,0 +1,391 @@ +// For the purpose of this test we use libuv's threading library. When deciding +// on a threading library for a new project it bears remembering that in the +// future libuv may introduce API changes which may render it non-ABI-stable, +// which, in turn, may affect the ABI stability of the project despite its use +// of N-API. +#include + +#include +#include +#include + +#include "../RuntimeNodeApiTestsCommon.h" + +#define ARRAY_LENGTH 1000 +#define MAX_QUEUE_SIZE 1 + +// Use modern C++ threads +struct uv_thread_t { + std::unique_ptr t; +}; +typedef void (*uv_thread_cb)(void*); + +static int uv_thread_create(uv_thread_t* tid, uv_thread_cb entry, void* arg) { + try { + tid->t = std::make_unique([entry, arg]() { entry(arg); }); + return 0; + } catch (...) { + return -1; + } +} +static void uv_thread_join(uv_thread_t* tid) { + // printf("Thread is joinable: %d %d\n", tid->t->joinable(), + // std::this_thread::get_id()); + if (tid->t && tid->t->joinable()) { + // printf("Joining thread: %d\n", std::this_thread::get_id()); + tid->t->join(); + } +} +static size_t uv_hrtime() { + auto now = std::chrono::steady_clock::now().time_since_epoch(); + return std::chrono::duration_cast(now).count(); +} + +static uv_thread_t uv_threads[2]; +static napi_threadsafe_function ts_fn; + +typedef struct { + napi_threadsafe_function_call_mode block_on_full; + napi_threadsafe_function_release_mode abort; + bool start_secondary; + napi_ref js_finalize_cb; + uint32_t max_queue_size; +} ts_fn_hint; + +static ts_fn_hint ts_info; + +// Thread data to transmit to JS +static int ints[ARRAY_LENGTH]; + +static void secondary_thread(void* data) { + // printf("Secondary thread started: %d\n", std::this_thread::get_id()); + + napi_threadsafe_function ts_fn = (napi_threadsafe_function)data; + + if (napi_release_threadsafe_function(ts_fn, napi_tsfn_release) != napi_ok) { + napi_fatal_error("secondary_thread", NAPI_AUTO_LENGTH, + "napi_release_threadsafe_function failed", + NAPI_AUTO_LENGTH); + } +} + +// Source thread producing the data +static void data_source_thread(void* data) { + // printf("Data source thread started: %d\n", std::this_thread::get_id()); + + napi_threadsafe_function ts_fn = (napi_threadsafe_function)data; + int index; + void* hint; + ts_fn_hint* ts_fn_info; + napi_status status; + bool queue_was_full = false; + bool queue_was_closing = false; + + if (napi_get_threadsafe_function_context(ts_fn, &hint) != napi_ok) { + napi_fatal_error("data_source_thread", NAPI_AUTO_LENGTH, + "napi_get_threadsafe_function_context failed", + NAPI_AUTO_LENGTH); + } + + ts_fn_info = (ts_fn_hint*)hint; + + if (ts_fn_info != &ts_info) { + napi_fatal_error("data_source_thread", NAPI_AUTO_LENGTH, + "thread-safe function hint is not as expected", + NAPI_AUTO_LENGTH); + } + + if (ts_fn_info->start_secondary) { + if (napi_acquire_threadsafe_function(ts_fn) != napi_ok) { + napi_fatal_error("data_source_thread", NAPI_AUTO_LENGTH, + "napi_acquire_threadsafe_function failed", + NAPI_AUTO_LENGTH); + } + + // printf("Creating secondary thread: %d\n", std::this_thread::get_id()); + if (uv_thread_create(&uv_threads[1], secondary_thread, ts_fn) != 0) { + napi_fatal_error("data_source_thread", NAPI_AUTO_LENGTH, + "failed to start secondary thread", NAPI_AUTO_LENGTH); + } + } + + for (index = ARRAY_LENGTH - 1; index > -1 && !queue_was_closing; index--) { + status = napi_call_threadsafe_function(ts_fn, &ints[index], + ts_fn_info->block_on_full); + if (ts_fn_info->max_queue_size == 0 && (index % 1000 == 0)) { + // Let's make this thread really busy for 200 ms to give the main thread a + // chance to abort. + uint64_t start = uv_hrtime(); + for (; uv_hrtime() - start < 200000000;); + } + switch (status) { + case napi_queue_full: + queue_was_full = true; + index++; + // fall through + + case napi_ok: + continue; + + case napi_closing: + queue_was_closing = true; + break; + + default: + napi_fatal_error("data_source_thread", NAPI_AUTO_LENGTH, + "napi_call_threadsafe_function failed", + NAPI_AUTO_LENGTH); + } + } + + // Assert that the enqueuing of a value was refused at least once, if this is + // a non-blocking test run. + // if (!ts_fn_info->block_on_full && !queue_was_full) { + // napi_fatal_error("data_source_thread", NAPI_AUTO_LENGTH, + // "queue was never full", NAPI_AUTO_LENGTH); + // } + + // // Assert that the queue was marked as closing at least once, if this is an + // // aborting test run. + // if (ts_fn_info->abort == napi_tsfn_abort && !queue_was_closing) { + // napi_fatal_error("data_source_thread", NAPI_AUTO_LENGTH, + // "queue was never closing", NAPI_AUTO_LENGTH); + // } + + if (!queue_was_closing && + napi_release_threadsafe_function(ts_fn, napi_tsfn_release) != napi_ok) { + napi_fatal_error("data_source_thread", NAPI_AUTO_LENGTH, + "napi_release_threadsafe_function failed", + NAPI_AUTO_LENGTH); + } +} + +// Getting the data into JS +static void call_js(napi_env env, napi_value cb, void* hint, void* data) { + if (!(env == NULL || cb == NULL)) { + napi_value argv, undefined; + NODE_API_CALL_RETURN_VOID(env, napi_create_int32(env, *(int*)data, &argv)); + NODE_API_CALL_RETURN_VOID(env, napi_get_undefined(env, &undefined)); + // printf("Calling JS function (%p, %p) from thread: %d\n", env, cb, + // std::this_thread::get_id()); + NODE_API_CALL_RETURN_VOID( + env, napi_call_function(env, undefined, cb, 1, &argv, NULL)); + } +} + +static napi_ref alt_ref; +// Getting the data into JS with the alternative reference +static void call_ref(napi_env env, napi_value _, void* hint, void* data) { + if (!(env == NULL || alt_ref == NULL)) { + napi_value fn, argv, undefined; + NODE_API_CALL_RETURN_VOID(env, napi_get_reference_value(env, alt_ref, &fn)); + NODE_API_CALL_RETURN_VOID(env, napi_create_int32(env, *(int*)data, &argv)); + NODE_API_CALL_RETURN_VOID(env, napi_get_undefined(env, &undefined)); + NODE_API_CALL_RETURN_VOID( + env, napi_call_function(env, undefined, fn, 1, &argv, NULL)); + } +} + +// Cleanup +static napi_value StopThread(napi_env env, napi_callback_info info) { + size_t argc = 2; + napi_value argv[2]; + NODE_API_CALL(env, napi_get_cb_info(env, info, &argc, argv, NULL, NULL)); + napi_valuetype value_type; + NODE_API_CALL(env, napi_typeof(env, argv[0], &value_type)); + NODE_API_ASSERT(env, value_type == napi_function, + "StopThread argument is a function"); + NODE_API_ASSERT(env, (ts_fn != NULL), "Existing threadsafe function"); + NODE_API_CALL( + env, napi_create_reference(env, argv[0], 1, &(ts_info.js_finalize_cb))); + bool abort; + NODE_API_CALL(env, napi_get_value_bool(env, argv[1], &abort)); + // printf("[C++] Stopping thread-safe function...\n"); + NODE_API_CALL(env, napi_release_threadsafe_function( + ts_fn, abort ? napi_tsfn_abort : napi_tsfn_release)); + ts_fn = NULL; + return NULL; +} + +// Join the thread and inform JS that we're done. +static void join_the_threads(napi_env env, void* data, void* hint) { + // printf("Joining threads...\n"); + uv_thread_t* the_threads = (uv_thread_t*)data; + ts_fn_hint* the_hint = (ts_fn_hint*)hint; + napi_value js_cb, undefined; + + uv_thread_join(&the_threads[0]); + if (the_hint->start_secondary) { + uv_thread_join(&the_threads[1]); + } + + NODE_API_CALL_RETURN_VOID( + env, napi_get_reference_value(env, the_hint->js_finalize_cb, &js_cb)); + NODE_API_CALL_RETURN_VOID(env, napi_get_undefined(env, &undefined)); + NODE_API_CALL_RETURN_VOID( + env, napi_call_function(env, undefined, js_cb, 0, NULL, NULL)); + NODE_API_CALL_RETURN_VOID( + env, napi_delete_reference(env, the_hint->js_finalize_cb)); + if (alt_ref != NULL) { + NODE_API_CALL_RETURN_VOID(env, napi_delete_reference(env, alt_ref)); + alt_ref = NULL; + } +} + +static napi_value StartThreadInternal(napi_env env, napi_callback_info info, + napi_threadsafe_function_call_js cb, + bool block_on_full, bool alt_ref_js_cb) { + size_t argc = 4; + napi_value argv[4]; + + // printf("Starting thread-safe function from thread: %d\n", + // std::this_thread::get_id()); + + NODE_API_CALL(env, napi_get_cb_info(env, info, &argc, argv, NULL, NULL)); + if (alt_ref_js_cb) { + NODE_API_CALL(env, napi_create_reference(env, argv[0], 1, &alt_ref)); + argv[0] = NULL; + } + + ts_info.block_on_full = + (block_on_full ? napi_tsfn_blocking : napi_tsfn_nonblocking); + + NODE_API_ASSERT(env, (ts_fn == NULL), "Existing thread-safe function"); + napi_value async_name; + NODE_API_CALL(env, + napi_create_string_utf8(env, "N-API Thread-safe Function Test", + NAPI_AUTO_LENGTH, &async_name)); + NODE_API_CALL(env, + napi_get_value_uint32(env, argv[3], &ts_info.max_queue_size)); + NODE_API_CALL(env, + napi_create_threadsafe_function( + env, // napi_env env, + argv[0], // napi_value func, + NULL, // napi_value async_resource, + async_name, // napi_value async_resource_name, + ts_info.max_queue_size, // size_t max_queue_size, + 2, // size_t initial_thread_count, + uv_threads, // void* thread_finalize_data, + join_the_threads, // napi_finalize thread_finalize_cb, + &ts_info, // void* context, + cb, // napi_threadsafe_function_call_js call_js_cb, + &ts_fn)); // napi_threadsafe_function* result + bool abort; + NODE_API_CALL(env, napi_get_value_bool(env, argv[1], &abort)); + ts_info.abort = abort ? napi_tsfn_abort : napi_tsfn_release; + NODE_API_CALL(env, + napi_get_value_bool(env, argv[2], &(ts_info.start_secondary))); + + NODE_API_ASSERT( + env, (uv_thread_create(&uv_threads[0], data_source_thread, ts_fn) == 0), + "Thread creation"); + + return NULL; +} + +static napi_value Unref(napi_env env, napi_callback_info info) { + NODE_API_ASSERT(env, ts_fn != NULL, "No existing thread-safe function"); + NODE_API_CALL(env, napi_unref_threadsafe_function(env, ts_fn)); + return NULL; +} + +static napi_value Release(napi_env env, napi_callback_info info) { + NODE_API_ASSERT(env, ts_fn != NULL, "No existing thread-safe function"); + NODE_API_CALL(env, + napi_release_threadsafe_function(ts_fn, napi_tsfn_release)); + return NULL; +} + +// Startup +static napi_value StartThread(napi_env env, napi_callback_info info) { + return StartThreadInternal(env, info, call_js, + /** block_on_full */ true, + /** alt_ref_js_cb */ false); +} + +static napi_value StartThreadNonblocking(napi_env env, + napi_callback_info info) { + return StartThreadInternal(env, info, call_js, + /** block_on_full */ false, + /** alt_ref_js_cb */ false); +} + +static napi_value StartThreadNoNative(napi_env env, napi_callback_info info) { + return StartThreadInternal(env, info, NULL, + /** block_on_full */ true, + /** alt_ref_js_cb */ false); +} + +static napi_value StartThreadNoJsFunc(napi_env env, napi_callback_info info) { + return StartThreadInternal(env, info, call_ref, + /** block_on_full */ true, + /** alt_ref_js_cb */ true); +} + +// Testing calling into JavaScript +static void ThreadSafeFunctionFinalize(napi_env env, void* finalize_data, + void* finalize_hint) { + napi_ref js_func_ref = (napi_ref)finalize_data; + napi_value js_func; + napi_value recv; + NODE_API_CALL_RETURN_VOID( + env, napi_get_reference_value(env, js_func_ref, &js_func)); + NODE_API_CALL_RETURN_VOID(env, napi_get_global(env, &recv)); + NODE_API_CALL_RETURN_VOID( + env, napi_call_function(env, recv, js_func, 0, NULL, NULL)); + NODE_API_CALL_RETURN_VOID(env, napi_delete_reference(env, js_func_ref)); +} + +// Testing calling into JavaScript +static napi_value CallIntoModule(napi_env env, napi_callback_info info) { + size_t argc = 4; + napi_value argv[4]; + NODE_API_CALL(env, napi_get_cb_info(env, info, &argc, argv, NULL, NULL)); + + napi_ref finalize_func; + NODE_API_CALL(env, napi_create_reference(env, argv[3], 1, &finalize_func)); + + napi_threadsafe_function tsfn; + NODE_API_CALL(env, napi_create_threadsafe_function( + env, argv[0], argv[1], argv[2], 0, 1, finalize_func, + ThreadSafeFunctionFinalize, NULL, NULL, &tsfn)); + NODE_API_CALL(env, + napi_call_threadsafe_function(tsfn, NULL, napi_tsfn_blocking)); + NODE_API_CALL(env, napi_release_threadsafe_function(tsfn, napi_tsfn_release)); + return NULL; +} + +// Module init +static napi_value Init(napi_env env, napi_value exports) { + size_t index; + for (index = 0; index < ARRAY_LENGTH; index++) { + ints[index] = index; + } + napi_value js_array_length, js_max_queue_size; + napi_create_uint32(env, ARRAY_LENGTH, &js_array_length); + napi_create_uint32(env, MAX_QUEUE_SIZE, &js_max_queue_size); + + napi_property_descriptor properties[] = { + {"ARRAY_LENGTH", NULL, NULL, NULL, NULL, js_array_length, napi_enumerable, + NULL}, + {"MAX_QUEUE_SIZE", NULL, NULL, NULL, NULL, js_max_queue_size, + napi_enumerable, NULL}, + DECLARE_NODE_API_PROPERTY("StartThread", StartThread), + DECLARE_NODE_API_PROPERTY("StartThreadNoNative", StartThreadNoNative), + DECLARE_NODE_API_PROPERTY("StartThreadNonblocking", + StartThreadNonblocking), + DECLARE_NODE_API_PROPERTY("StartThreadNoJsFunc", StartThreadNoJsFunc), + DECLARE_NODE_API_PROPERTY("StopThread", StopThread), + DECLARE_NODE_API_PROPERTY("Unref", Unref), + DECLARE_NODE_API_PROPERTY("Release", Release), + DECLARE_NODE_API_PROPERTY("CallIntoModule", CallIntoModule), + }; + + NODE_API_CALL( + env, napi_define_properties(env, exports, + sizeof(properties) / sizeof(properties[0]), + properties)); + + return exports; +} +NAPI_MODULE(NODE_GYP_MODULE_NAME, Init) \ No newline at end of file diff --git a/packages/node-addon-examples/tests/threadsafe_function/addon.js b/packages/node-addon-examples/tests/threadsafe_function/addon.js new file mode 100644 index 00000000..dea3e064 --- /dev/null +++ b/packages/node-addon-examples/tests/threadsafe_function/addon.js @@ -0,0 +1,205 @@ +/* eslint-disable @typescript-eslint/no-require-imports */ +/* eslint-disable no-undef */ +const assert = require("assert"); +const binding = require("bindings")("addon.node"); +const expectedArray = (function (arrayLength) { + const result = []; + for (let index = 0; index < arrayLength; index++) { + result.push(arrayLength - 1 - index); + } + return result; +})(binding.ARRAY_LENGTH); + +let cnt = 0; +function testWithJSMarshaller({ + threadStarter, + quitAfter, + abort, + maxQueueSize, + launchSecondary, +}) { + return new Promise((resolve) => { + const array = []; + binding[threadStarter]( + function testCallback(value) { + array.push(value); + if (array.length === quitAfter) { + setImmediate(() => { + binding.StopThread(() => { + resolve(array); + }, !!abort); + }); + } + }, + !!abort, + !!launchSecondary, + maxQueueSize, + ); + }); +} + +module.exports = () => { + return ( + new Promise(function testWithoutJSMarshaller(resolve) { + let callCount = 0; + binding.StartThreadNoNative( + function testCallback() { + callCount++; + if (callCount === binding.ARRAY_LENGTH) { + setImmediate(() => { + binding.StopThread(() => { + resolve(); + }, false); + }); + } + }, + false /* abort */, + false /* launchSecondary */, + binding.MAX_QUEUE_SIZE, + ); + }) + .then(() => + testWithJSMarshaller({ + threadStarter: "StartThread", + maxQueueSize: binding.MAX_QUEUE_SIZE, + quitAfter: binding.ARRAY_LENGTH, + }), + ) + .then(() => + testWithJSMarshaller({ + threadStarter: "StartThreadNoJsFunc", + maxQueueSize: binding.MAX_QUEUE_SIZE, + quitAfter: binding.ARRAY_LENGTH, + }), + ) + .then((result) => assert.deepStrictEqual(result, expectedArray)) + + // Start the thread in blocking mode with an infinite queue, and assert that all + // values are passed. Quit after it's done. + .then(() => + testWithJSMarshaller({ + threadStarter: "StartThread", + maxQueueSize: 0, + quitAfter: binding.ARRAY_LENGTH, + }), + ) + .then((result) => assert.deepStrictEqual(result, expectedArray)) + + // Start the thread in non-blocking mode, and assert that all values are passed. + // Quit after it's done. + .then(() => + testWithJSMarshaller({ + threadStarter: "StartThreadNonblocking", + maxQueueSize: binding.MAX_QUEUE_SIZE, + quitAfter: binding.ARRAY_LENGTH, + }), + ) + .then((result) => assert.deepStrictEqual(result, expectedArray)) + + // // Start the thread in blocking mode, and assert that all values are passed. + // // Quit early, but let the thread finish. + .then(() => + testWithJSMarshaller({ + threadStarter: "StartThread", + maxQueueSize: binding.MAX_QUEUE_SIZE, + quitAfter: 1, + }), + ) + .then((result) => assert.deepStrictEqual(result, expectedArray)) + + // // Start the thread in blocking mode with an infinite queue, and assert that all + // // values are passed. Quit early, but let the thread finish. + .then(() => + testWithJSMarshaller({ + threadStarter: "StartThread", + maxQueueSize: 0, + quitAfter: 1, + }), + ) + .then((result) => assert.deepStrictEqual(result, expectedArray)) + + // // Start the thread in non-blocking mode, and assert that all values are passed. + // // Quit early, but let the thread finish. + .then(() => + testWithJSMarshaller({ + threadStarter: "StartThreadNonblocking", + maxQueueSize: binding.MAX_QUEUE_SIZE, + quitAfter: 1, + }), + ) + .then((result) => assert.deepStrictEqual(result, expectedArray)) + + // // Start the thread in blocking mode, and assert that all values are passed. + // // Quit early, but let the thread finish. Launch a secondary thread to test the + // // reference counter incrementing functionality. + .then(() => + testWithJSMarshaller({ + threadStarter: "StartThread", + quitAfter: 1, + maxQueueSize: binding.MAX_QUEUE_SIZE, + launchSecondary: true, + }), + ) + .then((result) => assert.deepStrictEqual(result, expectedArray)) + + // // Start the thread in non-blocking mode, and assert that all values are passed. + // // Quit early, but let the thread finish. Launch a secondary thread to test the + // // reference counter incrementing functionality. + .then(() => + testWithJSMarshaller({ + threadStarter: "StartThreadNonblocking", + quitAfter: 1, + maxQueueSize: binding.MAX_QUEUE_SIZE, + launchSecondary: true, + }), + ) + .then((result) => assert.deepStrictEqual(result, expectedArray)) + + // // Start the thread in blocking mode, and assert that it could not finish. + // // Quit early by aborting. + .then(() => + testWithJSMarshaller({ + threadStarter: "StartThread", + quitAfter: 1, + maxQueueSize: binding.MAX_QUEUE_SIZE, + abort: true, + }), + ) + .then((result) => assert.strictEqual(result.indexOf(0), -1)) + + // Start the thread in blocking mode with an infinite queue, and assert that it + // could not finish. Quit early by aborting. + .then(() => + testWithJSMarshaller({ + threadStarter: "StartThread", + quitAfter: 1, + maxQueueSize: 0, + abort: true, + }), + ) + .then((result) => assert.strictEqual(result.indexOf(0), -1)) + + // // Start the thread in non-blocking mode, and assert that it could not finish. + // // Quit early and aborting. + .then(() => + testWithJSMarshaller({ + threadStarter: "StartThreadNonblocking", + quitAfter: 1, + maxQueueSize: binding.MAX_QUEUE_SIZE, + abort: true, + }), + ) + .then((result) => assert.strictEqual(result.indexOf(0), -1)) + + // // Make sure that threadsafe function isn't stalled when we hit + // // `kMaxIterationCount` in `src/node_api.cc` + .then(() => + testWithJSMarshaller({ + threadStarter: "StartThreadNonblocking", + maxQueueSize: binding.ARRAY_LENGTH >>> 1, + quitAfter: binding.ARRAY_LENGTH, + }), + ) + .then((result) => assert.deepStrictEqual(result, expectedArray)) + ); +}; diff --git a/packages/node-addon-examples/tests/threadsafe_function/binding.gyp b/packages/node-addon-examples/tests/threadsafe_function/binding.gyp new file mode 100644 index 00000000..26cbd2c9 --- /dev/null +++ b/packages/node-addon-examples/tests/threadsafe_function/binding.gyp @@ -0,0 +1,14 @@ +{ + "targets": [ + { + "target_name": "addon", + "cflags!": [ "-fno-exceptions" ], + "cflags_cc!": [ "-fno-exceptions" ], + "sources": [ "addon.cpp" ], + "include_dirs": [ + "