Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Upgrade logging #113

Open
wants to merge 15 commits into
base: main
Choose a base branch
from
137 changes: 128 additions & 9 deletions megalista_dataflow/config/logging.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,18 @@

import logging
import sys
from error.logging_handler import LoggingHandler
import io
import traceback
from types import FrameType
from typing import Optional, Tuple, List, Any

from models.execution import Execution

class LoggingConfig:
@staticmethod
def config_logging(show_lines: bool = False):
# If there is a FileHandler, the execution is running on Dataflow
# In this scenario, we shouldn't change the formatter
logging_handler = LoggingHandler()
logging.getLogger().addHandler(logging_handler)
file_handler = LoggingConfig.get_file_handler()
if file_handler is None:
log_format = "[%(levelname)s] %(name)s: %(message)s"
Expand All @@ -36,9 +39,7 @@ def config_logging(show_lines: bool = False):
stream_handler = logging.StreamHandler(stream=sys.stderr)
logging.getLogger().addHandler(stream_handler)
stream_handler.setFormatter(formatter)

logging_handler.setFormatter(formatter)


logging.getLogger().setLevel(logging.ERROR)
logging.getLogger("megalista").setLevel(logging.INFO)

Expand All @@ -50,10 +51,9 @@ def get_stream_handler():
def get_file_handler():
return LoggingConfig.get_handler(logging.FileHandler)


@staticmethod
def get_logging_handler():
return LoggingConfig.get_handler(LoggingHandler)
return None

@staticmethod
def get_handler(type: type):
Expand All @@ -63,4 +63,123 @@ def get_handler(type: type):
result_handler = handler
break

return result_handler
return result_handler

class _LogWrapper:
def __init__(self, name: Optional[str]):
self._name = str(name)
self._logger = logging.getLogger(name)

def debug(self, msg: str, *args, **kwargs):
self._log(msg, logging.DEBUG, *args, **kwargs)

def info(self, msg: str, *args, **kwargs):
self._log(msg, logging.INFO, *args, **kwargs)

def warning(self, msg: str, *args, **kwargs):
self._log(msg, logging.WARNING, *args, **kwargs)

def error(self, msg: str, *args, **kwargs):
self._log(msg, logging.ERROR, *args, **kwargs)

def critical(self, msg: str, *args, **kwargs):
self._log(msg, logging.CRITICAL, *args, **kwargs)

def exception(self, msg: str, *args, **kwargs):
self._log(msg, logging.CRITICAL, *args, **kwargs)

def _log(self, msg: str, level: int, *args, **kwargs):
stacklevel = self._get_stacklevel(**kwargs)
msg = self._get_msg_execution(msg, **kwargs)
msg = self._get_msg_context(msg, **kwargs)
if level >= logging.ERROR:
_add_error(self._name, msg, stacklevel, level, args)
if level == logging.ERROR:
level = logging.WARNING
keys_to_remove = ['execution', 'context']
for key in keys_to_remove:
if key in kwargs:
del kwargs[key]
self._logger.log(level, msg, *args, **self._change_stacklevel(**kwargs))

def _change_stacklevel(self, **kwargs):
stacklevel = self._get_stacklevel(**kwargs)
return dict(kwargs, stacklevel = stacklevel)

def _get_stacklevel(self, **kwargs):
dict_kwargs = dict(kwargs)
stacklevel = 3
if 'stacklevel' in dict_kwargs:
stacklevel = 2 + dict_kwargs['stacklevel']
return stacklevel

def _get_msg_context(self, msg: str, **kwargs):
if 'context' in kwargs:
context = kwargs['context']
msg = f'[Context: {context}] {msg}'
return msg

def _get_msg_execution(self, msg: str, **kwargs):
if 'execution' in kwargs:
execution: Execution = kwargs['execution']
msg = f'[Execution: {execution.source.source_name} -> {execution.destination.destination_name}] {msg}'
return msg

def getLogger(name: Optional[str] = None):
return get_logger(name)

def get_logger(name: Optional[str] = None):
return _LogWrapper(name)

_error_list: List[logging.LogRecord] = []

def _add_error(name: str, msg: str, stacklevel: int, level: int, args):
fn, lno, func, sinfo = _get_stack_trace(stacklevel)
_error_list.append(logging.LogRecord(name, level, fn, lno, msg, args, None, func, sinfo))

def _get_stack_trace(stacklevel: int, stack_info: bool = True):
# from python logging module
f: Optional[FrameType] = sys._getframe(3)
if f is not None:
f = f.f_back
orig_f = f
while f and stacklevel > 1:
f = f.f_back
stacklevel -= 1
if not f:
f = orig_f
rv: Tuple[str, int, str, Optional[str]]= ("(unknown file)", 0, "(unknown function)", None)
if f is not None and hasattr(f, "f_code"):
co = f.f_code
sinfo = None
if stack_info:
sio = io.StringIO()
sio.write('Stack (most recent call last):\n')
traceback.print_stack(f, file=sio)
sinfo = sio.getvalue()
if sinfo[-1] == '\n':
sinfo = sinfo[:-1]
sio.close()
rv = (co.co_filename, f.f_lineno, co.co_name, sinfo)
return rv

def has_errors() -> bool:
return len(_error_list) > 0

def error_list() -> List[logging.LogRecord]:
return _error_list

def get_formatted_error_list() -> Optional[str]:
records = _error_list
if records is not None and len(records) > 0:
message = ''
for i in range(len(records)):
rec = records[i]
message += f'{i+1}. {rec.msg}\n... in {rec.pathname}:{rec.lineno}\n'
return message
else:
return None

def null_filter(el: Any) -> Any:
get_logger('megalista.LOG').info(f'Logging: {el}')
return el
9 changes: 8 additions & 1 deletion megalista_dataflow/data_sources/base_data_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,4 +30,11 @@ def retrieve_data(self, executions: ExecutionsGroupedBySource) -> List[DataRowsG
raise NotImplementedError("Source Type not implemented. Please check your configuration (sheet / json / firestore).")

def write_transactional_info(self, rows, execution):
raise NotImplementedError("Source Type not implemented. Please check your configuration (sheet / json / firestore).")
raise NotImplementedError("Source Type not implemented. Please check your configuration (sheet / json / firestore).")

@staticmethod
def _convert_row_to_dict(row):
dict = {}
for key, value in row.items():
dict[key] = value
return dict
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
from string import Template

import apache_beam as beam
import logging
from config import logging
from google.cloud import bigquery
from google.cloud.bigquery import SchemaField, Client
from apache_beam.io.gcp.bigquery import ReadFromBigQueryRequest
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,15 +13,14 @@
# limitations under the License.

import datetime
import pytest

from models.execution import AccountConfig, ExecutionsGroupedBySource
from models.execution import Destination
from models.execution import DestinationType
from models.execution import Execution
from models.execution import Source
from models.execution import SourceType
from models.execution import Batch
import pytest

from models.execution import TransactionalType

Expand Down
1 change: 0 additions & 1 deletion megalista_dataflow/data_sources/data_schemas.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
from configparser import MissingSectionHeaderError
from typing import List, Dict, Any
from models.execution import Destination, DestinationType, Execution, Batch
import logging
import functools
import pandas as pd
import ast
Expand Down
2 changes: 1 addition & 1 deletion megalista_dataflow/data_sources/file/file_data_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
from apache_beam.typehints.decorators import with_output_types
import numpy as np

import logging
from config import logging

from models.execution import SourceType, DestinationType, Execution, Batch, TransactionalType, ExecutionsGroupedBySource, DataRowsGroupedBySource
from models.options import DataflowOptions
Expand Down
2 changes: 1 addition & 1 deletion megalista_dataflow/data_sources/file/file_provider.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
"""

import io
import logging
from config import logging
from os.path import exists
from urllib.parse import ParseResultBytes

Expand Down
4 changes: 2 additions & 2 deletions megalista_dataflow/error/error_handling.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import base64
import logging
from config import logging
from email.mime.text import MIMEText
from typing import Iterable, Optional, Dict

Expand Down Expand Up @@ -163,7 +163,7 @@ def add_error(self, execution: Execution, error_message: str):

if execution.destination.destination_type != self._destination_type:
raise ValueError(
f'Received a error of destination type: {execution.destination.destination_type}'
f'Received an error of destination type: {execution.destination.destination_type}'
f' but this error handler is initialized with {self._destination_type} destination type')

error = Error(execution, error_message)
Expand Down
54 changes: 0 additions & 54 deletions megalista_dataflow/error/logging_handler.py

This file was deleted.

39 changes: 0 additions & 39 deletions megalista_dataflow/error/logging_handler_test.py

This file was deleted.

19 changes: 6 additions & 13 deletions megalista_dataflow/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
# See the License for the specific language governing permissions and
# limitations under the License.

import logging
import warnings

import apache_beam as beam
Expand All @@ -23,6 +22,7 @@

from error.error_handling import GmailNotifier
from config.logging import LoggingConfig
from config import logging

from models.execution import DataRowsGroupedBySource, Execution, ExecutionsGroupedBySource
from sources.batches_from_executions import ExecutionsGroupedBySourceCoder, DataRowsGroupedBySourceCoder, ExecutionCoder
Expand Down Expand Up @@ -90,16 +90,9 @@ def run(argv=None):
if __name__ == "__main__":
run()

logging_handler = LoggingConfig.get_logging_handler()
if logging_handler is None:
logging.getLogger("megalista").info(
f"MEGALISTA build {MEGALISTA_VERSION}: Clould not find error interception handler. Skipping error intereception.")
else:
if logging_handler.has_errors:
logging.getLogger("megalista").critical(
f'MEGALISTA build {MEGALISTA_VERSION}: Completed with errors')
raise SystemExit(1)
else:
logging.getLogger("megalista").info(
f"MEGALISTA build {MEGALISTA_VERSION}: Completed successfully!")
if logging.has_errors():
logging.get_logger("megalista").critical(f'MEGALISTA build {MEGALISTA_VERSION}: Completed with errors')
raise SystemExit(1)

logging.get_logger("megalista").info(f"MEGALISTA build {MEGALISTA_VERSION}: Completed successfully!")
exit(0)
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
# limitations under the License.

import hashlib
import logging
from config import logging
import re

from models.execution import Batch
Expand Down
Loading