diff --git a/.github/dependabot.yml b/.github/dependabot.yml new file mode 100644 index 0000000..5ace460 --- /dev/null +++ b/.github/dependabot.yml @@ -0,0 +1,6 @@ +version: 2 +updates: + - package-ecosystem: "github-actions" + directory: "/" + schedule: + interval: "weekly" diff --git a/.github/workflows/tests.yml b/.github/workflows/tests.yml index 8db6333..d50eced 100644 --- a/.github/workflows/tests.yml +++ b/.github/workflows/tests.yml @@ -15,17 +15,17 @@ on: jobs: test: name: Python ${{ matrix.python }} - runs-on: ubuntu-20.04 + runs-on: ubuntu-24.04 strategy: matrix: - python: ['3.6', '3.7', '3.8', '3.9', '3.10'] + python: ['3.11', '3.12', '3.13', '3.14'] fail-fast: false steps: - - uses: actions/checkout@v2 + - uses: actions/checkout@v5 - name: Setup Python - uses: actions/setup-python@v2 + uses: actions/setup-python@v6 with: python-version: ${{ matrix.python }} diff --git a/.gitignore b/.gitignore index 79d4fee..193c1d8 100644 --- a/.gitignore +++ b/.gitignore @@ -11,6 +11,7 @@ MANIFEST /eggs /bin /var +/venv /sdist /develop-eggs /.installed.cfg diff --git a/.readthedocs.yaml b/.readthedocs.yaml new file mode 100644 index 0000000..94c1fe0 --- /dev/null +++ b/.readthedocs.yaml @@ -0,0 +1,16 @@ +version: 2 + +sphinx: + configuration: docs/conf.py + +python: + install: + - method: pip + path: . + extra_requirements: + - docs + +build: + os: ubuntu-22.04 + tools: + python: '3.12' diff --git a/docs/about.rst b/docs/about.rst index 3df044d..592f648 100644 --- a/docs/about.rst +++ b/docs/about.rst @@ -13,6 +13,8 @@ It adds the following features: * TCP transport optionally SSL-encrypted * Special formatter ready to be used in Django projects * Special formatter ready to be used in Flask projects + * Support for Elastic Common Schema (ECS, + https://www.elastic.co/blog/introducing-the-elastic-common-schema) Asynchronous processing @@ -92,6 +94,97 @@ License ChangeLog --------- +4.1.0 (Nov 23 2025) ++++++++++++++++++++ + + * Fix handling already closed sockets on shutdown + * Support setting "ssl_verify_flags" to override Python's default + + +4.0.2 (Mar 16 2025) ++++++++++++++++++++ + + * Fix hang on socket errors by not using the logging system + * Do not try to "shutdown" UDP sockets (#107, #108). + + +4.0.1 (Jan 27 2025) ++++++++++++++++++++ + + * Properly require Python >= 3.11. + The previous release had a wrong requirement set unfortunately. + + +4.0.0 (Jan 26 2025) ++++++++++++++++++++ + + * Breaking change: drop support for 3.8, 3.9 and 3.10. + * Catch errors on TCP/UDP socket closing to prevent requeuing of events + (#98, #100). + * Add new setting to configure batch sizes for Beats transport (#93). + * Support URL path in HttpTransport (#103, Seyyed Mohammad Borghei). + * LogstashFormatter: Move top_level_field_set creation to __init__ + to ease setting FORMATTER_LOGSTASH_MESSAGE_FIELD_LIST after + importing the formatter class (#96, #97, feliixx). + + +3.0.0 (Feb 12 2024) ++++++++++++++++++++ + + * Add ECS (Elastic Common Schema) formatter support (#91, Andrii Lahuta). + * On closing UDP/TCP socket, make sure it is reset (#89, #92). + * Ensure to not check for socket write buffer if disabled. + + +2.7.2 (Sep 14 2023) ++++++++++++++++++++ + + * Implement get_non_flushed_event_count() for MemoryCache (#88). + + +2.7.1 (Sep 10 2023) ++++++++++++++++++++ + + * Wait for empty socket write buffer only if "fnctl" is available (#82). + * Wait at most 30s for empty socket write buffer (#83). + * Read pending events from database on startup for immediate flushing (#85). + + +2.7.0 (Aug 20 2023) ++++++++++++++++++++ + + * Add a wait until all data is sent, before closing socket on TCP + transport (#81, Alisher Nazarkhanov). + * Explicitly shutdown the socket connection in Udp/TcpTransport to + prevent peer connection errors. + * Implement optional database VACUUM on shutdown (#80). + + +2.6.0 (Jul 30 2023) ++++++++++++++++++++ + + * Load certificate chain only if a certificate was specified (#79). + * Handle network errors raised by "requests" in HttpTransport (#75). + + +2.5.0 (Apr 16 2022) ++++++++++++++++++++ + + * Handle database disk errors (#72, Peter Mazarovich). + * Use Python's PriorityQueue to reduce memory consumption + (#73, Peter Mazarovich). + + +2.4.0 (Apr 04 2022) ++++++++++++++++++++ + + * Update link to docs for rate limit string notation + (#70, Garrett Hyde). + * Log connection and network errors as warnings (#71). + * Fix support for Django 4.0 by using Request.build_absolute_uri(). + * Use datetime as column type for "entry_date" in database. + + 2.3.0 (May 23 2021) +++++++++++++++++++ diff --git a/docs/conf.py b/docs/conf.py index 9c3ba33..34b77e7 100644 --- a/docs/conf.py +++ b/docs/conf.py @@ -1,5 +1,3 @@ -# -*- coding: utf-8 -*- -# # python-logstash-async documentation build configuration file, created by # sphinx-quickstart on Sun Dec 25 04:59:44 2016. # @@ -65,7 +63,7 @@ # # This is also used if you do content translation via gettext catalogs. # Usually you set "language" from the command line for these cases. -language = None +language = 'en' # List of patterns, relative to source directory, that match files and # directories to ignore when looking for source files. diff --git a/docs/config.rst b/docs/config.rst index 72050ed..2ff359e 100644 --- a/docs/config.rst +++ b/docs/config.rst @@ -90,6 +90,21 @@ Options for configuring the log handler *Default*: ``True`` +``ssl_verify_flags`` + + Specify verify flags for Python's `ssl.SSLContext`. + See the Python documentation for valid values. + This can be useful to override Python 3.13's strict verify flags by passing `0`. + + Only used for `logstash_async.transport.TcpTransport`, + ``logstash_async.transport.BeatsTransport`` and + ``logstash_async.transport.HttpTransport``. + + *Type*: ``integer`` + + *Default*: None + + ``keyfile`` The path to client side SSL key file. @@ -150,10 +165,19 @@ Options for configuring the log handler Options for configuring the log formatter ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ -The following settings are only valid for the provided formatters -`logstash_async.handler.LogstashFormatter`, -`logstash_async.handler.DjangoLogstashFormatter` and -`logstash_async.handler.FlaskLogstashFormatter`. +The following settings are only valid for the provided formatters: + +- `logstash_async.handler.LogstashFormatter` +- `logstash_async.handler.LogstashEcsFormatter` +- `logstash_async.handler.DjangoLogstashFormatter` +- `logstash_async.handler.DjangoLogstashEcsFormatter` +- `logstash_async.handler.FlaskLogstashFormatter` +- `logstash_async.handler.FlaskLogstashEcsFormatter` + +The included formatter classes with the "Ecs" infix generate +events to be stored using the Elastic Common Schema +(ECS, https://www.elastic.co/blog/introducing-the-elastic-common-schema) +and should be used if the target ElasticSearch index uses this schema. You can use any other formatter by configuring Python's logging system accordingly. Any other formatter's `format()` method just @@ -267,6 +291,16 @@ for easy modification. *Default*: ``5.0`` +``constants.SOCKET_CLOSE_WAIT_TIMEOUT`` + + Maximum time in seconds to wait for the socket's write buffer to get empty. + Set to 0 to disable waiting for the socket write buffer to get empty. + + *Type*: ``float`` + + *Default*: ``30.0`` + + ``constants.QUEUE_CHECK_INTERVAL`` Interval in seconds to check the internal queue for new messages @@ -310,6 +344,17 @@ for easy modification. *Default*: ``50`` +``constants.QUEUED_EVENTS_BEATS_BATCH_SIZE`` + + Maximum number of events to be sent to Logstash in one batch when using the Beats transport, + each batch of events is sent using the same connection and can be considered as a kind + of transaction. Should be smaller than `QUEUED_EVENTS_BATCH_SIZE`. + + *Type*: ``integer`` + + *Default*: ``25`` + + ``constants.DATABASE_EVENT_CHUNK_SIZE`` Maximum number of events to be updated within one SQLite statement @@ -351,6 +396,39 @@ for easy modification. *Default*: +``constants.FORMATTER_LOGSTASH_ECS_MESSAGE_FIELD_LIST`` + + Fields to be set on the top-level of a Logstash event/message, do not modify this + unless you know what you are doing. This list is used by "ECS" formatters + (Elastic Common Schema). + + *Type*: ``list`` + + *Default*: + + +``constants.FORMATTER_LOGSTASH_ECS_NORMALIZE_MESSAGE`` + + Whether to adapt dotted ECS fields into nested objects in the Logstash event structure. + Logstash/ElasticSearch can process both variants however nested objects are encouraged + to use. + + Example: `field.nested.key` becomes: + + .. code-block:: json + + "field": { + "nested": { + "key": "..." + } + } + + + *Type*: ``boolean`` + + *Default*: True + + ``constants.ERROR_LOG_RATE_LIMIT`` Enable rate limiting for error messages (e.g. network errors) emitted by the logger @@ -375,6 +453,16 @@ for easy modification. *Default*: None +``constants.DATABASE_VACUUM_ON_SHUTDOWN`` + + Vacuum SQLite database on shutdown - when enabled, the database will be vacuumed on shutdown + to reduce its size on disk. + + *Type*: ``boolean`` + + *Default*: False + + Example usage: .. code-block:: python diff --git a/example1.py b/example1.py index 9b1c52f..178f8f3 100644 --- a/example1.py +++ b/example1.py @@ -1,10 +1,9 @@ -# -*- coding: utf-8 -*- - import logging import sys from logstash_async.handler import AsynchronousLogstashHandler + host = 'localhost' port = 5959 @@ -21,7 +20,7 @@ try: 1 / 0 except Exception as e: - test_logger.exception(u'Exception: %s', e) + test_logger.exception('Exception: %s', e) # add extra field to logstash message extra = { diff --git a/example2.py b/example2.py index 791f4d6..8c6abfc 100644 --- a/example2.py +++ b/example2.py @@ -1,10 +1,9 @@ -# -*- coding: utf-8 -*- - import logging import sys from logstash_async.handler import AsynchronousLogstashHandler + host = 'localhost' port = 5959 diff --git a/example3.py b/example3.py index 5f92603..3a7fdda 100644 --- a/example3.py +++ b/example3.py @@ -1,9 +1,8 @@ -# -*- coding: utf-8 -*- - import logging from logstash_async.handler import AsynchronousLogstashHandler + host = 'localhost' port = 5959 diff --git a/logstash_async/__init__.py b/logstash_async/__init__.py index e1cb8a0..11d6e90 100644 --- a/logstash_async/__init__.py +++ b/logstash_async/__init__.py @@ -1,9 +1,7 @@ -# -*- coding: utf-8 -*- -# # This software may be modified and distributed under the terms # of the MIT license. See the LICENSE file for details. -__version__ = '2.3.0' +__version__ = '4.1.0' # When using an in-memory only cache, this persists the cache through # thread failures, shutdowns, and restarts. diff --git a/logstash_async/cache.py b/logstash_async/cache.py index 0ef8031..0c63f9e 100644 --- a/logstash_async/cache.py +++ b/logstash_async/cache.py @@ -1,5 +1,3 @@ -# -*- coding: utf-8 -*- -# # This software may be modified and distributed under the terms # of the MIT license. See the LICENSE file for details. @@ -20,7 +18,6 @@ def add_event(self, event): :param str event: A log message :return: """ - pass # ---------------------------------------------------------------------- @abstractmethod @@ -29,7 +26,6 @@ def get_queued_events(self): :return: A list of events to be published """ - pass # ---------------------------------------------------------------------- @abstractmethod @@ -42,7 +38,6 @@ def requeue_queued_events(self, events): :param events: :return: """ - pass # ---------------------------------------------------------------------- @abstractmethod @@ -51,7 +46,6 @@ def delete_queued_events(self): :return: """ - pass # ---------------------------------------------------------------------- @abstractmethod @@ -60,4 +54,11 @@ def expire_events(self): :return: """ - pass + + # ---------------------------------------------------------------------- + @abstractmethod + def get_non_flushed_event_count(self): + """Determine the count of pending events in the cache which need to be sent to Logstash. + + :return: Count of pending events + """ diff --git a/logstash_async/constants.py b/logstash_async/constants.py index f04167c..cc4e326 100644 --- a/logstash_async/constants.py +++ b/logstash_async/constants.py @@ -1,5 +1,3 @@ -# -*- coding: utf-8 -*- -# # This software may be modified and distributed under the terms # of the MIT license. See the LICENSE file for details. @@ -13,6 +11,8 @@ class Constants: """ # timeout in seconds for TCP connections SOCKET_TIMEOUT = 5.0 + # maximum time in seconds to wait for the socket's write buffer to get empty + SOCKET_CLOSE_WAIT_TIMEOUT = 30.0 # interval in seconds to check the internal queue for new messages to be cached in the database QUEUE_CHECK_INTERVAL = 2.0 # interval in seconds to send cached events from the database to Logstash @@ -23,6 +23,10 @@ class Constants: QUEUED_EVENTS_FLUSH_COUNT = 50 # maximum number of events to be sent to Logstash in one batch (i.e. using a single connection) QUEUED_EVENTS_BATCH_SIZE = 50 + # maximum number of events to be sent to Logstash in one batch when using the Beats transport, + # each batch of events is sent using the same connection and can be considered as a kind + # of transaction. Should be smaller than QUEUED_EVENTS_BATCH_SIZE. + QUEUED_EVENTS_BEATS_BATCH_SIZE = 25 # maximum number of events to be updated within one SQLite statement DATABASE_EVENT_CHUNK_SIZE = 750 # timeout in seconds to "connect" (i.e. open) the SQLite database @@ -36,17 +40,24 @@ class Constants: 'args', 'asctime', 'created', 'exc_info', 'exc_text', 'filename', 'funcName', 'id', 'levelname', 'levelno', 'lineno', 'module', 'msecs', 'msg', 'name', 'pathname', 'process', - 'processName', 'relativeCreated', 'stack_info', 'thread', 'threadName'] + 'processName', 'relativeCreated', 'stack_info', 'thread', 'threadName', 'taskName'] # fields to be set on the top-level of a Logstash event/message, do not modify this # unless you know what you are doing FORMATTER_LOGSTASH_MESSAGE_FIELD_LIST = [ '@timestamp', '@version', 'host', 'level', 'logsource', 'message', 'pid', 'program', 'type', 'tags', '@metadata'] + FORMATTER_LOGSTASH_ECS_MESSAGE_FIELD_LIST = [ + '@timestamp', '@version', '@metadata', 'message', 'labels', 'tags'] + # convert dotted ECS fields into nested objects + FORMATTER_LOGSTASH_ECS_NORMALIZE_MESSAGE = True # enable rate limiting for error messages (e.g. network errors) emitted by the logger # used in LogProcessingWorker, i.e. when transmitting log messages to the Logstash server. # Use a string like '5 per minute' or None to disable (default), for details see # http://limits.readthedocs.io/en/stable/string-notation.html ERROR_LOG_RATE_LIMIT = None + # Vacuum SQLite database on shutdown - when enabled, the database will be vacuumed on shutdown + # to reduce its size on disk + DATABASE_VACUUM_ON_SHUTDOWN = False constants = Constants() # pylint: disable=invalid-name diff --git a/logstash_async/database.py b/logstash_async/database.py index 5ca4b7a..f64e871 100644 --- a/logstash_async/database.py +++ b/logstash_async/database.py @@ -1,11 +1,9 @@ -# -*- coding: utf-8 -*- -# # This software may be modified and distributed under the terms # of the MIT license. See the LICENSE file for details. -from contextlib import contextmanager import sqlite3 import sys +from contextlib import contextmanager from logstash_async.cache import Cache from logstash_async.constants import constants @@ -18,7 +16,7 @@ `event_id` INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT, `event_text` TEXT NOT NULL, `pending_delete` INTEGER NOT NULL, - `entry_date` TEXT NOT NULL); + `entry_date` DATETIME NOT NULL); ''', '''CREATE INDEX IF NOT EXISTS `idx_pending_delete` ON `event` (pending_delete);''', '''CREATE INDEX IF NOT EXISTS `idx_entry_date` ON `event` (entry_date);''', @@ -29,6 +27,10 @@ class DatabaseLockedError(Exception): pass +class DatabaseDiskIOError(Exception): + pass + + class DatabaseCache(Cache): """ Backend implementation for python-logstash-async. Keeps messages on disk in a SQL-lite DB @@ -47,8 +49,8 @@ def __init__(self, path, event_ttl=None): @contextmanager def _connect(self): - self._open() try: + self._open() with self._connection as connection: yield connection except sqlite3.OperationalError: @@ -96,6 +98,12 @@ def _handle_sqlite_error(self): _, exc, _ = sys.exc_info() if str(exc) == 'database is locked': raise DatabaseLockedError from exc + if str(exc) == 'disk I/O error': + raise DatabaseDiskIOError from exc + if str(exc) == 'unable to open database file': + raise DatabaseDiskIOError from exc + if str(exc) == 'attempt to write a readonly database': + raise DatabaseDiskIOError from exc # ---------------------------------------------------------------------- def get_queued_events(self): @@ -137,8 +145,23 @@ def expire_events(self): if self._event_ttl is None: return - query_delete = "DELETE FROM `event` WHERE " \ - f"`entry_date` < datetime('now', '-{self._event_ttl} seconds');" + query_delete = ("DELETE FROM `event` WHERE " # noqa: S608 + f"`entry_date` < datetime('now', '-{self._event_ttl} seconds');") with self._connect() as connection: cursor = connection.cursor() cursor.execute(query_delete) + + # ---------------------------------------------------------------------- + def vacuum(self): + with self._connect() as connection: + cursor = connection.cursor() + cursor.execute('VACUUM;') + + # ---------------------------------------------------------------------- + def get_non_flushed_event_count(self): + query_fetch = 'SELECT count(*) FROM `event` WHERE `pending_delete` = 0;' + with self._connect() as connection: + cursor = connection.cursor() + cursor.execute(query_fetch) + count = cursor.fetchone()[0] + return count diff --git a/logstash_async/formatter.py b/logstash_async/formatter.py index 5c16834..0677a6e 100644 --- a/logstash_async/formatter.py +++ b/logstash_async/formatter.py @@ -1,18 +1,18 @@ -# -*- coding: utf-8 -*- -# # This software may be modified and distributed under the terms # of the MIT license. See the LICENSE file for details. -from datetime import date, datetime +import importlib.metadata import logging import socket import sys import time import traceback import uuid +from datetime import date, datetime, UTC -from logstash_async.constants import constants import logstash_async +from logstash_async.constants import constants +from logstash_async.utils import normalize_ecs_dict try: @@ -25,6 +25,31 @@ class LogstashFormatter(logging.Formatter): _basic_data_types = (type(None), bool, str, int, float) + class MessageSchema: + TIMESTAMP = '@timestamp' + VERSION = '@version' + METADATA = '@metadata' + HOST = 'host' + LOG_LEVEL = 'level' + LOG_SOURCE = 'logsource' + LOGGER_NAME = 'logger_name' + LINE = 'line' + MESSAGE = 'message' + MESSAGE_TYPE = 'type' + FUNC_NAME = 'func_name' + TASK_NAME = 'task_name' + THREAD_NAME = 'thread_name' + PROCESS_NAME = 'process_name' + INTERPRETER = 'interpreter' + INTERPRETER_VERSION = 'interpreter_version' + PATH = 'path' + PID = 'pid' + PROGRAM = 'program' + STACK_TRACE = 'stack_trace' + ERROR_TYPE = 'error_type' + TAGS = 'tags' + LOGSTASH_ASYNC_VERSION = 'logstash_async_version' + # ---------------------------------------------------------------------- # pylint: disable=too-many-arguments def __init__( @@ -59,6 +84,9 @@ def __init__( self._prefetch_logsource() self._prefetch_program_name() + self.field_skip_set = set(constants.FORMATTER_RECORD_FIELD_SKIP_LIST) + self.top_level_field_set = set(constants.FORMATTER_LOGSTASH_MESSAGE_FIELD_LIST) + # ---------------------------------------------------------------------- def _prefetch_interpreter(self): """Override when needed""" @@ -90,43 +118,29 @@ def _prefetch_program_name(self): # ---------------------------------------------------------------------- def format(self, record): - message = { - '@timestamp': self._format_timestamp(record.created), - '@version': '1', - 'host': self._host, - 'level': record.levelname, - 'logsource': self._logsource, - 'message': record.getMessage(), - 'pid': record.process, - 'program': self._program_name, - 'type': self._message_type, - } - if self._metadata: - message['@metadata'] = self._metadata - if self._tags: - message['tags'] = self._tags + message = self._format_to_dict(record) + return self._serialize(message) + # ---------------------------------------------------------------------- + def _format_to_dict(self, record): + message = self._get_primary_fields(record) # record fields record_fields = self._get_record_fields(record) message.update(record_fields) # prepare dynamic extra fields extra_fields = self._get_extra_fields(record) - # remove all fields to be excluded - self._remove_excluded_fields(message, extra_fields) - # wrap extra fields in configurable namespace - if self._extra_prefix: - message[self._extra_prefix] = extra_fields - else: - message.update(extra_fields) + message.update(extra_fields) + # remove all fields to be excluded + self._remove_excluded_fields(message) # move existing extra record fields into the configured prefix self._move_extra_record_fields_to_prefix(message) - return self._serialize(message) + return message # ---------------------------------------------------------------------- def _format_timestamp(self, time_): - timestamp = datetime.utcfromtimestamp(time_) + timestamp = datetime.fromtimestamp(time_, UTC) formatted_timestamp = timestamp.strftime('%Y-%m-%dT%H:%M:%S') microsecond = int(timestamp.microsecond / 1000) return f'{formatted_timestamp}.{microsecond:03}Z' @@ -150,25 +164,49 @@ def _value_repr(self, value): else: return repr(value) + # ---------------------------------------------------------------------- + def _get_primary_fields(self, record): + schema = self.MessageSchema + primary_fields = { + schema.TIMESTAMP: self._format_timestamp(record.created), + schema.VERSION: '1', + schema.HOST: self._host, + schema.LOG_LEVEL: record.levelname, + schema.LOG_SOURCE: self._logsource, + schema.MESSAGE: record.getMessage(), + schema.PID: record.process, + schema.PROGRAM: self._program_name, + schema.MESSAGE_TYPE: self._message_type, + } + if self._metadata: + primary_fields[schema.METADATA] = self._metadata + if self._tags: + primary_fields[schema.TAGS] = self._tags + return primary_fields + # ---------------------------------------------------------------------- def _get_extra_fields(self, record): + schema = self.MessageSchema extra_fields = { - 'func_name': record.funcName, - 'interpreter': self._interpreter, - 'interpreter_version': self._interpreter_version, - 'line': record.lineno, - 'logger_name': record.name, - 'logstash_async_version': logstash_async.__version__, - 'path': record.pathname, - 'process_name': record.processName, - 'thread_name': record.threadName, + schema.FUNC_NAME: record.funcName, + schema.INTERPRETER: self._interpreter, + schema.INTERPRETER_VERSION: self._interpreter_version, + schema.LINE: record.lineno, + schema.LOGGER_NAME: record.name, + schema.LOGSTASH_ASYNC_VERSION: logstash_async.__version__, + schema.PATH: record.pathname, + schema.PROCESS_NAME: record.processName, + schema.THREAD_NAME: record.threadName, } # static extra fields if self._extra: extra_fields.update(self._extra) + if getattr(record, 'taskName', None): + extra_fields[schema.TASK_NAME] = record.taskName # exceptions if record.exc_info: - extra_fields['stack_trace'] = self._format_exception(record.exc_info) + extra_fields[schema.ERROR_TYPE] = record.exc_info[0].__name__ + extra_fields[schema.STACK_TRACE] = self._format_exception(record.exc_info) return extra_fields # ---------------------------------------------------------------------- @@ -182,16 +220,15 @@ def _format_exception(self, exc_info): return stack_trace # ---------------------------------------------------------------------- - def _remove_excluded_fields(self, message, extra_fields): - for fields in (message, extra_fields): - for field_name in list(fields): - if field_name in constants.FORMATTER_RECORD_FIELD_SKIP_LIST: - del fields[field_name] + def _remove_excluded_fields(self, message): + for field_name in list(message): + if field_name in self.field_skip_set: + del message[field_name] # ---------------------------------------------------------------------- def _move_extra_record_fields_to_prefix(self, message): """ - Anythng added by the "extra" keyword in the logging call will be moved into the + Anything added by the "extra" keyword in the logging call will be moved into the configured "extra" prefix. This way the event in Logstash will be clean and any extras will be paired together in the configured extra prefix. If not extra prefix is configured, the message will be kept as is. @@ -199,9 +236,10 @@ def _move_extra_record_fields_to_prefix(self, message): if not self._extra_prefix: return # early out if no prefix is configured - field_skip_list = constants.FORMATTER_LOGSTASH_MESSAGE_FIELD_LIST + [self._extra_prefix] + message.setdefault(self._extra_prefix, {}) + field_skip_set = self.top_level_field_set | {self._extra_prefix} for key in list(message): - if key not in field_skip_list: + if key not in field_skip_set: message[self._extra_prefix][key] = message.pop(key) # ---------------------------------------------------------------------- @@ -209,7 +247,65 @@ def _serialize(self, message): return json.dumps(message, ensure_ascii=self._ensure_ascii) +class LogstashEcsFormatter(LogstashFormatter): + ecs_version = '8.11.0' + __schema_dict = { + 'ECS_VERSION': 'ecs.version', + 'MESSAGE_TYPE': 'event.module', + 'HOST': 'host.hostname', + 'LOG_LEVEL': 'log.level', + 'LOGGER_NAME': 'log.logger', + 'LOG_SOURCE': 'log.syslog.hostname', + 'LINE': 'log.origin.file.line', + 'PATH': 'log.origin.file.name', + 'FUNC_NAME': 'log.origin.function', + 'STACK_TRACE': 'error.stack_trace', + 'ERROR_TYPE': 'error.type', + 'PROGRAM': 'process.executable', + 'PROCESS_NAME': 'process.name', + 'PID': 'process.pid', + 'THREAD_NAME': 'process.thread.name', + } + + normalize_ecs_message = constants.FORMATTER_LOGSTASH_ECS_NORMALIZE_MESSAGE + MessageSchema = type('MessageSchema', (LogstashFormatter.MessageSchema,), __schema_dict) + + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + self.top_level_field_set = {*constants.FORMATTER_LOGSTASH_ECS_MESSAGE_FIELD_LIST, + *self.__schema_dict.values()} + + def _get_primary_fields(self, record): + message = super()._get_primary_fields(record) + schema = self.MessageSchema + message[schema.ECS_VERSION] = self.ecs_version + return message + + def _format_to_dict(self, record): + message = super()._format_to_dict(record) + if self.normalize_ecs_message: + # pylint: disable-next=redefined-variable-type + message = normalize_ecs_dict(message) + return message + + class DjangoLogstashFormatter(LogstashFormatter): + class MessageSchema(LogstashFormatter.MessageSchema): + DJANGO_VERSION = 'django_version' + RESP_STATUS_CODE = 'status_code' + REQ_USER_AGENT = 'req_useragent' + REQ_REMOTE_ADDRESS = 'req_remote_address' + REQ_HOST = 'req_host' + REQ_URI = 'req_uri' + REQ_USER = 'req_user' + REQ_METHOD = 'req_method' + REQ_REFERER = 'req_referer' + REQ_FORWARDED_PROTO = 'req_forwarded_proto' + REQ_FORWARDED_FOR = 'req_forwarded_for' + TMPL_NAME = 'tmpl_name' + TMPL_LINE = 'tmpl_line' + TMPL_MESSAGE = 'tmpl_message' + TMPL_DURING = 'tmpl_during' # ---------------------------------------------------------------------- def __init__(self, *args, **kwargs): @@ -225,9 +321,10 @@ def _fetch_django_version(self): # ---------------------------------------------------------------------- def _get_extra_fields(self, record): extra_fields = super()._get_extra_fields(record) + schema = self.MessageSchema if hasattr(record, 'status_code'): - extra_fields['status_code'] = record.status_code + extra_fields[schema.RESP_STATUS_CODE] = record.status_code # Django's runserver command passes socketobject and WSGIRequest instances as "request". # Hence the check for the META attribute. @@ -236,34 +333,34 @@ def _get_extra_fields(self, record): request = record.request request_user = self._get_attribute_with_default(request, 'user', '') - extra_fields['django_version'] = self._django_version - extra_fields['req_useragent'] = request.META.get('HTTP_USER_AGENT', '') - extra_fields['req_remote_address'] = request.META.get('REMOTE_ADDR', '') - extra_fields['req_host'] = self._try_to_get_host_from_remote(request) - extra_fields['req_uri'] = request.get_raw_uri() - extra_fields['req_user'] = str(request_user) - extra_fields['req_method'] = request.META.get('REQUEST_METHOD', '') - extra_fields['req_referer'] = request.META.get('HTTP_REFERER', '') + extra_fields[schema.DJANGO_VERSION] = self._django_version + extra_fields[schema.REQ_USER_AGENT] = request.META.get('HTTP_USER_AGENT', '') + extra_fields[schema.REQ_REMOTE_ADDRESS] = request.META.get('REMOTE_ADDR', '') + extra_fields[schema.REQ_HOST] = self._try_to_get_host_from_remote(request) + extra_fields[schema.REQ_URI] = self._try_to_get_full_request_uri(request) + extra_fields[schema.REQ_USER] = str(request_user) + extra_fields[schema.REQ_METHOD] = request.META.get('REQUEST_METHOD', '') + extra_fields[schema.REQ_REFERER] = request.META.get('HTTP_REFERER', '') forwarded_proto = request.META.get('HTTP_X_FORWARDED_PROTO', None) if forwarded_proto is not None: - extra_fields['req_forwarded_proto'] = forwarded_proto + extra_fields[schema.REQ_FORWARDED_PROTO] = forwarded_proto forwarded_for = request.META.get('HTTP_X_FORWARDED_FOR', None) if forwarded_for is not None: # make it a list forwarded_for_list = forwarded_for.replace(' ', '').split(',') - extra_fields['req_forwarded_for'] = forwarded_for_list + extra_fields[schema.REQ_FORWARDED_FOR] = forwarded_for_list # template debug if isinstance(record.exc_info, tuple): exc_value = record.exc_info[1] template_info = getattr(exc_value, 'template_debug', None) if template_info: - extra_fields['tmpl_name'] = template_info['name'] - extra_fields['tmpl_line'] = template_info['line'] - extra_fields['tmpl_message'] = template_info['message'] - extra_fields['tmpl_during'] = template_info['during'] + extra_fields[schema.TMPL_NAME] = template_info['name'] + extra_fields[schema.TMPL_LINE] = template_info['line'] + extra_fields[schema.TMPL_MESSAGE] = template_info['message'] + extra_fields[schema.TMPL_DURING] = template_info['during'] return extra_fields @@ -290,53 +387,131 @@ def _try_to_get_host_from_remote(self, request): else: return request.META['SERVER_NAME'] + # ---------------------------------------------------------------------- + def _try_to_get_full_request_uri(self, request): + try: + return request.build_absolute_uri() + except Exception: + # build_absolute_uri() may fail with DisallowedHost errors and maybe more + return None + + +class DjangoLogstashEcsFormatter(DjangoLogstashFormatter, LogstashEcsFormatter): + __schema_dict = { + 'RESP_STATUS_CODE': 'http.response.status_code', + 'REQ_USER_AGENT': 'user_agent.original', + 'REQ_REMOTE_ADDRESS': 'client.ip', + 'REQ_HOST': 'client.domain', + 'REQ_URI': 'url.original', + 'REQ_USER': 'user.name', + 'REQ_METHOD': 'http.request.method', + 'REQ_REFERER': 'http.request.referrer', + } + + MessageSchema = type( + 'MessageSchema', + (DjangoLogstashFormatter.MessageSchema, LogstashEcsFormatter.MessageSchema), + __schema_dict, + ) + + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + self.top_level_field_set = self.top_level_field_set | set(self.__schema_dict.values()) + + def _remove_excluded_fields(self, message): + message.pop('status_code', None) + super()._remove_excluded_fields(message) + class FlaskLogstashFormatter(LogstashFormatter): + class MessageSchema(LogstashFormatter.MessageSchema): + FLASK_VERSION = 'flask_version' + RESP_STATUS_CODE = 'status_code' + REQ_USER_AGENT = 'req_useragent' + REQ_REMOTE_ADDRESS = 'req_remote_address' + REQ_HOST = 'req_host' + REQ_URI = 'req_uri' + REQ_USER = 'req_user' + REQ_METHOD = 'req_method' + REQ_REFERER = 'req_referer' + REQ_ID = 'request_id' + REQ_FORWARDED_PROTO = 'req_forwarded_proto' + REQ_FORWARDED_FOR = 'req_forwarded_for' # ---------------------------------------------------------------------- def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) - self._django_version = None + self._flask_version = None self._fetch_flask_version() # ---------------------------------------------------------------------- def _fetch_flask_version(self): - from flask import __version__ # pylint: disable=import-error,import-outside-toplevel - self._flask_version = __version__ + self._flask_version = importlib.metadata.version('flask') # ---------------------------------------------------------------------- def _get_extra_fields(self, record): - from flask import request # pylint: disable=import-error,import-outside-toplevel + # pylint: disable-next=import-error,import-outside-toplevel + from flask import request extra_fields = super()._get_extra_fields(record) + schema = self.MessageSchema - extra_fields['flask_version'] = self._flask_version + extra_fields[schema.FLASK_VERSION] = self._flask_version if request: # request might be unbound in other threads - extra_fields['req_useragent'] = str(request.user_agent) if request.user_agent else '' - extra_fields['req_remote_address'] = request.remote_addr - extra_fields['req_host'] = request.host.split(':', 1)[0] - extra_fields['req_uri'] = request.url - extra_fields['req_method'] = request.method - extra_fields['req_referer'] = request.referrer + extra_fields[schema.REQ_USER_AGENT] = (str(request.user_agent) + if request.user_agent else '') + extra_fields[schema.REQ_REMOTE_ADDRESS] = request.remote_addr + extra_fields[schema.REQ_HOST] = request.host.split(':', 1)[0] + extra_fields[schema.REQ_URI] = request.url + extra_fields[schema.REQ_METHOD] = request.method + extra_fields[schema.REQ_REFERER] = request.referrer if 'X-Request-ID' in request.headers: - extra_fields['request_id'] = request.headers.get('X-Request-ID') + extra_fields[schema.REQ_ID] = request.headers.get('X-Request-ID') if request.remote_user: - extra_fields['req_user'] = request.remote_user + extra_fields[schema.REQ_USER] = request.remote_user forwarded_proto = request.headers.get('X-Forwarded-Proto', None) if forwarded_proto is not None: - extra_fields['req_forwarded_proto'] = forwarded_proto + extra_fields[schema.REQ_FORWARDED_PROTO] = forwarded_proto forwarded_for = request.headers.get('X-Forwarded-For', None) if forwarded_for is not None: # make it a list forwarded_for_list = forwarded_for.replace(' ', '').split(',') - extra_fields['req_forwarded_for'] = forwarded_for_list + extra_fields[schema.REQ_FORWARDED_FOR] = forwarded_for_list # check if we have a status code somewhere if hasattr(record, 'status_code'): - extra_fields['status_code'] = record.status_code + extra_fields[schema.RESP_STATUS_CODE] = record.status_code if hasattr(record, 'response'): - extra_fields['status_code'] = record.response.status_code + extra_fields[schema.RESP_STATUS_CODE] = record.response.status_code return extra_fields + + +class FlaskLogstashEcsFormatter(FlaskLogstashFormatter, LogstashEcsFormatter): + __schema_dict = { + 'RESP_STATUS_CODE': 'http.response.status_code', + 'REQ_USER_AGENT': 'user_agent.original', + 'REQ_REMOTE_ADDRESS': 'client.ip', + 'REQ_HOST': 'client.domain', + 'REQ_URI': 'url.original', + 'REQ_USER': 'user.name', + 'REQ_METHOD': 'http.request.method', + 'REQ_REFERER': 'http.request.referrer', + 'REQ_ID': 'http.request.id', + } + + MessageSchema = type( + 'MessageSchema', + (FlaskLogstashFormatter.MessageSchema, LogstashEcsFormatter.MessageSchema), + __schema_dict, + ) + + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + self.top_level_field_set = self.top_level_field_set | set(self.__schema_dict.values()) + + def _remove_excluded_fields(self, message): + message.pop('status_code', None) + super()._remove_excluded_fields(message) diff --git a/logstash_async/handler.py b/logstash_async/handler.py index 131a5ce..0c8aeaf 100644 --- a/logstash_async/handler.py +++ b/logstash_async/handler.py @@ -1,15 +1,13 @@ -# -*- coding: utf-8 -*- -# # This software may be modified and distributed under the terms # of the MIT license. See the LICENSE file for details. from logging import Handler +import logstash_async from logstash_async.constants import constants from logstash_async.formatter import LogstashFormatter from logstash_async.utils import import_string, safe_log_via_print from logstash_async.worker import LogProcessingWorker -import logstash_async class ProcessingError(Exception): @@ -23,6 +21,7 @@ class SynchronousLogstashHandler(Handler): :param transport: Callable or path to a compatible transport class. :param ssl_enable: Should SSL be enabled for the connection? Default is False. :param ssl_verify: Should the server's SSL certificate be verified? + :param ssl_verify_flags: Verification flags for ssl.SSLContext (Default: None) :param keyfile: The path to client side SSL key file (default is None). :param certfile: The path to client side SSL certificate file (default is None). :param ca_certs: The path to the file containing recognized CA certificates. @@ -31,16 +30,17 @@ class SynchronousLogstashHandler(Handler): """ # ---------------------------------------------------------------------- - # pylint: disable=too-many-arguments + # pylint: disable=too-many-arguments,too-many-positional-arguments def __init__(self, host, port, transport='logstash_async.transport.TcpTransport', - ssl_enable=False, ssl_verify=True, keyfile=None, certfile=None, ca_certs=None, - enable=True, encoding='utf-8', **kwargs): + ssl_enable=False, ssl_verify=True, ssl_verify_flags=None, keyfile=None, + certfile=None, ca_certs=None, enable=True, encoding='utf-8', **kwargs): super().__init__() self._host = host self._port = port self._transport_path = transport self._ssl_enable = ssl_enable self._ssl_verify = ssl_verify + self._ssl_verify_flags = ssl_verify_flags self._keyfile = keyfile self._certfile = certfile self._ca_certs = ca_certs @@ -74,6 +74,7 @@ def _setup_transport(self, **kwargs): timeout=constants.SOCKET_TIMEOUT, ssl_enable=self._ssl_enable, ssl_verify=self._ssl_verify, + ssl_verify_flags=self._ssl_verify_flags, keyfile=self._keyfile, certfile=self._certfile, ca_certs=self._ca_certs, @@ -86,9 +87,9 @@ def _setup_transport(self, **kwargs): elif hasattr(self._transport_path, 'send'): self._transport = self._transport_path else: - raise RuntimeError( - 'Invalid transport path: must be an importable module path, ' - 'a class or factory function or an instance.') + error_message = ('Invalid transport path: must be an importable module path, ' + 'a class or factory function or an instance.') + raise RuntimeError(error_message) # ---------------------------------------------------------------------- def _format_record(self, record): @@ -137,16 +138,17 @@ class AsynchronousLogstashHandler(SynchronousLogstashHandler): _worker_thread = None # ---------------------------------------------------------------------- - # pylint: disable=too-many-arguments + # pylint: disable=too-many-arguments,too-many-positional-arguments def __init__(self, host, port, database_path, transport='logstash_async.transport.TcpTransport', - ssl_enable=False, ssl_verify=True, keyfile=None, certfile=None, ca_certs=None, - enable=True, event_ttl=None, encoding='utf-8', **kwargs): + ssl_enable=False, ssl_verify=True, ssl_verify_flags=None, keyfile=None, + certfile=None, ca_certs=None, enable=True, event_ttl=None, encoding='utf-8', + **kwargs): self._database_path = database_path self._event_ttl = event_ttl super().__init__(host, port, transport, - ssl_enable, ssl_verify, keyfile, certfile, ca_certs, + ssl_enable, ssl_verify, ssl_verify_flags, keyfile, certfile, ca_certs, enable, encoding, **kwargs) # ---------------------------------------------------------------------- @@ -180,6 +182,7 @@ def _start_worker_thread(self): transport=self._transport, ssl_enable=self._ssl_enable, ssl_verify=self._ssl_verify, + ssl_verify_flags=self._ssl_verify_flags, keyfile=self._keyfile, certfile=self._certfile, ca_certs=self._ca_certs, @@ -192,10 +195,8 @@ def _start_worker_thread(self): @staticmethod def _worker_thread_is_running(): worker_thread = AsynchronousLogstashHandler._worker_thread - if worker_thread is not None and worker_thread.is_alive(): - return True - return False + return worker_thread is not None and worker_thread.is_alive() # ---------------------------------------------------------------------- def shutdown(self): diff --git a/logstash_async/memory_cache.py b/logstash_async/memory_cache.py index ce8a370..9a25156 100644 --- a/logstash_async/memory_cache.py +++ b/logstash_async/memory_cache.py @@ -1,11 +1,9 @@ -# -*- coding: utf-8 -*- -# # This software may be modified and distributed under the terms # of the MIT license. See the LICENSE file for details. -from datetime import datetime, timedelta -from logging import getLogger as get_logger import uuid +from datetime import datetime, timedelta, UTC +from logging import getLogger as get_logger # noqa: N813 from logstash_async.cache import Cache from logstash_async.constants import constants @@ -31,10 +29,10 @@ def __init__(self, cache, event_ttl=None): def add_event(self, event): event_id = uuid.uuid4() self._cache[event_id] = { - "event_text": event, - "pending_delete": False, - "entry_date": datetime.now(), - "id": event_id + 'event_text': event, + 'pending_delete': False, + 'entry_date': datetime.now(tz=UTC), + 'id': event_id } # ---------------------------------------------------------------------- @@ -63,7 +61,7 @@ def requeue_queued_events(self, events): event_to_queue['pending_delete'] = False else: self.logger.warning( - "Could not requeue event with id %s. It does not appear to be in the cache.", + 'Could not requeue event with id %s. It does not appear to be in the cache.', event['id']) # ---------------------------------------------------------------------- @@ -76,7 +74,7 @@ def expire_events(self): if self._event_ttl is None: return - delete_time = datetime.now() - timedelta(seconds=self._event_ttl) + delete_time = datetime.now(tz=UTC) - timedelta(seconds=self._event_ttl) ids_to_delete = [ event['id'] for event in self._cache.values() @@ -91,5 +89,9 @@ def _delete_events(self, ids_to_delete): event = self._cache.pop(event_id, None) if not event: self.logger.warning( - "Could not delete event with id %s. It does not appear to be in the cache.", + 'Could not delete event with id %s. It does not appear to be in the cache.', event_id) + + # ---------------------------------------------------------------------- + def get_non_flushed_event_count(self): + return len([event for event in self._cache.values() if not event['pending_delete']]) diff --git a/logstash_async/transport.py b/logstash_async/transport.py index 2543ec5..7e4245b 100644 --- a/logstash_async/transport.py +++ b/logstash_async/transport.py @@ -1,20 +1,30 @@ -# -*- coding: utf-8 -*- -# # This software may be modified and distributed under the terms # of the MIT license. See the LICENSE file for details. -from abc import ABC, abstractmethod -from typing import Iterator, Union import json import logging import socket import ssl +import time +from abc import ABC, abstractmethod +from collections.abc import Iterator +from typing import Union + + +# Use fcntl to control socket buffering if available +try: + import fcntl + import struct + import termios +except ImportError: + fcntl = None -from requests.auth import HTTPBasicAuth import pylogbeat import requests +from requests.auth import HTTPBasicAuth -from logstash_async.utils import ichunked +from logstash_async.constants import constants +from logstash_async.utils import ichunked, safe_log_via_print logger = logging.getLogger(__name__) @@ -38,6 +48,8 @@ class Transport(ABC): :type ssl_enable: bool :param ssl_verify: Activates the TLS certificate verification. :type ssl_verify: bool or str + :param ssl_verify_flags: Verification flags for ssl.SSLContext or None + :type ssl_verify_flags: int :param use_logging: Use logging for debugging. :type use_logging: bool """ @@ -49,6 +61,7 @@ def __init__( timeout: Union[None, float], ssl_enable: bool, ssl_verify: Union[bool, str], + ssl_verify_flags: Union[None, int], use_logging: bool, ): self._host = host @@ -56,6 +69,7 @@ def __init__( self._timeout = None if timeout is TimeoutNotSet else timeout self._ssl_enable = ssl_enable self._ssl_verify = ssl_verify + self._ssl_verify_flags = ssl_verify_flags self._use_logging = use_logging super().__init__() @@ -122,8 +136,53 @@ def _convert_data_to_send(self, data): def _close(self, force=False): if not self._keep_connection or force: if self._sock: - self._sock.close() - self._sock = None + try: + self._wait_for_socket_buffer_empty() + self._try_to_close_socket() + finally: + self._sock = None + + # ---------------------------------------------------------------------- + def _wait_for_socket_buffer_empty(self): + wait_timeout = constants.SOCKET_CLOSE_WAIT_TIMEOUT + interval = 0.05 + time_waited = 0 + # wait until the socket's write buffer is empty + # but do not wait longer than SOCKET_CLOSE_WAIT_TIMEOUT + while time_waited < wait_timeout and not self._is_sock_write_buff_empty(): + time_waited += interval + time.sleep(interval) + + # ---------------------------------------------------------------------- + def _is_sock_write_buff_empty(self): + if fcntl is None: + return True + + socket_fd = self._sock.fileno() + if socket_fd == -1: + return True + + buffer_size = struct.pack('I', 0) + ioctl_result = fcntl.ioctl(socket_fd, termios.TIOCOUTQ, buffer_size) + buffer_size = struct.unpack('I', ioctl_result)[0] + return not buffer_size + + # ---------------------------------------------------------------------- + def _try_to_close_socket(self): + self._shutdown_socket() + try: + self._sock.close() + except Exception as exc: + message = f'Error on closing the transport socket: {exc}' + self._log_close_socket_error(message) + + # ---------------------------------------------------------------------- + def _shutdown_socket(self): + pass # not necessary for UDP sockets + + # ---------------------------------------------------------------------- + def _log_close_socket_error(self, message): + safe_log_via_print('warning', message) # ---------------------------------------------------------------------- def close(self): @@ -139,6 +198,7 @@ def __init__( # pylint: disable=too-many-arguments port, ssl_enable, ssl_verify, + ssl_verify_flags, keyfile, certfile, ca_certs, @@ -147,6 +207,7 @@ def __init__( # pylint: disable=too-many-arguments super().__init__(host, port) self._ssl_enable = ssl_enable self._ssl_verify = ssl_verify + self._ssl_verify_flags = ssl_verify_flags self._keyfile = keyfile self._certfile = certfile self._ca_certs = ca_certs @@ -171,16 +232,18 @@ def _create_socket(self): cert_reqs = ssl.CERT_REQUIRED ssl_context = ssl.create_default_context(cafile=self._ca_certs) if not self._ssl_verify: - if self._ca_certs: - cert_reqs = ssl.CERT_OPTIONAL - else: - cert_reqs = ssl.CERT_NONE + cert_reqs = ssl.CERT_OPTIONAL if self._ca_certs else ssl.CERT_NONE - ssl_context.verify_mode = cert_reqs ssl_context.check_hostname = False - ssl_context.load_cert_chain(self._certfile, self._keyfile) + ssl_context.verify_mode = cert_reqs + + if self._ssl_verify_flags is not None: + ssl_context.verify_flags = self._ssl_verify_flags + + if self._certfile and self._keyfile: + ssl_context.load_cert_chain(self._certfile, self._keyfile) self._sock = ssl_context.wrap_socket(self._sock, server_side=False) - except socket.error: + except OSError: self._close() raise @@ -189,11 +252,17 @@ def _send_via_socket(self, data): data_to_send = self._convert_data_to_send(data) self._sock.sendall(data_to_send) + # ---------------------------------------------------------------------- + def _shutdown_socket(self): + try: + self._sock.shutdown(socket.SHUT_WR) + except Exception as exc: + message = f'Error on shutting down the transport socket: {exc}' + self._log_close_socket_error(message) + class BeatsTransport: - _batch_size = 10 - # ---------------------------------------------------------------------- def __init__( # pylint: disable=too-many-arguments self, @@ -201,6 +270,7 @@ def __init__( # pylint: disable=too-many-arguments port, ssl_enable, ssl_verify, + ssl_verify_flags, keyfile, certfile, ca_certs, @@ -213,6 +283,7 @@ def __init__( # pylint: disable=too-many-arguments timeout=timeout_, ssl_enable=ssl_enable, ssl_verify=ssl_verify, + ssl_verify_flags=ssl_verify_flags, keyfile=keyfile, certfile=certfile, ca_certs=ca_certs, @@ -226,7 +297,7 @@ def close(self): def send(self, events, use_logging=False): client = pylogbeat.PyLogBeatClient(use_logging=use_logging, **self._client_arguments) with client: - for events_subset in ichunked(events, self._batch_size): + for events_subset in ichunked(events, constants.QUEUED_EVENTS_BEATS_BATCH_SIZE): client.send(events_subset) @@ -241,6 +312,8 @@ class HttpTransport(Transport): :type host: str :param port: The TCP port of the logstash HTTP server. :type port: int + :param path: The path of the logstash HTTP server. + :type path: str :param timeout: The connection timeout. (Default: None) :type timeout: float :param ssl_enable: Activates TLS. (Default: True) @@ -250,6 +323,8 @@ class HttpTransport(Transport): pass a string with a file location to CA certificate the class tries to validate it against it. (Default: True) :type ssl_verify: bool or str + :param ssl_verify_flags: Verification flags for ssl.SSLContext (Default: None) + :type ssl_verify_flags: int :param use_logging: Use logging for debugging. :type use_logging: bool :param username: Username for basic authorization. (Default: "") @@ -261,6 +336,7 @@ class HttpTransport(Transport): :type max_content_length: int """ + # pylint: disable=too-many-arguments,too-many-positional-arguments def __init__( self, host: str, @@ -268,13 +344,16 @@ def __init__( timeout: Union[None, float] = TimeoutNotSet, ssl_enable: bool = True, ssl_verify: Union[bool, str] = True, + ssl_verify_flags: Union[None, int] = None, use_logging: bool = False, + path: str = '', **kwargs ): - super().__init__(host, port, timeout, ssl_enable, ssl_verify, use_logging) - self._username = kwargs.get('username', None) - self._password = kwargs.get('password', None) + super().__init__(host, port, timeout, ssl_enable, ssl_verify, ssl_verify_flags, use_logging) + self._username = kwargs.get('username') + self._password = kwargs.get('password') self._max_content_length = kwargs.get('max_content_length', 100 * 1024 * 1024) + self._path = path self.__session = None @property @@ -288,7 +367,7 @@ def url(self) -> str: protocol = 'http' if self._ssl_enable: protocol = 'https' - return f'{protocol}://{self._host}:{self._port}' + return f'{protocol}://{self._host}:{self._port}/{self._path}' def __batches(self, events: list) -> Iterator[list]: """Generate dynamic sized batches based on the max content length. diff --git a/logstash_async/utils.py b/logstash_async/utils.py index f2fd676..f6f686f 100644 --- a/logstash_async/utils.py +++ b/logstash_async/utils.py @@ -1,15 +1,12 @@ -# -*- coding: utf-8 -*- -# # This software may be modified and distributed under the terms # of the MIT license. See the LICENSE file for details. -from __future__ import print_function - -from datetime import datetime -from importlib import import_module -from itertools import chain, islice import sys import traceback +from copy import deepcopy +from datetime import datetime, UTC +from importlib import import_module +from itertools import chain, islice # ---------------------------------------------------------------------- @@ -29,16 +26,16 @@ def ichunked(seq, chunksize): # ---------------------------------------------------------------------- def safe_log_via_print(log_level, message, *args, **kwargs): - timestamp = datetime.now().strftime('%Y-%m-%d %H:%M:%S') + timestamp = datetime.now(tz=UTC).strftime('%Y-%m-%d %H:%M:%S') log_message = f'{timestamp}: {log_level}: {message}' - print(log_message % args, file=sys.stderr) + print(log_message % args, file=sys.stderr) # noqa: T201 # print stack trace if available - exc_info = kwargs.get('exc_info', None) + exc_info = kwargs.get('exc_info') if exc_info or log_level == 'exception': if not isinstance(exc_info, tuple): exc_info = sys.exc_info() stack_trace = ''.join(traceback.format_exception(*exc_info)) - print(stack_trace, file=sys.stderr) + print(stack_trace, file=sys.stderr) # noqa: T201 # ---------------------------------------------------------------------- @@ -52,11 +49,70 @@ def import_string(dotted_path): try: module_path, class_name = dotted_path.rsplit('.', 1) except ValueError as exc: - raise ImportError(f'{dotted_path} does not look like a module path') from exc + error_message = f'{dotted_path} does not look like a module path' + raise ImportError(error_message) from exc module = import_module(module_path) try: return getattr(module, class_name) except AttributeError as exc: - raise ImportError( - f'Module "{module_path}" does not define a "{class_name}" attribute/class') from exc + error_message = f'Module "{module_path}" does not define a "{class_name}" attribute/class' + raise ImportError(error_message) from exc + + +# ---------------------------------------------------------------------- +# pylint: disable-next=invalid-name +class normalize_ecs_dict: # noqa: N801 + """ + Convert dotted ecs fields into nested objects. + """ + + def __new__(cls, ecs_dict): + new_dict = deepcopy(ecs_dict) + cls.normalize_dict(new_dict) + return new_dict + + @classmethod + def normalize_dict(cls, ecs_dict): + for key in list(ecs_dict): + if '.' in key: + cls.merge_dicts(ecs_dict, cls.de_dot_record(key, ecs_dict.pop(key))) + for val in ecs_dict.values(): + cls.normalize_value(val) + + @classmethod + def normalize_sequence(cls, ecs_sequence): + for val in ecs_sequence: + cls.normalize_value(val) + + @classmethod + def normalize_value(cls, ecs_value): + if isinstance(ecs_value, dict): + cls.normalize_dict(ecs_value) + if isinstance(ecs_value, (list, tuple, set)): + cls.normalize_sequence(ecs_value) + + @classmethod + def merge_dicts(cls, target, src): + """ + Merge dicts recursively. + Mutates `target`. + Uses references from `src` which may lead to `src` mutation. + """ + for key, src_value in src.items(): + if key in target: + target_value = target[key] + if isinstance(target_value, dict) and isinstance(src_value, dict): + cls.merge_dicts(target_value, src_value) + else: + target[key] = src_value + else: + target[key] = src_value + + @classmethod + def de_dot_record(cls, key, value): + keys = key.split('.') + res = {keys.pop(): value} + for k in reversed(keys): + res = {k: res} + return res diff --git a/logstash_async/worker.py b/logstash_async/worker.py index aeb8dcf..3d84d1e 100644 --- a/logstash_async/worker.py +++ b/logstash_async/worker.py @@ -1,24 +1,40 @@ -# -*- coding: utf-8 -*- -# # This software may be modified and distributed under the terms # of the MIT license. See the LICENSE file for details. -from datetime import datetime -from logging import getLogger as get_logger -from queue import Empty, Queue +import contextlib +from datetime import datetime, UTC +from logging import getLogger as get_logger # noqa: N813 +from queue import Empty, PriorityQueue from socket import gaierror as socket_gaierror from threading import Event, Thread from limits import parse as parse_rate_limit from limits.storage import MemoryStorage from limits.strategies import FixedWindowRateLimiter +from requests.exceptions import ConnectionError as RequestsConnectionError +from requests.exceptions import ConnectTimeout, HTTPError, ProxyError, RetryError, Timeout from logstash_async.constants import constants -from logstash_async.database import DatabaseCache, DatabaseLockedError +from logstash_async.database import DatabaseCache, DatabaseDiskIOError, DatabaseLockedError from logstash_async.memory_cache import MemoryCache from logstash_async.utils import safe_log_via_print +NETWORK_EXCEPTIONS = ( + # Python + ConnectionError, + TimeoutError, + socket_gaierror, + # Requests + ConnectTimeout, + RequestsConnectionError, + HTTPError, + ProxyError, + RetryError, + Timeout, +) + + class ProcessingError(Exception): """""" @@ -33,6 +49,7 @@ def __init__(self, *args, **kwargs): self._transport = kwargs.pop('transport') self._ssl_enable = kwargs.pop('ssl_enable') self._ssl_verify = kwargs.pop('ssl_verify') + self._ssl_verify_flags = kwargs.pop('ssl_verify_flags') self._keyfile = kwargs.pop('keyfile') self._certfile = kwargs.pop('certfile') self._ca_certs = kwargs.pop('ca_certs') @@ -46,7 +63,7 @@ def __init__(self, *args, **kwargs): self._shutdown_event = Event() self._flush_event = Event() - self._queue = Queue() + self._queue = PriorityQueue() self._event = None self._database = None @@ -80,6 +97,7 @@ def run(self): self._log_general_error(exc) # check for empty queue and report if not self._warn_about_non_empty_queue_on_shutdown() + self._vaccum_database() # ---------------------------------------------------------------------- def force_flush_queued_events(self): @@ -87,7 +105,7 @@ def force_flush_queued_events(self): # ---------------------------------------------------------------------- def _reset_flush_counters(self): - self._last_event_flush_date = datetime.now() + self._last_event_flush_date = datetime.now(tz=UTC) self._non_flushed_event_count = 0 # ---------------------------------------------------------------------- @@ -111,13 +129,15 @@ def _setup_database(self): else: self._database = MemoryCache(cache=self._memory_cache, event_ttl=self._event_ttl) + self._non_flushed_event_count = self._database.get_non_flushed_event_count() + # ---------------------------------------------------------------------- def _fetch_events(self): while True: try: self._fetch_event() self._process_event() - except Empty: + except Empty: # noqa: PERF203 # Flush queued (in database) events after internally queued events has been # processed, i.e. the queue is empty. if self._shutdown_requested(): @@ -128,7 +148,7 @@ def _fetch_events(self): self._flush_queued_events(force=force_flush) self._delay_processing() self._expire_events() - except (DatabaseLockedError, ProcessingError): + except (DatabaseLockedError, ProcessingError, DatabaseDiskIOError): if self._shutdown_requested(): return @@ -150,6 +170,13 @@ def _process_event(self): self._queue.qsize(), exc=exc) raise + except DatabaseDiskIOError as exc: + self._safe_log( + 'debug', + 'Disk I/O error, will try again later (queue length %d)', + self._queue.qsize(), + exc=exc) + raise except Exception as exc: self._log_processing_error(exc) raise ProcessingError from exc @@ -158,12 +185,10 @@ def _process_event(self): # ---------------------------------------------------------------------- def _expire_events(self): - try: + # Nothing to handle, if it fails, we will either successfully publish + # these messages next time or we will delete them on the next pass. + with contextlib.suppress(DatabaseLockedError, DatabaseDiskIOError): self._database.expire_events() - except DatabaseLockedError: - # Nothing to handle, if it fails, we will either successfully publish - # these messages next time or we will delete them on the next pass. - pass # ---------------------------------------------------------------------- def _log_processing_error(self, exception): @@ -212,10 +237,10 @@ def _flush_queued_events(self, force=False): try: events = [event['event_text'] for event in queued_events] self._send_events(events) - # exception types for which we do not want a stack trace - except (ConnectionError, TimeoutError, socket_gaierror) as exc: + # Log connection and network errors as warnings as they are rather harmless + except NETWORK_EXCEPTIONS as exc: self._safe_log( - 'error', + 'warning', 'An error occurred while sending events: %s', exc) self._database.requeue_queued_events(queued_events) @@ -242,6 +267,13 @@ def _fetch_queued_events_for_flush(self): 'Database is locked, will try again later (queue length %d)', self._queue.qsize(), exc=exc) + except DatabaseDiskIOError as exc: + self._safe_log( + 'debug', + 'Disk I/O error, will try again later (queue length %d)', + self._queue.qsize(), + exc=exc) + raise except Exception as exc: # just log the exception and hope we can recover from the error self._safe_log('exception', 'Error retrieving queued events: %s', exc, exc=exc) @@ -250,14 +282,13 @@ def _fetch_queued_events_for_flush(self): # ---------------------------------------------------------------------- def _delete_queued_events_from_database(self): - try: + # Nothing to handle, if it fails, we delete those events in a later run + with contextlib.suppress(DatabaseLockedError, DatabaseDiskIOError): self._database.delete_queued_events() - except DatabaseLockedError: - pass # nothing to handle, if it fails, we delete those events in a later run # ---------------------------------------------------------------------- def _queued_event_interval_reached(self): - delta = datetime.now() - self._last_event_flush_date + delta = datetime.now(tz=UTC) - self._last_event_flush_date return delta.total_seconds() > constants.QUEUED_EVENTS_FLUSH_INTERVAL # ---------------------------------------------------------------------- @@ -303,7 +334,7 @@ def _rate_limit_check(self, kwargs): return 2 # any value greater than 1 means allowed # ---------------------------------------------------------------------- - def _factor_rate_limit_key(self, exc): # pylint: disable=no-self-use + def _factor_rate_limit_key(self, exc): module_name = getattr(exc, '__module__', '__no_module__') class_name = exc.__class__.__name__ key_items = [module_name, class_name] @@ -325,4 +356,14 @@ def _warn_about_non_empty_queue_on_shutdown(self): 'warn', f'Non-empty queue while shutting down ({queue_size} events pending). ' 'This indicates a previous error.', - extra=dict(queue_size=queue_size)) + extra={'queue_size': queue_size}) + + # ---------------------------------------------------------------------- + def _vaccum_database(self): + if not constants.DATABASE_VACUUM_ON_SHUTDOWN: + return + + try: + self._database.vacuum() + except DatabaseLockedError: + self._safe_log('debug', 'Database is locked, ignore vacuuming database') diff --git a/setup.cfg b/setup.cfg index fef1030..b5c2f9f 100644 --- a/setup.cfg +++ b/setup.cfg @@ -1,5 +1,3 @@ -# -*- coding: utf-8 -*- -# # This software may be modified and distributed under the terms # of the MIT license. See the LICENSE file for details. @@ -24,23 +22,21 @@ known_first_party = logstash_async known_third_party = limits,pylogbeat,requests sections = FUTURE,STDLIB,THIRDPARTY,FIRSTPARTY,LOCALFOLDER lines_after_imports = 2 -from_first = true +from_first = false include_trailing_comma = true # the following sections are for pylint -[MASTER] +[pylint.main] ignore=.git persistent=no load-plugins= pylint.extensions.bad_builtin, pylint.extensions.check_elif, - pylint.extensions.comparetozero, - pylint.extensions.emptystring, pylint.extensions.mccabe, pylint.extensions.overlapping_exceptions, pylint.extensions.redefined_variable_type -[MESSAGES CONTROL] +[pylint] disable= fixme, duplicate-code, @@ -49,25 +45,28 @@ disable= missing-docstring, no-else-raise, no-else-return, - no-self-use, - unnecessary-pass + unnecessary-pass, + use-dict-literal -[REPORTS] +[pylint.reports] output-format=parseable -files-output=no reports=no -[FORMAT] +[pylint.format] max-line-length=100 -[VARIABLES] +[pylint.variables] dummy-variables-rgx=_|dummy -[DESIGN] +[pylint.basic] +good-names=i,QUEUED_EVENTS_BATCH_SIZE + +[pylint.design] min-public-methods=0 max-attributes=15 -max-args=7 +max-args=8 +max-positional-arguments=13 max-parents=9 -[EXCEPTIONS] +[pylint.exceptions] overgeneral-exceptions= diff --git a/setup.py b/setup.py index 2929551..db163a0 100644 --- a/setup.py +++ b/setup.py @@ -1,15 +1,15 @@ -# -*- coding: utf-8 -*- -# # This software may be modified and distributed under the terms # of the MIT license. See the LICENSE file for details. from os import path -from setuptools import setup from shutil import rmtree import sys +from setuptools import setup + + NAME = 'python-logstash-async' -VERSION = '2.3.0' +VERSION = '4.1.0' here = path.abspath(path.dirname(__file__)) with open(path.join(here, 'README.rst'), 'rb') as f: @@ -36,13 +36,16 @@ author_email='enrico.troeger@uvena.de', url='https://github.com/eht16/python-logstash-async', project_urls={ - 'Travis CI': 'https://travis-ci.org/eht16/python-logstash-async/', 'Source code': 'https://github.com/eht16/python-logstash-async/', 'Documentation': 'https://python-logstash-async.readthedocs.io/en/stable/', }, keywords='logging logstash asynchronous', - install_requires=['limits', 'pylogbeat', 'requests'], - python_requires='>3.5', + install_requires=['limits', 'pylogbeat>=2.1.0', 'requests'], + extras_require={ + 'dev': ['django', 'flask'], + 'docs': ['sphinx-rtd-theme'], + }, + python_requires='>=3.11', include_package_data=True, classifiers=[ 'Development Status :: 4 - Beta', diff --git a/tests/database_test.py b/tests/database_test.py index 52fbdc4..14c9b92 100644 --- a/tests/database_test.py +++ b/tests/database_test.py @@ -1,5 +1,3 @@ -# -*- coding: utf-8 -*- -# # This software may be modified and distributed under the terms # of the MIT license. See the LICENSE file for details. @@ -7,17 +5,17 @@ import sqlite3 import time import unittest +from stat import S_IREAD, S_IRGRP, S_IROTH, S_IWUSR from logstash_async.constants import constants -from logstash_async.database import DATABASE_SCHEMA_STATEMENTS, DatabaseCache +from logstash_async.database import DATABASE_SCHEMA_STATEMENTS, DatabaseCache, DatabaseDiskIOError -# pylint: disable=protected-access +# ruff: noqa: PT009, SLF001 pylint: disable=protected-access class DatabaseCacheTest(unittest.TestCase): - - TEST_DB_FILENAME = "test.db" + TEST_DB_FILENAME = 'test.db' _connection = None # ---------------------------------------------------------------------- @@ -32,7 +30,7 @@ def setUp(self): # ---------------------------------------------------------------------- def tearDown(self): - stmt = "DELETE FROM `event`;" + stmt = 'DELETE FROM `event`;' self.cache._open() with self.cache._connection as conn: conn.execute(stmt) @@ -63,9 +61,18 @@ def close_connection(cls): cls._connection.close() cls._connection = None + # ---------------------------------------------------------------------- + def test_disk_io_exception(self): + self.cache.add_event('message') + with self.assertRaises(DatabaseDiskIOError): # noqa: PT027 + # change permissions to produce error + os.chmod(os.path.abspath('test.db'), S_IREAD | S_IRGRP | S_IROTH) + self.cache.add_event('message') + os.chmod(os.path.abspath('test.db'), S_IWUSR | S_IREAD) + # ---------------------------------------------------------------------- def test_add_event(self): - self.cache.add_event("message") + self.cache.add_event('message') conn = self.get_connection() cursor = conn.cursor() events = cursor.execute('SELECT `event_text`, `pending_delete` FROM `event`;').fetchall() @@ -75,7 +82,7 @@ def test_add_event(self): # ---------------------------------------------------------------------- def test_get_queued_events(self): - self.cache.add_event("message") + self.cache.add_event('message') events = self.cache.get_queued_events() self.assertEqual(len(events), 1) @@ -83,11 +90,11 @@ def test_get_queued_events(self): def test_get_queued_events_batch_size(self): constants.QUEUED_EVENTS_BATCH_SIZE = 3 - self.cache.add_event("message 1") - self.cache.add_event("message 2") - self.cache.add_event("message 3") - self.cache.add_event("message 4") - self.cache.add_event("message 5") + self.cache.add_event('message 1') + self.cache.add_event('message 2') + self.cache.add_event('message 3') + self.cache.add_event('message 4') + self.cache.add_event('message 5') events = self.cache.get_queued_events() # expect only 3 events according to QUEUED_EVENTS_BATCH_SIZE @@ -97,7 +104,7 @@ def test_get_queued_events_batch_size(self): def test_get_queued_events_batch_size_underrun(self): constants.QUEUED_EVENTS_BATCH_SIZE = 3 - self.cache.add_event("message 1") + self.cache.add_event('message 1') events = self.cache.get_queued_events() # expect only 1 event as there are no more available @@ -105,7 +112,7 @@ def test_get_queued_events_batch_size_underrun(self): # ---------------------------------------------------------------------- def test_get_queued_events_set_delete_flag(self): - self.cache.add_event("message") + self.cache.add_event('message') events = self.cache.get_queued_events() self.assertEqual(len(events), 1) events = self.cache.get_queued_events() @@ -113,7 +120,7 @@ def test_get_queued_events_set_delete_flag(self): # ---------------------------------------------------------------------- def test_requeue_queued_events(self): - self.cache.add_event("message") + self.cache.add_event('message') events = self.cache.get_queued_events() self.assertEqual(len(events), 1) self.cache.requeue_queued_events(events) diff --git a/tests/formatter_test.py b/tests/formatter_test.py index 0cba802..32cb33f 100644 --- a/tests/formatter_test.py +++ b/tests/formatter_test.py @@ -1,17 +1,47 @@ -# -*- coding: utf-8 -*- -# # This software may be modified and distributed under the terms # of the MIT license. See the LICENSE file for details. -from logging import FileHandler, makeLogRecord +import importlib.metadata import os +import socket import sys import unittest +from contextlib import suppress +from logging import FileHandler, makeLogRecord +from types import SimpleNamespace +from unittest.mock import patch + +import logstash_async +from logstash_async.formatter import ( + DjangoLogstashEcsFormatter, + DjangoLogstashFormatter, + FlaskLogstashEcsFormatter, + FlaskLogstashFormatter, + LogstashEcsFormatter, + LogstashFormatter, +) + -from logstash_async.formatter import LogstashFormatter +# ruff: noqa: PT009, SLF001 pylint: disable=protected-access +INTERPRETER_VERSION = f'{sys.version_info.major}.{sys.version_info.minor}.{sys.version_info.micro}' -# pylint: disable=protected-access + +def create_log_record(**kwargs): + return makeLogRecord({ + 'msg': 'test', + 'created': 1635082335.024747, + 'levelname': 'INFO', + 'process': 1, + 'funcName': 'f', + 'lineno': 2, + 'name': 'foo', + 'pathname': 'a/b/c', + 'processName': 'bar', + 'threadName': 'baz', + 'exc_info': (ValueError, None, None), + **kwargs, + }) class ExceptionCatchingFileHandler(FileHandler): @@ -19,7 +49,7 @@ def __init__(self, *args, **kwargs): FileHandler.__init__(self, *args, **kwargs) self.exception = None - def handleError(self, record): + def handleError(self, record): # noqa: N802 self.exception = sys.exc_info() @@ -60,6 +90,376 @@ def test_format_timestamp_microsecond_2(self): result = formatter._format_timestamp(test_time_microsecond2) self.assertEqual(result, '2021-10-24T13:32:15.024Z') + @patch.object(LogstashFormatter, '_format_exception', lambda s, e: e) + def test_default_schema(self): + formatter = LogstashFormatter(tags=['t1', 't2']) + result = formatter._format_to_dict(create_log_record()) + self.assertDictEqual(result, { + '@timestamp': '2021-10-24T13:32:15.024Z', + '@version': '1', + 'host': socket.gethostname(), + 'level': 'INFO', + 'logsource': socket.gethostname(), + 'message': 'test', + 'pid': 1, + 'program': sys.argv[0], + 'type': 'python-logstash', + 'tags': ['t1', 't2'], + 'extra': { + 'func_name': 'f', + 'interpreter': sys.executable, + 'interpreter_version': INTERPRETER_VERSION, + 'line': 2, + 'logger_name': 'foo', + 'logstash_async_version': logstash_async.__version__, + 'path': 'a/b/c', + 'process_name': 'bar', + 'thread_name': 'baz', + 'stack_trace': (ValueError, None, None), + 'error_type': 'ValueError', + } + }) + + +@patch.object(LogstashEcsFormatter, '_format_exception', lambda s, e: e) +class LogstashEcsFormatterTest(unittest.TestCase): + def test_default_schema(self): + formatter = LogstashEcsFormatter(tags=['t1', 't2']) + result = formatter._format_to_dict(create_log_record()) + self.assertDictEqual(result, { + '@timestamp': '2021-10-24T13:32:15.024Z', + '@version': '1', + 'ecs': {'version': '8.11.0'}, + 'event': {'module': 'python-logstash'}, + 'host': {'hostname': socket.gethostname()}, + 'log': { + 'level': 'INFO', + 'syslog': {'hostname': socket.gethostname()}, + 'origin': { + 'file': {'line': 2, 'name': 'a/b/c'}, + 'function': 'f', + }, + 'logger': 'foo', + }, + 'message': 'test', + 'process': { + 'thread': {'name': 'baz'}, + 'name': 'bar', + 'pid': 1, + 'executable': sys.argv[0], + }, + 'error': {'stack_trace': (ValueError, None, None), 'type': 'ValueError'}, + 'tags': ['t1', 't2'], + 'extra': { + 'interpreter': sys.executable, + 'interpreter_version': INTERPRETER_VERSION, + 'logstash_async_version': logstash_async.__version__, + } + }) + + def test_dotted_schema(self): + class _LogstashEcsFormatter(LogstashEcsFormatter): + normalize_ecs_message = False + + formatter = _LogstashEcsFormatter(tags=['t1', 't2']) + result = formatter._format_to_dict(create_log_record()) + self.assertDictEqual(result, { + '@timestamp': '2021-10-24T13:32:15.024Z', + '@version': '1', + 'ecs.version': '8.11.0', + 'event.module': 'python-logstash', + 'host.hostname': socket.gethostname(), + 'log.level': 'INFO', + 'log.syslog.hostname': socket.gethostname(), + 'log.origin.file.line': 2, + 'log.origin.file.name': 'a/b/c', + 'log.origin.function': 'f', + 'log.logger': 'foo', + 'message': 'test', + 'process.thread.name': 'baz', + 'process.name': 'bar', + 'process.pid': 1, + 'process.executable': sys.argv[0], + 'error.stack_trace': (ValueError, None, None), + 'error.type': 'ValueError', + 'tags': ['t1', 't2'], + 'extra': { + 'interpreter': sys.executable, + 'interpreter_version': INTERPRETER_VERSION, + 'logstash_async_version': logstash_async.__version__, + } + }) + + +class DjangoTestMixin: + @classmethod + def setUpClass(cls): # pylint: disable=invalid-name + super().setUpClass() + + # pylint: disable=import-outside-toplevel + import django + from django.conf import settings + from django.http import HttpRequest + + # pylint: enable=import-outside-toplevel + + with suppress(RuntimeError): + settings.configure() + cls.HttpRequest = HttpRequest + cls.django_version = django.get_version() + + def _create_request(self): + request = self.HttpRequest() + request.user = 'usr' + request.META.update({ + 'HTTP_USER_AGENT': 'dj-agent', + 'REMOTE_ADDR': 'dj-addr', + 'HTTP_HOST': 'dj-host', + 'HTTP_REFERER': 'dj-ref', + 'REQUEST_METHOD': 'GET', + 'HTTP_X_FORWARDED_PROTO': 'dj-f-proto', + 'HTTP_X_FORWARDED_FOR': 'dj-f1, dj-f2', + }) + return request + + +class DjangoLogstashFormatterTest(DjangoTestMixin, unittest.TestCase): + @patch.object(DjangoLogstashFormatter, '_format_exception', lambda s, e: e) + def test_default_schema(self): + formatter = DjangoLogstashFormatter(tags=['t1', 't2']) + exc_info = (ValueError, SimpleNamespace(template_debug={ + 'name': 'tpl', + 'line': 3, + 'message': 'tmsg', + 'during': 'd', + }), None) + result = formatter._format_to_dict(create_log_record( + status_code=500, + request=self._create_request(), + exc_info=exc_info, + )) + self.assertDictEqual(result, { + '@timestamp': '2021-10-24T13:32:15.024Z', + '@version': '1', + 'host': socket.gethostname(), + 'level': 'INFO', + 'logsource': socket.gethostname(), + 'message': 'test', + 'pid': 1, + 'program': sys.argv[0], + 'type': 'python-logstash', + 'tags': ['t1', 't2'], + 'extra': { + 'func_name': 'f', + 'interpreter': sys.executable, + 'interpreter_version': INTERPRETER_VERSION, + 'line': 2, + 'logger_name': 'foo', + 'logstash_async_version': logstash_async.__version__, + 'path': 'a/b/c', + 'process_name': 'bar', + 'thread_name': 'baz', + 'stack_trace': exc_info, + 'error_type': 'ValueError', + 'status_code': 500, + 'django_version': self.django_version, + 'req_useragent': 'dj-agent', + 'req_remote_address': 'dj-addr', + 'req_host': 'dj-host', + 'req_uri': None, + 'req_user': 'usr', + 'req_method': 'GET', + 'req_referer': 'dj-ref', + 'req_forwarded_proto': 'dj-f-proto', + 'req_forwarded_for': ['dj-f1', 'dj-f2'], + 'tmpl_name': 'tpl', + 'tmpl_line': 3, + 'tmpl_message': 'tmsg', + 'tmpl_during': 'd', + 'request': '', + } + }) + + +class DjangoLogstashEcsFormatterTest(DjangoTestMixin, unittest.TestCase): + @patch.object(DjangoLogstashEcsFormatter, '_format_exception', lambda s, e: e) + def test_default_schema(self): + formatter = DjangoLogstashEcsFormatter(tags=['t1', 't2']) + exc_info = (ValueError, SimpleNamespace(template_debug={ + 'name': 'tpl', + 'line': 3, + 'message': 'tmsg', + 'during': 'd', + }), None) + result = formatter._format_to_dict(create_log_record( + status_code=500, + request=self._create_request(), + exc_info=exc_info, + )) + self.assertDictEqual(result, { + '@timestamp': '2021-10-24T13:32:15.024Z', + '@version': '1', + 'ecs': {'version': '8.11.0'}, + 'event': {'module': 'python-logstash'}, + 'host': {'hostname': socket.gethostname()}, + 'client': {'domain': 'dj-host', 'ip': 'dj-addr'}, + 'http': { + 'request': {'method': 'GET', 'referrer': 'dj-ref'}, + 'response': {'status_code': 500}, + }, + 'url': {'original': None}, + 'user': {'name': 'usr'}, + 'user_agent': {'original': 'dj-agent'}, + 'log': { + 'level': 'INFO', + 'syslog': {'hostname': socket.gethostname()}, + 'origin': { + 'file': {'line': 2, 'name': 'a/b/c'}, + 'function': 'f', + }, + 'logger': 'foo', + }, + 'message': 'test', + 'process': { + 'thread': {'name': 'baz'}, + 'name': 'bar', + 'pid': 1, + 'executable': sys.argv[0], + }, + 'error': {'stack_trace': exc_info, 'type': 'ValueError'}, + 'tags': ['t1', 't2'], + 'extra': { + 'interpreter': sys.executable, + 'interpreter_version': INTERPRETER_VERSION, + 'logstash_async_version': logstash_async.__version__, + 'req_forwarded_proto': 'dj-f-proto', + 'req_forwarded_for': ['dj-f1', 'dj-f2'], + 'tmpl_name': 'tpl', + 'tmpl_line': 3, + 'tmpl_message': 'tmsg', + 'tmpl_during': 'd', + 'request': '', + 'django_version': self.django_version, + } + }) + + +class FlaskTestMixin: + @classmethod + def setUpClass(cls): # pylint: disable=invalid-name + super().setUpClass() + cls.flask_version = importlib.metadata.version('flask') + + def _create_request(self): + return SimpleNamespace( + user_agent='f-agent', + remote_addr='f-addr', + host='f-host:80', + url='f-url', + method='GET', + referrer='f-ref', + remote_user='usr', + headers={ + 'X-Request-ID': 'x-id', + 'X-Forwarded-Proto': 'f-proto', + 'X-Forwarded-For': 'f1, f2', + }, + ) + + +class FlaskLogstashFormatterTest(FlaskTestMixin, unittest.TestCase): + @patch.object(FlaskLogstashFormatter, '_format_exception', lambda s, e: e) + def test_default_schema(self): + with patch('flask.request', self._create_request()): + formatter = FlaskLogstashFormatter(tags=['t1', 't2']) + result = formatter._format_to_dict(create_log_record(status_code=500)) + self.assertDictEqual(result, { + '@timestamp': '2021-10-24T13:32:15.024Z', + '@version': '1', + 'host': socket.gethostname(), + 'level': 'INFO', + 'logsource': socket.gethostname(), + 'message': 'test', + 'pid': 1, + 'program': sys.argv[0], + 'type': 'python-logstash', + 'tags': ['t1', 't2'], + 'extra': { + 'func_name': 'f', + 'interpreter': sys.executable, + 'interpreter_version': INTERPRETER_VERSION, + 'line': 2, + 'logger_name': 'foo', + 'logstash_async_version': logstash_async.__version__, + 'path': 'a/b/c', + 'process_name': 'bar', + 'thread_name': 'baz', + 'error_type': 'ValueError', + 'stack_trace': (ValueError, None, None), + 'status_code': 500, + 'flask_version': self.flask_version, + 'req_useragent': 'f-agent', + 'req_remote_address': 'f-addr', + 'req_host': 'f-host', + 'req_uri': 'f-url', + 'req_user': 'usr', + 'req_method': 'GET', + 'req_referer': 'f-ref', + 'req_forwarded_proto': 'f-proto', + 'req_forwarded_for': ['f1', 'f2'], + 'request_id': 'x-id', + } + }) + + +class FlaskLogstashEcsFormatterTest(FlaskTestMixin, unittest.TestCase): + @patch.object(FlaskLogstashEcsFormatter, '_format_exception', lambda s, e: e) + def test_default_schema(self): + with patch('flask.request', self._create_request()): + formatter = FlaskLogstashEcsFormatter(tags=['t1', 't2']) + result = formatter._format_to_dict(create_log_record(status_code=500)) + self.assertDictEqual(result, { + '@timestamp': '2021-10-24T13:32:15.024Z', + '@version': '1', + 'ecs': {'version': '8.11.0'}, + 'event': {'module': 'python-logstash'}, + 'host': {'hostname': socket.gethostname()}, + 'client': {'domain': 'f-host', 'ip': 'f-addr'}, + 'http': { + 'request': {'id': 'x-id', 'method': 'GET', 'referrer': 'f-ref'}, + 'response': {'status_code': 500}, + }, + 'url': {'original': 'f-url'}, + 'user': {'name': 'usr'}, + 'user_agent': {'original': 'f-agent'}, + 'log': { + 'level': 'INFO', + 'syslog': {'hostname': socket.gethostname()}, + 'origin': { + 'file': {'line': 2, 'name': 'a/b/c'}, + 'function': 'f', + }, + 'logger': 'foo', + }, + 'message': 'test', + 'process': { + 'thread': {'name': 'baz'}, + 'name': 'bar', + 'pid': 1, + 'executable': sys.argv[0], + }, + 'error': {'stack_trace': (ValueError, None, None), 'type': 'ValueError'}, + 'tags': ['t1', 't2'], + 'extra': { + 'interpreter': sys.executable, + 'interpreter_version': INTERPRETER_VERSION, + 'logstash_async_version': logstash_async.__version__, + 'req_forwarded_proto': 'f-proto', + 'req_forwarded_for': ['f1', 'f2'], + 'flask_version': self.flask_version, + } + }) + if __name__ == '__main__': unittest.main() diff --git a/tests/ichunked_test.py b/tests/ichunked_test.py index 8a3d4a8..d856311 100644 --- a/tests/ichunked_test.py +++ b/tests/ichunked_test.py @@ -1,10 +1,8 @@ -# -*- coding: utf-8 -*- -# # This software may be modified and distributed under the terms # of the MIT license. See the LICENSE file for details. -from random import randint import unittest +from random import randint from logstash_async.utils import ichunked @@ -14,6 +12,8 @@ CHUNK_SIZE_BIG = 750 CHUNK_ITERATIONS = 5 +# ruff: noqa: PT009, S311 pylint: disable=protected-access + class IChunkedTest(unittest.TestCase): diff --git a/tests/memory_cache_test.py b/tests/memory_cache_test.py index 8c26872..2994f28 100644 --- a/tests/memory_cache_test.py +++ b/tests/memory_cache_test.py @@ -1,5 +1,3 @@ -# -*- coding: utf-8 -*- -# # This software may be modified and distributed under the terms # of the MIT license. See the LICENSE file for details. @@ -10,7 +8,7 @@ from logstash_async.memory_cache import MemoryCache -# pylint: disable=protected-access +# ruff: noqa: PT009, SLF001 pylint: disable=protected-access class MemoryCacheTest(unittest.TestCase): @@ -18,7 +16,7 @@ class MemoryCacheTest(unittest.TestCase): # ---------------------------------------------------------------------- def test_add_event(self): cache = MemoryCache({}) - cache.add_event("message") + cache.add_event('message') self.assertEqual(len(cache._cache), 1) event = list(cache._cache.values())[0] self.assertEqual(event['event_text'], 'message') @@ -27,8 +25,8 @@ def test_add_event(self): # ---------------------------------------------------------------------- def test_get_queued_events(self): cache = MemoryCache({ - "id1": {"pending_delete": True}, - "id2": {"pending_delete": False} + 'id1': {'pending_delete': True}, + 'id2': {'pending_delete': False} }) self.assertEqual(len(cache.get_queued_events()), 1) @@ -37,12 +35,12 @@ def test_get_queued_events_batch_size(self): constants.QUEUED_EVENTS_BATCH_SIZE = 3 cache = MemoryCache({ - "id1": {"pending_delete": True}, - "id2": {"pending_delete": False}, - "id3": {"pending_delete": False}, - "id4": {"pending_delete": False}, - "id5": {"pending_delete": False}, - "id6": {"pending_delete": False}, + 'id1': {'pending_delete': True}, + 'id2': {'pending_delete': False}, + 'id3': {'pending_delete': False}, + 'id4': {'pending_delete': False}, + 'id5': {'pending_delete': False}, + 'id6': {'pending_delete': False}, }) events = cache.get_queued_events() # expect only 3 events according to QUEUED_EVENTS_BATCH_SIZE @@ -53,8 +51,8 @@ def test_get_queued_events_batch_size_underrun(self): constants.QUEUED_EVENTS_BATCH_SIZE = 3 cache = MemoryCache({ - "id1": {"pending_delete": True}, - "id2": {"pending_delete": False}, + 'id1': {'pending_delete': True}, + 'id2': {'pending_delete': False}, }) events = cache.get_queued_events() # expect only 1 event as there are no more available @@ -63,7 +61,7 @@ def test_get_queued_events_batch_size_underrun(self): # ---------------------------------------------------------------------- def test_get_queued_events_pending_delete_check(self): cache = MemoryCache({ - "id1": {"pending_delete": False} + 'id1': {'pending_delete': False} }) queued_events = cache.get_queued_events() self.assertEqual(len(queued_events), 1) @@ -72,17 +70,17 @@ def test_get_queued_events_pending_delete_check(self): # ---------------------------------------------------------------------- def test_requeue_queued_events(self): cache = MemoryCache({ - "id1": {"pending_delete": True} + 'id1': {'pending_delete': True} }) self.assertEqual(len(cache.get_queued_events()), 0) - cache.requeue_queued_events([{"id": "id1"}]) + cache.requeue_queued_events([{'id': 'id1'}]) self.assertEqual(len(cache.get_queued_events()), 1) # ---------------------------------------------------------------------- def test_delete_queued_events(self): cache = MemoryCache({ - "id1": {"pending_delete": True, "id": "id1"}, - "id2": {"pending_delete": False, "id": "id2"} + 'id1': {'pending_delete': True, 'id': 'id1'}, + 'id2': {'pending_delete': False, 'id': 'id2'} }) cache.delete_queued_events() self.assertEqual(len(cache._cache), 1) @@ -90,14 +88,14 @@ def test_delete_queued_events(self): # ---------------------------------------------------------------------- def test_expire_events(self): cache = MemoryCache({ - "id1": { - "pending_delete": False, - "id": "id1", - "entry_date": datetime.datetime.fromtimestamp(0)}, - "id2": { - "pending_delete": False, - "id": "id2", - "entry_date": datetime.datetime.now()} + 'id1': { + 'pending_delete': False, + 'id': 'id1', + 'entry_date': datetime.datetime.fromtimestamp(0, tz=datetime.UTC)}, + 'id2': { + 'pending_delete': False, + 'id': 'id2', + 'entry_date': datetime.datetime.now(tz=datetime.UTC)} }, event_ttl=100) cache.expire_events() self.assertEqual(len(cache._cache), 1) diff --git a/tests/utils_test.py b/tests/utils_test.py new file mode 100644 index 0000000..f04d849 --- /dev/null +++ b/tests/utils_test.py @@ -0,0 +1,53 @@ +import unittest +from copy import deepcopy + +from logstash_async.utils import normalize_ecs_dict + + +# ruff: noqa: PT009 + + +class NormalizeEcsDictTest(unittest.TestCase): + def test_de_dot(self): + with self.subTest('no dots'): + result = normalize_ecs_dict.de_dot_record('a', {'x': [1]}) + self.assertDictEqual(result, {'a': {'x': [1]}}) + with self.subTest('dots'): + result = normalize_ecs_dict.de_dot_record('a.b.c', {'x': [1]}) + self.assertDictEqual(result, {'a': {'b': {'c': {'x': [1]}}}}) + + def test_normalization(self): + d = { + 'a': 1, + 'b': 11, + 'b.c': { + 'd.e': [2, ({'f.g': 3}, 4), 5], + 'h': None, + }, + 'b.c.x': {'y': 6}, + 'c': {'d': [1], 'e': 2}, + 'c.d': [2], + 'c.f': 3, + } + d_copy = deepcopy(d) + expected = { + 'a': 1, + 'b': { + 'c': { + 'd': { + 'e': [2, ({'f': {'g': 3}}, 4), 5], + }, + 'h': None, + 'x': {'y': 6}, + }, + }, + 'c': {'d': [2], 'e': 2, 'f': 3}, + } + result = normalize_ecs_dict(d) + self.assertDictEqual(result, expected) + + with self.subTest('source dict not mutated'): + self.assertDictEqual(d, d_copy) + # pylint: disable-next=unsubscriptable-object + result['c']['d'].append(22) + self.assertDictEqual(d, d_copy) diff --git a/tox.ini b/tox.ini index c349b0d..380f0c5 100644 --- a/tox.ini +++ b/tox.ini @@ -1,12 +1,10 @@ -# -*- coding: utf-8 -*- -# # This software may be modified and distributed under the terms # of the MIT license. See the LICENSE file for details. [tox] skip_missing_interpreters = true envlist = - docs,py36,py37,py38,py39,py310,pypy3 + docs,py311,py312,py313,py314 logstash_async_modules = logstash_async tests @@ -15,6 +13,8 @@ deps = flake8 isort pylint + Django + Flask commands = # linting and code analysis {envbindir}/flake8 {[tox]logstash_async_modules} @@ -24,9 +24,8 @@ commands = {envbindir}/python -m unittest discover --start-directory tests --pattern '*_test.py' [testenv:docs] -basepython = python3 deps = sphinx sphinx_rtd_theme -whitelist_externals = make +allowlist_externals = make commands = make -C docs html BUILDDIR={envtmpdir} "SPHINXOPTS=-W -E"