Skip to content

Commit d2b4108

Browse files
author
Hoang Phan
committed
Add support for session window
1 parent 2d4bcb9 commit d2b4108

File tree

6 files changed

+1505
-9
lines changed

6 files changed

+1505
-9
lines changed

docs/windowing.md

Lines changed: 228 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,9 @@ With windows, you can calculate such aggregations as:
99
- Total of website visitors for every hour
1010
- The average speed of a vehicle over the last 10 minutes
1111
- Maximum temperature of a sensor observed over 30 second ranges
12-
- Give an user a reward after 10 succesful actions
12+
- Give an user a reward after 10 succesful actions
13+
- Track user activity sessions on a website
14+
- Detect fraud patterns in financial transactions
1315

1416

1517
## Types of Time in Streaming
@@ -500,6 +502,220 @@ sdf = (
500502

501503
```
502504

505+
## Session Windows
506+
507+
Session windows group events that occur within a specified timeout period. Unlike fixed-time windows (tumbling, hopping, sliding), session windows have dynamic durations based on the actual timing of events, making them ideal for user activity tracking, fraud detection, and other event-driven scenarios.
508+
509+
A session starts with the first event and extends each time a new event arrives within the timeout period. The session closes after the timeout period with no new events.
510+
511+
Key characteristics of session windows:
512+
513+
- **Dynamic boundaries**: Each session can have different start and end times based on actual events
514+
- **Activity-based**: Sessions extend automatically when events arrive within the timeout period
515+
- **Event-driven closure**: Sessions close when no events arrive within the timeout period
516+
- **Grace period support**: Late events can still extend sessions if they arrive within the grace period
517+
518+
### How Session Windows Work
519+
520+
```
521+
Time: 0 5 10 15 20 25 30 35 40 45 50
522+
Events: A B C D E
523+
524+
Timeout: 10 seconds
525+
Grace: 2 seconds
526+
527+
Session 1: [0, 20] - Events A, B (B extends the session from A)
528+
Session 2: [25, 35] - Events C, D (D extends the session from C)
529+
Session 3: [45, 55] - Event E (session will close at 55 if no more events)
530+
```
531+
532+
In this example:
533+
- Event A starts Session 1 at time 0, session would timeout at time 10
534+
- Event B arrives at time 10, extending Session 1 to timeout at time 20
535+
- Event C arrives at time 25, starting Session 2 (too late for Session 1)
536+
- Event D arrives at time 30, extending Session 2 to timeout at time 40
537+
- Event E arrives at time 45, starting Session 3
538+
539+
### Basic Session Window Example
540+
541+
Imagine you want to track user activity sessions on a website, where a session continues as long as user actions occur within 30 minutes of each other:
542+
543+
Input:
544+
```json
545+
{"user_action": "page_view", "page": "/home", "timestamp": 1000}
546+
{"user_action": "click", "element": "button", "timestamp": 800000}
547+
{"user_action": "page_view", "page": "/products", "timestamp": 1200000}
548+
{"user_action": "purchase", "amount": 50, "timestamp": 2000000}
549+
```
550+
551+
Here's how to track user sessions using session windows:
552+
553+
```python
554+
from datetime import timedelta
555+
from quixstreams import Application
556+
from quixstreams.dataframe.windows import Count, Collect
557+
558+
app = Application(...)
559+
sdf = app.dataframe(...)
560+
561+
sdf = (
562+
# Define a session window with 30-minute timeout and 5-minute grace period
563+
.session_window(
564+
timeout_ms=timedelta(minutes=30),
565+
grace_ms=timedelta(minutes=5)
566+
)
567+
568+
# Count the number of actions in each session and collect all actions
569+
.agg(
570+
action_count=Count(),
571+
actions=Collect("user_action")
572+
)
573+
574+
# Emit results when sessions are complete
575+
.final()
576+
)
577+
578+
# Expected output (when session expires):
579+
# {
580+
# "start": 1000,
581+
# "end": 2000000 + 1800000, # last event + timeout
582+
# "action_count": 4,
583+
# "actions": ["page_view", "click", "page_view", "purchase"]
584+
# }
585+
```
586+
587+
### Session Window with Current Mode
588+
589+
For real-time monitoring, you can use `.current()` mode to get updates as the session progresses:
590+
591+
```python
592+
from datetime import timedelta
593+
from quixstreams import Application
594+
from quixstreams.dataframe.windows import Sum, Count
595+
596+
app = Application(...)
597+
sdf = app.dataframe(...)
598+
599+
sdf = (
600+
# Define a session window with 10-second timeout
601+
.session_window(timeout_ms=timedelta(seconds=10))
602+
603+
# Track total purchase amount and count in each session
604+
.agg(
605+
total_amount=Sum("amount"),
606+
purchase_count=Count()
607+
)
608+
609+
# Emit updates for each message (real-time session tracking)
610+
.current()
611+
)
612+
613+
# Output for each incoming event:
614+
# Event 1: {"start": 1000, "end": 11000, "total_amount": 25, "purchase_count": 1}
615+
# Event 2: {"start": 1000, "end": 15000, "total_amount": 75, "purchase_count": 2} # session extended
616+
# Event 3: {"start": 1000, "end": 18000, "total_amount": 125, "purchase_count": 3} # session extended again
617+
```
618+
619+
### Handling Late Events in Sessions
620+
621+
Session windows support grace periods to handle out-of-order events:
622+
623+
```python
624+
from datetime import timedelta
625+
from quixstreams import Application
626+
from quixstreams.dataframe.windows import Count
627+
628+
def on_late_session_event(
629+
value, key, timestamp_ms, late_by_ms, start, end, name, topic, partition, offset
630+
):
631+
"""Handle late events that couldn't extend any session"""
632+
print(f"Late event for key {key}: {late_by_ms}ms late")
633+
print(f"Event would have belonged to session [{start}, {end}]")
634+
return False # Suppress default logging
635+
636+
app = Application(...)
637+
sdf = app.dataframe(...)
638+
639+
sdf = (
640+
# Session window with 5-minute timeout and 1-minute grace period
641+
.session_window(
642+
timeout_ms=timedelta(minutes=5),
643+
grace_ms=timedelta(minutes=1),
644+
on_late=on_late_session_event
645+
)
646+
.agg(event_count=Count())
647+
.final()
648+
)
649+
```
650+
651+
### Session Window Use Cases
652+
653+
**1. User Activity Tracking**
654+
```python
655+
# Track user sessions on a website or app
656+
.session_window(timeout_ms=timedelta(minutes=30))
657+
.agg(
658+
page_views=Count(),
659+
unique_pages=Count("page_url", unique=True),
660+
session_duration=Max("timestamp") - Min("timestamp")
661+
)
662+
```
663+
664+
**2. Fraud Detection**
665+
```python
666+
# Detect suspicious transaction patterns
667+
.session_window(timeout_ms=timedelta(minutes=10))
668+
.agg(
669+
transaction_count=Count(),
670+
total_amount=Sum("amount"),
671+
locations=Collect("location")
672+
)
673+
```
674+
675+
**3. IoT Device Monitoring**
676+
```python
677+
# Monitor device activity sessions
678+
.session_window(timeout_ms=timedelta(hours=1))
679+
.agg(
680+
readings_count=Count(),
681+
avg_temperature=Mean("temperature"),
682+
max_pressure=Max("pressure")
683+
)
684+
```
685+
686+
**4. Gaming Analytics**
687+
```python
688+
# Track gaming sessions
689+
.session_window(timeout_ms=timedelta(minutes=20))
690+
.agg(
691+
actions_performed=Count(),
692+
points_earned=Sum("points"),
693+
levels_completed=Count("level_completed")
694+
)
695+
```
696+
697+
### Session Window Parameters
698+
699+
- **`timeout_ms`**: The session timeout period. If no new events arrive within this period, the session will be closed. Can be specified as either an `int` (milliseconds) or a `timedelta` object.
700+
701+
- **`grace_ms`**: The grace period for data arrival. Allows late-arriving data to be included in the session, even if it arrives after the session has theoretically timed out. Can be specified as either an `int` (milliseconds) or a `timedelta` object.
702+
703+
- **`name`**: Optional unique identifier for the window. If not provided, it will be automatically generated based on the window's properties.
704+
705+
- **`on_late`**: Optional callback to react to late records that cannot extend any existing session. Use this to customize logging or route late events to a dead-letter queue.
706+
707+
### Session Window Behavior
708+
709+
**Session Creation**: A new session starts when an event arrives and no existing session can accommodate it (i.e., all existing sessions have timed out).
710+
711+
**Session Extension**: An existing session is extended when an event arrives within `timeout + grace_period` of the session's last activity.
712+
713+
**Session Closure**: A session closes when the current time exceeds `session_end_time + grace_period`, where `session_end_time = last_event_time + timeout`.
714+
715+
**Key Grouping**: Like all windows in Quix Streams, sessions are grouped by message key. Each key maintains its own independent sessions.
716+
717+
**Event Time**: Sessions use event time (from Kafka message timestamps) rather than processing time.
718+
503719
## Lateness and Out-of-Order Processing
504720
When working with event time, some events may be processed later than they're supposed to.
505721
Such events are called **"out-of-order"** because they violate the expected order of time in the data stream.
@@ -540,7 +756,7 @@ The appropriate value for a grace period varies depending on the use case.
540756
### Reacting on late events
541757
!!! info New in v3.8.0
542758

543-
To react on late records coming into time windows, you can pass the `on_late` callbacks to `.tumbling_window()`, `.hopping_window()` and `.sliding_window()` methods.
759+
To react on late records coming into time windows, you can pass the `on_late` callbacks to `.tumbling_window()`, `.hopping_window()`, `.sliding_window()`, and `.session_window()` methods.
544760

545761
You can use this callback to customize the logging of such messages or to send them to some dead-letter queue, for example.
546762

@@ -667,6 +883,8 @@ In this strategy, messages advance time and close only windows with the **same**
667883

668884
If some message keys appear irregularly in the stream, the latest windows can remain unprocessed until the message with the same key is received.
669885

886+
Session windows also support both closing strategies. With **key** strategy, sessions for each key close independently. With **partition** strategy, any message can advance time and close sessions for all keys in the partition.
887+
670888
```python
671889
from datetime import timedelta
672890
from quixstreams import Application
@@ -780,7 +998,7 @@ described in [the "Updating Kafka Headers" section](./processing.md#updating-kaf
780998

781999
Here are some general concepts about how windowed aggregations are implemented in Quix Streams:
7821000

783-
- Only time-based windows are supported.
1001+
- Time-based windows (tumbling, hopping, sliding, session) and count-based windows are supported.
7841002
- Every window is grouped by the current Kafka message key.
7851003
- Messages with `None` key will be ignored.
7861004
- The minimal window unit is a **millisecond**. More fine-grained values (e.g. microseconds) will be rounded towards the closest millisecond number.
@@ -794,10 +1012,12 @@ window specification.
7941012

7951013
The state store name is auto-generated by default using the following window attributes:
7961014

797-
- Window type: `"tumbling"` or `"hopping"`
798-
- Window parameters: `duration_ms` and `step_ms`
1015+
- Window type: `"tumbling"`, `"hopping"`, `"sliding"`, or `"session"`
1016+
- Window parameters: `duration_ms` and `step_ms` for time-based windows, `timeout_ms` for session windows
7991017

800-
E.g. a store name for a hopping window of 30 seconds with a 5 second step will be `hopping_window_30000_5000`.
1018+
Examples:
1019+
- A hopping window of 30 seconds with a 5 second step: `hopping_window_30000_5000`
1020+
- A session window with 30 second timeout: `session_window_30000`
8011021

8021022
### Updating Window Definitions
8031023

@@ -807,8 +1027,8 @@ When you change the definition of the window (e.g. its size), the data in the st
8071027

8081028
Quix Streams handles some of the situations, like:
8091029

810-
- Updating window type (e.g. from tumbling to hopping)
811-
- Updating window period or step
1030+
- Updating window type (e.g. from tumbling to hopping, from hopping to session)
1031+
- Updating window period, step, or timeout
8121032
- Adding/Removing/Updating an aggregation function (except `Reduce()`)
8131033

8141034
Updating the window type and parameters will change the name of the underlying state store, and the new window definition will use a different one.

quixstreams/dataframe/dataframe.py

Lines changed: 98 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,7 @@
6666
from .windows import (
6767
HoppingCountWindowDefinition,
6868
HoppingTimeWindowDefinition,
69+
SessionWindowDefinition,
6970
SlidingCountWindowDefinition,
7071
SlidingTimeWindowDefinition,
7172
TumblingCountWindowDefinition,
@@ -1484,6 +1485,103 @@ def sliding_count_window(
14841485
name=name,
14851486
)
14861487

1488+
def session_window(
1489+
self,
1490+
timeout_ms: Union[int, timedelta],
1491+
grace_ms: Union[int, timedelta] = 0,
1492+
name: Optional[str] = None,
1493+
on_late: Optional[WindowOnLateCallback] = None,
1494+
) -> SessionWindowDefinition:
1495+
"""
1496+
Create a session window transformation on this StreamingDataFrame.
1497+
1498+
Session windows group events that occur within a specified timeout period.
1499+
A session starts with the first event and extends each time a new event arrives
1500+
within the timeout period. The session closes after the timeout period with no
1501+
new events.
1502+
1503+
Unlike fixed-time windows, session windows have dynamic durations based on the
1504+
actual events and their timing, making them ideal for user activity tracking,
1505+
fraud detection, and other event-driven scenarios.
1506+
1507+
They allow performing stateful aggregations like `sum`, `reduce`, etc.
1508+
on top of the data and emit results downstream.
1509+
1510+
Notes:
1511+
1512+
- The timestamp of the aggregation result is set to the session start timestamp.
1513+
- Every session is grouped by the current Kafka message key.
1514+
- Messages with `None` key will be ignored.
1515+
- Sessions always use the current event time.
1516+
1517+
Example Snippet:
1518+
1519+
```python
1520+
from quixstreams import Application
1521+
import quixstreams.dataframe.windows.aggregations as agg
1522+
1523+
app = Application()
1524+
sdf = app.dataframe(...)
1525+
1526+
sdf = (
1527+
# Define a session window with 30-second timeout and 10-second grace period
1528+
sdf.session_window(
1529+
timeout_ms=timedelta(seconds=30),
1530+
grace_ms=timedelta(seconds=10)
1531+
)
1532+
1533+
# Specify the aggregation function
1534+
.agg(value=agg.Sum())
1535+
1536+
# Specify how the results should be emitted downstream.
1537+
# "current()" will emit results as they come for each updated session,
1538+
# possibly producing multiple messages per key-session pair
1539+
# "final()" will emit sessions only when they are closed and cannot
1540+
# receive any updates anymore.
1541+
.final()
1542+
)
1543+
```
1544+
1545+
:param timeout_ms: The session timeout period.
1546+
If no new events arrive within this period, the session will be closed.
1547+
Can be specified as either an `int` representing milliseconds
1548+
or a `timedelta` object.
1549+
>***NOTE:*** `timedelta` objects will be rounded to the closest millisecond
1550+
value.
1551+
1552+
:param grace_ms: The grace period for data arrival.
1553+
It allows late-arriving data to be included in the session,
1554+
even if it arrives after the session has theoretically timed out.
1555+
Can be specified as either an `int` representing milliseconds
1556+
or a `timedelta` object.
1557+
>***NOTE:*** `timedelta` objects will be rounded to the closest millisecond
1558+
value.
1559+
1560+
:param name: The unique identifier for the window. If not provided, it will be
1561+
automatically generated based on the window's properties.
1562+
1563+
:param on_late: an optional callback to react on late records in sessions and
1564+
to configure the logging of such events.
1565+
If the callback returns `True`, the message about a late record will be logged
1566+
(default behavior).
1567+
Otherwise, no message will be logged.
1568+
1569+
:return: `SessionWindowDefinition` instance representing the session window
1570+
configuration.
1571+
This object can be further configured with aggregation functions
1572+
like `sum`, `count`, etc. applied to the StreamingDataFrame.
1573+
"""
1574+
timeout_ms = ensure_milliseconds(timeout_ms)
1575+
grace_ms = ensure_milliseconds(grace_ms)
1576+
1577+
return SessionWindowDefinition(
1578+
timeout_ms=timeout_ms,
1579+
grace_ms=grace_ms,
1580+
dataframe=self,
1581+
name=name,
1582+
on_late=on_late,
1583+
)
1584+
14871585
def fill(self, *columns: str, **mapping: Any) -> "StreamingDataFrame":
14881586
"""
14891587
Fill missing values in the message value with a constant value.

0 commit comments

Comments
 (0)