|
8 | 8 |
|
9 | 9 | ## Anatomy of a `csp.graph` |
10 | 10 |
|
11 | | -To reiterate, `csp.graph` methods are called in order to construct the graph and are only executed before the engine is run. |
12 | | -`csp.graph` methods don't do anything special, they are essentially regular python methods, but they can be defined to accept inputs and generate outputs similar to `csp.nodes`. |
13 | | -This is solely used for type checking. |
| 11 | +`csp.graph` methods are called in order to construct the graph and are only executed before the engine is run. A graph is a collection of nodes and adapters which can either be executed as an argument to `csp.run` or composed into a larger graph. |
| 12 | +The `csp.graph` decorator is only used for type validation and it is optional when creating a CSP program. A standard Python function without the decorator can also be passed as an argument to `csp.run` if type validation is not required. |
14 | 13 | `csp.graph` methods can be created to encapsulate components of a graph, and can be called from other `csp.graph` methods in order to help facilitate graph building. |
15 | 14 |
|
16 | 15 | Simple example: |
17 | 16 |
|
18 | 17 | ```python |
19 | 18 | @csp.graph |
20 | | -def calc_symbol_pnl(symbol: str, trades: ts[Trade]) -> ts[float]: |
21 | | - # sub-graph code needed to compute pnl for given symbol and symbol's trades |
22 | | - # sub-graph can subscribe to market data for the symbol as needed |
23 | | - ... |
| 19 | +def calc_user_time(session_data: ts[UserSession]) -> ts[float]: |
| 20 | + # sub-graph code needed to compute the time a user spends on a website |
| 21 | + session_time = session_data.logout_time - session_data.login_time |
| 22 | + time_online = csp.stats.sum(session_time) |
| 23 | + return time_online |
24 | 24 |
|
25 | 25 |
|
26 | 26 | @csp.graph |
27 | | -def calc_portfolio_pnl(symbols: [str]) -> ts[float]: |
28 | | - symbol_pnl = [] |
29 | | - for symbol in symbols: |
30 | | - symbol_trades = trade_adapter.subscribe(symbol) |
31 | | - symbol_pnl.append(calc_symbol_pnl(symbol, symbol_trades)) |
| 27 | +def calc_site_traffic(users: List[str]) -> ts[float]: |
| 28 | + user_time = [] |
| 29 | + for user in users: |
| 30 | + user_sessions = get_session(user) |
| 31 | + user_time.append(calc_user_time(user_sessions)) |
32 | 32 |
|
33 | | - return csp.sum(symbol_pnl) |
| 33 | + return csp.sum(user_time) |
34 | 34 | ``` |
35 | 35 |
|
36 | | -In this simple example we have a `csp.graph` component `calc_symbol_pnl` which encapsulates computing pnl for a single symbol. |
37 | | -`calc_portfolio_pnl` is a graph that computes portfolio level pnl, it invokes the symbol-level pnl calc for every symbol, then sums up the results for the portfolio level pnl. |
| 36 | +In this simple example we compute the total time all users spend on a website. We have a `csp.graph` subcomponent `calc_user_time` which computes the time a single user spends on the site throughout the run. |
| 37 | +Then, in `calc_site_traffic` we compute the total user traffic by creating the user-level subgraph for each account and aggregating the results. |
38 | 38 |
|
39 | | -## Graph Propagation and Single-dispatch |
| 39 | +## Graph Propagation and Single-Dispatch |
40 | 40 |
|
41 | | -The CSP graph propagation algorithm ensures that all nodes are executed *once* per engine cycle, and in the correct order. |
42 | | -Correct order means, that all input dependencies of a given node are guaranteed to have been evaluated before a given node is executed. |
43 | | -Take this graph for example: |
| 41 | +The CSP graph propagation algorithm ensures that all nodes are executed *after* any of their dependencies on a given engine cycle. |
| 42 | + |
| 43 | +> \[!IMPORTANT\] |
| 44 | +> An *engine cycle* refers to a single execution of a CSP graph. There can be multiple engine cycles at the same *timestamp*; for example, a single data source may have two events both at `2020-01-01 00:00:00`. These events will be executed in two *cycles* that both occur at the same timestamp. Another case where multiple cycles can occur is [csp.feedback](Add-Cycles-in-Graphs). |
| 45 | +
|
| 46 | +For example, consider the graph below: |
44 | 47 |
|
45 | 48 |  |
46 | 49 |
|
47 | | -On a given cycle lets say the `bid` input ticks. |
48 | | -The CSP engine will ensure that **`mid`** is executed, followed by **`spread`** and only once **`spread`**'s output is updated will **`quote`** be called. |
49 | | -When **`quote`** executes it will have the latest values of the `mid` and `spread` calc for this cycle. |
| 50 | +Individuals nodes are executed in *rank order* where the rank of a node is defined as the longest path between the node and an input adapter. The "mid" node is at rank 1, while "spread" is at rank 2 and "quote" is rank 3. Therefore, if "bid" ticks on a given engine cycle then "mid" will be executed before "spread" and "quote". Note that the order of node execution *within* a rank is undefined, and users should never rely on the execution order of nodes at the same rank. |
50 | 51 |
|
51 | 52 | ## Graph Pruning |
52 | 53 |
|
53 | | -One should note a subtle optimization technique in CSP graphs. |
54 | | -Any part of a graph that is created at graph building time, but is NOT connected to any output nodes, will be pruned from the graph and will not exist during runtime. |
| 54 | +Any node in a graph that is not connected to an output will be pruned from the graph and will not exist during runtime. |
55 | 55 | An output is defined as either an output adapter or a `csp.node` without any outputs of its own. |
56 | | -The idea here is that we can avoid doing work if it doesn't result in any output being generated. |
57 | | -In general its best practice for all `csp.nodes` to be \***side-effect free**, in other words they shouldn't mutate any state outside of the node. |
58 | | -Assuming all nodes are side-effect free, pruning the graph would not have any noticeable effects. |
| 56 | +Pruning is an optimization which avoids executing nodes whose result will be discarded. |
| 57 | +As a result, it's best practice for any `csp.node` to be \***side-effect free**; they shouldn't mutate any state outside of the node. |
| 58 | + |
| 59 | +## Executing a Graph |
| 60 | + |
| 61 | +Graphs can be executed using the `csp.run` function. Execution takes place in either real-time or historical mode (see [Execution Modes](Execution-Modes)) depending on the `realtime` argument. Graph execution begin at a `starttime` and ends at an `endtime`; the `endtime` argument can either be a `datetime` which is past the start *or* a `timedelta` which is the duration of the run. For example, if we wish to run our `calc_site_traffic` graph over one week of historical data we can execute it with: |
| 62 | + |
| 63 | +```python |
| 64 | +csp.run(calc_site_traffic, users=['alice', 'bob'], starttime=start, endtime=timedelta(weeks=1), realtime=False) |
| 65 | +``` |
59 | 66 |
|
60 | 67 | ## Collecting Graph Outputs |
61 | 68 |
|
62 | | -If the `csp.graph` passed to `csp.run` has outputs, the full timeseries will be returned from `csp.run` like so: |
| 69 | +There are multiple methods of getting in-process outputs after executing a `csp.graph`. If the graph returns one or more time-series, the full history of those values will be returned from `csp.run`. |
63 | 70 |
|
64 | | -**outputs example** |
| 71 | +**return example** |
65 | 72 |
|
66 | 73 | ```python |
67 | 74 | import csp |
68 | 75 | from datetime import datetime, timedelta |
69 | 76 |
|
70 | 77 | @csp.graph |
71 | 78 | def my_graph() -> ts[int]: |
72 | | - return csp.merge(csp.const(1), csp.const(2, timedelta(seconds=1))) |
| 79 | + return csp.merge(csp.const(1), csp.const(2, delay=timedelta(seconds=1))) |
73 | 80 |
|
74 | | -if __name__ == '__main__': |
75 | | - res = csp.run(my_graph, starttime=datetime(2021,11,8)) |
76 | | - print(res) |
| 81 | +res = csp.run(my_graph, starttime=datetime(2021,11,8)) |
77 | 82 | ``` |
78 | 83 |
|
79 | | -result: |
| 84 | +res: |
80 | 85 |
|
81 | 86 | ```raw |
82 | 87 | {0: [(datetime.datetime(2021, 11, 8, 0, 0), 1), (datetime.datetime(2021, 11, 8, 0, 0, 1), 2)]} |
83 | 88 | ``` |
84 | 89 |
|
85 | | -Note that the result is a list of `(datetime, value)` tuples. |
| 90 | +Note that the result is a list of `(time, value)` tuples. You can have the result returned as two separate NumPy arrays, one for the times and one for the values, by setting `output_numpy=True` in the `run` call. |
86 | 91 |
|
87 | | -You can also use [csp.add_graph_output](Base-Adapters-API#cspadd_graph_output) to add outputs. |
88 | | -These do not need to be in the top-level graph called directly from `csp.run`. |
| 92 | +```python |
| 93 | +res = csp.run(my_graph, starttime=datetime(2021,11,8), output_numpy=True) |
| 94 | +``` |
| 95 | + |
| 96 | +res: |
89 | 97 |
|
90 | | -This gives the same result: |
| 98 | +```raw |
| 99 | +{0: (array(['2021-11-08T00:00:00.000000000', '2021-11-08T00:00:01.000000000'], dtype='datetime64[ns]'), array([1, 2], dtype=int64))} |
| 100 | +``` |
| 101 | + |
| 102 | +You can also use [csp.add_graph_output](Base-Adapters-API#cspadd_graph_output) to add outputs. |
| 103 | +These do not need to be in the top-level graph called directly from `csp.run`. Users can also specify the amount of history they want stored in the output using the `tick_count` and `tick_history` arguments to `add_graph_output`. For example, if only the last value needs to be stored set `tick_count=1`. |
91 | 104 |
|
92 | 105 | **add_graph_output example** |
93 | 106 |
|
94 | 107 | ```python |
95 | 108 | @csp.graph |
96 | 109 | def my_graph(): |
97 | | - csp.add_graph_output('a', csp.merge(csp.const(1), csp.const(2, timedelta(seconds=1)))) |
98 | | -``` |
| 110 | + same_thing = csp.merge(csp.const(1), csp.const(2, delay=timedelta(seconds=1))) |
| 111 | + csp.add_graph_output('my_name', same_thing) |
99 | 112 |
|
100 | | -In addition to python outputs like above, you can set the optional `csp.run` argument `output_numpy` to `True` to get outputs as numpy arrays: |
101 | | - |
102 | | -**numpy outputs** |
103 | | - |
104 | | -```python |
105 | | -result = csp.run(my_graph, starttime=datetime(2021,11,8), output_numpy=True) |
| 113 | +res = csp.run(my_graph, starttime=datetime(2021,11,8)) |
106 | 114 | ``` |
107 | 115 |
|
108 | | -result: |
| 116 | +res: |
109 | 117 |
|
110 | 118 | ```raw |
111 | | -{0: (array(['2021-11-08T00:00:00.000000000', '2021-11-08T00:00:01.000000000'], dtype='datetime64[ns]'), array([1, 2], dtype=int64))} |
| 119 | +{'my_name': [(datetime.datetime(2021, 11, 8, 0, 0), 1), (datetime.datetime(2021, 11, 8, 0, 0, 1), 2)]} |
112 | 120 | ``` |
113 | | - |
114 | | -Note that the result there is a tuple per output, containing two numpy arrays, one with the datetimes and one with the values. |
|
0 commit comments