diff --git a/log-alert/config.json b/log-alert/config.json index 35e76d1..74f8cc8 100644 --- a/log-alert/config.json +++ b/log-alert/config.json @@ -1,43 +1,72 @@ { - "log-fetcher":{ - "type": "loki", - "config": { - "url": "http://fileserver.home:3100" + "log-fetchers": [ + { + "name": "loki-fileserver", + "type": "loki", + "config": { + "url": "http://fileserver.home:3100" + } } - }, - "alert-manager":{ - "type": "gotify", - "config": { - "url": "https://paris.nappez.com/gotify/message", - "token-from-env": "{GOTIFY_TOKEN}" + ], + "alert-managers":[ + { + "name": "gotify-paris", + "type": "gotify", + "config": { + "url": "https://paris.nappez.com/gotify/message", + "token-from-env": "{GOTIFY_TOKEN}" + } } - }, - "check-interval": 60, + ], "log-alerts": [ { "name": "SSH outside connection", - "filters": { - "labels": { - "container": "openssh-server" - }, - "text": "Accepted", - "match": "Accepted (?P\\w+) for (?P\\w+) from (?P[^\\s]+)" + "check-interval": 60, + "log-fetcher": { + "name": "loki-fileserver", + "filters": { + "labels": { + "container": "openssh-server" + }, + "text": "Accepted" + } }, - "alert": { + "filters": [ + { + "type": "regexp", + "config": { + "match": "Accepted (?P\\w+) for (?P\\w+) from (?P[^\\s]+)" + } + } + ], + "alert-manager": { + "name": "gotify-paris", "title": "Outside SSH login", "message": "New SSH login for {username} on {instance} from ip {ip} (method: {method})" } }, { "name": "SSH local connection", - "filters": { - "labels": { - "filename": "/var/log/host/auth.log" - }, - "text": "Accepted", - "match": "Accepted (?P\\w+) for (?P\\w+) from (?P[^\\s]+)" + "check-interval": 30, + "log-fetcher": { + "name": "loki-fileserver", + "filters": { + "labels": { + "filename": "/var/log/host/auth.log" + }, + "text": "Accepted" + } }, - "alert": { + "filters": [ + { + "type": "regexp", + "config": { + "match": "Accepted (?P\\w+) for (?P\\w+) from (?P[^\\s]+)" + } + } + ], + "alert-manager": { + "name": "gotify-paris", "title": "Local SSH login", "message": "New SSH login for {username} on {instance} from ip {ip} (method: {method})" } diff --git a/log-alert/log-alert.py b/log-alert/log-alert.py index cc625b6..006741b 100755 --- a/log-alert/log-alert.py +++ b/log-alert/log-alert.py @@ -16,6 +16,13 @@ class LogFetcher(ABC): def fetch_logs(self, filters: Dict[str, Any], start_time: int, end_time: int) -> List[Dict[str, Any]]: pass +class Filter(ABC): + """Abstract base class for filters.""" + + @abstractmethod + def filter(self, log: Dict[str, Any]) -> Dict[str, Any]: + pass + class AlertManager(ABC): """Abstract base class for alert managers.""" @@ -44,8 +51,8 @@ class LokiLogFetcher(LogFetcher): payload = { "query": query, "limit": 1000, - "start": str(start_time * 1000000000), # Convert to nanoseconds - "end": str(end_time * 1000000000), + "start": str(int(start_time) * 1000000000), # Convert to nanoseconds + "end": str(int(end_time) * 1000000000), "direction": "forward" } try: @@ -58,13 +65,29 @@ class LokiLogFetcher(LogFetcher): timestamp, log = value logs.append({ "timestamp": timestamp, - "log": log - } | stream.get("stream", {})) + "log": log, + "labels": stream.get("stream", {}) + }) return logs except requests.exceptions.RequestException as e: print(f"Error fetching logs from Loki: {e}") return [] +# Regexp Filter +class RegexpFilter(Filter): + """Concrete implementation for Regexp filter.""" + + def __init__(self, config: Dict[str, Any]): + self.match = config["match"] + + def filter(self, log: Dict[str, Any]) -> Dict[str, Any]: + match = re.search(self.match, log["log"]) + print(f"Regex match for '{self.match}' in log: {match.groupdict()}") + if match: + log["labels"].update(match.groupdict()) + return log + return None + # Gotify Alert Manager class GotifyAlertManager(AlertManager): """Concrete implementation for Gotify alert manager.""" @@ -91,19 +114,39 @@ class GotifyAlertManager(AlertManager): class AlertRule: """Represents an alert rule with filters and alert template.""" - def __init__(self, config: Dict[str, Any]): + def __init__(self, log_fetchers: LogFetcher, alert_managers: AlertManager, config: Dict[str, Any]): self.name = config["name"] - self.filters = config["filters"] - self.alert_title = config["alert"]["title"] - self.alert_message = config["alert"]["message"] + self.log_fetcher = log_fetchers[config["log-fetcher"]["name"]] + self.fetcher_filters = config["log-fetcher"].get("filters", {}) + self.check_interval = config.get("check-interval", 60) + self.filters = [] + for filter in config.get("filters", []): + if filter["type"] == "regexp": + self.filters.append(RegexpFilter(filter["config"])) + else: + raise ValueError(f"Unsupported filter type: {filter['type']}") + self.alert_manager = alert_managers[config["alert-manager"]["name"]] + self.alert_title = config["alert-manager"]["title"] + self.alert_message = config["alert-manager"]["message"] + self.last_run = time.time() - self.check_interval + self.next_run = time.time() - def matches(self, log: str) -> Optional[Dict[str, str]]: - """Check if the log matches the alert rule.""" - match = re.search(self.filters.get("match"), log) - print(f"Regex match for '{self.filters.get('match')}' in log: {match.groupdict()}") - if match: - return match.groupdict() - return None + def run(self) -> None: + print(f"Processing rule: {self.name}") + logs = self.log_fetcher.fetch_logs(self.fetcher_filters, self.last_run, self.next_run) + for log_entry in logs: + print(f"Checking log: {log_entry['log']}") + for filter in self.filters: + log_entry = filter.filter(log_entry) + if log_entry is None: + break + if log_entry is None: + continue + message = self.alert_message.format_map(log_entry["labels"]) + print(f"Sending message: {message}, with params: {log_entry}") + self.alert_manager.send_alert(self.alert_title, message) + self.last_run = self.next_run + self.next_run = time.time() + self.check_interval # Main Application class LogAlertApp: @@ -111,9 +154,14 @@ class LogAlertApp: def __init__(self, config_path: str): self.config = self._load_config(config_path) - self.log_fetcher = self._init_log_fetcher() - self.alert_manager = self._init_alert_manager() - self.alert_rules = [AlertRule(rule) for rule in self.config["log-alerts"]] + print(f"Configuration loaded: {self.config}") + self.log_fetchers = {} + for fetcher in self.config["log-fetchers"]: + self.log_fetchers[fetcher["name"]] = self._init_log_fetcher(fetcher) + self.alert_managers = {} + for manager in self.config["alert-managers"]: + self.alert_managers[manager["name"]] = self._init_alert_manager(manager) + self.alert_rules = [AlertRule(self.log_fetchers, self.alert_managers, rule) for rule in self.config["log-alerts"]] def _load_config(self, config_path: str) -> Dict[str, Any]: """Load the configuration from a JSON file.""" @@ -132,66 +180,44 @@ class LogAlertApp: for key, value in config.items(): if isinstance(value, dict): config[key] = self._update_config_from_env(value) + elif isinstance(value, list): + config[key] = [self._update_config_from_env(item) for item in value] elif isinstance(value, str) and key.endswith("-from-env"): - config[key[0:-9]] = value.format_map(os.environ) + new_key = key[:-9] # Remove '-from-env' + config[new_key] = value.format_map(os.environ) del config[key] return self._update_config_from_env(config) # re-evaluate in case of nested env vars return config - def _init_log_fetcher(self) -> LogFetcher: + def _init_log_fetcher(self, fetcher_config: Dict[str, Any]) -> LogFetcher: """Initialize the log fetcher based on config.""" - fetcher_config = self.config["log-fetcher"] if fetcher_config["type"] == "loki": return LokiLogFetcher(fetcher_config["config"]) else: raise ValueError(f"Unsupported log fetcher type: {fetcher_config['type']}") - def _init_alert_manager(self) -> AlertManager: + def _init_alert_manager(self, manager_config: Dict[str, Any]) -> AlertManager: """Initialize the alert manager based on config.""" - manager_config = self.config["alert-manager"] if manager_config["type"] == "gotify": return GotifyAlertManager(manager_config["config"]) else: raise ValueError(f"Unsupported alert manager type: {manager_config['type']}") - def run_once(self, start_time: int, end_time: int) -> None: - for rule in self.alert_rules: - print(f"Processing rule: {rule.name}") - logs = self.log_fetcher.fetch_logs(rule.filters, start_time, end_time) - for log_entry in logs: - print(f"Checking log: {log_entry['log']}") - match = rule.matches(log_entry["log"]) - log_entry.update(match) - if match: - message = rule.alert_message.format_map(log_entry) - print(f"Sending message: {message}, with params: {log_entry}") - self.alert_manager.send_alert(rule.alert_title, message) - - def run(self, start_time: int, end_time: int) -> None: + def run(self) -> None: """Fetch logs, check for matches, and send alerts.""" - if start_time == 0 and end_time == 0: - last_run = int(time.time())-self.config["check-interval"]; - while True: - now = int(time.time()) - self.run_once(last_run, now) - last_run = now - time.sleep(self.config["check-interval"]) - else: - if start_time == 0: - start_time = int(time.time()) - self.config["check-interval"] - if end_time == 0: - end_time = int(time.time()) - self.run_once(start_time, end_time) + while True: + for rule in self.alert_rules: + if time.time() >= rule.next_run: + rule.run() + time.sleep(5) def main(): parser = argparse.ArgumentParser(description="Log and Alert Management Tool") parser.add_argument("--config", required=True, help="Path to the configuration file") - parser.add_argument("--start", type=int, default=0, help="Start time (Unix timestamp)") - parser.add_argument("--end", type=int, default=0, help="End time (Unix timestamp)") args = parser.parse_args() app = LogAlertApp(args.config) - app.run(args.start, args.end) + app.run() if __name__ == "__main__": main()