Improved logging

This commit is contained in:
2025-10-17 23:32:11 +02:00
parent 7de812d002
commit b06a39df3a

View File

@@ -2,6 +2,7 @@
import argparse
import json
import jsonschema
import logging
import os
import re
import requests
@@ -10,6 +11,13 @@ import time
from abc import ABC, abstractmethod
from typing import Dict, Any, List, Optional
# Configure root logger
logging.basicConfig(
level=os.environ.get("LOGLEVEL", "INFO"),
format="%(asctime)s [%(levelname)s] %(name)s: %(message)s",
)
logger = logging.getLogger("log-alert")
# Base Classes
class LogFetcher(ABC):
"""Abstract base class for log fetchers."""
@@ -49,7 +57,7 @@ class LokiLogFetcher(LogFetcher):
query += '}'
if "text" in filters:
query += f' |= "{filters["text"]}"'
print(f"Executing Loki query: {query}")
logger.debug(f"Executing Loki query: {query}")
payload = {
"query": query,
"limit": 1000,
@@ -72,7 +80,7 @@ class LokiLogFetcher(LogFetcher):
})
return logs
except requests.exceptions.RequestException as e:
print(f"Error fetching logs from Loki: {e}")
logger.error(f"Error fetching logs from Loki: {e}")
return []
# Regexp Filter
@@ -87,12 +95,12 @@ class RegexpFilter(Filter):
if match:
# Only call groupdict() when there is a match
groups = match.groupdict()
print(f"Regex match for '{self.match}' in log: {groups}")
logger.debug(f"Regex match for '{self.match}' in log: {groups}")
if groups:
log.setdefault("labels", {}).update(groups)
return log
# no match
print(f"Regex did not match for pattern '{self.match}' in log: {log.get('log')}")
logger.debug(f"Regex did not match for pattern '{self.match}' in log: {log.get('log')}")
return None
# Gotify Alert Manager
@@ -113,9 +121,8 @@ class GotifyAlertManager(AlertManager):
try:
response = requests.post(f"{self.url}?token={self.token}", json=payload)
response.raise_for_status()
print(f"Alert sent to Gotify: {title}")
except requests.exceptions.RequestException as e:
print(f"Error sending alert to Gotify: {e}")
logger.error(f"Error sending alert to Gotify: {e}")
# Alert Rule
class AlertRule:
@@ -139,10 +146,10 @@ class AlertRule:
self.next_run = time.time()
def run(self) -> None:
print(f"Processing rule: {self.name}")
logger.debug(f"Processing rule: {self.name}")
logs = self.log_fetcher.fetch_logs(self.fetcher_filters, self.last_run, self.next_run)
for log_entry in logs:
print(f"Checking log: {log_entry['log']}")
logger.debug(f"Checking log: {log_entry['log']}")
for filter in self.filters:
log_entry = filter.filter(log_entry)
if log_entry is None:
@@ -150,7 +157,7 @@ class AlertRule:
if log_entry is None:
continue
message = self.alert_message.format_map(log_entry.get("labels", {}))
print(f"Sending message: {message}, with params: {log_entry}")
logger.info(f"Sending message: {message}, with params: {log_entry}")
self.alert_manager.send_alert(self.alert_title, message)
self.last_run = self.next_run
self.next_run = time.time() + self.check_interval
@@ -161,7 +168,7 @@ class LogAlertApp:
def __init__(self, config_path: str):
self.config = self._load_config(config_path)
print(f"Configuration loaded: {self.config}")
logger.debug(f"Configuration loaded: {self.config}")
self.log_fetchers = {}
for fetcher in self.config["log-fetchers"]:
self.log_fetchers[fetcher["name"]] = self._init_log_fetcher(fetcher)
@@ -181,10 +188,10 @@ class LogAlertApp:
# Update config to load env variable where required
return self._update_config_from_env(config)
except FileNotFoundError:
print(f"Error: Configuration file '{config_path}' not found.")
logger.error(f"Error: Configuration file '{config_path}' not found.")
sys.exit(1)
except json.JSONDecodeError:
print(f"Error: Invalid JSON in configuration file '{config_path}'.")
logger.error(f"Error: Invalid JSON in configuration file '{config_path}'.")
sys.exit(1)
def _validate_config_with_schema(self, config: Dict[str, Any]) -> None:
@@ -195,13 +202,14 @@ class LogAlertApp:
schema = json.load(sf)
jsonschema.validate(instance=config, schema=schema)
except FileNotFoundError:
print(f"Warning: Schema file '{schema_path}' not found; skipping config validation.")
logger.error(f"Schema file '{schema_path}' not found")
sys
except jsonschema.exceptions.ValidationError as e:
print(f"Configuration validation error: {e.message}")
print("Detailed error:", e)
logger.error(f"Configuration validation error: {e.message}")
logger.error("Detailed error:", e)
sys.exit(1)
except Exception as e:
print(f"Unexpected error while validating configuration: {e}")
logger.error(f"Unexpected error while validating configuration: {e}")
sys.exit(1)
def _update_config_from_env(self, config: Dict[str, Any]) -> Dict[str, Any]: