Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Kernel Platform Worker Support #4605

Draft
wants to merge 9 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 1 addition & 4 deletions src/core/worker.c
Original file line number Diff line number Diff line change
Expand Up @@ -96,13 +96,10 @@ QuicWorkerInitialize(
Worker->ExecutionContext.NextTimeUs = UINT64_MAX;
Worker->ExecutionContext.Ready = TRUE;

#ifndef _KERNEL_MODE // Not supported on kernel mode
if (ExecProfile != QUIC_EXECUTION_PROFILE_TYPE_MAX_THROUGHPUT) {
Worker->IsExternal = TRUE;
CxPlatAddExecutionContext(&MsQuicLib.WorkerPool, &Worker->ExecutionContext, PartitionIndex);
} else
#endif // _KERNEL_MODE
{
} else {
uint16_t ThreadFlags;
switch (ExecProfile) {
default:
Expand Down
14 changes: 4 additions & 10 deletions src/inc/quic_platform.h
Original file line number Diff line number Diff line change
Expand Up @@ -436,20 +436,17 @@ typedef struct CXPLAT_WORKER_POOL {

} CXPLAT_WORKER_POOL;

#ifdef _KERNEL_MODE // Not supported on kernel mode
#define CxPlatWorkerPoolInit(WorkerPool) UNREFERENCED_PARAMETER(WorkerPool)
#define CxPlatWorkerPoolUninit(WorkerPool) UNREFERENCED_PARAMETER(WorkerPool)
#else
_IRQL_requires_max_(PASSIVE_LEVEL)
void
CxPlatWorkerPoolInit(
_In_ CXPLAT_WORKER_POOL* WorkerPool
);

_IRQL_requires_max_(PASSIVE_LEVEL)
void
CxPlatWorkerPoolUninit(
_In_ CXPLAT_WORKER_POOL* WorkerPool
);
#endif

//
// General purpose execution context abstraction layer. Used for driving worker
Expand Down Expand Up @@ -524,22 +521,19 @@ typedef struct CXPLAT_EXECUTION_CONTEXT {

} CXPLAT_EXECUTION_CONTEXT;

#ifdef _KERNEL_MODE // Not supported on kernel mode
#define CxPlatAddExecutionContext(WorkerPool, Context, IdealProcessor) CXPLAT_FRE_ASSERT(FALSE)
#define CxPlatWakeExecutionContext(Context) CXPLAT_FRE_ASSERT(FALSE)
#else
_IRQL_requires_max_(PASSIVE_LEVEL)
void
CxPlatAddExecutionContext(
_In_ CXPLAT_WORKER_POOL* WorkerPool,
_Inout_ CXPLAT_EXECUTION_CONTEXT* Context,
_In_ uint16_t Index // Into the execution config processor array
);

_IRQL_requires_max_(PASSIVE_LEVEL)
void
CxPlatWakeExecutionContext(
_In_ CXPLAT_EXECUTION_CONTEXT* Context
);
#endif

//
// The "type" of the completion queue event is stored as the first uint32_t of
Expand Down
91 changes: 77 additions & 14 deletions src/inc/quic_platform_winkernel.h
Original file line number Diff line number Diff line change
Expand Up @@ -452,6 +452,7 @@ typedef KEVENT CXPLAT_EVENT;
#define CxPlatEventUninitialize(Event) UNREFERENCED_PARAMETER(Event)
#define CxPlatEventSet(Event) KeSetEvent(&(Event), IO_NO_INCREMENT, FALSE)
#define CxPlatEventReset(Event) KeResetEvent(&(Event))
#define CxPlatEventClear(Event) KeClearEvent(&(Event))
#define CxPlatEventWaitForever(Event) \
KeWaitForSingleObject(&(Event), Executive, KernelMode, FALSE, NULL)
inline
Expand All @@ -473,56 +474,118 @@ _CxPlatEventWaitWithTimeout(
// Event Queue Interfaces
//

typedef KEVENT CXPLAT_EVENTQ; // Event queue
typedef void* CXPLAT_CQE;
typedef struct CXPLAT_EVENTQ {
CXPLAT_LOCK Lock;
LIST_ENTRY Events;
CXPLAT_EVENT EventsAvailable;
} CXPLAT_EVENTQ;

typedef struct CXPLAT_CQE {
void* UserData;
} CXPLAT_CQE;

#define CXPLAT_SQE CXPLAT_SQE
#define CXPLAT_SQE_DEFAULT {0}
typedef struct CXPLAT_SQE {
LIST_ENTRY Link;
void* UserData;
} CXPLAT_SQE;

_IRQL_requires_max_(PASSIVE_LEVEL)
inline
BOOLEAN
CxPlatEventQInitialize(
_Out_ CXPLAT_EVENTQ* queue
)
{
KeInitializeEvent(queue, SynchronizationEvent, FALSE);
CxPlatLockInitialize(&queue->Lock);
InitializeListHead(&queue->Events);
CxPlatEventInitialize(&queue->EventsAvailable, TRUE, FALSE);
return TRUE;
}

_IRQL_requires_max_(PASSIVE_LEVEL)
inline
void
CxPlatEventQCleanup(
_In_ CXPLAT_EVENTQ* queue
)
{
UNREFERENCED_PARAMETER(queue);
CxPlatEventUninitialize(queue->EventsAvailable);
CXPLAT_DBG_ASSERT(IsListEmpty(&queue->Events));
CxPlatLockUninitialize(&queue->Lock);
}

_IRQL_requires_max_(PASSIVE_LEVEL)
inline
BOOLEAN
_CxPlatEventQEnqueue(
CxPlatEventQEnqueue(
_In_ CXPLAT_EVENTQ* queue,
_In_ CXPLAT_SQE* sqe,
_In_opt_ void* user_data
)
{
UNREFERENCED_PARAMETER(user_data);
KeSetEvent(queue, IO_NO_INCREMENT, FALSE);
CxPlatLockAcquire(&queue->Lock);

if (sqe->Link.Flink != NULL) { // Already in a queue
CxPlatLockRelease(&queue->Lock);
return TRUE;
}

sqe->UserData = user_data;
BOOLEAN SignalEvent = IsListEmpty(&queue->Events);
InsertTailList(&queue->Events, &sqe->Link);

CxPlatLockRelease(&queue->Lock);

if (SignalEvent) {
CxPlatEventSet(queue->EventsAvailable);
}

return TRUE;
}

#define CxPlatEventQEnqueue(queue, sqe, user_data) _CxPlatEventQEnqueue(queue, user_data)

_IRQL_requires_max_(PASSIVE_LEVEL)
inline
uint32_t
CxPlatEventQDequeue(
_In_ CXPLAT_EVENTQ* queue,
_Out_ CXPLAT_CQE* events,
_Out_writes_to_(count, return) CXPLAT_CQE* events,
_In_ uint32_t count,
_In_ uint32_t wait_time // milliseconds
)
{
UNREFERENCED_PARAMETER(count);
*events = NULL;
return STATUS_SUCCESS == _CxPlatEventWaitWithTimeout(queue, wait_time) ? 1 : 0;
CxPlatLockAcquire(&queue->Lock);

if (IsListEmpty(&queue->Events)) {
CxPlatEventClear(queue->EventsAvailable);
CxPlatLockRelease(&queue->Lock);
if (wait_time == 0) {
return 0;
}
if (wait_time == UINT32_MAX) {
CxPlatEventWaitForever(queue->EventsAvailable);
} else {
CxPlatEventWaitWithTimeout(queue->EventsAvailable, wait_time);
}
CxPlatLockAcquire(&queue->Lock);
}

uint32_t EventsDequeued = 0;
while (EventsDequeued < count && !IsListEmpty(&queue->Events)) {
CXPLAT_SQE* Sqe =
CXPLAT_CONTAINING_RECORD(
RemoveHeadList(&queue->Events), CXPLAT_SQE, Link);
events[EventsDequeued++].UserData = Sqe->UserData;
Sqe->Link.Flink = NULL; // Indicates it's not in a queue
}

CxPlatLockRelease(&queue->Lock);

return EventsDequeued;
}

_IRQL_requires_max_(PASSIVE_LEVEL)
inline
void
CxPlatEventQReturn(
Expand All @@ -540,7 +603,7 @@ CxPlatCqeUserData(
_In_ const CXPLAT_CQE* cqe
)
{
return *cqe;
return cqe->UserData;
}

//
Expand Down
6 changes: 5 additions & 1 deletion src/platform/datapath_winkernel.c
Original file line number Diff line number Diff line change
Expand Up @@ -818,7 +818,6 @@ CxPlatDataPathInitialize(
_Out_ CXPLAT_DATAPATH* *NewDataPath
)
{
UNREFERENCED_PARAMETER(WorkerPool);
QUIC_STATUS Status;
WSK_CLIENT_NPI WskClientNpi = { NULL, &WskAppDispatch };
uint32_t DatapathLength;
Expand Down Expand Up @@ -847,6 +846,11 @@ CxPlatDataPathInitialize(
}
}

if (!CxPlatWorkerPoolLazyStart(WorkerPool, Config)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Launching threads here can be waste of resources? because pure winkernel doesn't use them.
Should be put in raw datapath initialization? but no need to fix in this PR I think.

Status = QUIC_STATUS_OUT_OF_MEMORY;
goto Exit;
}

DatapathLength =
sizeof(CXPLAT_DATAPATH) +
CxPlatProcCount() * sizeof(CXPLAT_DATAPATH_PROC_CONTEXT);
Expand Down
1 change: 1 addition & 0 deletions src/platform/platform.kernel.vcxproj
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
<ClCompile Include="hashtable.c" />
<ClCompile Include="pcp.c" />
<ClCompile Include="platform_winkernel.c" />
<ClCompile Include="platform_worker.c" />
<ClCompile Include="storage_winkernel.c" />
<ClCompile Include="tls_schannel.c" />
<ClCompile Include="toeplitz.c" />
Expand Down
7 changes: 6 additions & 1 deletion src/platform/platform_internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -648,8 +648,9 @@ CxPlatCryptUninitialize(

//
// Platform Worker APIs
//
//

_IRQL_requires_max_(PASSIVE_LEVEL)
BOOLEAN
CxPlatWorkerPoolLazyStart(
_In_ CXPLAT_WORKER_POOL* WorkerPool,
Expand All @@ -662,10 +663,14 @@ CxPlatWorkerPoolGetEventQ(
_In_ uint16_t Index // Into the config processor array
);

#ifdef _KERNEL_MODE // Not supported on kernel mode (yet)
#define CxPlatDataPathProcessCqe(X) CXPLAT_FRE_ASSERT(FALSE)
ami-GS marked this conversation as resolved.
Show resolved Hide resolved
#else
void
CxPlatDataPathProcessCqe(
_In_ CXPLAT_CQE* Cqe
);
#endif

BOOLEAN // Returns FALSE no work was done.
CxPlatDataPathPoll(
Expand Down
16 changes: 15 additions & 1 deletion src/platform/platform_worker.c
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,7 @@ typedef struct QUIC_CACHEALIGN CXPLAT_WORKER {

CXPLAT_THREAD_CALLBACK(CxPlatWorkerThread, Context);

_IRQL_requires_max_(PASSIVE_LEVEL)
void
CxPlatWorkerPoolInit(
_In_ CXPLAT_WORKER_POOL* WorkerPool
Expand All @@ -119,6 +120,7 @@ CxPlatWorkerPoolInit(
#pragma warning(push)
#pragma warning(disable:6385)
#pragma warning(disable:6386) // SAL is confused about the worker size
_IRQL_requires_max_(PASSIVE_LEVEL)
BOOLEAN
CxPlatWorkerPoolLazyStart(
_In_ CXPLAT_WORKER_POOL* WorkerPool,
Expand All @@ -143,7 +145,7 @@ CxPlatWorkerPoolLazyStart(
CXPLAT_DBG_ASSERT(WorkerPool->WorkerCount > 0 && WorkerPool->WorkerCount <= UINT16_MAX);

const size_t WorkersSize = sizeof(CXPLAT_WORKER) * WorkerPool->WorkerCount;
WorkerPool->Workers = (CXPLAT_WORKER*)CXPLAT_ALLOC_PAGED(WorkersSize, QUIC_POOL_PLATFORM_WORKER);
WorkerPool->Workers = (CXPLAT_WORKER*)CXPLAT_ALLOC_NONPAGED(WorkersSize, QUIC_POOL_PLATFORM_WORKER);
if (WorkerPool->Workers == NULL) {
QuicTraceEvent(
AllocFailure,
Expand Down Expand Up @@ -280,6 +282,7 @@ CxPlatWorkerPoolLazyStart(
}
#pragma warning(pop)

_IRQL_requires_max_(PASSIVE_LEVEL)
void
CxPlatWorkerPoolUninit(
_In_ CXPLAT_WORKER_POOL* WorkerPool
Expand Down Expand Up @@ -321,6 +324,8 @@ CxPlatWorkerPoolUninit(
CxPlatLockUninitialize(&WorkerPool->WorkerLock);
}

#ifndef _KERNEL_MODE // Not supported on kernel mode

#define DYNAMIC_POOL_PROCESSING_PERIOD 1000000 // 1 second
#define DYNAMIC_POOL_PRUNE_COUNT 8

Expand Down Expand Up @@ -383,6 +388,8 @@ CxPlatProcessDynamicPoolAllocators(
CxPlatLockRelease(&Worker->ECLock);
}

#endif // _KERNEL_MODE

CXPLAT_EVENTQ*
CxPlatWorkerPoolGetEventQ(
_In_ const CXPLAT_WORKER_POOL* WorkerPool,
Expand All @@ -394,6 +401,7 @@ CxPlatWorkerPoolGetEventQ(
return &WorkerPool->Workers[Index].EventQ;
}

_IRQL_requires_max_(PASSIVE_LEVEL)
void
CxPlatAddExecutionContext(
_In_ CXPLAT_WORKER_POOL* WorkerPool,
Expand All @@ -420,6 +428,7 @@ CxPlatAddExecutionContext(
}
}

_IRQL_requires_max_(PASSIVE_LEVEL)
void
CxPlatWakeExecutionContext(
_In_ CXPLAT_EXECUTION_CONTEXT* Context
Expand All @@ -431,6 +440,7 @@ CxPlatWakeExecutionContext(
}
}

_IRQL_requires_max_(PASSIVE_LEVEL)
void
CxPlatUpdateExecutionContexts(
_In_ CXPLAT_WORKER* Worker
Expand All @@ -452,6 +462,7 @@ CxPlatUpdateExecutionContexts(
}
}

_IRQL_requires_max_(PASSIVE_LEVEL)
void
CxPlatRunExecutionContexts(
_In_ CXPLAT_WORKER* Worker,
Expand Down Expand Up @@ -509,6 +520,7 @@ CxPlatRunExecutionContexts(
}
}

_IRQL_requires_max_(PASSIVE_LEVEL)
BOOLEAN
CxPlatProcessEvents(
_In_ CXPLAT_WORKER* Worker,
Expand Down Expand Up @@ -593,10 +605,12 @@ CXPLAT_THREAD_CALLBACK(CxPlatWorkerThread, Context)
State.NoWorkCount = 0;
}

#ifndef _KERNEL_MODE // Unnecessary on kernel mode
if (State.TimeNow - State.LastPoolProcessTime > DYNAMIC_POOL_PROCESSING_PERIOD) {
CxPlatProcessDynamicPoolAllocators(Worker);
State.LastPoolProcessTime = State.TimeNow;
}
#endif // _KERNEL_MODE
}

Shutdown:
Expand Down
Loading