Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

InfluxDB Metrics Backend #3601

Merged
merged 11 commits into from
Jul 1, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 9 additions & 3 deletions code/__DEFINES/subsystems.dm
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,8 @@
#define SS_INIT_INPUT 85
#define SS_INIT_FAIL_TO_TOPIC 84
#define SS_INIT_TOPIC 83
#define SS_INIT_RUST 26
#define SS_INIT_RUST 30
#define SS_INIT_INFLUXDRIVER 28
#define SS_INIT_SUPPLY_SHUTTLE 25
#define SS_INIT_GARBAGE 24
#define SS_INIT_EVENTS 23.5
Expand All @@ -133,7 +134,9 @@
#define SS_INIT_MORE_INIT 16
#define SS_INIT_AIR 15
#define SS_INIT_TELEPORTER 13
#define SS_INIT_LIGHTING 12
#define SS_INIT_INFLUXMCSTATS 12
#define SS_INIT_INFLUXSTATS 11
#define SS_INIT_LIGHTING 10
#define SS_INIT_DEFCON 9
#define SS_INIT_LAW 6
#define SS_INIT_FZ_TRANSITIONS 5
Expand Down Expand Up @@ -212,12 +215,15 @@
#define SS_PRIORITY_UNSPECIFIED 30
#define SS_PRIORITY_PROCESS 25
#define SS_PRIORITY_SOUNDSCAPE 24
#define SS_PRIORITY_INFLUXDRIVER 23
#define SS_PRIORITY_PAGER_STATUS 22
#define SS_PRIORITY_LIGHTING 20
#define SS_PRIORITY_TRACKING 19
#define SS_PRIORITY_DATABASE 15
#define SS_PRIORITY_MINIMAPS 11
#define SS_PRIORITY_PING 10
#define SS_PRIORITY_DATABASE 15
#define SS_PRIORITY_INFLUXMCSTATS 9
#define SS_PRIORITY_INFLUXSTATS 8
#define SS_PRIORITY_PLAYTIME 5
#define SS_PRIORITY_PERFLOGGING 4
#define SS_PRIORITY_CORPSESPAWNER 3
Expand Down
5 changes: 5 additions & 0 deletions code/__HELPERS/text.dm
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,11 @@
t = "0[t]"
return t

/proc/pad_trailing(text, padding, size)
while (length(text) < size)
text = "[text][padding]"
return text

//Adds 'u' number of spaces ahead of the text 't'
/proc/add_lspace(t, u)
while(length(t) < u)
Expand Down
1 change: 1 addition & 0 deletions code/_globalvars/global_lists.dm
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ GLOBAL_LIST_EMPTY(GeneralFaxes) //Inter-machine faxes
GLOBAL_LIST_EMPTY(fax_contents) //List of fax contents to maintain it even if source paper is deleted

GLOBAL_LIST_EMPTY(failed_fultons) //A list of fultoned items which weren't collected and fell back down
GLOBAL_LIST_EMPTY(larva_burst_by_hive)

GLOBAL_LIST_INIT_TYPED(custom_huds_list, /datum/custom_hud, setup_all_huds())
GLOBAL_LIST_INIT_TYPED(custom_human_huds, /datum/custom_hud, setup_human_huds())
Expand Down
19 changes: 19 additions & 0 deletions code/controllers/configuration/entries/general.dm
Original file line number Diff line number Diff line change
Expand Up @@ -529,6 +529,25 @@ This maintains a list of ip addresses that are able to bypass topic filtering.

/datum/config_entry/string/round_results_webhook_url

/// InfluxDB v2 Host to connect to for sending statistics (over HTTP API)
/datum/config_entry/string/influxdb_host
/// InfluxDB v2 Bucket to send staistics to
/datum/config_entry/string/influxdb_bucket
/// InfluxDB v2 Organization to access buckets of
/datum/config_entry/string/influxdb_org
/// InfluxDB v2 API Token to access the organization and bucket
/datum/config_entry/string/influxdb_token

/// How often to snapshot general game statistics to influxdb driver
/datum/config_entry/number/influxdb_stats_period
config_entry_value = 30
/// How often to snapshot MC statistics
/datum/config_entry/number/influxdb_mcstats_period
config_entry_value = 60
/// How often to send queued influxdb statistics
/datum/config_entry/number/influxdb_send_period
config_entry_value = 10

/// logs all timers in buckets on automatic bucket reset (Useful for timer debugging)
/datum/config_entry/flag/log_timers_on_bucket_reset

Expand Down
132 changes: 132 additions & 0 deletions code/controllers/subsystem/influxdriver.dm
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
/// Sends collected statistics to an influxdb v2 backend periodically
SUBSYSTEM_DEF(influxdriver)
name = "InfluxDB Driver"
wait = 10 SECONDS
init_order = SS_INIT_INFLUXDRIVER
priority = SS_PRIORITY_INFLUXDRIVER
runlevels = RUNLEVELS_DEFAULT|RUNLEVEL_LOBBY

var/list/send_queue = list()

/// Maximum amount of metric lines to send at most in one request
/// This is neccessary because sending a lot of metrics can get expensive
/// and drive the subsystem into overtime, but we can't split the work as it'd be even less efficient
var/max_batch = 150

/// Last timestamp in microseconds
var/timestamp_cache_realtime
/// Last tick time the timestamp was taken at
var/timestamp_cache_worldtime

/datum/controller/subsystem/influxdriver/Initialize()
var/period = text2num(CONFIG_GET(number/influxdb_send_period))
if(isnum(period))
wait = max(period * (1 SECONDS), 2 SECONDS)
return SS_INIT_SUCCESS

/datum/controller/subsystem/influxdriver/stat_entry(msg)
msg += "period=[wait] queue=[length(send_queue)]"
return ..()

/datum/controller/subsystem/influxdriver/proc/unix_timestamp_string() // pending change to rust-g
return RUSTG_CALL(RUST_G, "unix_timestamp")()

/datum/controller/subsystem/influxdriver/proc/update_timestamp()
PRIVATE_PROC(TRUE)
// We make only one request to rustg per game tick, so we cache the result per world.time
var/whole_timestamp = unix_timestamp_string() // Format "7129739474.4758981" - timestamp with up to 7-8 decimals
var/list/tsparts = splittext(whole_timestamp, ".")
var/fractional = copytext(pad_trailing(tsparts[2], "0", 6), 1, 7) // in microseconds
timestamp_cache_worldtime = world.time
timestamp_cache_realtime = "[tsparts[1]][fractional]"

/datum/controller/subsystem/influxdriver/fire(resumed)
var/maxlen = min(length(send_queue)+1, max_batch)
var/list/queue = send_queue.Copy(1, maxlen)
send_queue.Cut(1, maxlen)
flush_queue(queue)

/// Flushes measurements batch to InfluxDB backend
/datum/controller/subsystem/influxdriver/proc/flush_queue(list/queue)
PRIVATE_PROC(TRUE)

var/host = CONFIG_GET(string/influxdb_host)
var/token = CONFIG_GET(string/influxdb_token)
var/bucket = CONFIG_GET(string/influxdb_bucket)
var/org = CONFIG_GET(string/influxdb_org)

if(!host || !token || !bucket || !org)
can_fire = FALSE
return

if(!length(queue))
return // Nothing to do

var/url = "[host]/api/v2/write?org=[org]&bucket=[bucket]&precision=us" // microseconds
var/list/headers = list()
headers["Authorization"] = "Token [token]"
headers["Content-Type"] = "text/plain; charset=utf-8"
headers["Accept"] = "application/json"

var/datum/http_request/request = new
var/payload = ""
for(var/line in queue)
payload += "[line]\n"
request.prepare(RUSTG_HTTP_METHOD_POST, url, payload, headers)
request.begin_async()
// TODO possibly check back result of request later

/// Enqueues sending to InfluxDB Backend selected measurement values - round_id and timestamp are filled in automatically
/datum/controller/subsystem/influxdriver/proc/enqueue_stats(measurement, list/tags, list/fields)
. = FALSE
var/valid = FALSE
var/serialized = "[measurement],round_id=[GLOB.round_id]"
if(tags)
for(var/tag in tags)
var/serialized_tag = serialize_field(tag, tags[tag])
if(serialized_tag)
serialized += ",[serialized_tag]"
serialized += " "
var/comma = ""
for(var/field in fields)
var/serialized_field = serialize_field(field, fields[field])
if(serialized_field)
valid = TRUE
serialized += "[comma][serialized_field]"
comma = ","
if(!valid)
CRASH("Attempted to serialize to InfluxDB backend an invalid measurement (likely has no fields)")
if(timestamp_cache_worldtime != world.time)
update_timestamp()
serialized += " [timestamp_cache_realtime]"
send_queue += serialized
return TRUE

/// Enqueues sending varied stats in a dumb and simpler format directly as: measurement count=
/datum/controller/subsystem/influxdriver/proc/enqueue_stats_crude(measurement, value, field_name = "count")
. = FALSE
var/serialized_field = serialize_field(field_name, value)
if(!length(serialized_field))
return
if(timestamp_cache_worldtime != world.time)
update_timestamp()
var/serialized = "[measurement],round_id=[GLOB.round_id] [serialized_field] [timestamp_cache_realtime]"
send_queue += serialized
return TRUE

/// Puts a single field or tag value into InfluxDB Line format
/datum/controller/subsystem/influxdriver/proc/serialize_field(field, value)
var/static/regex/whitelistedCharacters = regex(@{"([^a-zA-Z0-9_]+)"}, "g")
var/sanitized_field = whitelistedCharacters.Replace("[field]", "")
if(!length(sanitized_field) || copytext(sanitized_field, 1, 2) == "_")
CRASH("Invalid tag/field for InfluxDB serialization: '[sanitized_field]' (original: '[field]')")
var/sanitized_value
if(isnum(value))
sanitized_value = value
else if(istext(value))
sanitized_value = whitelistedCharacters.Replace("[value]", "")
if(!length(sanitized_value) || copytext(sanitized_value, 1, 2) == "_")
CRASH("Invalid value for InfluxDB serialization: '[sanitized_value]' (original: '[value]')")
else
CRASH("Invalid value type passed for InfluxDB serialization: '[value]'")
return "[sanitized_field]=[sanitized_value]"
47 changes: 47 additions & 0 deletions code/controllers/subsystem/influxmcstats.dm
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
SUBSYSTEM_DEF(influxmcstats)
name = "InfluxDB MC Stats"
wait = 60 SECONDS
priority = SS_PRIORITY_INFLUXMCSTATS
init_order = SS_INIT_INFLUXMCSTATS
runlevels = RUNLEVEL_LOBBY|RUNLEVELS_DEFAULT
flags = SS_KEEP_TIMING
var/checkpoint = 0
var/list/subsystem_name_cache = list()

/datum/controller/subsystem/influxmcstats/Initialize()
var/period = text2num(CONFIG_GET(number/influxdb_mcstats_period))
if(isnum(period))
wait = max(period * (1 SECONDS), 10 SECONDS)
return SS_INIT_SUCCESS

/datum/controller/subsystem/influxmcstats/stat_entry(msg)
msg += "period=[wait] checkpoint=[checkpoint]"
return ..()

/datum/controller/subsystem/influxmcstats/fire(resumed)
if(!SSinfluxdriver.can_fire)
can_fire = FALSE
return

var/list/data = list()
data["time_dilation_current"] = SStime_track.time_dilation_current
data["time_dilation_avg"] = SStime_track.time_dilation_avg
data["time_dilation_avg_slow"] = SStime_track.time_dilation_avg_slow
data["time_dilation_avg_fast"] = SStime_track.time_dilation_avg_fast
SSinfluxdriver.enqueue_stats("tidi", null, data)

SSinfluxdriver.enqueue_stats("cpu", null, list("cpu" = world.cpu, "map_cpu" = world.map_cpu))

var/static/regex/get_last_path_element = regex(@{"/([^/]+)$"})
checkpoint++
for(var/datum/controller/subsystem/SS in Master.subsystems)
if(!SS.can_fire)
continue
if(!subsystem_name_cache[SS.type])
get_last_path_element.Find("[SS.type]")
subsystem_name_cache[SS.type] = "SS[get_last_path_element.group[1]]"
var/SSname = subsystem_name_cache[SS.type]
if(!SSname)
stack_trace("Influx MC Stats couldnt name a subsystem, type=[SS.type]")
continue
SSinfluxdriver.enqueue_stats("sstimings", list("ss" = SSname), list("cost" = SS.cost, "tick_overrun" = SS.tick_overrun, "tick_usage" = SS.tick_usage, "wait" = SS.wait))
Loading