#!/usr/bin/env python3 """A check plugin for Nagios/Icinga/etc. that queries Home Assistant for the current state of entities.""" import argparse import dataclasses import logging import re import sys from urllib.parse import urljoin from typing import Optional from pathlib import Path import requests import nagiosplugin _log = logging.getLogger("nagiosplugin") @dataclasses.dataclass class Entities(nagiosplugin.Resource): url: str token: str device_class: str numeric: bool filters: list[str] attribute: Optional[str] friendly_name: bool min: Optional[float] = None max: Optional[float] = None def hass_get(self, endpoint: str) -> requests.Response: headers = { "Authorization": "Bearer " + self.token, "Content-Type": "application/json", } r = requests.get(urljoin(self.url, endpoint), headers=headers) if not r.ok: raise Exception("Failed to query Home Assistant API: " + r.text) return r.json() def check_api(self): message = self.hass_get("/api/").get("message") if message != "API running.": print("ERROR: " + message) sys.exit(1) else: print("OK: " + message) def probe(self): response = self.hass_get("/api/states") for state in response: if state["attributes"].get("device_class") == self.device_class and all( state["attributes"].get(k) == v for k, v in self.filters ): if self.attribute is not None: value = state["attributes"].get( self.attribute, -1 if self.numeric else "attribute missing" ) uom = None else: if self.numeric: value = ( float(state["state"]) if state["state"].isnumeric() else -1 ) else: value = state["state"] uom = state["attributes"].get("unit_of_measurement") if self.friendly_name: name = ( state["attributes"] .get("friendly_name", state["entity_id"]) .translate({ord(c): "_" for c in "'="}) ) else: name = state["entity_id"] if state["state"] == "unavailable": _log.info(f"{state['entity_id']} unavailable") yield nagiosplugin.Metric( name, value, uom, context=self.device_class, min=self.min, max=self.max, ) class RegexContext(nagiosplugin.Context): def __init__( self, name: str, ok: str = None, warning: str = None, critical: str = None, fmt_metric=None, result_cls=nagiosplugin.Result, ): super().__init__(name, fmt_metric=fmt_metric, result_cls=result_cls) # '.^' should match nothing self.ok = re.compile(ok or ".^") self.warning = re.compile(warning or ".^") self.critical = re.compile(critical or ".^") def fmt_reason(self, regex: re.Pattern, metric: nagiosplugin.Metric) -> str: return f"{metric.name} matched regex '{regex.pattern}' ({metric.value})" def evaluate(self, metric: nagiosplugin.Metric, resource): if self.critical.match(metric.value): return self.result_cls( nagiosplugin.Critical, self.fmt_reason(self.critical, metric), metric ) elif self.warning.match(metric.value): return self.result_cls( nagiosplugin.Warn, self.fmt_reason(self.warning, metric), metric ) elif self.ok.match(metric.value): return self.result_cls( nagiosplugin.Ok, self.fmt_reason(self.ok, metric), metric ) else: return self.result_cls( nagiosplugin.Unknown, f"{metric.name} did not match any defined patterns ({metric.value})", metric, ) def key_value(arg: str): k, _, v = arg.partition("=") return k, v class Icinga2ConfAction(argparse.Action): def __init__( self, option_strings, dest=argparse.SUPPRESS, default=argparse.SUPPRESS, help="Generate conf file for icinga2 CheckCommand", ): super().__init__( option_strings=option_strings, dest=dest, default=default, nargs=0, help=help, ) def _format_actions(self, param_prefix: str, parser: argparse.ArgumentParser): for action in parser._actions: if action.dest in [self.dest, "help", "version"] or isinstance( action, argparse._SubParsersAction ): continue arg_str = action.option_strings[-1] icinga2_var = arg_str.lstrip("-").replace("-", "_") print(f' "{arg_str}" = {{') if action.required: print(" required = true") if isinstance( action, ( argparse._StoreConstAction, argparse._CountAction, argparse._AppendConstAction, ), ): print(f' set_if = "${param_prefix}_{icinga2_var}$"') else: print(f' value = "${param_prefix}_{icinga2_var}$"') print(f' description = "{action.help}"') print(" }") def __call__( self, parser: argparse.ArgumentParser, namespace, values, option_string=None ): filename = Path(__file__).absolute() command_name = filename.stem.removeprefix("check_") # FIXME: assumes that script will be in a subdirectory of PluginContribDir command_path = filename.relative_to(filename.parents[1]) if parser._subparsers: # TODO: this seems a bit more hacky than it needs to be choices = parser._subparsers._group_actions[0].choices for subname, subparser in choices.items(): print(f'object CheckCommand "{command_name}" {{') print( f' command = [ PluginContribDir + "/{command_path}", "{subname}" ]' ) print(" arguments = {") self._format_actions(command_name, parser) self._format_actions(f"{command_name}_{subname}", subparser) print(" }\n}") else: print(f'object CheckCommand "{command_name}" {{') print(f' command = [ PluginContribDir + "/{command_path}" ]') print(" arguments = {") self._format_actions(command_name, parser) print(" }\n}") parser.exit() @nagiosplugin.guarded def main(): argp = argparse.ArgumentParser(description=__doc__) argp.add_argument("--version", action="version", version="0.1.0") argp.add_argument("-v", "--verbose", action="count", default=0) argp.add_argument("--make-icinga-conf", action=Icinga2ConfAction) argp.add_argument( "-t", "--token", required=True, type=str, help="token for Home Assistant REST API (see https://developers.home-assistant.io/docs/api/rest/)", ) argp.add_argument( "-u", "--url", required=True, type=str, help="URL for Home Assistant" ) shared_parser = argparse.ArgumentParser(add_help=False) common_args = shared_parser.add_argument_group( "Common Arguments", "Arguments shared between metric types" ) common_args.add_argument( "-d", "--device-class", type=str, required=True, help="device class of entities to monitor", ) common_args.add_argument( "-a", "--attribute", type=str, help="check attribute instead of value" ) common_args.add_argument( "-f", "--filter", type=key_value, default=[], nargs="*", help="filter by 'attribute=value' (may be specified multiple times)", ) common_args.add_argument( "--friendly", action="store_true", help="use friendly name, when available", ) subparsers = argp.add_subparsers( title="Query type", required=True, dest="subparser_name" ) scalar_parser = subparsers.add_parser( "scalar", help="Numeric metrics", parents=[shared_parser] ) scalar_parser.add_argument( "-w", "--warning", metavar="RANGE", required=True, help="return warning if value is outside %(metavar)s", ) scalar_parser.add_argument( "-c", "--critical", metavar="RANGE", required=True, help="return critical if value is outside %(metavar)s", ) scalar_parser.add_argument( "--min", type=float, help="min for performance data", ) scalar_parser.add_argument( "--max", type=float, help="max for performance data", ) text_parser = subparsers.add_parser( "text", help="Textual metrics", parents=[shared_parser] ) text_parser.add_argument( "-o", "--ok", metavar="REGEX", help="return ok if value matches %(metavar)s", ) text_parser.add_argument( "-w", "--warning", metavar="REGEX", help="return warning if value matches %(metavar)s", ) text_parser.add_argument( "-c", "--critical", metavar="REGEX", help="return critical if value matches %(metavar)s", ) args = argp.parse_args() if args.subparser_name == "scalar": check = nagiosplugin.Check( Entities( url=args.url, token=args.token, device_class=args.device_class, numeric=True, min=args.min, max=args.max, filters=args.filter, attribute=args.attribute, friendly_name=args.friendly, ), nagiosplugin.ScalarContext(args.device_class, args.warning, args.critical), ) elif args.subparser_name == "text": check = nagiosplugin.Check( Entities( url=args.url, token=args.token, device_class=args.device_class, numeric=False, filters=args.filter, attribute=args.attribute, friendly_name=args.friendly, ), RegexContext(args.device_class, args.ok, args.warning, args.critical), ) check.main(args.verbose) if __name__ == "__main__": main()