diff --git a/.gitignore b/.gitignore index b0a81d4..482d53f 100644 --- a/.gitignore +++ b/.gitignore @@ -1,27 +1,21 @@ *.py[cod] MANIFEST -.idea - -# C extensions -*.so # Packages +/.eggs *.egg *.egg-info -dist -build -eggs -parts -bin -var -sdist -develop-eggs -.installed.cfg -lib -lib64 +/dist +/build +/eggs +/bin +/var +/sdist +/develop-eggs +/.installed.cfg # Installer logs -pip-log.txt +/pip-log.txt # Unit test / coverage reports .coverage @@ -31,7 +25,11 @@ nosetests.xml # Translations *.mo -# Mr Developer -.mr.developer.cfg -.project -.pydevproject +# IDEs +/.idea +/.geany +/.mr.developer.cfg +/.project +/.pydevproject +/.settings +/.release diff --git a/README.rst b/README.rst index e637785..03d92a1 100644 --- a/README.rst +++ b/README.rst @@ -1,65 +1,85 @@ -python-logstash -=============== +===================== +python-logstash-async +===================== -Python logging handler for Logstash. +Python logging handler for asynchronous event processing and transport to Logstash. http://logstash.net/ -Changelog -========= -0.4.6 - - Updated field names to match java counterparts supported by logstash crew -0.4.5 - - Allow passing exchange's routing key to AMQP handler -0.4.4 - - Fixed urllib import python3 compatibility. - - Added long type to easy_types. -0.4.3 - - Added AMQP handler. -0.4.2 - - Updated README - - Added ``tags`` parameter to handler -0.4.1 - - Added TCP handler. -0.3.1 - - Added support for Python 3 -0.2.2 - - Split Handler into Handler and Formatter classes -0.2.1 - - Added support for the new JSON schema in Logstash 1.2.x. See details in - http://tobrunet.ch/2013/09/logstash-1-2-0-upgrade-notes-included/ and - https://logstash.jira.com/browse/LOGSTASH-675 - - - Added ``version`` parameter. Available values: 1 (Logstash 1.2.x version format), 0 - default (previous version). + +About +----- + +This Python logging handler is a fork of +https://github.com/vklochan/python-logstash. + +It adds the following features: + + * Asynchronous transport of log events + * Store log events temporarily in a SQLite database until transport + to the Logstash server has been successful + * Transport of events via TCP and UDP, in the future hopefully via + the Beats protocol + * TCP transport optionally SSL-encrypted + * Special formatter ready to be used in Django projects + + +Asynchronous processing +^^^^^^^^^^^^^^^^^^^^^^^ + +Unlike the original ``python-logstash``, this handler will try to +handle log events as fast as possible so that the sending program +code can continue with its primary job. +In other words, for web applications or web services it is important +to not slow down request times due to logging delays, e.g. waiting +for network timeouts to the Logstash server or similar. + +So this handler will accept log events and pass them for further +processing to a separate worker thread which will try to send +the events to the configured Logstash server asynchronously. +If sending the events fails, the events are stored in a +local SQLite database for a later sending attempt. + +Whenever the application stops, to be more exact whenever +Python' logging subsystem is shutdown, the worker thread +is signaled to send any queued events and clean up itself +before shutdown. + +The sending intervals and timeouts can be configured in the +``logstash_async.constants`` module by the corresponding +module-level constants, see below for details. Installation -============ +------------ Using pip:: - pip install python-logstash + pip install python-logstash-async Usage -===== +----- -``LogstashHandler`` is a custom logging handler which sends Logstash messages using UDP. +`AsynchronousLogstashHandler` is a custom logging handler which +sends Logstash messages using UDP and TCP. For example: -For example:: +.. code:: python import logging - import logstash import sys + from logstash_async.handler import AsynchronousLogstashHandler host = 'localhost' + port = 5959 test_logger = logging.getLogger('python-logstash-logger') test_logger.setLevel(logging.INFO) - test_logger.addHandler(logstash.LogstashHandler(host, 5959, version=1)) - # test_logger.addHandler(logstash.TCPLogstashHandler(host, 5959, version=1)) + test_logger.addHandler(AsynchronousLogstashHandler( + host, port, database_path='logstash.db'))) + # test_logger.addHandler(AsynchronousLogstashHandler(host, port)) - test_logger.error('python-logstash: test logstash error message.') - test_logger.info('python-logstash: test logstash info message.') - test_logger.warning('python-logstash: test logstash warning message.') + test_logger.error('python-logstash-async: test logstash error message.') + test_logger.info('python-logstash-async: test logstash info message.') + test_logger.warning('python-logstash-async: test logstash warning message.') # add extra field to logstash message extra = { @@ -72,47 +92,56 @@ For example:: } test_logger.info('python-logstash: test extra fields', extra=extra) -When using ``extra`` field make sure you don't use reserved names. From `Python documentation `_. - | "The keys in the dictionary passed in extra should not clash with the keys used by the logging system. (See the `Formatter `_ documentation for more information on which keys are used by the logging system.)" - -To use the AMQPLogstashHandler you will need to install pika first. - - pip install pika - -For example:: +When using the ``extra`` field make sure you don't use reserved names. +From `Python documentation `_:: - import logging - import logstash - - test_logger = logging.getLogger('python-logstash-logger') - test_logger.setLevel(logging.INFO) - test_logger.addHandler(logstash.AMQPLogstashHandler(host='localhost', version=1)) + "The keys in the dictionary passed in extra should not clash + with the keys used by the logging system. + (See the `Formatter `_ documentation + for more information on which keys are used by the logging system.)" - test_logger.info('python-logstash: test logstash info message.') - try: - 1/0 - except: - test_logger.exception('python-logstash-logger: Exception with stack trace!') +You can also specify an additional extra dictionary in the logging configuration with static +values like the application name, environment, etc. These values will be merged with any +extra dictionary items passed in the logging call into the configured extra prefix. +Usage with Django +----------------- -Using with Django -================= +Modify your ``settings.py`` to integrate ``python-logstash-async`` with Django's logging: -Modify your ``settings.py`` to integrate ``python-logstash`` with Django's logging:: +.. code:: python LOGGING = { ... + 'formatters': { + ... + 'logstash': { + '()': 'logstash_async.formatter.DjangoLogstashFormatter', + 'message_type': 'python-logstash', + 'fqdn': False, # Fully qualified domain name. Default value: false. + 'extra_prefix': 'dev', # + 'extra': { + 'application': PROJECT_APP, + 'project_path': PROJECT_APP_PATH, + 'environment': 'production' + } + }, + }, 'handlers': { + ... 'logstash': { 'level': 'DEBUG', - 'class': 'logstash.LogstashHandler', - 'host': 'localhost', - 'port': 5959, # Default value: 5959 - 'version': 1, # Version of logstash event schema. Default value: 0 (for backward compatibility of the library) - 'message_type': 'logstash', # 'type' field in logstash message. Default value: 'logstash'. - 'fqdn': False, # Fully qualified domain name. Default value: false. - 'tags': ['tag1', 'tag2'], # list of tags. Default: None. + 'class': 'logstash_async.handler.AsynchronousLogstashHandler', + 'transport': 'logstash_async.transport.TcpTransport', + 'host': 'logstash.host.tld', + 'port': 5959, + 'ssl_enable': True, + 'ssl_verify': True, + 'ca_certs': 'etc/ssl/certs/logstash_ca.crt', + 'certfile': '/etc/ssl/certs/logstash.crt', + 'keyfile': '/etc/ssl/private/logstash.key', + 'database_path': '{}/logstash.db'.format(PROJECT_ROOT), }, }, 'loggers': { @@ -125,19 +154,198 @@ Modify your ``settings.py`` to integrate ``python-logstash`` with Django's loggi ... } -Note -==== +This would result in a Logstash event like the following +(note: to some extend dependent of your Logstash configuration): -Example Logstash Configuration (``logstash.conf``) for Receiving Events from python-logstash is:: +.. code:: json - input { - tcp { - port => 5000 - codec => json + { + "@timestamp": "2016-10-23T15:11:16.853Z", + "@version": "1", + "extra": { + "application": "django_example", + "django_version": "1.10.2", + "environment": "production", + "func_name": "get_response", + "interpreter": "/home/enrico/example/venv/bin/python", + "interpreter_version": "2.7.12", + "line": 152, + "logger_name": "django.request", + "path": "/home/enrico/example/venv/lib/python2.7/site-packages/django/core/handlers/base.py", + "process_name": "MainProcess", + "project_path": "/home/enrico/example/app", + "req_host": "localhost", + "req_method": "GET", + "req_referer": "", + "req_remote_address": "127.0.0.1", + "req_uri": "http://localhost/hosts/nonexistent/", + "req_user": "enrico", + "req_useragent": "Mozilla/5.0 (Windows NT 6.1; WOW64; rv:40.0) Gecko/20100101 Firefox/40.1", + "request": "", + "status_code": 404, + "thread_name": "Thread-1" + }, + "host": "my.host.tld", + "level": "WARNING", + "logsource": "endor.l8failed.net", + "message": "Not Found: /hosts/nonexistent/", + "pid": 23605, + "port": 56170, + "program": "manage.py", + "type": "python-logstash" } - } - output { - stdout { - codec => rubydebug + + +Configuration +------------- + +Options for configuring the log handler +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ + +host + The host of the Logstash server (no default) + +port + The port of the Logstash server (default 5959) + +transport + Callable or path to a compatible transport class + (default: 'logstash_async.transport.TcpTransport'). + + You can specify your own transport class, e.g. to implement + a transport via Redis or the Beats protocol. + If you support pass a string, it should be a path to a + class which can be imported. + If you pass anything else, it should be an object of a class + with a similar interface as `logstash_async.transport.TcpTransport`. + Especially it should provide a `close()` and a `send()` method. + +ssl_enable + Should SSL be enabled for the connection? (default: False) + Only used for `logstash_async.transport.TcpTransport`. + +ssl_verify + Should the server's SSL certificate be verified? (default: True) + Only used for `logstash_async.transport.TcpTransport`. + +keyfile + The path to client side SSL key file (default: None) + Only used for `logstash_async.transport.TcpTransport`. + +certfile + The path to client side SSL certificate file (default: None) + Only used for `logstash_async.transport.TcpTransport`. + +ca_certs + The path to the file containing recognized CA certificates + (default: None) + Only used for `logstash_async.transport.TcpTransport`. + +database_path + The path to the file containing queued events (default: ':memory:') + +enable + Flag to enable log processing (default is True, disabling + might be handy for local testing, etc.) + + +Options for configuring the log formatter +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ + +The following settings are only valid for the provided formatters +`logstash_async.handler.LogstashFormatter` and +`logstash_async.handler.DjangoLogstashFormatter`. + +You can use any other formatter by configuring Python's logging +system accordingly. Any other formatter's `format()` method just +should return valid JSON suitable to be sent to Logstash +(see `Example Logstash Configuration`_ below). + +Options: + +message_type + The `type` field in the message sent to Logstash + (default: 'python-logstash') + +tags + Additional tags to include in the Logstash message (default: None) + +fqdn + Use the system's FQDN (fully qualified domain name) in the `host` + field of the message sent to Logstash. + `socket.getfqdn()` is used to retrieve the FQDN, otherwise + `socket.gethostname()` is used for the default hostname. + (default: False) + +extra_prefix + Name of the field in the resulting message sent to Logstash where + all additional fields are grouped into. Consider it as some sort + of namespace for all non-standard fields in the log event. + This field will take any items passed in as extra fields via + the `extra` configuration option (see below) as well as any extra + items passed in the logging call. + + To disable grouping of the extra items and have them on the top + level of the log event message, simply set this option to `None` + or the empty string. + (default: 'extra') + +extra + Dictionary with static items to be included in the message sent + to Logstash. This dictionary will be merged with any other extra + items passed in the logging call. + (default: None) + + +Options for the asynchronous processing (in module logstash_async.constants) +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ + +SOCKET_TIMEOUT + Timeout in seconds for TCP connections (default: 5.0) + +QUEUE_CHECK_INTERVAL + Interval in seconds to check the internal queue for new messages + to be cached in the database (default: 2.0) + +QUEUED_EVENTS_FLUSH_INTERVAL + Interval in seconds to send cached events from the database + to Logstash (default 10.0) + +QUEUED_EVENTS_FLUSH_COUNT + Count of cached events to send cached events from the database + to Logstash; events are sent to Logstash whenever + `QUEUED_EVENTS_FLUSH_COUNT` or `QUEUED_EVENTS_FLUSH_INTERVAL` is reached, + whatever happens first (default 50) + + +Example Logstash Configuration +------------------------------ + +Example ``logstash.conf`` for unencrypted TCP transport:: + + input { + tcp { + host => "127.0.0.1" + port => 5959 + mode => server + codec => json + } + } + + +Example ``logstash.conf`` for SSL-encrypted TCP transport:: + + input { + tcp { + host => "127.0.0.1" + port => 5958 + mode => server + codec => json + + ssl_enable => true + ssl_verify => true + ssl_extra_chain_certs => ["/etc/ssl/certs/logstash_ca.crt"] + ssl_cert => "/etc/ssl/certs/logstash.crt" + ssl_key => "/etc/ssl/private/logstash.key" + } } - } diff --git a/example1.py b/example1.py index ad4d8e9..6e4cfe6 100644 --- a/example1.py +++ b/example1.py @@ -1,17 +1,27 @@ +# -*- coding: utf-8 -*- + import logging -import logstash import sys +from logstash_async.handler import AsynchronousLogstashHandler + host = 'localhost' +port = 5959 test_logger = logging.getLogger('python-logstash-logger') +test_logger = logging.getLogger('') test_logger.setLevel(logging.INFO) -test_logger.addHandler(logstash.LogstashHandler(host, 5959, version=1)) -# test_logger.addHandler(logstash.TCPLogstashHandler(host, 5959, version=1)) +test_logger.addHandler(AsynchronousLogstashHandler(host, port, database_path='logstash_test.db')) + +test_logger.error('python-logstash-async: test logstash error message.') +test_logger.info('python-logstash-async: test logstash info message.') +test_logger.warning('python-logstash-async: test logstash warning message.') +test_logger.debug('python-logstash-async: test logstash debug message.') -test_logger.error('python-logstash: test logstash error message.') -test_logger.info('python-logstash: test logstash info message.') -test_logger.warning('python-logstash: test logstash warning message.') +try: + 1 / 0 +except Exception, e: + test_logger.exception(u'Exception: %s', e) # add extra field to logstash message extra = { diff --git a/example2.py b/example2.py index 165ed6d..4feacf7 100644 --- a/example2.py +++ b/example2.py @@ -1,30 +1,39 @@ +# -*- coding: utf-8 -*- + import logging -import logstash +import sys + +from logstash_async.handler import AsynchronousLogstashHandler -# AMQP parameters host = 'localhost' -username = 'guest' -password= 'guest' -exchange = 'logstash.py' +port = 5959 -# get a logger and set logging level test_logger = logging.getLogger('python-logstash-logger') +test_logger = logging.getLogger('') test_logger.setLevel(logging.INFO) +test_logger.addHandler(AsynchronousLogstashHandler( + host, + port, + ssl_enable=True, + ssl_verify=True, + keyfile='/etc/ssl/private/logstash.key', + certfile='/etc/ssl/certs/logstash.crt', + ca_certs='/etc/ssl/certs/logstash_ca.crt', + database_path='logstash_test.db')) -# add the handler -test_logger.addHandler(logstash.AMQPLogstashHandler(version=1, - host=host, - durable=True, - username=username, - password=password, - exchange=exchange)) +test_logger.error('python-logstash-async: test logstash error message.') +test_logger.info('python-logstash-async: test logstash info message.') +test_logger.warning('python-logstash-async: test logstash warning message.') +test_logger.debug('python-logstash-async: test logstash debug message.') -# log -test_logger.error('python-logstash: test logstash error message.') -test_logger.info('python-logstash: test logstash info message.') -test_logger.warning('python-logstash: test logstash warning message.') +# add extra field to logstash message +extra = { + 'test_string': 'python version: ' + repr(sys.version_info), + 'test_boolean': True, + 'test_dict': {'a': 1, 'b': 'c'}, + 'test_float': 1.23, + 'test_integer': 123, + 'test_list': [1, 2, '3'], +} +test_logger.info('python-logstash: test extra fields', extra=extra) -try: - 1/0 -except: - test_logger.exception('python-logstash: test logstash exception with stack trace') diff --git a/logstash/__init__.py b/logstash/__init__.py deleted file mode 100644 index 2111741..0000000 --- a/logstash/__init__.py +++ /dev/null @@ -1,13 +0,0 @@ - -from logstash.formatter import LogstashFormatterVersion0, LogstashFormatterVersion1 - -from logstash.handler_tcp import TCPLogstashHandler -from logstash.handler_udp import UDPLogstashHandler, LogstashHandler -try: - from logstash.handler_amqp import AMQPLogstashHandler -except: - # you need to install AMQP support to enable this handler. - pass - - - diff --git a/logstash/formatter.py b/logstash/formatter.py deleted file mode 100644 index 6d0887a..0000000 --- a/logstash/formatter.py +++ /dev/null @@ -1,141 +0,0 @@ -import traceback -import logging -import socket -import sys -from datetime import datetime -try: - import json -except ImportError: - import simplejson as json - - -class LogstashFormatterBase(logging.Formatter): - - def __init__(self, message_type='Logstash', tags=None, fqdn=False): - self.message_type = message_type - self.tags = tags if tags is not None else [] - - if fqdn: - self.host = socket.getfqdn() - else: - self.host = socket.gethostname() - - def get_extra_fields(self, record): - # The list contains all the attributes listed in - # http://docs.python.org/library/logging.html#logrecord-attributes - skip_list = ( - 'args', 'asctime', 'created', 'exc_info', 'exc_text', 'filename', - 'funcName', 'id', 'levelname', 'levelno', 'lineno', 'module', - 'msecs', 'msecs', 'message', 'msg', 'name', 'pathname', 'process', - 'processName', 'relativeCreated', 'thread', 'threadName', 'extra') - - if sys.version_info < (3, 0): - easy_types = (basestring, bool, dict, float, int, long, list, type(None)) - else: - easy_types = (str, bool, dict, float, int, list, type(None)) - - fields = {} - - for key, value in record.__dict__.items(): - if key not in skip_list: - if isinstance(value, easy_types): - fields[key] = value - else: - fields[key] = repr(value) - - return fields - - def get_debug_fields(self, record): - fields = { - 'stack_trace': self.format_exception(record.exc_info), - 'lineno': record.lineno, - 'process': record.process, - 'thread_name': record.threadName, - } - - # funcName was added in 2.5 - if not getattr(record, 'funcName', None): - fields['funcName'] = record.funcName - - # processName was added in 2.6 - if not getattr(record, 'processName', None): - fields['processName'] = record.processName - - return fields - - @classmethod - def format_source(cls, message_type, host, path): - return "%s://%s/%s" % (message_type, host, path) - - @classmethod - def format_timestamp(cls, time): - tstamp = datetime.utcfromtimestamp(time) - return tstamp.strftime("%Y-%m-%dT%H:%M:%S") + ".%03d" % (tstamp.microsecond / 1000) + "Z" - - @classmethod - def format_exception(cls, exc_info): - return ''.join(traceback.format_exception(*exc_info)) if exc_info else '' - - @classmethod - def serialize(cls, message): - if sys.version_info < (3, 0): - return json.dumps(message) - else: - return bytes(json.dumps(message), 'utf-8') - -class LogstashFormatterVersion0(LogstashFormatterBase): - version = 0 - - def format(self, record): - # Create message dict - message = { - '@timestamp': self.format_timestamp(record.created), - '@message': record.getMessage(), - '@source': self.format_source(self.message_type, self.host, - record.pathname), - '@source_host': self.host, - '@source_path': record.pathname, - '@tags': self.tags, - '@type': self.message_type, - '@fields': { - 'levelname': record.levelname, - 'logger': record.name, - }, - } - - # Add extra fields - message['@fields'].update(self.get_extra_fields(record)) - - # If exception, add debug info - if record.exc_info: - message['@fields'].update(self.get_debug_fields(record)) - - return self.serialize(message) - - -class LogstashFormatterVersion1(LogstashFormatterBase): - - def format(self, record): - # Create message dict - message = { - '@timestamp': self.format_timestamp(record.created), - '@version': '1', - 'message': record.getMessage(), - 'host': self.host, - 'path': record.pathname, - 'tags': self.tags, - 'type': self.message_type, - - # Extra Fields - 'level': record.levelname, - 'logger_name': record.name, - } - - # Add extra fields - message.update(self.get_extra_fields(record)) - - # If exception, add debug info - if record.exc_info: - message.update(self.get_debug_fields(record)) - - return self.serialize(message) diff --git a/logstash/handler_amqp.py b/logstash/handler_amqp.py deleted file mode 100644 index 99d190e..0000000 --- a/logstash/handler_amqp.py +++ /dev/null @@ -1,126 +0,0 @@ -import json -try: - from urllib import urlencode -except ImportError: - from urllib.parse import urlencode - -from logging import Filter -from logging.handlers import SocketHandler - -import pika -from logstash import formatter - - -class AMQPLogstashHandler(SocketHandler, object): - """AMQP Log Format handler - - :param host: AMQP host (default 'localhost') - :param port: AMQP port (default 5672) - :param username: AMQP user name (default 'guest', which is the default for - RabbitMQ) - :param password: AMQP password (default 'guest', which is the default for - RabbitMQ) - - :param exchange: AMQP exchange. Default 'logging.gelf'. - A queue binding must be defined on the server to prevent - log messages from being dropped. - :param exchange_type: AMQP exchange type (default 'fanout'). - :param durable: AMQP exchange is durable (default False) - :param virtual_host: AMQP virtual host (default '/'). - - :param tags: list of tags for a logger (default is None). - :param message_type: The type of the message (default logstash). - :param version: version of logstash event schema (default is 0). - - :param extra_fields: Send extra fields on the log record to graylog - if true (the default) - :param fqdn: Use fully qualified domain name of localhost as source - host (socket.getfqdn()). - :param facility: Replace facility with specified value. If specified, - record.name will be passed as `logger` parameter. - """ - - def __init__(self, host='localhost', port=5672, username='guest', - password='guest', exchange='logstash', exchange_type='fanout', - virtual_host='/', message_type='logstash', tags=None, - durable=False, version=0, extra_fields=True, fqdn=False, - facility=None, exchange_routing_key=''): - - - # AMQP parameters - self.host = host - self.port = port - self.username = username - self.password = password - self.exchange_type = exchange_type - self.exchange = exchange - self.exchange_is_durable = durable - self.virtual_host = virtual_host - self.routing_key = exchange_routing_key - - SocketHandler.__init__(self, host, port) - - # Extract Logstash paramaters - self.tags = tags or [] - fn = formatter.LogstashFormatterVersion1 if version == 1 \ - else formatter.LogstashFormatterVersion0 - self.formatter = fn(message_type, tags, fqdn) - - # Standard logging parameters - self.extra_fields = extra_fields - self.fqdn = fqdn - self.facility = facility - - def makeSocket(self, **kwargs): - - return PikaSocket(self.host, - self.port, - self.username, - self.password, - self.virtual_host, - self.exchange, - self.routing_key, - self.exchange_is_durable, - self.exchange_type) - - def makePickle(self, record): - return self.formatter.format(record) - - -class PikaSocket(object): - - def __init__(self, host, port, username, password, virtual_host, exchange, - routing_key, durable, exchange_type): - - # create connection parameters - credentials = pika.PlainCredentials(username, password) - parameters = pika.ConnectionParameters(host, port, virtual_host, - credentials) - - # create connection & channel - self.connection = pika.BlockingConnection(parameters) - self.channel = self.connection.channel() - - # create an exchange, if needed - self.channel.exchange_declare(exchange=exchange, - exchange_type=exchange_type, - durable=durable) - - # needed when publishing - self.spec = pika.spec.BasicProperties(delivery_mode=2) - self.routing_key = routing_key - self.exchange = exchange - - - def sendall(self, data): - - self.channel.basic_publish(self.exchange, - self.routing_key, - data, - properties=self.spec) - - def close(self): - try: - self.connection.close() - except Exception: - pass diff --git a/logstash/handler_tcp.py b/logstash/handler_tcp.py deleted file mode 100644 index e5b7d5f..0000000 --- a/logstash/handler_tcp.py +++ /dev/null @@ -1,53 +0,0 @@ -import ssl -from logging.handlers import SocketHandler -from logstash import formatter - -from pprint import pprint - -# Derive from object to force a new-style class and thus allow super() to work -# on Python 2.6 -class TCPLogstashHandler(SocketHandler, object): - """Python logging handler for Logstash. Sends events over TCP. - :param host: The host of the logstash server. - :param port: The port of the logstash server (default 5959). - :param message_type: The type of the message (default logstash). - :param fqdn; Indicates whether to show fully qualified domain name or not (default False). - :param version: version of logstash event schema (default is 0). - :param tags: list of tags for a logger (default is None). - :param ssl: Should SSL be enabled for the connection? Default is True. - :param ssl_verify: Should the server's SSL certificate be verified? - :param keyfile: The path to client side SSL key file (default is None). - :param certfile: The path to client side SSL certificate file (default is None). - :param ca_certs: The path to the file containing recognised CA certificates. - """ - - def __init__(self, host, port=5959, message_type='logstash', tags=None, fqdn=False, version=0, ssl=True, ssl_verify=True, keyfile=None, certfile=None, ca_certs=None): - super(TCPLogstashHandler, self).__init__(host, port) - - self.ssl = ssl - self.ssl_verify = ssl_verify - self.keyfile = keyfile - self.certfile = certfile - self.ca_certs = ca_certs - - if version == 1: - self.formatter = formatter.LogstashFormatterVersion1(message_type, tags, fqdn) - else: - self.formatter = formatter.LogstashFormatterVersion0(message_type, tags, fqdn) - - def makePickle(self, record): - return self.formatter.format(record) + b'\n' - - def makeSocket(self, timeout=1): - s = super(TCPLogstashHandler, self).makeSocket(timeout) - if not self.ssl: - return s - - cert_reqs = ssl.CERT_REQUIRED - if not self.ssl_verify: - if self.ca_certs: - cert_reqs = ssl.CERT_OPTIONAL - else: - cert_reqs = ssl.CERT_NONE - - return ssl.wrap_socket(s, keyfile=self.keyfile, certfile=self.certfile, ca_certs=self.ca_certs, cert_reqs=cert_reqs) diff --git a/logstash/handler_udp.py b/logstash/handler_udp.py deleted file mode 100644 index 71789f5..0000000 --- a/logstash/handler_udp.py +++ /dev/null @@ -1,22 +0,0 @@ -from logging.handlers import DatagramHandler, SocketHandler -from logstash.handler_tcp import TCPLogstashHandler -from logstash import formatter - - -class UDPLogstashHandler(TCPLogstashHandler, DatagramHandler): - """Python logging handler for Logstash. Sends events over UDP. - :param host: The host of the logstash server. - :param port: The port of the logstash server (default 5959). - :param message_type: The type of the message (default logstash). - :param fqdn; Indicates whether to show fully qualified domain name or not (default False). - :param version: version of logstash event schema (default is 0). - :param tags: list of tags for a logger (default is None). - """ - - def makePickle(self, record): - return self.formatter.format(record) - - -# For backward compatibility -LogstashHandler = UDPLogstashHandler - diff --git a/logstash_async/__init__.py b/logstash_async/__init__.py new file mode 100644 index 0000000..86ac267 --- /dev/null +++ b/logstash_async/__init__.py @@ -0,0 +1,6 @@ +# -*- coding: utf-8 -*- +# +# This software may be modified and distributed under the terms +# of the MIT license. See the LICENSE file for details. + +__version__ = '1.0.0' diff --git a/logstash_async/constants.py b/logstash_async/constants.py new file mode 100644 index 0000000..3dfbe85 --- /dev/null +++ b/logstash_async/constants.py @@ -0,0 +1,13 @@ +# -*- coding: utf-8 -*- + + +# timeout in seconds for TCP connections +SOCKET_TIMEOUT = 5.0 +# interval in seconds to check the internal queue for new messages to be cached in the database +QUEUE_CHECK_INTERVAL = 2.0 +# interval in seconds to send cached events from the database to Logstash +QUEUED_EVENTS_FLUSH_INTERVAL = 10.0 +# count of cached events to send cached events from the database to Logstash; events are sent +# to Logstash whenever QUEUED_EVENTS_FLUSH_COUNT or QUEUED_EVENTS_FLUSH_INTERVAL is reached, +# whatever happens first +QUEUED_EVENTS_FLUSH_COUNT = 50 diff --git a/logstash_async/database.py b/logstash_async/database.py new file mode 100644 index 0000000..ea07a80 --- /dev/null +++ b/logstash_async/database.py @@ -0,0 +1,142 @@ +# -*- coding: utf-8 -*- +# +# This software may be modified and distributed under the terms +# of the MIT license. See the LICENSE file for details. + +import sqlite3 +import sys + +import six + +from logstash_async.utils import ichunked + + +DATABASE_SCHEMA_STATEMENTS = [ + ''' + CREATE TABLE IF NOT EXISTS `event` ( + `event_id` INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT, + `event_text` TEXT NOT NULL, + `pending_delete` INTEGER NOT NULL, + `entry_date` TEXT NOT NULL); + ''', + '''CREATE INDEX IF NOT EXISTS `idx_pending_delete` ON `event` (pending_delete);''', + '''CREATE INDEX IF NOT EXISTS `idx_entry_date` ON `event` (entry_date);''', +] + +EVENT_CHUNK_SIZE = 750 # maximum number of events to be updated within one SQLite statement + + +class DatabaseLockedError(Exception): + pass + + +class DatabaseCache(object): + + # ---------------------------------------------------------------------- + def __init__(self, path): + self._database_path = path + self._connection = None + + # ---------------------------------------------------------------------- + def _open(self): + self._connection = sqlite3.connect( + self._database_path, + timeout=5.0, + isolation_level='EXCLUSIVE') + self._connection.row_factory = sqlite3.Row + self._initialize_schema() + + # ---------------------------------------------------------------------- + def _close(self): + if self._connection is not None: + self._connection.close() + self._connection = None + + # ---------------------------------------------------------------------- + def _initialize_schema(self): + cursor = self._connection.cursor() + for statement in DATABASE_SCHEMA_STATEMENTS: + cursor.execute(statement) + + # ---------------------------------------------------------------------- + def add_event(self, event): + query = u''' + INSERT INTO `event` + (`event_text`, `pending_delete`, `entry_date`) VALUES (?, ?, datetime('now'))''' + self._open() + try: + with self._connection: # implicit commit/rollback + self._connection.execute(query, (event, False)) + except sqlite3.OperationalError: + self._handle_sqlite_error() + raise + finally: + self._close() + + # ---------------------------------------------------------------------- + def _handle_sqlite_error(self): + _, e, traceback = sys.exc_info() + if str(e) == 'database is locked': + six.reraise(DatabaseLockedError, DatabaseLockedError(e), traceback) + + # ---------------------------------------------------------------------- + def get_queued_events(self): + """ + Fetch pending events and mark them to be deleted soon, so other threads/processes + won't fetch them as well. + """ + query_fetch = 'SELECT `event_id`, `event_text` FROM `event` WHERE `pending_delete` = 0;' + query_update_base = 'UPDATE `event` SET `pending_delete`=1 WHERE `event_id` IN (%s);' + self._open() + try: + with self._connection: # implicit commit/rollback + cursor = self._connection.cursor() + cursor.execute(query_fetch) + events = cursor.fetchall() + # mark retrieved events as pending_delete + self._bulk_update_events(cursor, events, query_update_base) + except sqlite3.OperationalError: + self._handle_sqlite_error() + raise + finally: + self._close() + + return events + + # ---------------------------------------------------------------------- + def _bulk_update_events(self, cursor, events, statement_base): + event_ids = [event[0] for event in events] + # split into multiple queries as SQLite has a maximum 1000 variables per query + for event_ids_subset in ichunked(event_ids, EVENT_CHUNK_SIZE): + statement = statement_base % ','.join('?' * len(event_ids_subset)) + cursor.execute(statement, event_ids_subset) + + # ---------------------------------------------------------------------- + def requeue_queued_events(self, events): + query_update_base = 'UPDATE `event` SET `pending_delete`=0 WHERE `event_id` IN (%s);' + self._open() + try: + with self._connection: # implicit commit/rollback + cursor = self._connection.cursor() + self._bulk_update_events(cursor, events, query_update_base) + except sqlite3.OperationalError: + self._handle_sqlite_error() + raise + finally: + self._close() + + return events + + # ---------------------------------------------------------------------- + def delete_queued_events(self): + query_delete = 'DELETE FROM `event` WHERE `pending_delete`=1;' + self._open() + try: + with self._connection: # implicit commit/rollback + cursor = self._connection.cursor() + cursor.execute(query_delete) + except sqlite3.OperationalError: + self._handle_sqlite_error() + raise + finally: + self._close() diff --git a/logstash_async/formatter.py b/logstash_async/formatter.py new file mode 100644 index 0000000..ecddc59 --- /dev/null +++ b/logstash_async/formatter.py @@ -0,0 +1,261 @@ +# -*- coding: utf-8 -*- +# +# This software may be modified and distributed under the terms +# of the MIT license. See the LICENSE file for details. + +from datetime import date, datetime +import logging +import socket +import sys +import time +import traceback +import uuid +try: + import json +except ImportError: + import simplejson as json + +import logstash_async + +# The list contains all the attributes listed in +# http://docs.python.org/library/logging.html#logrecord-attributes +RECORD_FIELD_SKIP_LIST = ( + 'args', 'asctime', 'created', 'exc_info', 'exc_text', 'filename', + 'funcName', 'id', 'levelname', 'levelno', 'lineno', 'module', + 'msecs', 'message', 'msg', 'name', 'pathname', 'process', + 'processName', 'relativeCreated', 'stack_info', 'thread', 'threadName') +LOGSTASH_MESSAGE_FIELD_LIST = [ + '@timestamp', '@version', 'host', 'level', 'logsource', 'message', + 'pid', 'program', 'type', 'tags'] + + +class LogstashFormatter(logging.Formatter): + + # ---------------------------------------------------------------------- + def __init__(self, message_type='python-logstash', tags=None, fqdn=False, extra_prefix='extra', extra=None): + super(LogstashFormatter, self).__init__() + self._message_type = message_type + self._tags = tags if tags is not None else [] + self._extra_prefix = extra_prefix + self._extra = extra + + self._interpreter = None + self._interpreter_version = None + self._host = None + self._logsource = None + self._program_name = None + + # fetch static information and process related information already as they won't change during lifetime + self._prefetch_interpreter() + self._prefetch_interpreter_version() + self._prefetch_host(fqdn) + self._prefetch_logsource() + self._prefetch_program_name() + + # ---------------------------------------------------------------------- + def _prefetch_interpreter(self): + """Override when needed""" + self._interpreter = sys.executable + + # ---------------------------------------------------------------------- + def _prefetch_interpreter_version(self): + """Override when needed""" + self._interpreter_version = u'{}.{}.{}'.format( + sys.version_info.major, + sys.version_info.minor, + sys.version_info.micro) + + # ---------------------------------------------------------------------- + def _prefetch_host(self, fqdn): + """Override when needed""" + if fqdn: + self._host = socket.getfqdn() + else: + self._host = socket.gethostname() + + # ---------------------------------------------------------------------- + def _prefetch_logsource(self): + """Override when needed""" + self._logsource = self._host + + # ---------------------------------------------------------------------- + def _prefetch_program_name(self): + """Override when needed""" + self._program_name = sys.argv[0] + + # ---------------------------------------------------------------------- + def format(self, record): + message = { + '@timestamp': self._format_timestamp(record.created), + '@version': '1', + 'host': self._host, + 'level': record.levelname, + 'logsource': self._logsource, + 'message': record.getMessage(), + 'pid': record.process, + 'program': self._program_name, + 'type': self._message_type, + } + if self._tags: + message['tags'] = self._tags + + # record fields + record_fields = self._get_record_fields(record) + message.update(record_fields) + # prepare dynamic extra fields + extra_fields = self._get_extra_fields(record) + # wrap extra fields in configurable namespace + if self._extra_prefix: + message[self._extra_prefix] = extra_fields + else: + message.update(extra_fields) + + # move existing extra record fields into the configured prefix + self._move_extra_record_fields_to_prefix(message) + + return self._serialize(message) + + # ---------------------------------------------------------------------- + def _format_timestamp(self, time_): + tstamp = datetime.utcfromtimestamp(time_) + return tstamp.strftime("%Y-%m-%dT%H:%M:%S") + ".%03d" % (tstamp.microsecond / 1000) + "Z" + + # ---------------------------------------------------------------------- + def _get_record_fields(self, record): + def value_repr(value): + if sys.version_info < (3, 0): + easy_types = (basestring, bool, float, int, long, type(None)) + else: + easy_types = (str, bool, float, int, type(None)) + + if isinstance(value, dict): + return {k: value_repr(v) for k, v in value.items()} + elif isinstance(value, (tuple, list)): + return [value_repr(v) for v in value] + elif isinstance(value, (datetime, date)): + return self._format_timestamp(time.mktime(value.timetuple())) + elif isinstance(value, uuid.UUID): + return value.hex + elif isinstance(value, easy_types): + return value + else: + return repr(value) + + fields = {} + + for key, value in record.__dict__.items(): + if key not in RECORD_FIELD_SKIP_LIST: + fields[key] = value_repr(value) + return fields + + # ---------------------------------------------------------------------- + def _get_extra_fields(self, record): + extra_fields = { + 'func_name': record.funcName, + 'interpreter': self._interpreter, + 'interpreter_version': self._interpreter_version, + 'line': record.lineno, + 'logger_name': record.name, + 'logstash_async_version': logstash_async.__version__, + 'path': record.pathname, + 'process_name': record.processName, + 'thread_name': record.threadName, + } + # static extra fields + if self._extra: + extra_fields.update(self._extra) + # exceptions + if record.exc_info: + extra_fields['stack_trace'] = self._format_exception(record.exc_info) + return extra_fields + + # ---------------------------------------------------------------------- + def _format_exception(self, exc_info): + if isinstance(exc_info, tuple): + stack_trace = ''.join(traceback.format_exception(*exc_info)) + elif exc_info: + stack_trace = ''.join(traceback.format_stack()) + else: + stack_trace = '' + return stack_trace + + # ---------------------------------------------------------------------- + def _move_extra_record_fields_to_prefix(self, message): + """ + Anythng added by the "extra" keyword in the logging call will be moved into the + configured "extra" prefix. This way the event in Logstash will be clean and any extras + will be paired together in the configured extra prefix. + If not extra prefix is configured, the message will be kept as is. + """ + if not self._extra_prefix: + return # early out if no prefix is configured + + field_skip_list = LOGSTASH_MESSAGE_FIELD_LIST + [self._extra_prefix] + for key, value in message.items(): + if key not in field_skip_list: + message[self._extra_prefix][key] = value + del message[key] + + # ---------------------------------------------------------------------- + def _serialize(self, message): + if sys.version_info < (3, 0): + return json.dumps(message) + else: + return bytes(json.dumps(message), 'utf-8') + + +class DjangoLogstashFormatter(LogstashFormatter): + + # ---------------------------------------------------------------------- + def __init__(self, *args, **kwargs): + super(DjangoLogstashFormatter, self).__init__(*args, **kwargs) + self._django_version = None + self._fetch_django_version() + + # ---------------------------------------------------------------------- + def _fetch_django_version(self): + from django import get_version + self._django_version = get_version() + + # ---------------------------------------------------------------------- + def _get_extra_fields(self, record): + extra_fields = super(DjangoLogstashFormatter, self)._get_extra_fields(record) + + if hasattr(record, 'status_code'): + extra_fields['status_code'] = record.status_code + + # Django's runserver command passes socketobject and WSGIRequest instances as "request". + # Hence the check for the META attribute. + # For details see https://code.djangoproject.com/ticket/27234 + if hasattr(record, 'request') and hasattr(record.request, 'META'): + request = record.request + extra_fields['django_version'] = self._django_version + extra_fields['req_useragent'] = request.META.get('HTTP_USER_AGENT', '') + extra_fields['req_remote_address'] = request.META.get('REMOTE_ADDR', '') + extra_fields['req_host'] = request.get_host() + extra_fields['req_uri'] = request.get_raw_uri() + extra_fields['req_user'] = unicode(request.user) if request.user else '' + extra_fields['req_method'] = request.META.get('REQUEST_METHOD', '') + extra_fields['req_referer'] = request.META.get('HTTP_REFERER', '') + + forwarded_proto = request.META.get('HTTP_X_FORWARDED_PROTO', None) + if forwarded_proto is not None: + extra_fields['req_forwarded_proto'] = forwarded_proto + + forwarded_for = request.META.get('HTTP_X_FORWARDED_FOR', None) + if forwarded_for is not None: + # make it a list + forwarded_for_list = forwarded_for.replace(' ', '').split(',') + extra_fields['req_forwarded_for'] = forwarded_for_list + + # template debug + if isinstance(record.exc_info, tuple): + exc_value = record.exc_info[1] + template_info = getattr(exc_value, 'template_debug', None) + if template_info: + extra_fields['tmpl_name'] = template_info['name'] + extra_fields['tmpl_line'] = template_info['line'] + extra_fields['tmpl_message'] = template_info['message'] + extra_fields['tmpl_during'] = template_info['during'] + + return extra_fields diff --git a/logstash_async/handler.py b/logstash_async/handler.py new file mode 100644 index 0000000..c45ab31 --- /dev/null +++ b/logstash_async/handler.py @@ -0,0 +1,160 @@ +# -*- coding: utf-8 -*- +# +# This software may be modified and distributed under the terms +# of the MIT license. See the LICENSE file for details. + +from logging import Handler + +from six import string_types + +from logstash_async.formatter import LogstashFormatter +from logstash_async.utils import import_string, safe_log_via_print +from logstash_async.worker import LogProcessingWorker + + +class ProcessingError(Exception): + """""" + + +class AsynchronousLogstashHandler(Handler): + """Python logging handler for Logstash. Sends events over TCP. + :param host: The host of the logstash server. + :param port: The port of the logstash server (default 5959). + :param transport: Callable or path to a compatible transport class. + :param ssl_enable: Should SSL be enabled for the connection? Default is False. + :param ssl_verify: Should the server's SSL certificate be verified? + :param keyfile: The path to client side SSL key file (default is None). + :param certfile: The path to client side SSL certificate file (default is None). + :param ca_certs: The path to the file containing recognized CA certificates. + :param database_path: The path to the file containing queued events. + :param enable Flag to enable log processing (default is True, disabling + might be handy for local testing, etc.) + """ + + _worker_thread = None + + # ---------------------------------------------------------------------- + def __init__(self, host, port=5959, transport='logstash_async.transport.TcpTransport', + ssl_enable=False, ssl_verify=True, keyfile=None, certfile=None, ca_certs=None, + database_path=':memory:', enable=True): + super(AsynchronousLogstashHandler, self).__init__() + self._host = host + self._port = port + self._transport_path = transport + self._ssl_enable = ssl_enable + self._ssl_verify = ssl_verify + self._keyfile = keyfile + self._certfile = certfile + self._ca_certs = ca_certs + self._database_path = database_path + self._enable = enable + self._transport = None + self._setup_transport() + + # ---------------------------------------------------------------------- + def emit(self, record): + if not self._enable: + return # we should not do anything, so just leave + + self._setup_transport() + self._start_worker_thread() + + # basically same implementation as in logging.handlers.SocketHandler.emit() + try: + data = self._format_record(record) + AsynchronousLogstashHandler._worker_thread.enqueue_event(data) + except (KeyboardInterrupt, SystemExit): + raise + except Exception: + self.handleError(record) + + # ---------------------------------------------------------------------- + def _setup_transport(self): + if self._transport is not None: + return + + if isinstance(self._transport_path, string_types): + transport_class = import_string(self._transport_path) + self._transport = transport_class( + host=self._host, + port=self._port, + ssl_enable=self._ssl_enable, + ssl_verify=self._ssl_verify, + keyfile=self._keyfile, + certfile=self._certfile, + ca_certs=self._ca_certs) + else: + self._transport = self._transport_path + + # ---------------------------------------------------------------------- + def _start_worker_thread(self): + if self._worker_thread_is_running(): + return + + AsynchronousLogstashHandler._worker_thread = LogProcessingWorker( + host=self._host, + port=self._port, + transport=self._transport, + ssl_enable=self._ssl_enable, + ssl_verify=self._ssl_verify, + keyfile=self._keyfile, + certfile=self._certfile, + ca_certs=self._ca_certs, + database_path=self._database_path) + AsynchronousLogstashHandler._worker_thread.start() + + # ---------------------------------------------------------------------- + @staticmethod + def _worker_thread_is_running(): + worker_thread = AsynchronousLogstashHandler._worker_thread + if worker_thread is not None and worker_thread.is_alive(): + return True + + # ---------------------------------------------------------------------- + def _format_record(self, record): + self._create_formatter_if_necessary() + return self.formatter.format(record) + b'\n' + + # ---------------------------------------------------------------------- + def _create_formatter_if_necessary(self): + if self.formatter is None: + self.formatter = LogstashFormatter() + + # ---------------------------------------------------------------------- + def close(self): + self.acquire() + try: + self.shutdown() + finally: + self.release() + super(AsynchronousLogstashHandler, self).close() + + # ---------------------------------------------------------------------- + def shutdown(self): + if self._worker_thread_is_running(): + self._trigger_worker_shutdown() + self._wait_for_worker_thread() + self._reset_worker_thread() + self._close_transport() + else: + pass + + # ---------------------------------------------------------------------- + def _trigger_worker_shutdown(self): + AsynchronousLogstashHandler._worker_thread.shutdown() + + # ---------------------------------------------------------------------- + def _wait_for_worker_thread(self): + AsynchronousLogstashHandler._worker_thread.join() + + # ---------------------------------------------------------------------- + def _reset_worker_thread(self): + AsynchronousLogstashHandler._worker_thread = None + + # ---------------------------------------------------------------------- + def _close_transport(self): + try: + if self._transport is not None: + self._transport.close() + except Exception as e: + safe_log_via_print('error', u'Error on closing transport: {}'.format(e)) diff --git a/logstash_async/transport.py b/logstash_async/transport.py new file mode 100644 index 0000000..6e75a90 --- /dev/null +++ b/logstash_async/transport.py @@ -0,0 +1,116 @@ +# -*- coding: utf-8 -*- +# +# This software may be modified and distributed under the terms +# of the MIT license. See the LICENSE file for details. + +import socket +import ssl +import sys + +from logstash_async.constants import SOCKET_TIMEOUT + + +class UdpTransport(object): + + _keep_connection = False + + # ---------------------------------------------------------------------- + def __init__(self, host, port, **kwargs): + self._host = host + self._port = port + self._sock = None + + # ---------------------------------------------------------------------- + def send(self, events): + # Ideally we would keep the socket open but this is risky because we might not notice + # a broken TCP connection and send events into the dark. + # On UDP we push into the dark by design :) + self._create_socket() + try: + self._send(events) + finally: + self._close() + + # ---------------------------------------------------------------------- + def _create_socket(self, timeout=SOCKET_TIMEOUT): + if self._sock is not None: + return + + # from logging.handlers.DatagramHandler + self._sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) + self._sock.settimeout(timeout) + + # ---------------------------------------------------------------------- + def _send(self, events): + for event in events: + self._send_via_socket(event) + + # ---------------------------------------------------------------------- + def _send_via_socket(self, data): + data_to_send = self._convert_data_to_send(data) + self._sock.sendto(data_to_send, (self._host, self._port)) + + # ---------------------------------------------------------------------- + def _convert_data_to_send(self, data): + if sys.version_info < (3, 0): + return data + # Python3 + elif not isinstance(data, bytes): + return bytes(data, 'utf-8') + + return data + + # ---------------------------------------------------------------------- + def _close(self, force=False): + if not self._keep_connection or force: + if self._sock: + self._sock.close() + self._sock = None + + # ---------------------------------------------------------------------- + def close(self): + self._close(force=True) + + +class TcpTransport(UdpTransport): + + # ---------------------------------------------------------------------- + def __init__(self, host, port, ssl_enable, ssl_verify, keyfile, certfile, ca_certs): + super(TcpTransport, self).__init__(host, port) + self._ssl_enable = ssl_enable + self._ssl_verify = ssl_verify + self._keyfile = keyfile + self._certfile = certfile + self._ca_certs = ca_certs + + # ---------------------------------------------------------------------- + def _create_socket(self, timeout=SOCKET_TIMEOUT): + if self._sock is not None: + return + + # from logging.handlers.SocketHandler + sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + sock.settimeout(timeout) + sock.connect((self._host, self._port)) + # non-SSL + if not self._ssl_enable: + self._sock = sock + return + # SSL + cert_reqs = ssl.CERT_REQUIRED + if not self._ssl_verify: + if self._ca_certs: + cert_reqs = ssl.CERT_OPTIONAL + else: + cert_reqs = ssl.CERT_NONE + self._sock = ssl.wrap_socket( + sock, + keyfile=self._keyfile, + certfile=self._certfile, + ca_certs=self._ca_certs, + cert_reqs=cert_reqs) + + # ---------------------------------------------------------------------- + def _send_via_socket(self, data): + data_to_send = self._convert_data_to_send(data) + self._sock.sendall(data_to_send) diff --git a/logstash_async/utils.py b/logstash_async/utils.py new file mode 100644 index 0000000..f377734 --- /dev/null +++ b/logstash_async/utils.py @@ -0,0 +1,60 @@ +# -*- coding: utf-8 -*- +# +# This software may be modified and distributed under the terms +# of the MIT license. See the LICENSE file for details. + +from __future__ import print_function + +from datetime import datetime +from importlib import import_module +from itertools import chain, islice +import sys +import traceback + +import six + + +# ---------------------------------------------------------------------- +def ichunked(seq, chunksize): + """Yields items from an iterator in iterable chunks. + http://stackoverflow.com/a/1335572 + """ + iterable = iter(seq) + while True: + yield list(chain([next(iterable)], islice(iterable, chunksize - 1))) + + +# ---------------------------------------------------------------------- +def safe_log_via_print(log_level, message, *args, **kwargs): + timestamp = datetime.now().strftime('%Y-%m-%d %H:%M:%S') + log_message = u'{}: {}: {}'.format(timestamp, log_level, message) + print(log_message % args, file=sys.stderr) + # print stack trace if available + exc_info = kwargs.get('exc_info', None) + if exc_info or log_level == 'exception': + if not isinstance(exc_info, tuple): + exc_info = sys.exc_info() + stack_trace = ''.join(traceback.format_exception(*exc_info)) + print(stack_trace, file=sys.stderr) + + +# ---------------------------------------------------------------------- +def import_string(dotted_path): + """ + Import a dotted module path and return the attribute/class designated by the + last name in the path. Raise ImportError if the import failed. + + (stolen from Django) + """ + try: + module_path, class_name = dotted_path.rsplit('.', 1) + except ValueError: + msg = "{} doesn't look like a module path".format(dotted_path) + six.reraise(ImportError, ImportError(msg), sys.exc_info()[2]) + + module = import_module(module_path) + try: + return getattr(module, class_name) + except AttributeError: + msg = 'Module "{}" does not define a "{}" attribute/class'.format(module_path, class_name) + six.reraise(ImportError, ImportError(msg), sys.exc_info()[2]) diff --git a/logstash_async/worker.py b/logstash_async/worker.py new file mode 100644 index 0000000..79e4986 --- /dev/null +++ b/logstash_async/worker.py @@ -0,0 +1,220 @@ +# -*- coding: utf-8 -*- +# +# This software may be modified and distributed under the terms +# of the MIT license. See the LICENSE file for details. + +from datetime import datetime +from logging import getLogger as get_logger +from threading import Event, Thread + +from six.moves.queue import Queue, Empty + +from logstash_async.constants import ( + QUEUED_EVENTS_FLUSH_COUNT, + QUEUED_EVENTS_FLUSH_INTERVAL, + QUEUE_CHECK_INTERVAL) +from logstash_async.database import DatabaseCache, DatabaseLockedError +from logstash_async.utils import safe_log_via_print + + +class ProcessingError(Exception): + """""" + + +class LogProcessingWorker(Thread): + """""" + + # ---------------------------------------------------------------------- + def __init__(self, *args, **kwargs): + self._host = kwargs.pop('host') + self._port = kwargs.pop('port') + self._transport = kwargs.pop('transport') + self._ssl_enable = kwargs.pop('ssl_enable') + self._ssl_verify = kwargs.pop('ssl_verify') + self._keyfile = kwargs.pop('keyfile') + self._certfile = kwargs.pop('certfile') + self._ca_certs = kwargs.pop('ca_certs') + self._database_path = kwargs.pop('database_path') + + super(LogProcessingWorker, self).__init__(*args, **kwargs) + self.daemon = True + self.name = self.__class__.__name__ + + self._shutdown_event = Event() + self._queue = Queue() + + self._event = None + self._database = None + self._last_event_flush_date = None + self._non_flushed_event_count = None + self._logger = None + + # ---------------------------------------------------------------------- + def enqueue_event(self, event): + # called from other threads + self._queue.put(event) + + # ---------------------------------------------------------------------- + def shutdown(self): + # called from other threads + self._shutdown_event.set() + + # ---------------------------------------------------------------------- + def run(self): + self._reset_flush_counters() + self._setup_logger() + self._setup_database() + try: + self._fetch_events() + except Exception as e: + # we really should not get anything here, and if, the worker thread is dying + # too early resulting in undefined application behaviour + self._log_general_error(e) + # check for empty queue and report if not + self._warn_about_non_empty_queue_on_shutdown() + + # ---------------------------------------------------------------------- + def _reset_flush_counters(self): + self._last_event_flush_date = datetime.now() + self._non_flushed_event_count = 0 + + # ---------------------------------------------------------------------- + def _setup_logger(self): + self._logger = get_logger(self.name) + + # ---------------------------------------------------------------------- + def _setup_database(self): + self._database = DatabaseCache(self._database_path) + + # ---------------------------------------------------------------------- + def _fetch_events(self): + while True: + try: + self._fetch_event() + self._process_event() + except Empty: + # Flush queued (in database) events after internally queued events has been + # processed, i.e. the queue is empty. + if self._shutdown_requested(): + self._flush_queued_events(force=True) + return + + self._flush_queued_events() + self._delay_processing() + except (DatabaseLockedError, ProcessingError): + if self._shutdown_requested(): + return + else: + self._requeue_event() + self._delay_processing() + + # ---------------------------------------------------------------------- + def _fetch_event(self): + self._event = self._queue.get(block=False) + + # ---------------------------------------------------------------------- + def _process_event(self): + try: + self._write_event_to_database() + except DatabaseLockedError: + self._safe_log( + u'debug', + u'Database is locked, will try again later (queue length %d)', + self._queue.qsize()) + raise + except Exception as e: + self._log_processing_error(e) + raise ProcessingError() + else: + self._event = None + + # ---------------------------------------------------------------------- + def _log_processing_error(self, exception): + self._safe_log( + u'exception', + u'Log processing error (queue size: %3s): %s', + self._queue.qsize(), + exception) + + # ---------------------------------------------------------------------- + def _delay_processing(self): + self._shutdown_event.wait(QUEUE_CHECK_INTERVAL) + + # ---------------------------------------------------------------------- + def _shutdown_requested(self): + return self._shutdown_event.is_set() + + # ---------------------------------------------------------------------- + def _requeue_event(self): + self._queue.put(self._event) + + # ---------------------------------------------------------------------- + def _write_event_to_database(self): + self._database.add_event(self._event) + self._non_flushed_event_count += 1 + + # ---------------------------------------------------------------------- + def _flush_queued_events(self, force=False): + # check if necessary and abort if not + if not force and not self._queued_event_interval_reached() and not self._queued_event_count_reached(): + return + + try: + queued_events = self._database.get_queued_events() + except DatabaseLockedError: + self._safe_log( + u'debug', + u'Database is locked, will try again later (queue length %d)', + self._queue.qsize()) + return # try again later + except Exception as e: + # just log the exception and hope we can recover from the error + self._safe_log(u'exception', u'Error retrieving queued events: %s', e) + return + + if queued_events: + try: + events = [event['event_text'] for event in queued_events] + self._send_events(events) + except Exception as e: + self._safe_log(u'exception', u'An error occurred while sending events: %s', e) + self._database.requeue_queued_events(queued_events) + else: + self._database.delete_queued_events() + self._reset_flush_counters() + + # ---------------------------------------------------------------------- + def _queued_event_interval_reached(self): + delta = datetime.now() - self._last_event_flush_date + return delta.total_seconds() > QUEUED_EVENTS_FLUSH_INTERVAL + + # ---------------------------------------------------------------------- + def _queued_event_count_reached(self): + return self._non_flushed_event_count > QUEUED_EVENTS_FLUSH_COUNT + + # ---------------------------------------------------------------------- + def _send_events(self, events): + self._transport.send(events) + + # ---------------------------------------------------------------------- + def _log_general_error(self, exc): + self._safe_log(u'exception', u'An unexpected error occurred: %s', exc) + + # ---------------------------------------------------------------------- + def _safe_log(self, log_level, message, *args, **kwargs): + # we cannot log via the logging subsystem any longer once it has been set to shutdown + if self._shutdown_requested(): + safe_log_via_print(log_level, message, *args, **kwargs) + else: + log_func = getattr(self._logger, log_level) + return log_func(message, *args, **kwargs) + + # ---------------------------------------------------------------------- + def _warn_about_non_empty_queue_on_shutdown(self): + queue_size = self._queue.qsize() + if queue_size: + self._safe_log( + 'warn', + u'Non-empty queue while shutting down ({} events pending). ' + u'This indicates a previous error.'.format(queue_size), + extra=dict(queue_size=queue_size)) diff --git a/setup.cfg b/setup.cfg new file mode 100644 index 0000000..5991632 --- /dev/null +++ b/setup.cfg @@ -0,0 +1,7 @@ +[bdist_wheel] +universal=1 + +[flake8] +exclude = build,.git +ignore = E127,E128, +max-line-length = 120 diff --git a/setup.py b/setup.py index bcf4eb8..9ae9c9e 100644 --- a/setup.py +++ b/setup.py @@ -1,13 +1,42 @@ -from distutils.core import setup +# -*- coding: utf-8 -*- + +from os import path +from setuptools import setup +from shutil import rmtree +import sys + +NAME = 'python-logstash-async' +VERSION = '1.0.0' + +here = path.abspath(path.dirname(__file__)) +with open(path.join(here, 'README.rst')) as f: + LONG_DESCRIPTION = f.read().decode('utf-8') + + +if 'bdist_wheel' in sys.argv: + # Remove previous build dir when creating a wheel build, since if files have been removed + # from the project, they'll still be cached in the build dir and end up as part of the + # build, which is really neat! + for directory in ('build', 'dist', 'python_logstash_async.egg-info'): + try: + rmtree(directory) + except: + pass + + setup( - name='python-logstash', - packages=['logstash'], - version='0.4.6', - description='Python logging handler for Logstash.', - long_description=open('README.rst').read(), - author='Volodymyr Klochan', - author_email='vklochan@gmail.com', - url='https://github.com/vklochan/python-logstash', + name=NAME, + packages=['logstash_async'], + version=VERSION, + description='Asynchronous Python logging handler for Logstash.', + long_description=LONG_DESCRIPTION, + license='MIT', + author='Enrico Tröger', + author_email='enrico.troeger@uvena.de', + url='https://github.com/eht16/python-logstash-async', + keywords='logging logstash asynchronous', + requires=['six'], + setup_requires=['flake8'], classifiers=[ 'Development Status :: 4 - Beta', 'Intended Audience :: Developers',