Skip to content

Commit

Permalink
Counting increase in types of messages over days (#77)
Browse files Browse the repository at this point in the history
* counting increase in types of messages over days

* making tracker compatible with other datetime format

* adding comments

* saving progress

* making daily increase analyzer work by service

* fixing bugs with increase analyzer and turning off header analyzer because its essentially a fancy echo

* making code more readable
  • Loading branch information
kitrady authored Aug 3, 2024
1 parent 8e9cfbb commit d58e5fc
Show file tree
Hide file tree
Showing 4 changed files with 473 additions and 97 deletions.
227 changes: 132 additions & 95 deletions data/test_output_file.txt
Original file line number Diff line number Diff line change
@@ -1,56 +1,106 @@
Number of error lines: 22
Daily increases in error log message for Tracking service:
- On 2024-06-21, the number of error messages increased by 10 from the previous day

Number of error lines: 25

Duplicate Log Messages:
STARTING Tracking service
20 05:00:00+00:00 on topic internal from kafka.servers:9092
Could not find instrument for ric UTU.T, using provided identifier 3
instrument not found for sedol OIN38378
instrument not found for ric RUE.ST
prometheus client http server running
Closing client connection.
Could not find instrument for ric YT.ATS, using provided identifier 1
instrument not found for sedol DHI7337
Could not find instrument for sedol UY29387, trying ric RUE.ST
instrument not found for sedol BYP321337
Updating subscribed topics to: frozenset({'internal'})
20 09:00:00.001550+00:00
29 exchange=XSET
tracking_service Closed
market hours frequency
Caught exception N/A. Message: Unclosed client session NoneType: None
Could not find instrument for sedol UY29387, trying ric RUE.ST
sql.co/qty
20
29 exchange=ASDI
Could not find instrument for sedol DHI7337, trying ric YT.ATS
Caught up and static data complete; starting timer.
Version: 2729a
Could not find instrument for sedol BYP321337, trying ric YT.ATS
Updating subscribed topics to: frozenset({'internal'})
instrument not found for ric UYU.T
Caught exception N/A. Message: Unclosed connector NoneType: None
Updating prices
Could not find instrument for sedol RZZZZ2, trying ric UYU.T
====================================================
Could not find instrument for sedol DHI7337, trying ric YT.ATS
Updating subscribed topics to: frozenset({'internal_topic'})
instrument not found for ric RUE.ST
Tracking service is caught up
29 exchange=SXDF
29 exchange=XSS
instrument not found for ric JPQ.CC
20
Scheduling Error Handler in 150.0 seconds
instrument not found for sedol UY29387
Exception in message handler <bound method TrackingService.method of <app.tracking_service.TrackingService object at 0x7feba0d0>> TrackingService.on_order_change() missing 1 required positional argument: 'order_identifier'
instrument not found for ric YT.ATS
version', '2729a']
Kafka source starting for topic internal at current offset 7924032 end offset 7928950 on servers kafka.servers:9092
Could not find instrument for ric RUE.ST, using provided identifier 5
Initialized prometheus server.
instrument not found for ric YT.ATS
instrument not found for sedol BYP321337
prometheus client http server running on http://prometheus.co
Could not find instrument for ric JPQ.CC, using provided identifier 2
20 05:00:00+00:00 on topic internal from kafka.servers:9092
Could not find instrument for ric 837.RIJ, using provided identifier 4
Updating subscribed topics to: frozenset({'topic_internal'})
Closing client connection.
Could not find instrument for sedol BYP321337, trying ric YT.ATS
Could not find instrument for ric YT.ATS, using provided identifier 1
instrument not found for ric 837.RIJ
29
29 exchange=BSTT
29 exchange=ASDN
Caught exception N/A. Message: Unclosed connector NoneType: None
market', 'US']
Adding subscription for pid None
Kafka topic internal is caught up at offset 7928949
Exception in message handler <bound method TrackingService.method of <app.tracking_service.TrackingService object at 0x7feba0d0>> TrackingService.on_order_change() missing 1 required positional argument: 'order_identifier'
====================================================
instrument not found for sedol UY29387
prometheus client http server running
Caught exception N/A. Message: Unclosed client session NoneType: None
Could not find instrument for sedol OIN38378, trying ric 837.RIJ
Kafka source starting for topic internal at current offset 7924032 end offset 7928950 on servers kafka.servers:9092
Updating prices
29.
29 exchange=NMQ
Kafka topic internal is caught up at offset 7928949
29 and symbol not in instrument_list
Could not find instrument for sedol RZZZZ2, trying ric UYU.T
Could not find instrument for ric RUE.ST, using provided identifier 5
29 exchange=UVD
Could not find instrument for ric UTU.T, using provided identifier 3
instrument not found for sedol RZZZZ2
Could not find instrument for ric 837.RIJ, using provided identifier 4
Tracking service is caught up
tracking_service Closed
instrument not found for ric 837.RIJ
Could not find instrument for ric JPQ.CC, using provided identifier 2
instrument not found for sedol DHI7337
20 09:00:00.001550+00:00
Initialized Influx DB Client to host
STARTING Tracking service
Initialized prometheus server.
instrument not found for sedol OIN38378
29 exchange=YS

Number of log lines: 151
Number of log lines: 608

Lines that do not conform to log format:
- Hello I am a bad log line
- NoneType: None

Lines that are part of the startup header:
1 non-fatal stack trace(s) found:
- Traceback (most recent call last):
- File "<frozen runpy>", line 1938, in _run_module_as_main
- File "<frozen runpy>", line 88, in _run_code
- File ".build/2811/execution/execution_service", line 973, in <module>
- app.run(life_cycle_runner.run, life_cycle_runner.stop)
- File ".build/2811/app/application.py", line 219, in run
- run(self.start(my_date, main, stop))
- File ".build/2811/.venv/lib/python3.11/runners.py", line 190, in run
- return runner.run(main)
- ^^^^^^^^^^^^^^^^
- File ".build/2811/.venv/lib/python3.11/runners.py", line 118, in run
- return self._loop.run_until_complete(task)
- ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
- File ".build/2811/.venv/lib/python3.11/events.py", line 653, in run_until_done
- return future.result()
- ^^^^^^^^^^^^^^^
- File "2811/app/application.py", line 421, in start
- await self.task
- File "2811/messages/app/runner.py", line 449, in run
- await asyncio.gather(*self.running_tasks)
- File "2811/messages/processor.py", line 340, in run
- await self.dispatch(message)
- File ".build/2811/execution/execution_service", line 1315, in market_test
- if symbol and obj.region != 'NORTHAMERICA'
- ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
- AttributeError: 'NoneType' object has no attribute 'region'

Lines that are part of the startup header(s):
- STARTING Tracking service
- Start time: 2024-06-20 09:00:00.001550+00:00
- Version: 2729a
Expand All @@ -63,73 +113,60 @@ Lines that are part of the startup header:
- Start time: 2024-06-21 09:00:00.001550+00:00
- Version: 2729a
- Command line: ['.venv/bin/python3', '-m', 'app.tracking_service', '--market', 'US', '--version', '2729a']
- STARTING PROCESS Quantity Loader
- Start time: 2024-05-30 10:30:00.034654+00:00
- Version: 2673
- Command line: ['.venv/bin/python3', '-m', 'loaders.quantity_loader', '--market', 'US']
- STARTING PROCESS Batch Writer
- Start time: 2024-06-10 10:30:00.153687+00:00
- Version: 2692
- Command line: ['.venv/bin/python3', '-m', 'db.batch_writer', '--market', 'US']
- STARTING PROCESS Access Point
- Start time: 2025-10-01 12:30:00.022996+00:00
- Version: 2808
- Command line: ['.venv/bin/python3', '-m', 'process.access_point', '--market', 'US', '--ap', 'ARC']
- STARTING PROCESS Execution Server
- Start time: 2024-07-23 21:00:00.011070+00:00
- Version: 2811
- Command line: ['.venv/bin/python3', '-m', 'execution/execution_service', '--version', '2811']

------ a report has been reported ------

Number of error lines: 22
There has been no daily increase in specific types of messages for any service

Number of error lines: 0

Duplicate Log Messages:
instrument not found for ric UYU.T
Could not find instrument for sedol OIN38378, trying ric 837.RIJ
prometheus client http server running
instrument not found for sedol RZZZZ2
Caught exception N/A. Message: Unclosed connector NoneType: None
20 05:00:00+00:00 on topic internal from kafka.servers:9092
tracking_service Closed
instrument not found for ric RUE.ST
version', '2729a']
Version: 2729a
Could not find instrument for ric YT.ATS, using provided identifier 1
Adding subscription for pid None
Kafka source starting for topic internal at current offset 7924032 end offset 7928950 on servers kafka.servers:9092
20 09:00:00.001550+00:00
Could not find instrument for sedol UY29387, trying ric RUE.ST
Could not find instrument for ric RUE.ST, using provided identifier 5
market', 'US']
Reading topic java_topic from kafka servers brokers:9092
Starting KafkaReader at offset 1005022
Could not find symbol for sedol 27ER995. Only publishing externally.
Starting KafkaReader at offset 28181084
KafkaReader found no data on topic java_topic. This will only log once.
Could not find symbol for sedol 3393DS8. Only publishing externally.
Writing to topic bosun_internal to kafka servers brokers:9092
STARTING PROCESS Java Price Process
Reading from kafka servers brokers:9092
Starting with config Config{hostName='host'services='[JAVA_PRICE_PROCESS]'}
====================================================
market hours frequency
Closing client connection.
STARTING Tracking service
instrument not found for ric 837.RIJ
Updating subscribed topics to: frozenset({'internal'})
instrument not found for sedol OIN38378
Initialized prometheus server.
Could not find instrument for ric 837.RIJ, using provided identifier 4
Tracking service is caught up
Exception in message handler <bound method TrackingService.method of <app.tracking_service.TrackingService object at 0x7feba0d0>> TrackingService.on_order_change() missing 1 required positional argument: 'order_identifier'
20
Scheduling Error Handler in 150.0 seconds
Updating prices
instrument not found for ric JPQ.CC
Initialized Influx DB Client to host
instrument not found for sedol BYP321337
Could not find instrument for sedol DHI7337, trying ric YT.ATS
Could not find instrument for ric JPQ.CC, using provided identifier 2
instrument not found for sedol DHI7337
Could not find instrument for sedol RZZZZ2, trying ric UYU.T
instrument not found for ric YT.ATS
Could not find instrument for sedol BYP321337, trying ric YT.ATS
Could not find instrument for ric UTU.T, using provided identifier 3
Caught exception N/A. Message: Unclosed client session NoneType: None
Kafka topic internal is caught up at offset 7928949
instrument not found for sedol UY29387
Version: 2757
Writing to topic sip_last_trade_px to kafka servers brokers:9092
Could not find symbol for sedol 344232AS7. Only publishing externally.

Number of log lines: 151
Number of log lines: 157

Lines that do not conform to log format:
- Hello I am a bad log line
All lines conform to log line format

Lines that are part of the startup header:
- STARTING Tracking service
- Start time: 2024-06-20 09:00:00.001550+00:00
- Version: 2729a
- Command line: ['.venv/bin/python3', '-m', 'app.tracking_service', '--market', 'US', '--version', '2729a']
- STARTING Tracking service
- Start time: 2024-06-20 09:00:00.001550+00:00
- Version: 2729a
- Command line: ['.venv/bin/python3', '-m', 'app.tracking_service', '--market', 'US', '--version', '2729a']
- STARTING Tracking service
- Start time: 2024-06-21 09:00:00.001550+00:00
- Version: 2729a
- Command line: ['.venv/bin/python3', '-m', 'app.tracking_service', '--market', 'US', '--version', '2729a']
No non-fatal stack traces were found

Lines that are part of the startup header(s):
- STARTING PROCESS Java Price Process
- Start time: 2024-06-28 12:00:00.060562+00:00
- Version: 2757
- Command line: ['./build/app/java_process', '--market', 'US']
- STARTING PROCESS Java Price Process
- Start time: 2024-06-27 12:00:00.060562+00:00
- Version: 2757
- Command line: ['./build/app/java_process', '--market', 'US']

------ a report has been reported ------
5 changes: 3 additions & 2 deletions src/alogamous/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

from alogamous import (
analyzer,
daily_count_analyzer,
directory_reader,
error_counter_analyzer,
flag_duplicate_log_messages,
Expand All @@ -10,7 +11,6 @@
log_line_parser,
loginfo_analyzer,
stack_trace_analyzer,
startup_header_analyzer,
warning_analyzer,
)

Expand Down Expand Up @@ -39,14 +39,15 @@
analyzer.analyze_log_stream(
[
# echo_analyzer.EchoAnalyzer(),
daily_count_analyzer.DailyCountAnalyzer(line_parser),
error_counter_analyzer.ErrorCounterAnalyzer(),
flag_duplicate_log_messages.FlagDuplicateLogMessages(),
line_count_analyzer.LineCountAnalyzer(),
format_analyzer.FormatAnalyzer(line_parser),
warning_analyzer.WarningAnalyzer(),
loginfo_analyzer.InfoAnalyzer(line_parser),
stack_trace_analyzer.StackTraceAnalyzer(line_parser),
startup_header_analyzer.StartupHeaderAnalyzer(line_parser),
# startup_header_analyzer.StartupHeaderAnalyzer(line_parser),
warning_analyzer.WarningAnalyzer(),
],
reader.read(),
Expand Down
77 changes: 77 additions & 0 deletions src/alogamous/daily_count_analyzer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
import datetime

from alogamous import analyzer, log_line_parser


class DailyCountAnalyzer(analyzer.Analyzer):
def __init__(self, line_parser):
self.parser = line_parser
self.header_line_count = 0
self.current_service = ""
self.info_counts_by_service_by_day = {}
self.warning_counts_by_service_by_day = {}
self.error_counts_by_service_by_day = {}

def read_log_line(self, line):
parsed_line = self.parser.parse(line)
self._find_service_line(line, parsed_line)
if parsed_line["type"] == log_line_parser.LineType.LOG_LINE:
date_object = self._create_date_object(parsed_line)
line_level = parsed_line["level"].lower()
if line_level == "info":
self._update_dictionary_count(self.info_counts_by_service_by_day, self.current_service, date_object)
elif line_level.count("warn") == 1:
self._update_dictionary_count(self.warning_counts_by_service_by_day, self.current_service, date_object)
elif line_level == "error":
self._update_dictionary_count(self.error_counts_by_service_by_day, self.current_service, date_object)

def report(self, out_stream):
all_report_messages = []
all_report_messages.extend(self._format_report(self.info_counts_by_service_by_day, "info"))
all_report_messages.extend(self._format_report(self.warning_counts_by_service_by_day, "warning"))
all_report_messages.extend(self._format_report(self.error_counts_by_service_by_day, "error"))
if all_report_messages:
out_stream.write("\n".join(all_report_messages))
else:
out_stream.write("There has been no daily increase in specific types of messages for any service")

def _find_service_line(self, line, parsed_line):
if parsed_line["type"] == log_line_parser.LineType.HEADER_LINE:
self.header_line_count += 1
elif self.header_line_count % 2 == 1 and line.count("STARTING") == 1:
self.current_service = line.replace("STARTING ", "")

@staticmethod
def _create_date_object(parsed_line):
if parsed_line["datetime"].count(" ") == 1:
date_string = parsed_line["datetime"].split(" ")[0]
else:
date_string = parsed_line["datetime"].split("T")[0]
return datetime.datetime.strptime(date_string, "%Y-%m-%d").astimezone(datetime.timezone.utc).date()

@staticmethod
def _update_dictionary_count(counts_by_service_by_day, service, date):
if counts_by_service_by_day.get(service) is None:
counts_by_service_by_day[service] = {date: 1}
elif counts_by_service_by_day[service].get(date) is None:
counts_by_service_by_day[service][date] = 1
else:
counts_by_service_by_day[service][date] += 1

@staticmethod
def _format_report(counts_by_service_by_day, level):
report_messages = []
for service in counts_by_service_by_day:
difference_messages = []
sorted_days = sorted(counts_by_service_by_day[service])
for previous_day_index, day in enumerate(sorted_days[1:]):
previous_day = sorted_days[previous_day_index]
difference = counts_by_service_by_day[service][day] - counts_by_service_by_day[service][previous_day]
if difference > 0:
difference_messages.append(
f"- On {day}, the number of {level} messages increased by {difference} from the previous day"
)
if difference_messages:
report_messages.append(f"Daily increases in {level} log message for {service}:")
report_messages.extend(difference_messages)
return report_messages
Loading

0 comments on commit d58e5fc

Please sign in to comment.