-
Notifications
You must be signed in to change notification settings - Fork 9
/
workflow_runner.py
167 lines (140 loc) · 5.99 KB
/
workflow_runner.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
import logging
import threading
from argparse import Namespace
from dataclasses import dataclass, field
from enum import Enum, auto
from time import sleep
from typing import Dict, List
import prometheus_client
import yaml
import internal_data_probe
from ccloud.org import CCloudOrgList
from helpers import (
env_parse_replace,
logged_method,
set_breadcrumb_flag,
set_logger_level,
)
LOGGER = logging.getLogger(__name__)
@dataclass(kw_only=True)
class AppProps:
days_in_memory: int = field(default=30)
relative_output_dir: str = field(default="output")
loglevel: str = field(default="INFO")
@logged_method
def get_app_props(in_config: Dict):
global APP_PROPS
if not in_config["system"]:
LOGGER.warning("No System Configuration found. Using default values.")
APP_PROPS = AppProps()
else:
LOGGER.debug("System Configuration found. Using values from config file.")
config: Dict = in_config["system"]
LOGGER.debug("Parsing loglevel from config file")
log_lvl: str = config.get("log_level", "INFO").upper()
match log_lvl:
case "DEBUG":
LOGGER.info("Setting loglevel to DEBUG")
loglevel = logging.DEBUG
case "INFO":
LOGGER.info("Setting loglevel to INFO")
loglevel = logging.INFO
case "WARNING":
LOGGER.info("Setting loglevel to WARNING")
loglevel = logging.WARNING
case "ERROR":
LOGGER.info("Setting loglevel to ERROR")
loglevel = logging.ERROR
case _:
LOGGER.info(
f"Cannot understand log level {log_lvl}. Setting loglevel to INFO"
)
loglevel = logging.INFO
set_logger_level(loglevel)
breadcrumbs = config.get("enable_method_breadcrumbs", False)
breadcrumbs = bool(breadcrumbs if breadcrumbs is True else False)
set_breadcrumb_flag(breadcrumbs)
LOGGER.info("Parsing Core Application Properties")
APP_PROPS = AppProps(
days_in_memory=config.get("days_in_memory", 7),
relative_output_dir=config.get("output_dir_name", "output"),
loglevel=loglevel,
)
class WorkflowStage(Enum):
GATHER = auto()
CALCULATE_OUTPUT = auto()
SLEEP = auto()
@logged_method
def try_parse_config_file(config_yaml_path: str) -> Dict:
LOGGER.debug("Trying to parse Configuration File: " + config_yaml_path)
with open(config_yaml_path, "r") as config_file:
core_config = yaml.safe_load(config_file)
LOGGER.debug("Successfully parsed Configuration File: " + config_yaml_path)
LOGGER.debug("Parsing Environment Variables")
env_parse_replace(core_config)
LOGGER.debug("Successfully parsed Environment Variables")
return core_config
@logged_method
def run_gather_cycle(ccloud_orgs: CCloudOrgList):
# This will try to refresh and read all the data that might be new from the last gather phase.
# Org Object has built in safeguard to prevent repetitive gathering for the same datasets.
# for Cloud Objects --> 30 minutes is the minimum.
# for Metrics API objects --> persistence store knows what all has been cached and written to disk and will not be
# gathered again. For billing CSV files --> if the data is already read in memory, it won't be read in again.
ccloud_orgs.execute_requests()
@logged_method
def run_calculate_cycle(ccloud_orgs: CCloudOrgList):
ccloud_orgs.run_calculations()
@logged_method
def execute_workflow(arg_flags: Namespace):
LOGGER.info("Starting Workflow Runner")
LOGGER.debug("Debug Mode is ON")
LOGGER.debug("Parsing config file")
core_config = try_parse_config_file(config_yaml_path=arg_flags.config_file)
LOGGER.debug("Successfully parsed config file")
LOGGER.debug("Setting up Core App Properties")
get_app_props(core_config["config"])
thread_configs: List = [
# [COMMON_THREAD_RUNNER, current_memory_usage, 5],
# [METRICS_PERSISTENCE_STORE, sync_to_file, METRICS_PERSISTENCE_STORE.flush_to_disk_interval_sec],
# [CHARGEBACK_PERSISTENCE_STORE, sync_to_file, CHARGEBACK_PERSISTENCE_STORE.flush_to_disk_interval_sec],
# [BILLING_PERSISTENCE_STORE, sync_to_file, BILLING_PERSISTENCE_STORE.flush_to_disk_interval_sec],
]
threads_list = list()
for _, item in enumerate(thread_configs):
threads_list.append(
item[0].get_new_thread(target_func=item[1], tick_duration_secs=item[2])
)
try:
LOGGER.info("Starting all threads")
for item in threads_list:
item.start()
LOGGER.debug("Starting Prometheus Server")
prometheus_client.start_http_server(8000)
LOGGER.debug("Starting Internal API Server for state sharing and readiness")
threading.Thread(
target=internal_data_probe.internal_api.run,
kwargs={"host": "0.0.0.0", "port": 8001},
).start()
# This step will initialize the CCloudOrg structure along with all the internal Objects in it.
# Those will include the first run for all the data gather step as well.
# There are some safeguards already implemented to prevent request choking, it should be safe in most use cases.
LOGGER.info("Initializing Core CCloudOrgList Object")
CCloudOrgList(
in_orgs=core_config["config"]["org_details"],
in_days_in_memory=APP_PROPS.days_in_memory,
)
LOGGER.info("Initialization Complete.")
internal_data_probe.set_readiness(readiness_flag=True)
# This is the main loop for the application.
LOGGER.info("Starting Main Loop")
while True:
sleep(10**8)
finally:
LOGGER.info("Shutting down all threads")
# Begin shutdown process.
for item in thread_configs:
item[0].stop_sync()
LOGGER.info("Waiting for State Sync ticker for Final sync before exit")
for item in threads_list:
item.join()