#!/usr/bin/env python3 """A check plugin for Nagios/Icinga/etc. that queries Home Assistant for the current state of entities.""" import argparse import dataclasses import logging import sys from urllib.parse import urljoin from typing import Optional from pathlib import Path import requests import nagiosplugin _log = logging.getLogger("nagiosplugin") @dataclasses.dataclass class Entities(nagiosplugin.Resource): url: str token: str device_class: str min: float max: float filters: list[str] attribute: Optional[str] friendly_name: bool def hass_get(self, endpoint: str) -> requests.Response: headers = { "Authorization": "Bearer " + self.token, "Content-Type": "application/json", } r = requests.get(urljoin(self.url, endpoint), headers=headers) if not r.ok: raise Exception("Failed to query Home Assistant API: " + r.text) return r.json() def check_api(self): message = self.hass_get("/api/").get("message") if message != "API running.": print("ERROR: " + message) sys.exit(1) else: print("OK: " + message) def probe(self): response = self.hass_get("/api/states") for state in response: if state["attributes"].get("device_class") == self.device_class and all( state["attributes"].get(k) == v for k, v in self.filters ): if self.attribute is not None: value = state["attributes"].get(self.attribute, -1) uom = None else: value = float(state["state"]) if state["state"].isnumeric() else -1 uom = state["attributes"].get("unit_of_measurement") if self.friendly_name: name = ( state["attributes"] .get("friendly_name", state["entity_id"]) .translate({ord(c): "_" for c in "'="}) ) else: name = state["entity_id"] if state["state"] == "unavailable": _log.info(f"{state['entity_id']} unavailable") yield nagiosplugin.Metric( name, value, uom, context=self.device_class, min=self.min, max=self.max, ) def key_value(arg: str): k, _, v = arg.partition("=") return k, v class Icinga2ConfAction(argparse.Action): def __init__( self, option_strings, dest=argparse.SUPPRESS, default=argparse.SUPPRESS, help="Generate conf file for icinga2 CheckCommand", ): super().__init__( option_strings=option_strings, dest=dest, default=default, nargs=0, help=help, ) def __call__( self, parser: argparse.ArgumentParser, namespace, values, option_string=None ): filename = Path(__file__).absolute() command_name = filename.stem.removeprefix("check_") # FIXME: assumes that script will be in a subdirectory of PluginContribDir command_path = filename.relative_to(filename.parents[1]) print(f'object CheckCommand "{command_name}" {{') print(f' command = [ PluginContribDir + "/{command_path}" ]') print(" arguments = {") for action in parser._actions: if action.dest in [self.dest, "help", "version"]: continue arg_str = action.option_strings[-1] icinga2_var = arg_str.lstrip("-").replace("-", "_") print(f' "{arg_str}" = {{') if action.required: print(" required = true") if isinstance( action, ( argparse._StoreConstAction, argparse._CountAction, argparse._AppendConstAction, ), ): print(f' set_if = "${command_name}_{icinga2_var}$"') else: print(f' value = "${command_name}_{icinga2_var}$"') print(f' description = "{action.help}"') print(" }") print(" }\n}") parser.exit() @nagiosplugin.guarded def main(): argp = argparse.ArgumentParser(description=__doc__) argp.add_argument("-v", "--verbose", action="count", default=0) argp.add_argument("--make-icinga-conf", action=Icinga2ConfAction) argp.add_argument( "-t", "--token", required=True, type=str, help="token for Home Assistant REST API (see https://developers.home-assistant.io/docs/api/rest/)", ) argp.add_argument( "-u", "--url", required=True, type=str, help="URL for Home Assistant" ) argp.add_argument( "-d", "--device-class", type=str, required=True, help="device class of entities to monitor", ) argp.add_argument( "-w", "--warning", metavar="RANGE", required=True, help="return warning if value is outside %(metavar)s", ) argp.add_argument( "-c", "--critical", metavar="RANGE", required=True, help="return critical if value is outside %(metavar)s", ) argp.add_argument( "--min", type=float, help="min for performance data", ) argp.add_argument( "--max", type=float, help="max for performance data", ) argp.add_argument( "-a", "--attribute", type=str, help="check attribute instead of value" ) argp.add_argument( "-f", "--filter", type=key_value, default=[], nargs="*", help="filter by 'attribute=value' (may be specified multiple times)", ) argp.add_argument( "--friendly", action="store_true", help="use friendly name, when available", ) args = argp.parse_args() check = nagiosplugin.Check( Entities( args.url, args.token, args.device_class, args.min, args.max, args.filter, args.attribute, args.friendly, ), nagiosplugin.ScalarContext(args.device_class, args.warning, args.critical), ) check.main(args.verbose) if __name__ == "__main__": main()