diff --git a/src/flow-hash.c b/src/flow-hash.c index ddab01cd5b6..19a58d109e6 100644 --- a/src/flow-hash.c +++ b/src/flow-hash.c @@ -763,10 +763,9 @@ static Flow *TcpReuseReplace(ThreadVars *tv, FlowLookupStruct *fls, FlowBucket * #ifdef UNITTESTS } #endif - /* time out immediately */ - old_f->timeout_at = 0; /* get some settings that we move over to the new flow */ FlowThreadId thread_id[2] = { old_f->thread_id[0], old_f->thread_id[1] }; + old_f->flow_end_flags |= FLOW_END_FLAG_TCPREUSE; /* flow is unlocked by caller */ @@ -831,19 +830,52 @@ static inline void MoveToWorkQueue(ThreadVars *tv, FlowLookupStruct *fls, } } -static inline bool FlowIsTimedOut(const Flow *f, const uint32_t sec, const bool emerg) +static inline bool FlowIsTimedOut( + const FlowThreadId ftid, const Flow *f, const SCTime_t pktts, const bool emerg) { - if (unlikely(f->timeout_at < sec)) { - return true; - } else if (unlikely(emerg)) { - extern FlowProtoTimeout flow_timeouts_delta[FLOW_PROTO_MAX]; + SCTime_t timesout_at; + if (emerg) { + extern FlowProtoTimeout flow_timeouts_emerg[FLOW_PROTO_MAX]; + timesout_at = SCTIME_ADD_SECS(f->lastts, + FlowGetFlowTimeoutDirect(flow_timeouts_emerg, f->flow_state, f->protomap)); + } else { + timesout_at = SCTIME_ADD_SECS(f->lastts, f->timeout_policy); + } + /* if time is live, we just use the pktts */ + if (TimeModeIsLive()) { + if (SCTIME_CMP_LT(pktts, timesout_at)) { + return false; + } + } else { + if (ftid == f->thread_id[0] || f->thread_id[0] == 0) { + /* do the timeout check */ + if (SCTIME_CMP_LT(pktts, timesout_at)) { + return false; + } + } else { + SCTime_t checkts = TmThreadsGetThreadTime(f->thread_id[0]); + /* do the timeout check */ + if (SCTIME_CMP_LT(checkts, timesout_at)) { + return false; + } + } + } + return true; +} - int64_t timeout_at = f->timeout_at - - FlowGetFlowTimeoutDirect(flow_timeouts_delta, f->flow_state, f->protomap); - if ((int64_t)sec >= timeout_at) - return true; +static inline uint16_t GetTvId(const ThreadVars *tv) +{ + uint16_t tv_id; +#ifdef UNITTESTS + if (RunmodeIsUnittests()) { + tv_id = 0; + } else { + tv_id = (uint16_t)tv->id; } - return false; +#else + tv_id = (uint16_t)tv->id; +#endif + return tv_id; } /** \brief Get Flow for packet @@ -897,40 +929,45 @@ Flow *FlowGetFlowFromHash(ThreadVars *tv, FlowLookupStruct *fls, Packet *p, Flow return f; } + const uint16_t tv_id = GetTvId(tv); const bool emerg = (SC_ATOMIC_GET(flow_flags) & FLOW_EMERGENCY) != 0; const uint32_t fb_nextts = !emerg ? SC_ATOMIC_GET(fb->next_ts) : 0; + const bool timeout_check = (fb_nextts <= (uint32_t)SCTIME_SECS(p->ts)); /* ok, we have a flow in the bucket. Let's find out if it is our flow */ Flow *prev_f = NULL; /* previous flow */ f = fb->head; do { Flow *next_f = NULL; - const bool timedout = (fb_nextts < (uint32_t)SCTIME_SECS(p->ts) && - FlowIsTimedOut(f, (uint32_t)SCTIME_SECS(p->ts), emerg)); - if (timedout) { - FLOWLOCK_WRLOCK(f); - next_f = f->next; - MoveToWorkQueue(tv, fls, fb, f, prev_f); - FLOWLOCK_UNLOCK(f); - goto flow_removed; - } else if (FlowCompare(f, p) != 0) { + const bool our_flow = FlowCompare(f, p) != 0; + if (our_flow || timeout_check) { FLOWLOCK_WRLOCK(f); - /* found a matching flow that is not timed out */ - if (unlikely(TcpSessionPacketSsnReuse(p, f, f->protoctx))) { - Flow *new_f = TcpReuseReplace(tv, fls, fb, f, hash, p); - if (prev_f == NULL) /* if we have no prev it means new_f is now our prev */ - prev_f = new_f; - MoveToWorkQueue(tv, fls, fb, f, prev_f); /* evict old flow */ - FLOWLOCK_UNLOCK(f); /* unlock old replaced flow */ - - if (new_f == NULL) { - FBLOCK_UNLOCK(fb); - return NULL; + const bool timedout = (timeout_check && FlowIsTimedOut(tv_id, f, p->ts, emerg)); + if (timedout) { + next_f = f->next; + MoveToWorkQueue(tv, fls, fb, f, prev_f); + FLOWLOCK_UNLOCK(f); + goto flow_removed; + } else if (our_flow) { + /* found a matching flow that is not timed out */ + if (unlikely(TcpSessionPacketSsnReuse(p, f, f->protoctx))) { + Flow *new_f = TcpReuseReplace(tv, fls, fb, f, hash, p); + if (prev_f == NULL) /* if we have no prev it means new_f is now our prev */ + prev_f = new_f; + MoveToWorkQueue(tv, fls, fb, f, prev_f); /* evict old flow */ + FLOWLOCK_UNLOCK(f); /* unlock old replaced flow */ + + if (new_f == NULL) { + FBLOCK_UNLOCK(fb); + return NULL; + } + f = new_f; } - f = new_f; + FlowReference(dest, f); + FBLOCK_UNLOCK(fb); + return f; /* return w/o releasing flow lock */ + } else { + FLOWLOCK_UNLOCK(f); } - FlowReference(dest, f); - FBLOCK_UNLOCK(fb); - return f; /* return w/o releasing flow lock */ } /* unless we removed 'f', prev_f needs to point to * current 'f' when adding a new flow below. */ diff --git a/src/flow-manager.c b/src/flow-manager.c index 05b791ee612..60e2eb2b137 100644 --- a/src/flow-manager.c +++ b/src/flow-manager.c @@ -173,6 +173,9 @@ void FlowDisableFlowManagerThread(void) /** \internal * \brief check if a flow is timed out + * Takes lastts, adds the timeout policy to it, compared to current time `ts`. + * In case of emergency mode, timeout_policy is ignored and the emerg table + * is used. * * \param f flow * \param ts timestamp @@ -182,17 +185,30 @@ void FlowDisableFlowManagerThread(void) */ static bool FlowManagerFlowTimeout(Flow *f, SCTime_t ts, uint32_t *next_ts, const bool emerg) { - uint32_t flow_times_out_at = f->timeout_at; + SCTime_t timesout_at; // = f->lastts; + if (emerg) { - extern FlowProtoTimeout flow_timeouts_delta[FLOW_PROTO_MAX]; - flow_times_out_at -= FlowGetFlowTimeoutDirect(flow_timeouts_delta, f->flow_state, f->protomap); + extern FlowProtoTimeout flow_timeouts_emerg[FLOW_PROTO_MAX]; + timesout_at = SCTIME_ADD_SECS(f->lastts, + FlowGetFlowTimeoutDirect(flow_timeouts_emerg, f->flow_state, f->protomap)); + } else { + timesout_at = SCTIME_ADD_SECS(f->lastts, f->timeout_policy); } - if (*next_ts == 0 || flow_times_out_at < *next_ts) - *next_ts = flow_times_out_at; + if (*next_ts == 0 || (uint32_t)SCTIME_SECS(timesout_at) < *next_ts) + *next_ts = (uint32_t)SCTIME_SECS(timesout_at); - /* do the timeout check */ - if ((uint64_t)flow_times_out_at >= SCTIME_SECS(ts)) { - return false; + /* if time is live, we just use the tts */ + if (TimeModeIsLive() || f->thread_id[0] == 0) { + /* do the timeout check */ + if (SCTIME_CMP_LT(ts, timesout_at)) { + return false; + } + } else { + SCTime_t checkts = TmThreadsGetThreadTime(f->thread_id[0]); + /* do the timeout check */ + if (SCTIME_CMP_LT(checkts, timesout_at)) { + return false; + } } return true; @@ -326,6 +342,8 @@ static void FlowManagerHashRowTimeout(FlowManagerTimeoutThread *td, Flow *f, SCT do { checked++; + FLOWLOCK_WRLOCK(f); + /* check flow timeout based on lastts and state. Both can be * accessed w/o Flow lock as we do have the hash row lock (so flow * can't disappear) and flow_state is atomic. lastts can only @@ -333,6 +351,7 @@ static void FlowManagerHashRowTimeout(FlowManagerTimeoutThread *td, Flow *f, SCT /* timeout logic goes here */ if (FlowManagerFlowTimeout(f, ts, next_ts, emergency) == false) { + FLOWLOCK_UNLOCK(f); counters->flows_notimeout++; prev_f = f; @@ -340,8 +359,6 @@ static void FlowManagerHashRowTimeout(FlowManagerTimeoutThread *td, Flow *f, SCT continue; } - FLOWLOCK_WRLOCK(f); - Flow *next_flow = f->next; #ifdef CAPTURE_OFFLOAD diff --git a/src/flow-util.c b/src/flow-util.c index 7e11da41f52..1d2fe3371d7 100644 --- a/src/flow-util.c +++ b/src/flow-util.c @@ -194,8 +194,6 @@ void FlowInit(Flow *f, const Packet *p) f->protomap = FlowGetProtoMapping(f->proto); f->timeout_policy = FlowGetTimeoutPolicy(f); - const uint32_t timeout_at = (uint32_t)SCTIME_SECS(f->startts) + f->timeout_policy; - f->timeout_at = timeout_at; if (MacSetFlowStorageEnabled()) { DEBUG_VALIDATE_BUG_ON(FlowGetStorageById(f, MacSetGetFlowStorageID()) != NULL); diff --git a/src/flow-util.h b/src/flow-util.h index 2d813bd9ee4..e2c4f56be94 100644 --- a/src/flow-util.h +++ b/src/flow-util.h @@ -41,7 +41,6 @@ (f)->dp = 0; \ (f)->proto = 0; \ (f)->livedev = NULL; \ - (f)->timeout_at = 0; \ (f)->timeout_policy = 0; \ (f)->vlan_idx = 0; \ (f)->next = NULL; \ @@ -88,7 +87,6 @@ (f)->vlan_idx = 0; \ (f)->ffr = 0; \ (f)->next = NULL; \ - (f)->timeout_at = 0; \ (f)->timeout_policy = 0; \ (f)->flow_state = 0; \ (f)->tenant_id = 0; \ diff --git a/src/flow-worker.c b/src/flow-worker.c index 63de42a2665..3c95fe8789a 100644 --- a/src/flow-worker.c +++ b/src/flow-worker.c @@ -554,11 +554,6 @@ static TmEcode FlowWorker(ThreadVars *tv, Packet *p, void *data) SCLogDebug("packet %"PRIu64, p->pcap_cnt); - /* update time */ - if (!(PKT_IS_PSEUDOPKT(p))) { - TimeSetByThread(tv->id, p->ts); - } - /* handle Flow */ if (p->flags & PKT_WANTS_FLOW) { FLOWWORKER_PROFILING_START(p, PROFILE_FLOWWORKER_FLOW); @@ -567,6 +562,10 @@ static TmEcode FlowWorker(ThreadVars *tv, Packet *p, void *data) if (likely(p->flow != NULL)) { DEBUG_ASSERT_FLOW_LOCKED(p->flow); if (FlowUpdate(tv, fw, p) == TM_ECODE_DONE) { + /* update time */ + if (!(PKT_IS_PSEUDOPKT(p))) { + TimeSetByThread(tv->id, p->ts); + } goto housekeeping; } } @@ -581,6 +580,11 @@ static TmEcode FlowWorker(ThreadVars *tv, Packet *p, void *data) DEBUG_VALIDATE_BUG_ON(p->pkt_src != PKT_SRC_FFR); } + /* update time */ + if (!(PKT_IS_PSEUDOPKT(p))) { + TimeSetByThread(tv->id, p->ts); + } + SCLogDebug("packet %"PRIu64" has flow? %s", p->pcap_cnt, p->flow ? "yes" : "no"); /* handle TCP and app layer */ diff --git a/src/flow.c b/src/flow.c index 7bfa80ea0a9..ef8cbb5807b 100644 --- a/src/flow.c +++ b/src/flow.c @@ -396,10 +396,6 @@ void FlowHandlePacketUpdate(Flow *f, Packet *p, ThreadVars *tv, DecodeThreadVars /* update the last seen timestamp of this flow */ if (SCTIME_CMP_GT(p->ts, f->lastts)) { f->lastts = p->ts; - const uint32_t timeout_at = (uint32_t)SCTIME_SECS(f->lastts) + f->timeout_policy; - if (timeout_at != f->timeout_at) { - f->timeout_at = timeout_at; - } } #ifdef CAPTURE_OFFLOAD } else { @@ -1166,9 +1162,6 @@ void FlowUpdateState(Flow *f, const enum FlowState s) const uint32_t timeout_policy = FlowGetTimeoutPolicy(f); if (timeout_policy != f->timeout_policy) { f->timeout_policy = timeout_policy; - const uint32_t timeout_at = (uint32_t)SCTIME_SECS(f->lastts) + timeout_policy; - if (timeout_at != f->timeout_at) - f->timeout_at = timeout_at; } } #ifdef UNITTESTS diff --git a/src/flow.h b/src/flow.h index 554f9fca4a3..3c7bdda0ef8 100644 --- a/src/flow.h +++ b/src/flow.h @@ -244,6 +244,7 @@ typedef struct AppLayerParserState_ AppLayerParserState; #define FLOW_END_FLAG_TIMEOUT 0x02 #define FLOW_END_FLAG_FORCED 0x04 #define FLOW_END_FLAG_SHUTDOWN 0x08 +#define FLOW_END_FLAG_TCPREUSE 0x10 /** Mutex or RWLocks for the flow. */ //#define FLOWLOCK_RWLOCK @@ -390,11 +391,6 @@ typedef struct Flow_ uint8_t ffr; }; - /** timestamp in seconds of the moment this flow will timeout - * according to the timeout policy. Does *not* take emergency - * mode into account. */ - uint32_t timeout_at; - /** Thread ID for the stream/detect portion of this flow */ FlowThreadId thread_id[2]; @@ -407,6 +403,8 @@ typedef struct Flow_ /** timeout policy value in seconds to add to the lastts.tv_sec * when a packet has been received. */ + /** timeout in seconds by policy, add to lastts to get actual time this times out. Ignored in + * emergency mode. */ uint32_t timeout_policy; /* time stamp of last update (last packet). Set/updated under the diff --git a/src/output-eve-stream.c b/src/output-eve-stream.c index fcdf0c2e5c0..9aec690741c 100644 --- a/src/output-eve-stream.c +++ b/src/output-eve-stream.c @@ -157,6 +157,7 @@ static OutputInitResult EveStreamLogInitCtxSub(ConfNode *conf, OutputCtx *parent ctx->trigger_flags |= SetFlag(conf, "state-update", STREAM_PKT_FLAG_STATE_UPDATE); ctx->trigger_flags |= SetFlag(conf, "spurious-retransmission", STREAM_PKT_FLAG_SPURIOUS_RETRANSMISSION); + ctx->trigger_flags |= SetFlag(conf, "tcp-port-reuse", STREAM_PKT_FLAG_TCP_PORT_REUSE); ctx->trigger_flags |= SetFlag(conf, "all", 0xFFFF); SCLogDebug("trigger_flags %04x", ctx->trigger_flags); diff --git a/src/output-json-flow.c b/src/output-json-flow.c index f7826734f0c..510862d8b39 100644 --- a/src/output-json-flow.c +++ b/src/output-json-flow.c @@ -258,7 +258,9 @@ static void EveFlowLogJSON(OutputJsonThreadCtx *aft, JsonBuilder *jb, Flow *f) } const char *reason = NULL; - if (f->flow_end_flags & FLOW_END_FLAG_FORCED) + if (f->flow_end_flags & FLOW_END_FLAG_TCPREUSE) + reason = "tcp_reuse"; + else if (f->flow_end_flags & FLOW_END_FLAG_FORCED) reason = "forced"; else if (f->flow_end_flags & FLOW_END_FLAG_SHUTDOWN) reason = "shutdown"; diff --git a/src/runmodes.c b/src/runmodes.c index b326a96e3a6..e3e7a93311e 100644 --- a/src/runmodes.c +++ b/src/runmodes.c @@ -436,6 +436,7 @@ void RunModeDispatch(int runmode, const char *custom_mode, const char *capture_p BypassedFlowManagerThreadSpawn(); } StatsSpawnThreads(); + TmThreadsSealThreads(); } } diff --git a/src/suricata.c b/src/suricata.c index 0de8039be1f..e3977504b17 100644 --- a/src/suricata.c +++ b/src/suricata.c @@ -2208,7 +2208,6 @@ static int InitSignalHandler(SCInstance *suri) * Will be run once per pcap in unix-socket mode */ void PreRunInit(const int runmode) { - HttpRangeContainersInit(); if (runmode == RUNMODE_UNIX_SOCKET) return; @@ -2231,37 +2230,34 @@ void PreRunInit(const int runmode) AppLayerParserPostStreamSetup(); AppLayerRegisterGlobalCounters(); OutputFilestoreRegisterGlobalCounters(); + HttpRangeContainersInit(); } /* tasks we need to run before packets start flowing, * but after we dropped privs */ void PreRunPostPrivsDropInit(const int runmode) { - StatsSetupPostConfigPreOutput(); - RunModeInitializeOutputs(); - DatasetsInit(); - if (runmode == RUNMODE_UNIX_SOCKET) { - /* As the above did some necessary startup initialization, it - * also setup some outputs where only one is allowed, so - * deinitialize to the state that unix-mode does after every - * pcap. */ - PostRunDeinit(RUNMODE_PCAP_FILE, NULL); return; } + StatsSetupPostConfigPreOutput(); + RunModeInitializeOutputs(); + DatasetsInit(); StatsSetupPostConfigPostOutput(); } -/* clean up / shutdown code for both the main modes and for - * unix socket mode. +/** \brief clean up / shutdown code for packet modes * - * Will be run once per pcap in unix-socket mode */ + * Shuts down packet modes, so regular packet runmodes and the + * per pcap mode in the unix socket. */ void PostRunDeinit(const int runmode, struct timeval *start_time) { if (runmode == RUNMODE_UNIX_SOCKET) return; + TmThreadsUnsealThreads(); + /* needed by FlowWorkToDoCleanup */ PacketPoolInit(); @@ -2964,12 +2960,11 @@ void SuricataInit(void) prerun_snap = SystemHugepageSnapshotCreate(); SCSetStartTime(&suricata); - RunModeDispatch(suricata.run_mode, suricata.runmode_custom_mode, - suricata.capture_plugin_name, suricata.capture_plugin_args); if (suricata.run_mode != RUNMODE_UNIX_SOCKET) { UnixManagerThreadSpawnNonRunmode(suricata.unix_socket_enabled); } - + RunModeDispatch(suricata.run_mode, suricata.runmode_custom_mode, suricata.capture_plugin_name, + suricata.capture_plugin_args); return; out: diff --git a/src/tm-threads.c b/src/tm-threads.c index b0d0f8686ba..a02081f3bc6 100644 --- a/src/tm-threads.c +++ b/src/tm-threads.c @@ -1,4 +1,4 @@ -/* Copyright (C) 2007-2022 Open Information Security Foundation +/* Copyright (C) 2007-2024 Open Information Security Foundation * * You can copy, redistribute or modify this Program under the terms of * the GNU General Public License version 2 as published by the Free @@ -1677,7 +1677,7 @@ TmEcode TmThreadSpawn(ThreadVars *tv) int rc = pthread_create(&tv->t, &attr, tv->tm_func, (void *)tv); if (rc) { - FatalError("Unable to create thread with pthread_create(): retval %d: %s", rc, + FatalError("Unable to create thread %s with pthread_create(): retval %d: %s", tv->name, rc, strerror(errno)); } @@ -2058,17 +2058,19 @@ static void TmThreadDumpThreads(void) } #endif +/* Aligned to CLS to avoid false sharing between atomic ops. */ typedef struct Thread_ { ThreadVars *tv; /**< threadvars structure */ const char *name; int type; int in_use; /**< bool to indicate this is in use */ - SCTime_t pktts; /**< current packet time of this thread - * (offline mode) */ + SC_ATOMIC_DECLARE(SCTime_t, pktts); /**< current packet time of this thread + * (offline mode) */ SCTime_t sys_sec_stamp; /**< timestamp in real system * time when the pktts was last updated. */ -} Thread; + SCSpinlock spin; +} __attribute__((aligned(CLS))) Thread; typedef struct Threads_ { Thread *threads; @@ -2076,9 +2078,26 @@ typedef struct Threads_ { int threads_cnt; } Threads; +static bool thread_store_sealed = false; static Threads thread_store = { NULL, 0, 0 }; static SCMutex thread_store_lock = SCMUTEX_INITIALIZER; +void TmThreadsSealThreads(void) +{ + SCMutexLock(&thread_store_lock); + DEBUG_VALIDATE_BUG_ON(thread_store_sealed); + thread_store_sealed = true; + SCMutexUnlock(&thread_store_lock); +} + +void TmThreadsUnsealThreads(void) +{ + SCMutexLock(&thread_store_lock); + DEBUG_VALIDATE_BUG_ON(!thread_store_sealed); + thread_store_sealed = false; + SCMutexUnlock(&thread_store_lock); +} + void TmThreadsListThreads(void) { SCMutexLock(&thread_store_lock); @@ -2106,6 +2125,7 @@ void TmThreadsListThreads(void) int TmThreadsRegisterThread(ThreadVars *tv, const int type) { SCMutexLock(&thread_store_lock); + DEBUG_VALIDATE_BUG_ON(thread_store_sealed); if (thread_store.threads == NULL) { thread_store.threads = SCCalloc(STEP, sizeof(Thread)); BUG_ON(thread_store.threads == NULL); @@ -2116,10 +2136,13 @@ int TmThreadsRegisterThread(ThreadVars *tv, const int type) for (s = 0; s < thread_store.threads_size; s++) { if (thread_store.threads[s].in_use == 0) { Thread *t = &thread_store.threads[s]; + SCSpinInit(&t->spin, 0); + SCSpinLock(&t->spin); t->name = tv->name; t->type = type; t->tv = tv; t->in_use = 1; + SCSpinUnlock(&t->spin); SCMutexUnlock(&thread_store_lock); return (int)(s+1); @@ -2133,10 +2156,13 @@ int TmThreadsRegisterThread(ThreadVars *tv, const int type) memset((uint8_t *)thread_store.threads + (thread_store.threads_size * sizeof(Thread)), 0x00, STEP * sizeof(Thread)); Thread *t = &thread_store.threads[thread_store.threads_size]; + SCSpinInit(&t->spin, 0); + SCSpinLock(&t->spin); t->name = tv->name; t->type = type; t->tv = tv; t->in_use = 1; + SCSpinUnlock(&t->spin); s = thread_store.threads_size; thread_store.threads_size += STEP; @@ -2149,6 +2175,7 @@ int TmThreadsRegisterThread(ThreadVars *tv, const int type) void TmThreadsUnregisterThread(const int id) { SCMutexLock(&thread_store_lock); + DEBUG_VALIDATE_BUG_ON(thread_store_sealed); if (id <= 0 || id > (int)thread_store.threads_size) { SCMutexUnlock(&thread_store_lock); return; @@ -2181,16 +2208,11 @@ void TmThreadsUnregisterThread(const int id) void TmThreadsSetThreadTimestamp(const int id, const SCTime_t ts) { - SCMutexLock(&thread_store_lock); - if (unlikely(id <= 0 || id > (int)thread_store.threads_size)) { - SCMutexUnlock(&thread_store_lock); - return; - } - + SCTime_t now = SCTimeGetTime(); int idx = id - 1; Thread *t = &thread_store.threads[idx]; - t->pktts = ts; - SCTime_t now = SCTimeGetTime(); + SCSpinLock(&t->spin); + SC_ATOMIC_SET(t->pktts, ts); #ifdef DEBUG if (t->sys_sec_stamp.secs != 0) { @@ -2202,43 +2224,58 @@ void TmThreadsSetThreadTimestamp(const int id, const SCTime_t ts) #endif t->sys_sec_stamp = now; - SCMutexUnlock(&thread_store_lock); + SCSpinUnlock(&t->spin); } bool TmThreadsTimeSubsysIsReady(void) { static SCTime_t nullts = SCTIME_INITIALIZER; bool ready = true; - SCMutexLock(&thread_store_lock); for (size_t s = 0; s < thread_store.threads_size; s++) { Thread *t = &thread_store.threads[s]; - if (!t->in_use) + if (!t->in_use) { break; - if (t->type != TVT_PPT) + } + SCSpinLock(&t->spin); + if (t->type != TVT_PPT) { + SCSpinUnlock(&t->spin); continue; + } if (SCTIME_CMP_EQ(t->sys_sec_stamp, nullts)) { ready = false; + SCSpinUnlock(&t->spin); break; } + SCSpinUnlock(&t->spin); } - SCMutexUnlock(&thread_store_lock); return ready; } void TmThreadsInitThreadsTimestamp(const SCTime_t ts) { SCTime_t now = SCTimeGetTime(); - SCMutexLock(&thread_store_lock); for (size_t s = 0; s < thread_store.threads_size; s++) { Thread *t = &thread_store.threads[s]; - if (!t->in_use) + if (!t->in_use) { break; - if (t->type != TVT_PPT) + } + SCSpinLock(&t->spin); + if (t->type != TVT_PPT) { + SCSpinUnlock(&t->spin); continue; - t->pktts = ts; + } + SC_ATOMIC_SET(t->pktts, ts); t->sys_sec_stamp = now; + SCSpinUnlock(&t->spin); } - SCMutexUnlock(&thread_store_lock); +} + +SCTime_t TmThreadsGetThreadTime(const int idx) +{ + BUG_ON(idx == 0); + const int i = idx - 1; + Thread *t = &thread_store.threads[i]; + return SC_ATOMIC_GET(t->pktts); } void TmThreadsGetMinimalTimestamp(struct timeval *ts) @@ -2246,34 +2283,38 @@ void TmThreadsGetMinimalTimestamp(struct timeval *ts) struct timeval local = { 0 }; static SCTime_t nullts = SCTIME_INITIALIZER; bool set = false; - size_t s; SCTime_t now = SCTimeGetTime(); - SCMutexLock(&thread_store_lock); - for (s = 0; s < thread_store.threads_size; s++) { + for (size_t s = 0; s < thread_store.threads_size; s++) { Thread *t = &thread_store.threads[s]; - if (t->in_use == 0) + if (t->in_use == 0) { break; + } + SCSpinLock(&t->spin); /* only packet threads set timestamps based on packets */ - if (t->type != TVT_PPT) + if (t->type != TVT_PPT) { + SCSpinUnlock(&t->spin); continue; - if (SCTIME_CMP_NEQ(t->pktts, nullts)) { - SCTime_t sys_sec_stamp = SCTIME_ADD_SECS(t->sys_sec_stamp, 1); + } + SCTime_t pktts = SC_ATOMIC_GET(t->pktts); + if (SCTIME_CMP_NEQ(pktts, nullts)) { + SCTime_t sys_sec_stamp = SCTIME_ADD_SECS(t->sys_sec_stamp, 5); /* ignore sleeping threads */ - if (SCTIME_CMP_LT(sys_sec_stamp, now)) + if (SCTIME_CMP_LT(sys_sec_stamp, now)) { + SCSpinUnlock(&t->spin); continue; - + } if (!set) { - SCTIME_TO_TIMEVAL(&local, t->pktts); + SCTIME_TO_TIMEVAL(&local, pktts); set = true; } else { - if (SCTIME_CMP_LT(t->pktts, SCTIME_FROM_TIMEVAL(&local))) { - SCTIME_TO_TIMEVAL(&local, t->pktts); + if (SCTIME_CMP_LT(pktts, SCTIME_FROM_TIMEVAL(&local))) { + SCTIME_TO_TIMEVAL(&local, pktts); } } } + SCSpinUnlock(&t->spin); } - SCMutexUnlock(&thread_store_lock); *ts = local; SCLogDebug("ts->tv_sec %"PRIuMAX, (uintmax_t)ts->tv_sec); } diff --git a/src/tm-threads.h b/src/tm-threads.h index 63fbef85b0a..dde0f2029ee 100644 --- a/src/tm-threads.h +++ b/src/tm-threads.h @@ -1,4 +1,4 @@ -/* Copyright (C) 2007-2011 Open Information Security Foundation +/* Copyright (C) 2007-2024 Open Information Security Foundation * * You can copy, redistribute or modify this Program under the terms of * the GNU General Public License version 2 as published by the Free @@ -276,6 +276,8 @@ static inline void TmThreadsCaptureBreakLoop(ThreadVars *tv) } } +void TmThreadsSealThreads(void); +void TmThreadsUnsealThreads(void); void TmThreadsListThreads(void); int TmThreadsRegisterThread(ThreadVars *tv, const int type); void TmThreadsUnregisterThread(const int id); @@ -284,6 +286,7 @@ void TmThreadsInjectFlowById(Flow *f, const int id); void TmThreadsInitThreadsTimestamp(const SCTime_t ts); void TmThreadsSetThreadTimestamp(const int id, const SCTime_t ts); void TmThreadsGetMinimalTimestamp(struct timeval *ts); +SCTime_t TmThreadsGetThreadTime(const int idx); uint16_t TmThreadsGetWorkerThreadMax(void); bool TmThreadsTimeSubsysIsReady(void);