From cf4a3cfd63aa93dc297ffb1c75b74fa0d2b3fb26 Mon Sep 17 00:00:00 2001 From: Victor Julien Date: Thu, 31 Oct 2024 20:26:08 +0100 Subject: [PATCH 01/16] threads: include name in error message When a thread fails to spawn, include the thread name in the error message. --- src/tm-threads.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/tm-threads.c b/src/tm-threads.c index b0d0f8686ba..ad8fb3c2ef3 100644 --- a/src/tm-threads.c +++ b/src/tm-threads.c @@ -1677,7 +1677,7 @@ TmEcode TmThreadSpawn(ThreadVars *tv) int rc = pthread_create(&tv->t, &attr, tv->tm_func, (void *)tv); if (rc) { - FatalError("Unable to create thread with pthread_create(): retval %d: %s", rc, + FatalError("Unable to create thread %s with pthread_create(): retval %d: %s", tv->name, rc, strerror(errno)); } From 8f3c6bd664621fec9fe6d4704f9132dfc4a3b39a Mon Sep 17 00:00:00 2001 From: Victor Julien Date: Thu, 31 Oct 2024 17:41:26 +0100 Subject: [PATCH 02/16] unix/socket: cleanup start up logic No longer init then deinit part of the engine at startup of the unix socket mode. --- src/suricata.c | 25 +++++++++---------------- 1 file changed, 9 insertions(+), 16 deletions(-) diff --git a/src/suricata.c b/src/suricata.c index 0de8039be1f..4319c6f35d8 100644 --- a/src/suricata.c +++ b/src/suricata.c @@ -2208,7 +2208,6 @@ static int InitSignalHandler(SCInstance *suri) * Will be run once per pcap in unix-socket mode */ void PreRunInit(const int runmode) { - HttpRangeContainersInit(); if (runmode == RUNMODE_UNIX_SOCKET) return; @@ -2231,32 +2230,27 @@ void PreRunInit(const int runmode) AppLayerParserPostStreamSetup(); AppLayerRegisterGlobalCounters(); OutputFilestoreRegisterGlobalCounters(); + HttpRangeContainersInit(); } /* tasks we need to run before packets start flowing, * but after we dropped privs */ void PreRunPostPrivsDropInit(const int runmode) { - StatsSetupPostConfigPreOutput(); - RunModeInitializeOutputs(); - DatasetsInit(); - if (runmode == RUNMODE_UNIX_SOCKET) { - /* As the above did some necessary startup initialization, it - * also setup some outputs where only one is allowed, so - * deinitialize to the state that unix-mode does after every - * pcap. */ - PostRunDeinit(RUNMODE_PCAP_FILE, NULL); return; } + StatsSetupPostConfigPreOutput(); + RunModeInitializeOutputs(); + DatasetsInit(); StatsSetupPostConfigPostOutput(); } -/* clean up / shutdown code for both the main modes and for - * unix socket mode. +/** \brief clean up / shutdown code for packet modes * - * Will be run once per pcap in unix-socket mode */ + * Shuts down packet modes, so regular packet runmodes and the + * per pcap mode in the unix socket. */ void PostRunDeinit(const int runmode, struct timeval *start_time) { if (runmode == RUNMODE_UNIX_SOCKET) @@ -2964,12 +2958,11 @@ void SuricataInit(void) prerun_snap = SystemHugepageSnapshotCreate(); SCSetStartTime(&suricata); - RunModeDispatch(suricata.run_mode, suricata.runmode_custom_mode, - suricata.capture_plugin_name, suricata.capture_plugin_args); if (suricata.run_mode != RUNMODE_UNIX_SOCKET) { UnixManagerThreadSpawnNonRunmode(suricata.unix_socket_enabled); } - + RunModeDispatch(suricata.run_mode, suricata.runmode_custom_mode, suricata.capture_plugin_name, + suricata.capture_plugin_args); return; out: From 858453d529e3906a77dd4d1e9c3e320aeb789624 Mon Sep 17 00:00:00 2001 From: Victor Julien Date: Fri, 13 Sep 2024 20:26:53 +0200 Subject: [PATCH 03/16] eve/flow: log tcp reuse as 'reason' --- src/flow-hash.c | 1 + src/flow.h | 1 + src/output-json-flow.c | 4 +++- 3 files changed, 5 insertions(+), 1 deletion(-) diff --git a/src/flow-hash.c b/src/flow-hash.c index ddab01cd5b6..903303320f5 100644 --- a/src/flow-hash.c +++ b/src/flow-hash.c @@ -767,6 +767,7 @@ static Flow *TcpReuseReplace(ThreadVars *tv, FlowLookupStruct *fls, FlowBucket * old_f->timeout_at = 0; /* get some settings that we move over to the new flow */ FlowThreadId thread_id[2] = { old_f->thread_id[0], old_f->thread_id[1] }; + old_f->flow_end_flags |= FLOW_END_FLAG_TCPREUSE; /* flow is unlocked by caller */ diff --git a/src/flow.h b/src/flow.h index 554f9fca4a3..dc3b09afd47 100644 --- a/src/flow.h +++ b/src/flow.h @@ -244,6 +244,7 @@ typedef struct AppLayerParserState_ AppLayerParserState; #define FLOW_END_FLAG_TIMEOUT 0x02 #define FLOW_END_FLAG_FORCED 0x04 #define FLOW_END_FLAG_SHUTDOWN 0x08 +#define FLOW_END_FLAG_TCPREUSE 0x10 /** Mutex or RWLocks for the flow. */ //#define FLOWLOCK_RWLOCK diff --git a/src/output-json-flow.c b/src/output-json-flow.c index f7826734f0c..510862d8b39 100644 --- a/src/output-json-flow.c +++ b/src/output-json-flow.c @@ -258,7 +258,9 @@ static void EveFlowLogJSON(OutputJsonThreadCtx *aft, JsonBuilder *jb, Flow *f) } const char *reason = NULL; - if (f->flow_end_flags & FLOW_END_FLAG_FORCED) + if (f->flow_end_flags & FLOW_END_FLAG_TCPREUSE) + reason = "tcp_reuse"; + else if (f->flow_end_flags & FLOW_END_FLAG_FORCED) reason = "forced"; else if (f->flow_end_flags & FLOW_END_FLAG_SHUTDOWN) reason = "shutdown"; From 44592ece433339511d69e6dc4d97b17eef54cf29 Mon Sep 17 00:00:00 2001 From: Victor Julien Date: Sat, 14 Sep 2024 21:26:45 +0200 Subject: [PATCH 04/16] flow: improve thread safety during timeout checks Timeout checks would access certain fields w/o locking, which could lead to thread safety issues. --- src/flow-hash.c | 5 +++-- src/flow-manager.c | 5 +++-- 2 files changed, 6 insertions(+), 4 deletions(-) diff --git a/src/flow-hash.c b/src/flow-hash.c index 903303320f5..46b33645d4c 100644 --- a/src/flow-hash.c +++ b/src/flow-hash.c @@ -905,16 +905,15 @@ Flow *FlowGetFlowFromHash(ThreadVars *tv, FlowLookupStruct *fls, Packet *p, Flow f = fb->head; do { Flow *next_f = NULL; + FLOWLOCK_WRLOCK(f); const bool timedout = (fb_nextts < (uint32_t)SCTIME_SECS(p->ts) && FlowIsTimedOut(f, (uint32_t)SCTIME_SECS(p->ts), emerg)); if (timedout) { - FLOWLOCK_WRLOCK(f); next_f = f->next; MoveToWorkQueue(tv, fls, fb, f, prev_f); FLOWLOCK_UNLOCK(f); goto flow_removed; } else if (FlowCompare(f, p) != 0) { - FLOWLOCK_WRLOCK(f); /* found a matching flow that is not timed out */ if (unlikely(TcpSessionPacketSsnReuse(p, f, f->protoctx))) { Flow *new_f = TcpReuseReplace(tv, fls, fb, f, hash, p); @@ -932,6 +931,8 @@ Flow *FlowGetFlowFromHash(ThreadVars *tv, FlowLookupStruct *fls, Packet *p, Flow FlowReference(dest, f); FBLOCK_UNLOCK(fb); return f; /* return w/o releasing flow lock */ + } else { + FLOWLOCK_UNLOCK(f); } /* unless we removed 'f', prev_f needs to point to * current 'f' when adding a new flow below. */ diff --git a/src/flow-manager.c b/src/flow-manager.c index 05b791ee612..4711f289f9f 100644 --- a/src/flow-manager.c +++ b/src/flow-manager.c @@ -326,6 +326,8 @@ static void FlowManagerHashRowTimeout(FlowManagerTimeoutThread *td, Flow *f, SCT do { checked++; + FLOWLOCK_WRLOCK(f); + /* check flow timeout based on lastts and state. Both can be * accessed w/o Flow lock as we do have the hash row lock (so flow * can't disappear) and flow_state is atomic. lastts can only @@ -333,6 +335,7 @@ static void FlowManagerHashRowTimeout(FlowManagerTimeoutThread *td, Flow *f, SCT /* timeout logic goes here */ if (FlowManagerFlowTimeout(f, ts, next_ts, emergency) == false) { + FLOWLOCK_UNLOCK(f); counters->flows_notimeout++; prev_f = f; @@ -340,8 +343,6 @@ static void FlowManagerHashRowTimeout(FlowManagerTimeoutThread *td, Flow *f, SCT continue; } - FLOWLOCK_WRLOCK(f); - Flow *next_flow = f->next; #ifdef CAPTURE_OFFLOAD From b1140eab9e03d442e3a6fe46003509f3ce8bee1a Mon Sep 17 00:00:00 2001 From: Victor Julien Date: Wed, 11 Sep 2024 21:11:09 +0200 Subject: [PATCH 05/16] eve/stream: add tcp-port-reuse trigger --- src/output-eve-stream.c | 1 + 1 file changed, 1 insertion(+) diff --git a/src/output-eve-stream.c b/src/output-eve-stream.c index fcdf0c2e5c0..9aec690741c 100644 --- a/src/output-eve-stream.c +++ b/src/output-eve-stream.c @@ -157,6 +157,7 @@ static OutputInitResult EveStreamLogInitCtxSub(ConfNode *conf, OutputCtx *parent ctx->trigger_flags |= SetFlag(conf, "state-update", STREAM_PKT_FLAG_STATE_UPDATE); ctx->trigger_flags |= SetFlag(conf, "spurious-retransmission", STREAM_PKT_FLAG_SPURIOUS_RETRANSMISSION); + ctx->trigger_flags |= SetFlag(conf, "tcp-port-reuse", STREAM_PKT_FLAG_TCP_PORT_REUSE); ctx->trigger_flags |= SetFlag(conf, "all", 0xFFFF); SCLogDebug("trigger_flags %04x", ctx->trigger_flags); From c80afe748fc93f112a612e63f206450f1d38d9a1 Mon Sep 17 00:00:00 2001 From: Victor Julien Date: Wed, 18 Sep 2024 11:15:00 +0200 Subject: [PATCH 06/16] time: getter for SCTime_t timestamp of a thread --- src/tm-threads.c | 8 ++++++++ src/tm-threads.h | 1 + 2 files changed, 9 insertions(+) diff --git a/src/tm-threads.c b/src/tm-threads.c index ad8fb3c2ef3..ead2472c4db 100644 --- a/src/tm-threads.c +++ b/src/tm-threads.c @@ -2241,6 +2241,14 @@ void TmThreadsInitThreadsTimestamp(const SCTime_t ts) SCMutexUnlock(&thread_store_lock); } +SCTime_t TmThreadsGetThreadTime(const int idx) +{ + BUG_ON(idx == 0); + const int i = idx - 1; + Thread *t = &thread_store.threads[i]; + return t->pktts; +} + void TmThreadsGetMinimalTimestamp(struct timeval *ts) { struct timeval local = { 0 }; diff --git a/src/tm-threads.h b/src/tm-threads.h index 63fbef85b0a..1e9d43c2187 100644 --- a/src/tm-threads.h +++ b/src/tm-threads.h @@ -284,6 +284,7 @@ void TmThreadsInjectFlowById(Flow *f, const int id); void TmThreadsInitThreadsTimestamp(const SCTime_t ts); void TmThreadsSetThreadTimestamp(const int id, const SCTime_t ts); void TmThreadsGetMinimalTimestamp(struct timeval *ts); +SCTime_t TmThreadsGetThreadTime(const int idx); uint16_t TmThreadsGetWorkerThreadMax(void); bool TmThreadsTimeSubsysIsReady(void); From 78b88982177cfeba7bf00bcf2d0cceb8f47b56e3 Mon Sep 17 00:00:00 2001 From: Victor Julien Date: Sun, 15 Sep 2024 12:20:21 +0200 Subject: [PATCH 07/16] flow: exact flow timeout Use a more precise calculation for timing out flows, using both the seconds and the micro seconds. --- src/flow-hash.c | 30 +++++++++++++++--------------- src/flow-manager.c | 19 +++++++++++++------ src/flow-util.c | 2 -- src/flow-util.h | 2 -- src/flow.c | 7 ------- src/flow.h | 7 ++----- 6 files changed, 30 insertions(+), 37 deletions(-) diff --git a/src/flow-hash.c b/src/flow-hash.c index 46b33645d4c..a580e03b492 100644 --- a/src/flow-hash.c +++ b/src/flow-hash.c @@ -763,8 +763,6 @@ static Flow *TcpReuseReplace(ThreadVars *tv, FlowLookupStruct *fls, FlowBucket * #ifdef UNITTESTS } #endif - /* time out immediately */ - old_f->timeout_at = 0; /* get some settings that we move over to the new flow */ FlowThreadId thread_id[2] = { old_f->thread_id[0], old_f->thread_id[1] }; old_f->flow_end_flags |= FLOW_END_FLAG_TCPREUSE; @@ -832,19 +830,21 @@ static inline void MoveToWorkQueue(ThreadVars *tv, FlowLookupStruct *fls, } } -static inline bool FlowIsTimedOut(const Flow *f, const uint32_t sec, const bool emerg) +static inline bool FlowIsTimedOut(const Flow *f, const SCTime_t ts, const bool emerg) { - if (unlikely(f->timeout_at < sec)) { - return true; - } else if (unlikely(emerg)) { - extern FlowProtoTimeout flow_timeouts_delta[FLOW_PROTO_MAX]; - - int64_t timeout_at = f->timeout_at - - FlowGetFlowTimeoutDirect(flow_timeouts_delta, f->flow_state, f->protomap); - if ((int64_t)sec >= timeout_at) - return true; + SCTime_t timesout_at; + if (emerg) { + extern FlowProtoTimeout flow_timeouts_emerg[FLOW_PROTO_MAX]; + timesout_at = SCTIME_ADD_SECS(f->lastts, + FlowGetFlowTimeoutDirect(flow_timeouts_emerg, f->flow_state, f->protomap)); + } else { + timesout_at = SCTIME_ADD_SECS(f->lastts, f->timeout_policy); } - return false; + /* do the timeout check */ + if (SCTIME_CMP_LT(ts, timesout_at)) { + return false; + } + return true; } /** \brief Get Flow for packet @@ -906,8 +906,8 @@ Flow *FlowGetFlowFromHash(ThreadVars *tv, FlowLookupStruct *fls, Packet *p, Flow do { Flow *next_f = NULL; FLOWLOCK_WRLOCK(f); - const bool timedout = (fb_nextts < (uint32_t)SCTIME_SECS(p->ts) && - FlowIsTimedOut(f, (uint32_t)SCTIME_SECS(p->ts), emerg)); + const bool timedout = + (fb_nextts < (uint32_t)SCTIME_SECS(p->ts) && FlowIsTimedOut(f, p->ts, emerg)); if (timedout) { next_f = f->next; MoveToWorkQueue(tv, fls, fb, f, prev_f); diff --git a/src/flow-manager.c b/src/flow-manager.c index 4711f289f9f..a93281f305b 100644 --- a/src/flow-manager.c +++ b/src/flow-manager.c @@ -173,6 +173,9 @@ void FlowDisableFlowManagerThread(void) /** \internal * \brief check if a flow is timed out + * Takes lastts, adds the timeout policy to it, compared to current time `ts`. + * In case of emergency mode, timeout_policy is ignored and the emerg table + * is used. * * \param f flow * \param ts timestamp @@ -182,16 +185,20 @@ void FlowDisableFlowManagerThread(void) */ static bool FlowManagerFlowTimeout(Flow *f, SCTime_t ts, uint32_t *next_ts, const bool emerg) { - uint32_t flow_times_out_at = f->timeout_at; + SCTime_t timesout_at; // = f->lastts; + if (emerg) { - extern FlowProtoTimeout flow_timeouts_delta[FLOW_PROTO_MAX]; - flow_times_out_at -= FlowGetFlowTimeoutDirect(flow_timeouts_delta, f->flow_state, f->protomap); + extern FlowProtoTimeout flow_timeouts_emerg[FLOW_PROTO_MAX]; + timesout_at = SCTIME_ADD_SECS(f->lastts, + FlowGetFlowTimeoutDirect(flow_timeouts_emerg, f->flow_state, f->protomap)); + } else { + timesout_at = SCTIME_ADD_SECS(f->lastts, f->timeout_policy); } - if (*next_ts == 0 || flow_times_out_at < *next_ts) - *next_ts = flow_times_out_at; + if (*next_ts == 0 || (uint32_t)SCTIME_SECS(timesout_at) < *next_ts) + *next_ts = (uint32_t)SCTIME_SECS(timesout_at); /* do the timeout check */ - if ((uint64_t)flow_times_out_at >= SCTIME_SECS(ts)) { + if (SCTIME_CMP_LT(ts, timesout_at)) { return false; } diff --git a/src/flow-util.c b/src/flow-util.c index 7e11da41f52..1d2fe3371d7 100644 --- a/src/flow-util.c +++ b/src/flow-util.c @@ -194,8 +194,6 @@ void FlowInit(Flow *f, const Packet *p) f->protomap = FlowGetProtoMapping(f->proto); f->timeout_policy = FlowGetTimeoutPolicy(f); - const uint32_t timeout_at = (uint32_t)SCTIME_SECS(f->startts) + f->timeout_policy; - f->timeout_at = timeout_at; if (MacSetFlowStorageEnabled()) { DEBUG_VALIDATE_BUG_ON(FlowGetStorageById(f, MacSetGetFlowStorageID()) != NULL); diff --git a/src/flow-util.h b/src/flow-util.h index 2d813bd9ee4..e2c4f56be94 100644 --- a/src/flow-util.h +++ b/src/flow-util.h @@ -41,7 +41,6 @@ (f)->dp = 0; \ (f)->proto = 0; \ (f)->livedev = NULL; \ - (f)->timeout_at = 0; \ (f)->timeout_policy = 0; \ (f)->vlan_idx = 0; \ (f)->next = NULL; \ @@ -88,7 +87,6 @@ (f)->vlan_idx = 0; \ (f)->ffr = 0; \ (f)->next = NULL; \ - (f)->timeout_at = 0; \ (f)->timeout_policy = 0; \ (f)->flow_state = 0; \ (f)->tenant_id = 0; \ diff --git a/src/flow.c b/src/flow.c index 7bfa80ea0a9..ef8cbb5807b 100644 --- a/src/flow.c +++ b/src/flow.c @@ -396,10 +396,6 @@ void FlowHandlePacketUpdate(Flow *f, Packet *p, ThreadVars *tv, DecodeThreadVars /* update the last seen timestamp of this flow */ if (SCTIME_CMP_GT(p->ts, f->lastts)) { f->lastts = p->ts; - const uint32_t timeout_at = (uint32_t)SCTIME_SECS(f->lastts) + f->timeout_policy; - if (timeout_at != f->timeout_at) { - f->timeout_at = timeout_at; - } } #ifdef CAPTURE_OFFLOAD } else { @@ -1166,9 +1162,6 @@ void FlowUpdateState(Flow *f, const enum FlowState s) const uint32_t timeout_policy = FlowGetTimeoutPolicy(f); if (timeout_policy != f->timeout_policy) { f->timeout_policy = timeout_policy; - const uint32_t timeout_at = (uint32_t)SCTIME_SECS(f->lastts) + timeout_policy; - if (timeout_at != f->timeout_at) - f->timeout_at = timeout_at; } } #ifdef UNITTESTS diff --git a/src/flow.h b/src/flow.h index dc3b09afd47..3c7bdda0ef8 100644 --- a/src/flow.h +++ b/src/flow.h @@ -391,11 +391,6 @@ typedef struct Flow_ uint8_t ffr; }; - /** timestamp in seconds of the moment this flow will timeout - * according to the timeout policy. Does *not* take emergency - * mode into account. */ - uint32_t timeout_at; - /** Thread ID for the stream/detect portion of this flow */ FlowThreadId thread_id[2]; @@ -408,6 +403,8 @@ typedef struct Flow_ /** timeout policy value in seconds to add to the lastts.tv_sec * when a packet has been received. */ + /** timeout in seconds by policy, add to lastts to get actual time this times out. Ignored in + * emergency mode. */ uint32_t timeout_policy; /* time stamp of last update (last packet). Set/updated under the From 9bc1c59ec432e78b4afa103a2f63adfc6feefbf7 Mon Sep 17 00:00:00 2001 From: Victor Julien Date: Sun, 15 Sep 2024 19:15:56 +0200 Subject: [PATCH 08/16] time: thread time update after flow update The flow worker needs to get the opportunity to run the flow update before globally making it's current timestamp available. This is to avoid another thread using the time to evict the flow that is about to get a legitimate update. --- src/flow-worker.c | 14 +++++++++----- 1 file changed, 9 insertions(+), 5 deletions(-) diff --git a/src/flow-worker.c b/src/flow-worker.c index 63de42a2665..3c95fe8789a 100644 --- a/src/flow-worker.c +++ b/src/flow-worker.c @@ -554,11 +554,6 @@ static TmEcode FlowWorker(ThreadVars *tv, Packet *p, void *data) SCLogDebug("packet %"PRIu64, p->pcap_cnt); - /* update time */ - if (!(PKT_IS_PSEUDOPKT(p))) { - TimeSetByThread(tv->id, p->ts); - } - /* handle Flow */ if (p->flags & PKT_WANTS_FLOW) { FLOWWORKER_PROFILING_START(p, PROFILE_FLOWWORKER_FLOW); @@ -567,6 +562,10 @@ static TmEcode FlowWorker(ThreadVars *tv, Packet *p, void *data) if (likely(p->flow != NULL)) { DEBUG_ASSERT_FLOW_LOCKED(p->flow); if (FlowUpdate(tv, fw, p) == TM_ECODE_DONE) { + /* update time */ + if (!(PKT_IS_PSEUDOPKT(p))) { + TimeSetByThread(tv->id, p->ts); + } goto housekeeping; } } @@ -581,6 +580,11 @@ static TmEcode FlowWorker(ThreadVars *tv, Packet *p, void *data) DEBUG_VALIDATE_BUG_ON(p->pkt_src != PKT_SRC_FFR); } + /* update time */ + if (!(PKT_IS_PSEUDOPKT(p))) { + TimeSetByThread(tv->id, p->ts); + } + SCLogDebug("packet %"PRIu64" has flow? %s", p->pcap_cnt, p->flow ? "yes" : "no"); /* handle TCP and app layer */ From a516c42c9f0c2ce18c33b37608c4bd298e1837c0 Mon Sep 17 00:00:00 2001 From: Victor Julien Date: Wed, 18 Sep 2024 12:21:42 +0200 Subject: [PATCH 09/16] threads: use sleeping threads for minimum time a bit longer --- src/tm-threads.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/tm-threads.c b/src/tm-threads.c index ead2472c4db..61c6761a3b6 100644 --- a/src/tm-threads.c +++ b/src/tm-threads.c @@ -2266,7 +2266,7 @@ void TmThreadsGetMinimalTimestamp(struct timeval *ts) if (t->type != TVT_PPT) continue; if (SCTIME_CMP_NEQ(t->pktts, nullts)) { - SCTime_t sys_sec_stamp = SCTIME_ADD_SECS(t->sys_sec_stamp, 1); + SCTime_t sys_sec_stamp = SCTIME_ADD_SECS(t->sys_sec_stamp, 5); /* ignore sleeping threads */ if (SCTIME_CMP_LT(sys_sec_stamp, now)) continue; From 22683e0bf997e8e459caba14e6aec192027eeede Mon Sep 17 00:00:00 2001 From: Victor Julien Date: Mon, 16 Sep 2024 08:54:43 +0200 Subject: [PATCH 10/16] flow: fix fb ts optimization If seconds match a flow can still be timing out. --- src/flow-hash.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/flow-hash.c b/src/flow-hash.c index a580e03b492..53966f3babc 100644 --- a/src/flow-hash.c +++ b/src/flow-hash.c @@ -907,7 +907,7 @@ Flow *FlowGetFlowFromHash(ThreadVars *tv, FlowLookupStruct *fls, Packet *p, Flow Flow *next_f = NULL; FLOWLOCK_WRLOCK(f); const bool timedout = - (fb_nextts < (uint32_t)SCTIME_SECS(p->ts) && FlowIsTimedOut(f, p->ts, emerg)); + (fb_nextts <= (uint32_t)SCTIME_SECS(p->ts) && FlowIsTimedOut(f, p->ts, emerg)); if (timedout) { next_f = f->next; MoveToWorkQueue(tv, fls, fb, f, prev_f); From 54113847501e4855b65ad60f96b939f6f2454474 Mon Sep 17 00:00:00 2001 From: Victor Julien Date: Wed, 18 Sep 2024 12:03:46 +0200 Subject: [PATCH 11/16] flow/worker: improve flow timeout time accuracy When timing out flows, use the timestamp from the "owning" thread. This avoids problems with threads being out of sync with each other. --- src/flow-hash.c | 44 ++++++++++++++++++++++++++++++++++++++------ 1 file changed, 38 insertions(+), 6 deletions(-) diff --git a/src/flow-hash.c b/src/flow-hash.c index 53966f3babc..e84579071f7 100644 --- a/src/flow-hash.c +++ b/src/flow-hash.c @@ -830,7 +830,8 @@ static inline void MoveToWorkQueue(ThreadVars *tv, FlowLookupStruct *fls, } } -static inline bool FlowIsTimedOut(const Flow *f, const SCTime_t ts, const bool emerg) +static inline bool FlowIsTimedOut( + const FlowThreadId ftid, const Flow *f, const SCTime_t pktts, const bool emerg) { SCTime_t timesout_at; if (emerg) { @@ -840,13 +841,43 @@ static inline bool FlowIsTimedOut(const Flow *f, const SCTime_t ts, const bool e } else { timesout_at = SCTIME_ADD_SECS(f->lastts, f->timeout_policy); } - /* do the timeout check */ - if (SCTIME_CMP_LT(ts, timesout_at)) { - return false; + /* if time is live, we just use the pktts */ + if (TimeModeIsLive()) { + if (SCTIME_CMP_LT(pktts, timesout_at)) { + return false; + } + } else { + if (ftid == f->thread_id[0] || f->thread_id[0] == 0) { + /* do the timeout check */ + if (SCTIME_CMP_LT(pktts, timesout_at)) { + return false; + } + } else { + SCTime_t checkts = TmThreadsGetThreadTime(f->thread_id[0]); + /* do the timeout check */ + if (SCTIME_CMP_LT(checkts, timesout_at)) { + return false; + } + } } return true; } +static inline uint16_t GetTvId(const ThreadVars *tv) +{ + uint16_t tv_id; +#ifdef UNITTESTS + if (RunmodeIsUnittests()) { + tv_id = 0; + } else { + tv_id = (uint16_t)tv->id; + } +#else + tv_id = (uint16_t)tv->id; +#endif + return tv_id; +} + /** \brief Get Flow for packet * * Hash retrieval function for flows. Looks up the hash bucket containing the @@ -898,6 +929,7 @@ Flow *FlowGetFlowFromHash(ThreadVars *tv, FlowLookupStruct *fls, Packet *p, Flow return f; } + const uint16_t tv_id = GetTvId(tv); const bool emerg = (SC_ATOMIC_GET(flow_flags) & FLOW_EMERGENCY) != 0; const uint32_t fb_nextts = !emerg ? SC_ATOMIC_GET(fb->next_ts) : 0; /* ok, we have a flow in the bucket. Let's find out if it is our flow */ @@ -906,8 +938,8 @@ Flow *FlowGetFlowFromHash(ThreadVars *tv, FlowLookupStruct *fls, Packet *p, Flow do { Flow *next_f = NULL; FLOWLOCK_WRLOCK(f); - const bool timedout = - (fb_nextts <= (uint32_t)SCTIME_SECS(p->ts) && FlowIsTimedOut(f, p->ts, emerg)); + const bool timedout = (fb_nextts <= (uint32_t)SCTIME_SECS(p->ts) && + FlowIsTimedOut(tv_id, f, p->ts, emerg)); if (timedout) { next_f = f->next; MoveToWorkQueue(tv, fls, fb, f, prev_f); From 663eefc6afe1f6c4eaa06a2ba2e7fc5650829dda Mon Sep 17 00:00:00 2001 From: Victor Julien Date: Tue, 17 Sep 2024 20:52:14 +0200 Subject: [PATCH 12/16] flow/manager: in offline mode, use owning threads time As this may mean that a threads ts is a bit ahead of the minimum time the flow manager normally uses, it can evict flows a bit faster. --- src/flow-manager.c | 15 ++++++++++++--- 1 file changed, 12 insertions(+), 3 deletions(-) diff --git a/src/flow-manager.c b/src/flow-manager.c index a93281f305b..60e2eb2b137 100644 --- a/src/flow-manager.c +++ b/src/flow-manager.c @@ -197,9 +197,18 @@ static bool FlowManagerFlowTimeout(Flow *f, SCTime_t ts, uint32_t *next_ts, cons if (*next_ts == 0 || (uint32_t)SCTIME_SECS(timesout_at) < *next_ts) *next_ts = (uint32_t)SCTIME_SECS(timesout_at); - /* do the timeout check */ - if (SCTIME_CMP_LT(ts, timesout_at)) { - return false; + /* if time is live, we just use the tts */ + if (TimeModeIsLive() || f->thread_id[0] == 0) { + /* do the timeout check */ + if (SCTIME_CMP_LT(ts, timesout_at)) { + return false; + } + } else { + SCTime_t checkts = TmThreadsGetThreadTime(f->thread_id[0]); + /* do the timeout check */ + if (SCTIME_CMP_LT(checkts, timesout_at)) { + return false; + } } return true; From ef672b30ab4fa15ed74fc6dac9eb4b6884c81cd8 Mon Sep 17 00:00:00 2001 From: Victor Julien Date: Tue, 17 Sep 2024 11:02:00 +0200 Subject: [PATCH 13/16] threads: atomic packet timestamp --- src/tm-threads.c | 79 ++++++++++++++++++++++++++++-------------------- 1 file changed, 46 insertions(+), 33 deletions(-) diff --git a/src/tm-threads.c b/src/tm-threads.c index 61c6761a3b6..23a070e7a88 100644 --- a/src/tm-threads.c +++ b/src/tm-threads.c @@ -2064,10 +2064,11 @@ typedef struct Thread_ { int type; int in_use; /**< bool to indicate this is in use */ - SCTime_t pktts; /**< current packet time of this thread - * (offline mode) */ + SC_ATOMIC_DECLARE(SCTime_t, pktts); /**< current packet time of this thread + * (offline mode) */ SCTime_t sys_sec_stamp; /**< timestamp in real system * time when the pktts was last updated. */ + SCSpinlock spin; } Thread; typedef struct Threads_ { @@ -2116,10 +2117,13 @@ int TmThreadsRegisterThread(ThreadVars *tv, const int type) for (s = 0; s < thread_store.threads_size; s++) { if (thread_store.threads[s].in_use == 0) { Thread *t = &thread_store.threads[s]; + SCSpinInit(&t->spin, 0); + SCSpinLock(&t->spin); t->name = tv->name; t->type = type; t->tv = tv; t->in_use = 1; + SCSpinUnlock(&t->spin); SCMutexUnlock(&thread_store_lock); return (int)(s+1); @@ -2133,10 +2137,13 @@ int TmThreadsRegisterThread(ThreadVars *tv, const int type) memset((uint8_t *)thread_store.threads + (thread_store.threads_size * sizeof(Thread)), 0x00, STEP * sizeof(Thread)); Thread *t = &thread_store.threads[thread_store.threads_size]; + SCSpinInit(&t->spin, 0); + SCSpinLock(&t->spin); t->name = tv->name; t->type = type; t->tv = tv; t->in_use = 1; + SCSpinUnlock(&t->spin); s = thread_store.threads_size; thread_store.threads_size += STEP; @@ -2181,16 +2188,11 @@ void TmThreadsUnregisterThread(const int id) void TmThreadsSetThreadTimestamp(const int id, const SCTime_t ts) { - SCMutexLock(&thread_store_lock); - if (unlikely(id <= 0 || id > (int)thread_store.threads_size)) { - SCMutexUnlock(&thread_store_lock); - return; - } - + SCTime_t now = SCTimeGetTime(); int idx = id - 1; Thread *t = &thread_store.threads[idx]; - t->pktts = ts; - SCTime_t now = SCTimeGetTime(); + SCSpinLock(&t->spin); + SC_ATOMIC_SET(t->pktts, ts); #ifdef DEBUG if (t->sys_sec_stamp.secs != 0) { @@ -2202,43 +2204,50 @@ void TmThreadsSetThreadTimestamp(const int id, const SCTime_t ts) #endif t->sys_sec_stamp = now; - SCMutexUnlock(&thread_store_lock); + SCSpinUnlock(&t->spin); } bool TmThreadsTimeSubsysIsReady(void) { static SCTime_t nullts = SCTIME_INITIALIZER; bool ready = true; - SCMutexLock(&thread_store_lock); for (size_t s = 0; s < thread_store.threads_size; s++) { Thread *t = &thread_store.threads[s]; - if (!t->in_use) + if (!t->in_use) { break; - if (t->type != TVT_PPT) + } + SCSpinLock(&t->spin); + if (t->type != TVT_PPT) { + SCSpinUnlock(&t->spin); continue; + } if (SCTIME_CMP_EQ(t->sys_sec_stamp, nullts)) { ready = false; + SCSpinUnlock(&t->spin); break; } + SCSpinUnlock(&t->spin); } - SCMutexUnlock(&thread_store_lock); return ready; } void TmThreadsInitThreadsTimestamp(const SCTime_t ts) { SCTime_t now = SCTimeGetTime(); - SCMutexLock(&thread_store_lock); for (size_t s = 0; s < thread_store.threads_size; s++) { Thread *t = &thread_store.threads[s]; - if (!t->in_use) + if (!t->in_use) { break; - if (t->type != TVT_PPT) + } + SCSpinLock(&t->spin); + if (t->type != TVT_PPT) { + SCSpinUnlock(&t->spin); continue; - t->pktts = ts; + } + SC_ATOMIC_SET(t->pktts, ts); t->sys_sec_stamp = now; + SCSpinUnlock(&t->spin); } - SCMutexUnlock(&thread_store_lock); } SCTime_t TmThreadsGetThreadTime(const int idx) @@ -2246,7 +2255,7 @@ SCTime_t TmThreadsGetThreadTime(const int idx) BUG_ON(idx == 0); const int i = idx - 1; Thread *t = &thread_store.threads[i]; - return t->pktts; + return SC_ATOMIC_GET(t->pktts); } void TmThreadsGetMinimalTimestamp(struct timeval *ts) @@ -2254,34 +2263,38 @@ void TmThreadsGetMinimalTimestamp(struct timeval *ts) struct timeval local = { 0 }; static SCTime_t nullts = SCTIME_INITIALIZER; bool set = false; - size_t s; SCTime_t now = SCTimeGetTime(); - SCMutexLock(&thread_store_lock); - for (s = 0; s < thread_store.threads_size; s++) { + for (size_t s = 0; s < thread_store.threads_size; s++) { Thread *t = &thread_store.threads[s]; - if (t->in_use == 0) + if (t->in_use == 0) { break; + } + SCSpinLock(&t->spin); /* only packet threads set timestamps based on packets */ - if (t->type != TVT_PPT) + if (t->type != TVT_PPT) { + SCSpinUnlock(&t->spin); continue; - if (SCTIME_CMP_NEQ(t->pktts, nullts)) { + } + SCTime_t pktts = SC_ATOMIC_GET(t->pktts); + if (SCTIME_CMP_NEQ(pktts, nullts)) { SCTime_t sys_sec_stamp = SCTIME_ADD_SECS(t->sys_sec_stamp, 5); /* ignore sleeping threads */ - if (SCTIME_CMP_LT(sys_sec_stamp, now)) + if (SCTIME_CMP_LT(sys_sec_stamp, now)) { + SCSpinUnlock(&t->spin); continue; - + } if (!set) { - SCTIME_TO_TIMEVAL(&local, t->pktts); + SCTIME_TO_TIMEVAL(&local, pktts); set = true; } else { - if (SCTIME_CMP_LT(t->pktts, SCTIME_FROM_TIMEVAL(&local))) { - SCTIME_TO_TIMEVAL(&local, t->pktts); + if (SCTIME_CMP_LT(pktts, SCTIME_FROM_TIMEVAL(&local))) { + SCTIME_TO_TIMEVAL(&local, pktts); } } } + SCSpinUnlock(&t->spin); } - SCMutexUnlock(&thread_store_lock); *ts = local; SCLogDebug("ts->tv_sec %"PRIuMAX, (uintmax_t)ts->tv_sec); } From 2b8351f5ccd206ce9d7ced52b9e46b48fa75cbe2 Mon Sep 17 00:00:00 2001 From: Victor Julien Date: Wed, 18 Sep 2024 10:48:29 +0200 Subject: [PATCH 14/16] threads: seal after setup; unseal at shutdown The idea of sealing the thread store is that its members can be accessed w/o holding a lock to the whole store at runtime. --- src/runmodes.c | 1 + src/suricata.c | 2 ++ src/tm-threads.c | 21 ++++++++++++++++++++- src/tm-threads.h | 4 +++- 4 files changed, 26 insertions(+), 2 deletions(-) diff --git a/src/runmodes.c b/src/runmodes.c index b326a96e3a6..e3e7a93311e 100644 --- a/src/runmodes.c +++ b/src/runmodes.c @@ -436,6 +436,7 @@ void RunModeDispatch(int runmode, const char *custom_mode, const char *capture_p BypassedFlowManagerThreadSpawn(); } StatsSpawnThreads(); + TmThreadsSealThreads(); } } diff --git a/src/suricata.c b/src/suricata.c index 4319c6f35d8..e3977504b17 100644 --- a/src/suricata.c +++ b/src/suricata.c @@ -2256,6 +2256,8 @@ void PostRunDeinit(const int runmode, struct timeval *start_time) if (runmode == RUNMODE_UNIX_SOCKET) return; + TmThreadsUnsealThreads(); + /* needed by FlowWorkToDoCleanup */ PacketPoolInit(); diff --git a/src/tm-threads.c b/src/tm-threads.c index 23a070e7a88..08ca9aa7d2a 100644 --- a/src/tm-threads.c +++ b/src/tm-threads.c @@ -1,4 +1,4 @@ -/* Copyright (C) 2007-2022 Open Information Security Foundation +/* Copyright (C) 2007-2024 Open Information Security Foundation * * You can copy, redistribute or modify this Program under the terms of * the GNU General Public License version 2 as published by the Free @@ -2077,9 +2077,26 @@ typedef struct Threads_ { int threads_cnt; } Threads; +static bool thread_store_sealed = false; static Threads thread_store = { NULL, 0, 0 }; static SCMutex thread_store_lock = SCMUTEX_INITIALIZER; +void TmThreadsSealThreads(void) +{ + SCMutexLock(&thread_store_lock); + DEBUG_VALIDATE_BUG_ON(thread_store_sealed); + thread_store_sealed = true; + SCMutexUnlock(&thread_store_lock); +} + +void TmThreadsUnsealThreads(void) +{ + SCMutexLock(&thread_store_lock); + DEBUG_VALIDATE_BUG_ON(!thread_store_sealed); + thread_store_sealed = false; + SCMutexUnlock(&thread_store_lock); +} + void TmThreadsListThreads(void) { SCMutexLock(&thread_store_lock); @@ -2107,6 +2124,7 @@ void TmThreadsListThreads(void) int TmThreadsRegisterThread(ThreadVars *tv, const int type) { SCMutexLock(&thread_store_lock); + DEBUG_VALIDATE_BUG_ON(thread_store_sealed); if (thread_store.threads == NULL) { thread_store.threads = SCCalloc(STEP, sizeof(Thread)); BUG_ON(thread_store.threads == NULL); @@ -2156,6 +2174,7 @@ int TmThreadsRegisterThread(ThreadVars *tv, const int type) void TmThreadsUnregisterThread(const int id) { SCMutexLock(&thread_store_lock); + DEBUG_VALIDATE_BUG_ON(thread_store_sealed); if (id <= 0 || id > (int)thread_store.threads_size) { SCMutexUnlock(&thread_store_lock); return; diff --git a/src/tm-threads.h b/src/tm-threads.h index 1e9d43c2187..dde0f2029ee 100644 --- a/src/tm-threads.h +++ b/src/tm-threads.h @@ -1,4 +1,4 @@ -/* Copyright (C) 2007-2011 Open Information Security Foundation +/* Copyright (C) 2007-2024 Open Information Security Foundation * * You can copy, redistribute or modify this Program under the terms of * the GNU General Public License version 2 as published by the Free @@ -276,6 +276,8 @@ static inline void TmThreadsCaptureBreakLoop(ThreadVars *tv) } } +void TmThreadsSealThreads(void); +void TmThreadsUnsealThreads(void); void TmThreadsListThreads(void); int TmThreadsRegisterThread(ThreadVars *tv, const int type); void TmThreadsUnregisterThread(const int id); From add7b786b024ff8713fbaae5259db6817b323958 Mon Sep 17 00:00:00 2001 From: Victor Julien Date: Wed, 18 Sep 2024 11:50:59 +0200 Subject: [PATCH 15/16] threads: align struct to CLS to avoid false sharing --- src/tm-threads.c | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/tm-threads.c b/src/tm-threads.c index 08ca9aa7d2a..a02081f3bc6 100644 --- a/src/tm-threads.c +++ b/src/tm-threads.c @@ -2058,6 +2058,7 @@ static void TmThreadDumpThreads(void) } #endif +/* Aligned to CLS to avoid false sharing between atomic ops. */ typedef struct Thread_ { ThreadVars *tv; /**< threadvars structure */ const char *name; @@ -2069,7 +2070,7 @@ typedef struct Thread_ { SCTime_t sys_sec_stamp; /**< timestamp in real system * time when the pktts was last updated. */ SCSpinlock spin; -} Thread; +} __attribute__((aligned(CLS))) Thread; typedef struct Threads_ { Thread *threads; From b1ea592237a51604c2b5b5d13aa635cfa299f01e Mon Sep 17 00:00:00 2001 From: Victor Julien Date: Wed, 18 Sep 2024 22:19:17 +0200 Subject: [PATCH 16/16] flow: skip lock for skippable flows Some checks can be done w/o holding a lock: - seeing if the flow matches the packet - if the hash row needs a timeout check This patch skips taking a lock in these conditions. --- src/flow-hash.c | 55 ++++++++++++++++++++++++++----------------------- 1 file changed, 29 insertions(+), 26 deletions(-) diff --git a/src/flow-hash.c b/src/flow-hash.c index e84579071f7..19a58d109e6 100644 --- a/src/flow-hash.c +++ b/src/flow-hash.c @@ -932,39 +932,42 @@ Flow *FlowGetFlowFromHash(ThreadVars *tv, FlowLookupStruct *fls, Packet *p, Flow const uint16_t tv_id = GetTvId(tv); const bool emerg = (SC_ATOMIC_GET(flow_flags) & FLOW_EMERGENCY) != 0; const uint32_t fb_nextts = !emerg ? SC_ATOMIC_GET(fb->next_ts) : 0; + const bool timeout_check = (fb_nextts <= (uint32_t)SCTIME_SECS(p->ts)); /* ok, we have a flow in the bucket. Let's find out if it is our flow */ Flow *prev_f = NULL; /* previous flow */ f = fb->head; do { Flow *next_f = NULL; - FLOWLOCK_WRLOCK(f); - const bool timedout = (fb_nextts <= (uint32_t)SCTIME_SECS(p->ts) && - FlowIsTimedOut(tv_id, f, p->ts, emerg)); - if (timedout) { - next_f = f->next; - MoveToWorkQueue(tv, fls, fb, f, prev_f); - FLOWLOCK_UNLOCK(f); - goto flow_removed; - } else if (FlowCompare(f, p) != 0) { - /* found a matching flow that is not timed out */ - if (unlikely(TcpSessionPacketSsnReuse(p, f, f->protoctx))) { - Flow *new_f = TcpReuseReplace(tv, fls, fb, f, hash, p); - if (prev_f == NULL) /* if we have no prev it means new_f is now our prev */ - prev_f = new_f; - MoveToWorkQueue(tv, fls, fb, f, prev_f); /* evict old flow */ - FLOWLOCK_UNLOCK(f); /* unlock old replaced flow */ - - if (new_f == NULL) { - FBLOCK_UNLOCK(fb); - return NULL; + const bool our_flow = FlowCompare(f, p) != 0; + if (our_flow || timeout_check) { + FLOWLOCK_WRLOCK(f); + const bool timedout = (timeout_check && FlowIsTimedOut(tv_id, f, p->ts, emerg)); + if (timedout) { + next_f = f->next; + MoveToWorkQueue(tv, fls, fb, f, prev_f); + FLOWLOCK_UNLOCK(f); + goto flow_removed; + } else if (our_flow) { + /* found a matching flow that is not timed out */ + if (unlikely(TcpSessionPacketSsnReuse(p, f, f->protoctx))) { + Flow *new_f = TcpReuseReplace(tv, fls, fb, f, hash, p); + if (prev_f == NULL) /* if we have no prev it means new_f is now our prev */ + prev_f = new_f; + MoveToWorkQueue(tv, fls, fb, f, prev_f); /* evict old flow */ + FLOWLOCK_UNLOCK(f); /* unlock old replaced flow */ + + if (new_f == NULL) { + FBLOCK_UNLOCK(fb); + return NULL; + } + f = new_f; } - f = new_f; + FlowReference(dest, f); + FBLOCK_UNLOCK(fb); + return f; /* return w/o releasing flow lock */ + } else { + FLOWLOCK_UNLOCK(f); } - FlowReference(dest, f); - FBLOCK_UNLOCK(fb); - return f; /* return w/o releasing flow lock */ - } else { - FLOWLOCK_UNLOCK(f); } /* unless we removed 'f', prev_f needs to point to * current 'f' when adding a new flow below. */