Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Flow timeout timing/v14 #12084

Open
wants to merge 16 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
109 changes: 73 additions & 36 deletions src/flow-hash.c
Original file line number Diff line number Diff line change
Expand Up @@ -763,10 +763,9 @@ static Flow *TcpReuseReplace(ThreadVars *tv, FlowLookupStruct *fls, FlowBucket *
#ifdef UNITTESTS
}
#endif
/* time out immediately */
old_f->timeout_at = 0;
/* get some settings that we move over to the new flow */
FlowThreadId thread_id[2] = { old_f->thread_id[0], old_f->thread_id[1] };
old_f->flow_end_flags |= FLOW_END_FLAG_TCPREUSE;

/* flow is unlocked by caller */

Expand Down Expand Up @@ -831,19 +830,52 @@ static inline void MoveToWorkQueue(ThreadVars *tv, FlowLookupStruct *fls,
}
}

static inline bool FlowIsTimedOut(const Flow *f, const uint32_t sec, const bool emerg)
static inline bool FlowIsTimedOut(
const FlowThreadId ftid, const Flow *f, const SCTime_t pktts, const bool emerg)
{
if (unlikely(f->timeout_at < sec)) {
return true;
} else if (unlikely(emerg)) {
extern FlowProtoTimeout flow_timeouts_delta[FLOW_PROTO_MAX];
SCTime_t timesout_at;
if (emerg) {
extern FlowProtoTimeout flow_timeouts_emerg[FLOW_PROTO_MAX];
timesout_at = SCTIME_ADD_SECS(f->lastts,
FlowGetFlowTimeoutDirect(flow_timeouts_emerg, f->flow_state, f->protomap));
} else {
timesout_at = SCTIME_ADD_SECS(f->lastts, f->timeout_policy);
}
/* if time is live, we just use the pktts */
if (TimeModeIsLive()) {
if (SCTIME_CMP_LT(pktts, timesout_at)) {
return false;
}
} else {
if (ftid == f->thread_id[0] || f->thread_id[0] == 0) {
/* do the timeout check */
if (SCTIME_CMP_LT(pktts, timesout_at)) {
return false;
}
Comment on lines +845 to +854
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We seem to do the same thing in both the blocks? Can we merge them? Then it becomes similar to flow manager..

} else {
SCTime_t checkts = TmThreadsGetThreadTime(f->thread_id[0]);
/* do the timeout check */
if (SCTIME_CMP_LT(checkts, timesout_at)) {
return false;
}
}
}
return true;
}

int64_t timeout_at = f->timeout_at -
FlowGetFlowTimeoutDirect(flow_timeouts_delta, f->flow_state, f->protomap);
if ((int64_t)sec >= timeout_at)
return true;
static inline uint16_t GetTvId(const ThreadVars *tv)
{
uint16_t tv_id;
#ifdef UNITTESTS
if (RunmodeIsUnittests()) {
tv_id = 0;
} else {
tv_id = (uint16_t)tv->id;
}
return false;
#else
tv_id = (uint16_t)tv->id;
#endif
return tv_id;
}

/** \brief Get Flow for packet
Expand Down Expand Up @@ -897,40 +929,45 @@ Flow *FlowGetFlowFromHash(ThreadVars *tv, FlowLookupStruct *fls, Packet *p, Flow
return f;
}

const uint16_t tv_id = GetTvId(tv);
const bool emerg = (SC_ATOMIC_GET(flow_flags) & FLOW_EMERGENCY) != 0;
const uint32_t fb_nextts = !emerg ? SC_ATOMIC_GET(fb->next_ts) : 0;
const bool timeout_check = (fb_nextts <= (uint32_t)SCTIME_SECS(p->ts));
/* ok, we have a flow in the bucket. Let's find out if it is our flow */
Flow *prev_f = NULL; /* previous flow */
f = fb->head;
do {
Flow *next_f = NULL;
const bool timedout = (fb_nextts < (uint32_t)SCTIME_SECS(p->ts) &&
FlowIsTimedOut(f, (uint32_t)SCTIME_SECS(p->ts), emerg));
if (timedout) {
FLOWLOCK_WRLOCK(f);
next_f = f->next;
MoveToWorkQueue(tv, fls, fb, f, prev_f);
FLOWLOCK_UNLOCK(f);
goto flow_removed;
} else if (FlowCompare(f, p) != 0) {
const bool our_flow = FlowCompare(f, p) != 0;
if (our_flow || timeout_check) {
FLOWLOCK_WRLOCK(f);
/* found a matching flow that is not timed out */
if (unlikely(TcpSessionPacketSsnReuse(p, f, f->protoctx))) {
Flow *new_f = TcpReuseReplace(tv, fls, fb, f, hash, p);
if (prev_f == NULL) /* if we have no prev it means new_f is now our prev */
prev_f = new_f;
MoveToWorkQueue(tv, fls, fb, f, prev_f); /* evict old flow */
FLOWLOCK_UNLOCK(f); /* unlock old replaced flow */

if (new_f == NULL) {
FBLOCK_UNLOCK(fb);
return NULL;
const bool timedout = (timeout_check && FlowIsTimedOut(tv_id, f, p->ts, emerg));
if (timedout) {
next_f = f->next;
MoveToWorkQueue(tv, fls, fb, f, prev_f);
FLOWLOCK_UNLOCK(f);
goto flow_removed;
} else if (our_flow) {
/* found a matching flow that is not timed out */
if (unlikely(TcpSessionPacketSsnReuse(p, f, f->protoctx))) {
Flow *new_f = TcpReuseReplace(tv, fls, fb, f, hash, p);
if (prev_f == NULL) /* if we have no prev it means new_f is now our prev */
prev_f = new_f;
MoveToWorkQueue(tv, fls, fb, f, prev_f); /* evict old flow */
FLOWLOCK_UNLOCK(f); /* unlock old replaced flow */

if (new_f == NULL) {
FBLOCK_UNLOCK(fb);
return NULL;
}
f = new_f;
}
f = new_f;
FlowReference(dest, f);
FBLOCK_UNLOCK(fb);
return f; /* return w/o releasing flow lock */
} else {
FLOWLOCK_UNLOCK(f);
}
FlowReference(dest, f);
FBLOCK_UNLOCK(fb);
return f; /* return w/o releasing flow lock */
}
/* unless we removed 'f', prev_f needs to point to
* current 'f' when adding a new flow below. */
Expand Down
37 changes: 27 additions & 10 deletions src/flow-manager.c
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,9 @@ void FlowDisableFlowManagerThread(void)

/** \internal
* \brief check if a flow is timed out
* Takes lastts, adds the timeout policy to it, compared to current time `ts`.
* In case of emergency mode, timeout_policy is ignored and the emerg table
* is used.
*
* \param f flow
* \param ts timestamp
Expand All @@ -182,17 +185,30 @@ void FlowDisableFlowManagerThread(void)
*/
static bool FlowManagerFlowTimeout(Flow *f, SCTime_t ts, uint32_t *next_ts, const bool emerg)
{
uint32_t flow_times_out_at = f->timeout_at;
SCTime_t timesout_at; // = f->lastts;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

About the comment: why?


if (emerg) {
extern FlowProtoTimeout flow_timeouts_delta[FLOW_PROTO_MAX];
flow_times_out_at -= FlowGetFlowTimeoutDirect(flow_timeouts_delta, f->flow_state, f->protomap);
extern FlowProtoTimeout flow_timeouts_emerg[FLOW_PROTO_MAX];
timesout_at = SCTIME_ADD_SECS(f->lastts,
FlowGetFlowTimeoutDirect(flow_timeouts_emerg, f->flow_state, f->protomap));
} else {
timesout_at = SCTIME_ADD_SECS(f->lastts, f->timeout_policy);
}
if (*next_ts == 0 || flow_times_out_at < *next_ts)
*next_ts = flow_times_out_at;
if (*next_ts == 0 || (uint32_t)SCTIME_SECS(timesout_at) < *next_ts)
*next_ts = (uint32_t)SCTIME_SECS(timesout_at);

/* do the timeout check */
if ((uint64_t)flow_times_out_at >= SCTIME_SECS(ts)) {
return false;
/* if time is live, we just use the tts */
if (TimeModeIsLive() || f->thread_id[0] == 0) {
/* do the timeout check */
if (SCTIME_CMP_LT(ts, timesout_at)) {
return false;
}
} else {
SCTime_t checkts = TmThreadsGetThreadTime(f->thread_id[0]);
/* do the timeout check */
if (SCTIME_CMP_LT(checkts, timesout_at)) {
return false;
}
}

return true;
Expand Down Expand Up @@ -326,22 +342,23 @@ static void FlowManagerHashRowTimeout(FlowManagerTimeoutThread *td, Flow *f, SCT
do {
checked++;

FLOWLOCK_WRLOCK(f);

/* check flow timeout based on lastts and state. Both can be
* accessed w/o Flow lock as we do have the hash row lock (so flow
* can't disappear) and flow_state is atomic. lastts can only
* be modified when we have both the flow and hash row lock */

/* timeout logic goes here */
if (FlowManagerFlowTimeout(f, ts, next_ts, emergency) == false) {
FLOWLOCK_UNLOCK(f);
counters->flows_notimeout++;

prev_f = f;
f = f->next;
continue;
}

FLOWLOCK_WRLOCK(f);

Flow *next_flow = f->next;

#ifdef CAPTURE_OFFLOAD
Expand Down
2 changes: 0 additions & 2 deletions src/flow-util.c
Original file line number Diff line number Diff line change
Expand Up @@ -194,8 +194,6 @@ void FlowInit(Flow *f, const Packet *p)

f->protomap = FlowGetProtoMapping(f->proto);
f->timeout_policy = FlowGetTimeoutPolicy(f);
const uint32_t timeout_at = (uint32_t)SCTIME_SECS(f->startts) + f->timeout_policy;
f->timeout_at = timeout_at;

if (MacSetFlowStorageEnabled()) {
DEBUG_VALIDATE_BUG_ON(FlowGetStorageById(f, MacSetGetFlowStorageID()) != NULL);
Expand Down
2 changes: 0 additions & 2 deletions src/flow-util.h
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@
(f)->dp = 0; \
(f)->proto = 0; \
(f)->livedev = NULL; \
(f)->timeout_at = 0; \
(f)->timeout_policy = 0; \
(f)->vlan_idx = 0; \
(f)->next = NULL; \
Expand Down Expand Up @@ -88,7 +87,6 @@
(f)->vlan_idx = 0; \
(f)->ffr = 0; \
(f)->next = NULL; \
(f)->timeout_at = 0; \
(f)->timeout_policy = 0; \
(f)->flow_state = 0; \
(f)->tenant_id = 0; \
Expand Down
14 changes: 9 additions & 5 deletions src/flow-worker.c
Original file line number Diff line number Diff line change
Expand Up @@ -554,11 +554,6 @@ static TmEcode FlowWorker(ThreadVars *tv, Packet *p, void *data)

SCLogDebug("packet %"PRIu64, p->pcap_cnt);

/* update time */
if (!(PKT_IS_PSEUDOPKT(p))) {
TimeSetByThread(tv->id, p->ts);
}

/* handle Flow */
if (p->flags & PKT_WANTS_FLOW) {
FLOWWORKER_PROFILING_START(p, PROFILE_FLOWWORKER_FLOW);
Expand All @@ -567,6 +562,10 @@ static TmEcode FlowWorker(ThreadVars *tv, Packet *p, void *data)
if (likely(p->flow != NULL)) {
DEBUG_ASSERT_FLOW_LOCKED(p->flow);
if (FlowUpdate(tv, fw, p) == TM_ECODE_DONE) {
/* update time */
if (!(PKT_IS_PSEUDOPKT(p))) {
TimeSetByThread(tv->id, p->ts);
}
goto housekeeping;
}
}
Expand All @@ -581,6 +580,11 @@ static TmEcode FlowWorker(ThreadVars *tv, Packet *p, void *data)
DEBUG_VALIDATE_BUG_ON(p->pkt_src != PKT_SRC_FFR);
}

/* update time */
if (!(PKT_IS_PSEUDOPKT(p))) {
TimeSetByThread(tv->id, p->ts);
}

Comment on lines +583 to +587
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we merge it w the else if condition above?

SCLogDebug("packet %"PRIu64" has flow? %s", p->pcap_cnt, p->flow ? "yes" : "no");

/* handle TCP and app layer */
Expand Down
7 changes: 0 additions & 7 deletions src/flow.c
Original file line number Diff line number Diff line change
Expand Up @@ -396,10 +396,6 @@ void FlowHandlePacketUpdate(Flow *f, Packet *p, ThreadVars *tv, DecodeThreadVars
/* update the last seen timestamp of this flow */
if (SCTIME_CMP_GT(p->ts, f->lastts)) {
f->lastts = p->ts;
const uint32_t timeout_at = (uint32_t)SCTIME_SECS(f->lastts) + f->timeout_policy;
if (timeout_at != f->timeout_at) {
f->timeout_at = timeout_at;
}
}
#ifdef CAPTURE_OFFLOAD
} else {
Expand Down Expand Up @@ -1166,9 +1162,6 @@ void FlowUpdateState(Flow *f, const enum FlowState s)
const uint32_t timeout_policy = FlowGetTimeoutPolicy(f);
if (timeout_policy != f->timeout_policy) {
f->timeout_policy = timeout_policy;
const uint32_t timeout_at = (uint32_t)SCTIME_SECS(f->lastts) + timeout_policy;
if (timeout_at != f->timeout_at)
f->timeout_at = timeout_at;
}
}
#ifdef UNITTESTS
Expand Down
8 changes: 3 additions & 5 deletions src/flow.h
Original file line number Diff line number Diff line change
Expand Up @@ -244,6 +244,7 @@ typedef struct AppLayerParserState_ AppLayerParserState;
#define FLOW_END_FLAG_TIMEOUT 0x02
#define FLOW_END_FLAG_FORCED 0x04
#define FLOW_END_FLAG_SHUTDOWN 0x08
#define FLOW_END_FLAG_TCPREUSE 0x10

/** Mutex or RWLocks for the flow. */
//#define FLOWLOCK_RWLOCK
Expand Down Expand Up @@ -390,11 +391,6 @@ typedef struct Flow_
uint8_t ffr;
};

/** timestamp in seconds of the moment this flow will timeout
* according to the timeout policy. Does *not* take emergency
* mode into account. */
uint32_t timeout_at;

/** Thread ID for the stream/detect portion of this flow */
FlowThreadId thread_id[2];

Expand All @@ -407,6 +403,8 @@ typedef struct Flow_

/** timeout policy value in seconds to add to the lastts.tv_sec
* when a packet has been received. */
/** timeout in seconds by policy, add to lastts to get actual time this times out. Ignored in
* emergency mode. */
uint32_t timeout_policy;

/* time stamp of last update (last packet). Set/updated under the
Expand Down
1 change: 1 addition & 0 deletions src/output-eve-stream.c
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,7 @@ static OutputInitResult EveStreamLogInitCtxSub(ConfNode *conf, OutputCtx *parent
ctx->trigger_flags |= SetFlag(conf, "state-update", STREAM_PKT_FLAG_STATE_UPDATE);
ctx->trigger_flags |=
SetFlag(conf, "spurious-retransmission", STREAM_PKT_FLAG_SPURIOUS_RETRANSMISSION);
ctx->trigger_flags |= SetFlag(conf, "tcp-port-reuse", STREAM_PKT_FLAG_TCP_PORT_REUSE);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Q: Why is it called "port" reuse?


ctx->trigger_flags |= SetFlag(conf, "all", 0xFFFF);
SCLogDebug("trigger_flags %04x", ctx->trigger_flags);
Expand Down
4 changes: 3 additions & 1 deletion src/output-json-flow.c
Original file line number Diff line number Diff line change
Expand Up @@ -258,7 +258,9 @@ static void EveFlowLogJSON(OutputJsonThreadCtx *aft, JsonBuilder *jb, Flow *f)
}

const char *reason = NULL;
if (f->flow_end_flags & FLOW_END_FLAG_FORCED)
if (f->flow_end_flags & FLOW_END_FLAG_TCPREUSE)
reason = "tcp_reuse";
else if (f->flow_end_flags & FLOW_END_FLAG_FORCED)
reason = "forced";
else if (f->flow_end_flags & FLOW_END_FLAG_SHUTDOWN)
reason = "shutdown";
Expand Down
1 change: 1 addition & 0 deletions src/runmodes.c
Original file line number Diff line number Diff line change
Expand Up @@ -436,6 +436,7 @@ void RunModeDispatch(int runmode, const char *custom_mode, const char *capture_p
BypassedFlowManagerThreadSpawn();
}
StatsSpawnThreads();
TmThreadsSealThreads();
}
}

Expand Down
Loading
Loading