Add jsonschema validation (#6)

* Add JSON Schema validation for log-alert config and require jsonschema

* Add jsonschema dependency to requirements

* Enhance config loading with schema validation

Added JSON schema validation for configuration and improved error handling.

* Simplify token assignment and config loading

Refactor token retrieval and configuration loading.
This commit is contained in:
2025-10-16 21:02:28 +02:00
committed by GitHub
parent c67b2b54c8
commit 7de812d002
3 changed files with 156 additions and 11 deletions

View File

@@ -1,5 +1,7 @@
#!/usr/bin/env python3
import argparse
import json
import jsonschema
import os
import re
import requests
@@ -7,7 +9,7 @@ import sys
import time
from abc import ABC, abstractmethod
from typing import Dict, Any, List, Optional
# Base Classes
class LogFetcher(ABC):
"""Abstract base class for log fetchers."""
@@ -82,10 +84,15 @@ class RegexpFilter(Filter):
def filter(self, log: Dict[str, Any]) -> Dict[str, Any]:
match = re.search(self.match, log["log"])
print(f"Regex match for '{self.match}' in log: {match.groupdict()}")
if match:
log["labels"].update(match.groupdict())
# Only call groupdict() when there is a match
groups = match.groupdict()
print(f"Regex match for '{self.match}' in log: {groups}")
if groups:
log.setdefault("labels", {}).update(groups)
return log
# no match
print(f"Regex did not match for pattern '{self.match}' in log: {log.get('log')}")
return None
# Gotify Alert Manager
@@ -94,7 +101,7 @@ class GotifyAlertManager(AlertManager):
def __init__(self, config: Dict[str, Any]):
self.url = config["url"]
self.token = config["token"]
self.token = config.get("token")
def send_alert(self, title: str, message: str) -> None:
"""Send an alert to Gotify."""
@@ -142,7 +149,7 @@ class AlertRule:
break
if log_entry is None:
continue
message = self.alert_message.format_map(log_entry["labels"])
message = self.alert_message.format_map(log_entry.get("labels", {}))
print(f"Sending message: {message}, with params: {log_entry}")
self.alert_manager.send_alert(self.alert_title, message)
self.last_run = self.next_run
@@ -162,12 +169,17 @@ class LogAlertApp:
for manager in self.config["alert-managers"]:
self.alert_managers[manager["name"]] = self._init_alert_manager(manager)
self.alert_rules = [AlertRule(self.log_fetchers, self.alert_managers, rule) for rule in self.config["log-alerts"]]
def _load_config(self, config_path: str) -> Dict[str, Any]:
"""Load the configuration from a JSON file."""
"""Load the configuration from a JSON file and validate it with JSON Schema."""
try:
with open(config_path, 'r') as config_file:
return self._update_config_from_env(json.load(config_file))
# read JSON first
config = json.load(config_file)
# Perform schema validation if jsonschema is available
self._validate_config_with_schema(config)
# Update config to load env variable where required
return self._update_config_from_env(config)
except FileNotFoundError:
print(f"Error: Configuration file '{config_path}' not found.")
sys.exit(1)
@@ -175,9 +187,26 @@ class LogAlertApp:
print(f"Error: Invalid JSON in configuration file '{config_path}'.")
sys.exit(1)
def _validate_config_with_schema(self, config: Dict[str, Any]) -> None:
"""Validate a loaded config dict against log-alert/config.schema.json if jsonschema is installed."""
schema_path = os.path.join(os.path.dirname(__file__), 'config.schema.json')
try:
with open(schema_path, 'r') as sf:
schema = json.load(sf)
jsonschema.validate(instance=config, schema=schema)
except FileNotFoundError:
print(f"Warning: Schema file '{schema_path}' not found; skipping config validation.")
except jsonschema.exceptions.ValidationError as e:
print(f"Configuration validation error: {e.message}")
print("Detailed error:", e)
sys.exit(1)
except Exception as e:
print(f"Unexpected error while validating configuration: {e}")
sys.exit(1)
def _update_config_from_env(self, config: Dict[str, Any]) -> Dict[str, Any]:
"""Update config values from environment variables if specified."""
for key, value in config.items():
for key, value in list(config.items()):
if isinstance(value, dict):
config[key] = self._update_config_from_env(value)
elif isinstance(value, list):
@@ -210,7 +239,7 @@ class LogAlertApp:
if time.time() >= rule.next_run:
rule.run()
time.sleep(5)
def main():
parser = argparse.ArgumentParser(description="Log and Alert Management Tool")
parser.add_argument("--config", required=True, help="Path to the configuration file")