diff --git a/src/core/worker.c b/src/core/worker.c
index 6338834413..db9977a91f 100644
--- a/src/core/worker.c
+++ b/src/core/worker.c
@@ -96,13 +96,10 @@ QuicWorkerInitialize(
Worker->ExecutionContext.NextTimeUs = UINT64_MAX;
Worker->ExecutionContext.Ready = TRUE;
-#ifndef _KERNEL_MODE // Not supported on kernel mode
if (ExecProfile != QUIC_EXECUTION_PROFILE_TYPE_MAX_THROUGHPUT) {
Worker->IsExternal = TRUE;
CxPlatAddExecutionContext(&MsQuicLib.WorkerPool, &Worker->ExecutionContext, PartitionIndex);
- } else
-#endif // _KERNEL_MODE
- {
+ } else {
uint16_t ThreadFlags;
switch (ExecProfile) {
default:
diff --git a/src/inc/quic_platform.h b/src/inc/quic_platform.h
index 4e56a87fd5..90938dbc20 100644
--- a/src/inc/quic_platform.h
+++ b/src/inc/quic_platform.h
@@ -436,20 +436,17 @@ typedef struct CXPLAT_WORKER_POOL {
} CXPLAT_WORKER_POOL;
-#ifdef _KERNEL_MODE // Not supported on kernel mode
-#define CxPlatWorkerPoolInit(WorkerPool) UNREFERENCED_PARAMETER(WorkerPool)
-#define CxPlatWorkerPoolUninit(WorkerPool) UNREFERENCED_PARAMETER(WorkerPool)
-#else
+_IRQL_requires_max_(PASSIVE_LEVEL)
void
CxPlatWorkerPoolInit(
_In_ CXPLAT_WORKER_POOL* WorkerPool
);
+_IRQL_requires_max_(PASSIVE_LEVEL)
void
CxPlatWorkerPoolUninit(
_In_ CXPLAT_WORKER_POOL* WorkerPool
);
-#endif
//
// General purpose execution context abstraction layer. Used for driving worker
@@ -524,10 +521,7 @@ typedef struct CXPLAT_EXECUTION_CONTEXT {
} CXPLAT_EXECUTION_CONTEXT;
-#ifdef _KERNEL_MODE // Not supported on kernel mode
-#define CxPlatAddExecutionContext(WorkerPool, Context, IdealProcessor) CXPLAT_FRE_ASSERT(FALSE)
-#define CxPlatWakeExecutionContext(Context) CXPLAT_FRE_ASSERT(FALSE)
-#else
+_IRQL_requires_max_(PASSIVE_LEVEL)
void
CxPlatAddExecutionContext(
_In_ CXPLAT_WORKER_POOL* WorkerPool,
@@ -535,11 +529,11 @@ CxPlatAddExecutionContext(
_In_ uint16_t Index // Into the execution config processor array
);
+_IRQL_requires_max_(PASSIVE_LEVEL)
void
CxPlatWakeExecutionContext(
_In_ CXPLAT_EXECUTION_CONTEXT* Context
);
-#endif
//
// Test Interface for loading a self-signed certificate.
diff --git a/src/inc/quic_platform_winkernel.h b/src/inc/quic_platform_winkernel.h
index 065f5e192a..0745a00325 100644
--- a/src/inc/quic_platform_winkernel.h
+++ b/src/inc/quic_platform_winkernel.h
@@ -469,58 +469,142 @@ _CxPlatEventWaitWithTimeout(
#define CxPlatEventWaitWithTimeout(Event, TimeoutMs) \
(STATUS_SUCCESS == _CxPlatEventWaitWithTimeout(&Event, TimeoutMs))
+//
+// TODO: Publish and document the following APIs
+//
+
+// ACCESS_MASK
+#define IO_COMPLETION_QUERY_STATE 0x0001
+#define IO_COMPLETION_MODIFY_STATE 0x0002 // winnt
+#define IO_COMPLETION_ALL_ACCESS (STANDARD_RIGHTS_REQUIRED|SYNCHRONIZE|0x3) // winnt
+
+typedef struct _FILE_IO_COMPLETION_INFORMATION {
+ PVOID KeyContext;
+ PVOID ApcContext;
+ IO_STATUS_BLOCK IoStatusBlock;
+} FILE_IO_COMPLETION_INFORMATION, *PFILE_IO_COMPLETION_INFORMATION;
+
+// copied from zwapi.h
+NTSYSAPI
+NTSTATUS
+NTAPI
+ZwCreateIoCompletion (
+ OUT PHANDLE IoCompletionHandle,
+ IN ACCESS_MASK DesiredAccess,
+ IN OPTIONAL POBJECT_ATTRIBUTES ObjectAttributes,
+ IN OPTIONAL ULONG Count
+ );
+
+NTSYSAPI
+NTSTATUS
+NTAPI
+ZwSetIoCompletion (
+ IN HANDLE IoCompletionHandle,
+ IN PVOID KeyContext,
+ IN PVOID ApcContext,
+ IN NTSTATUS IoStatus,
+ IN ULONG_PTR IoStatusInformation
+ );
+
+NTSYSAPI
+NTSTATUS
+NTAPI
+ZwRemoveIoCompletionEx (
+ IN HANDLE IoCompletionHandle,
+ OUT PFILE_IO_COMPLETION_INFORMATION IoCompletionInformation,
+ IN ULONG Count,
+ OUT PULONG NumEntriesRemoved,
+ IN PLARGE_INTEGER Timeout OPTIONAL,
+ IN BOOLEAN Alertable
+ );
+
//
// Event Queue Interfaces
//
-typedef KEVENT CXPLAT_EVENTQ; // Event queue
-typedef void* CXPLAT_CQE;
+typedef HANDLE CXPLAT_EVENTQ;
+typedef FILE_IO_COMPLETION_INFORMATION CXPLAT_CQE;
+
+typedef
+_IRQL_requires_max_(PASSIVE_LEVEL)
+void
+(CXPLAT_EVENT_COMPLETION)(
+ _In_ CXPLAT_CQE* Cqe
+ );
+typedef CXPLAT_EVENT_COMPLETION *CXPLAT_EVENT_COMPLETION_HANDLER;
+typedef struct CXPLAT_SQE {
+ CXPLAT_EVENT_COMPLETION_HANDLER Completion;
+} CXPLAT_SQE;
+
+_IRQL_requires_max_(PASSIVE_LEVEL)
inline
BOOLEAN
CxPlatEventQInitialize(
_Out_ CXPLAT_EVENTQ* queue
)
{
- KeInitializeEvent(queue, SynchronizationEvent, FALSE);
- return TRUE;
+ return NT_SUCCESS(ZwCreateIoCompletion(queue, IO_COMPLETION_ALL_ACCESS, NULL, 0));
}
+_IRQL_requires_max_(PASSIVE_LEVEL)
inline
void
CxPlatEventQCleanup(
_In_ CXPLAT_EVENTQ* queue
)
{
- UNREFERENCED_PARAMETER(queue);
+ NtClose(*queue);
}
+_IRQL_requires_max_(PASSIVE_LEVEL)
inline
BOOLEAN
-_CxPlatEventQEnqueue(
- _In_ CXPLAT_EVENTQ* queue
+CxPlatEventQEnqueue(
+ _In_ CXPLAT_EVENTQ* queue,
+ _In_ CXPLAT_SQE* sqe
)
{
- KeSetEvent(queue, IO_NO_INCREMENT, FALSE);
- return TRUE;
+ return NT_SUCCESS(ZwSetIoCompletion(*queue, NULL, sqe, STATUS_SUCCESS, 0));
}
-#define CxPlatEventQEnqueue(queue, sqe) _CxPlatEventQEnqueue(queue)
-
+_IRQL_requires_max_(PASSIVE_LEVEL)
inline
uint32_t
CxPlatEventQDequeue(
_In_ CXPLAT_EVENTQ* queue,
- _Out_ CXPLAT_CQE* events,
+ _Out_writes_to_(count, return) CXPLAT_CQE* events,
_In_ uint32_t count,
_In_ uint32_t wait_time // milliseconds
)
{
- UNREFERENCED_PARAMETER(count);
- *events = NULL;
- return STATUS_SUCCESS == _CxPlatEventWaitWithTimeout(queue, wait_time) ? 1 : 0;
+ NTSTATUS status;
+ ULONG entriesRemoved;
+ if (wait_time == UINT32_MAX) {
+ status =
+ ZwRemoveIoCompletionEx(
+ *queue,
+ events,
+ count,
+ &entriesRemoved,
+ NULL,
+ FALSE);
+ } else {
+ LARGE_INTEGER timeout;
+ timeout.QuadPart = -10000LL * wait_time;
+ status =
+ ZwRemoveIoCompletionEx(
+ *queue,
+ events,
+ count,
+ &entriesRemoved,
+ &timeout,
+ FALSE);
+ }
+ return NT_SUCCESS(status) ? entriesRemoved : 0;
}
+_IRQL_requires_max_(PASSIVE_LEVEL)
inline
void
CxPlatEventQReturn(
@@ -533,12 +617,46 @@ CxPlatEventQReturn(
}
inline
-void*
-CxPlatCqeUserData(
+BOOLEAN
+CxPlatSqeInitialize(
+ _In_ CXPLAT_EVENTQ* queue,
+ _In_ CXPLAT_EVENT_COMPLETION_HANDLER completion,
+ _Out_ CXPLAT_SQE* sqe
+ )
+{
+ UNREFERENCED_PARAMETER(queue);
+ sqe->Completion = completion;
+ return TRUE;
+}
+
+inline
+void
+CxPlatSqeInitializeEx(
+ _In_ CXPLAT_EVENT_COMPLETION_HANDLER completion,
+ _Out_ CXPLAT_SQE* sqe
+ )
+{
+ sqe->Completion = completion;
+}
+
+inline
+void
+CxPlatSqeCleanup(
+ _In_ CXPLAT_EVENTQ* queue,
+ _In_ CXPLAT_SQE* sqe
+ )
+{
+ UNREFERENCED_PARAMETER(queue);
+ UNREFERENCED_PARAMETER(sqe);
+}
+
+inline
+CXPLAT_SQE*
+CxPlatCqeGetSqe(
_In_ const CXPLAT_CQE* cqe
)
{
- return *cqe;
+ return (CXPLAT_SQE*)cqe->ApcContext;
}
//
diff --git a/src/platform/datapath_winkernel.c b/src/platform/datapath_winkernel.c
index 57d7862ea3..5db933e6b3 100644
--- a/src/platform/datapath_winkernel.c
+++ b/src/platform/datapath_winkernel.c
@@ -640,7 +640,6 @@ DataPathInitialize(
_Out_ CXPLAT_DATAPATH* *NewDataPath
)
{
- UNREFERENCED_PARAMETER(WorkerPool);
QUIC_STATUS Status;
WSK_CLIENT_NPI WskClientNpi = { NULL, &WskAppDispatch };
uint32_t DatapathLength;
@@ -669,6 +668,11 @@ DataPathInitialize(
}
}
+ if (!CxPlatWorkerPoolLazyStart(WorkerPool, Config)) {
+ Status = QUIC_STATUS_OUT_OF_MEMORY;
+ goto Exit;
+ }
+
DatapathLength =
sizeof(CXPLAT_DATAPATH) +
CxPlatProcCount() * sizeof(CXPLAT_DATAPATH_PROC_CONTEXT);
diff --git a/src/platform/platform.kernel.vcxproj b/src/platform/platform.kernel.vcxproj
index 6e1f4506de..db94c8b944 100644
--- a/src/platform/platform.kernel.vcxproj
+++ b/src/platform/platform.kernel.vcxproj
@@ -28,6 +28,7 @@
+
diff --git a/src/platform/platform_internal.h b/src/platform/platform_internal.h
index 4930ceb614..3458b18b06 100644
--- a/src/platform/platform_internal.h
+++ b/src/platform/platform_internal.h
@@ -759,6 +759,7 @@ CxPlatCryptUninitialize(
// Platform Worker APIs
//
+_IRQL_requires_max_(PASSIVE_LEVEL)
BOOLEAN
CxPlatWorkerPoolLazyStart(
_In_ CXPLAT_WORKER_POOL* WorkerPool,
diff --git a/src/platform/platform_worker.c b/src/platform/platform_worker.c
index 78a139c098..48bcb810cf 100644
--- a/src/platform/platform_worker.c
+++ b/src/platform/platform_worker.c
@@ -121,6 +121,7 @@ WakeCompletion(
UNREFERENCED_PARAMETER(Cqe);
}
+_IRQL_requires_max_(PASSIVE_LEVEL)
void
CxPlatUpdateExecutionContexts(
_In_ CXPLAT_WORKER* Worker
@@ -136,6 +137,7 @@ UpdatePollCompletion(
CxPlatUpdateExecutionContexts(Worker);
}
+_IRQL_requires_max_(PASSIVE_LEVEL)
void
CxPlatWorkerPoolInit(
_In_ CXPLAT_WORKER_POOL* WorkerPool
@@ -149,6 +151,7 @@ CxPlatWorkerPoolInit(
#pragma warning(push)
#pragma warning(disable:6385)
#pragma warning(disable:6386) // SAL is confused about the worker size
+_IRQL_requires_max_(PASSIVE_LEVEL)
BOOLEAN
CxPlatWorkerPoolLazyStart(
_In_ CXPLAT_WORKER_POOL* WorkerPool,
@@ -173,7 +176,7 @@ CxPlatWorkerPoolLazyStart(
CXPLAT_DBG_ASSERT(WorkerPool->WorkerCount > 0 && WorkerPool->WorkerCount <= UINT16_MAX);
const size_t WorkersSize = sizeof(CXPLAT_WORKER) * WorkerPool->WorkerCount;
- WorkerPool->Workers = (CXPLAT_WORKER*)CXPLAT_ALLOC_PAGED(WorkersSize, QUIC_POOL_PLATFORM_WORKER);
+ WorkerPool->Workers = (CXPLAT_WORKER*)CXPLAT_ALLOC_NONPAGED(WorkersSize, QUIC_POOL_PLATFORM_WORKER);
if (WorkerPool->Workers == NULL) {
QuicTraceEvent(
AllocFailure,
@@ -302,6 +305,7 @@ CxPlatWorkerPoolLazyStart(
}
#pragma warning(pop)
+_IRQL_requires_max_(PASSIVE_LEVEL)
void
CxPlatWorkerPoolUninit(
_In_ CXPLAT_WORKER_POOL* WorkerPool
@@ -339,6 +343,8 @@ CxPlatWorkerPoolUninit(
CxPlatLockUninitialize(&WorkerPool->WorkerLock);
}
+#ifndef _KERNEL_MODE // Not supported on kernel mode
+
#define DYNAMIC_POOL_PROCESSING_PERIOD 1000000 // 1 second
#define DYNAMIC_POOL_PRUNE_COUNT 8
@@ -401,6 +407,8 @@ CxPlatProcessDynamicPoolAllocators(
CxPlatLockRelease(&Worker->ECLock);
}
+#endif // _KERNEL_MODE
+
CXPLAT_EVENTQ*
CxPlatWorkerPoolGetEventQ(
_In_ const CXPLAT_WORKER_POOL* WorkerPool,
@@ -412,6 +420,7 @@ CxPlatWorkerPoolGetEventQ(
return &WorkerPool->Workers[Index].EventQ;
}
+_IRQL_requires_max_(PASSIVE_LEVEL)
void
CxPlatAddExecutionContext(
_In_ CXPLAT_WORKER_POOL* WorkerPool,
@@ -435,6 +444,7 @@ CxPlatAddExecutionContext(
}
}
+_IRQL_requires_max_(PASSIVE_LEVEL)
void
CxPlatWakeExecutionContext(
_In_ CXPLAT_EXECUTION_CONTEXT* Context
@@ -446,6 +456,7 @@ CxPlatWakeExecutionContext(
}
}
+_IRQL_requires_max_(PASSIVE_LEVEL)
void
CxPlatUpdateExecutionContexts(
_In_ CXPLAT_WORKER* Worker
@@ -467,6 +478,7 @@ CxPlatUpdateExecutionContexts(
}
}
+_IRQL_requires_max_(PASSIVE_LEVEL)
void
CxPlatRunExecutionContexts(
_In_ CXPLAT_WORKER* Worker,
@@ -524,6 +536,7 @@ CxPlatRunExecutionContexts(
}
}
+_IRQL_requires_max_(PASSIVE_LEVEL)
void
CxPlatProcessEvents(
_In_ CXPLAT_WORKER* Worker,
@@ -591,10 +604,12 @@ CXPLAT_THREAD_CALLBACK(CxPlatWorkerThread, Context)
State.NoWorkCount = 0;
}
+#ifndef _KERNEL_MODE // Unnecessary on kernel mode
if (State.TimeNow - State.LastPoolProcessTime > DYNAMIC_POOL_PROCESSING_PERIOD) {
CxPlatProcessDynamicPoolAllocators(Worker);
State.LastPoolProcessTime = State.TimeNow;
}
+#endif // _KERNEL_MODE
}
Worker->Running = FALSE;