check_home_assistant_state/check_home_assistant_state.py

225 lines
6.3 KiB
Python
Executable File

#!/usr/bin/env python3
"""A check plugin for Nagios/Icinga/etc. that queries Home Assistant for the current state of entities."""
import argparse
import dataclasses
import logging
import sys
from urllib.parse import urljoin
from typing import Optional
from pathlib import Path
import requests
import nagiosplugin
_log = logging.getLogger("nagiosplugin")
@dataclasses.dataclass
class Entities(nagiosplugin.Resource):
url: str
token: str
device_class: str
min: float
max: float
filters: list[str]
attribute: Optional[str]
friendly_name: bool
def hass_get(self, endpoint: str) -> requests.Response:
headers = {
"Authorization": "Bearer " + self.token,
"Content-Type": "application/json",
}
r = requests.get(urljoin(self.url, endpoint), headers=headers)
if not r.ok:
raise Exception("Failed to query Home Assistant API: " + r.text)
return r.json()
def check_api(self):
message = self.hass_get("/api/").get("message")
if message != "API running.":
print("ERROR: " + message)
sys.exit(1)
else:
print("OK: " + message)
def probe(self):
response = self.hass_get("/api/states")
for state in response:
if state["attributes"].get("device_class") == self.device_class and all(
state["attributes"].get(k) == v for k, v in self.filters
):
if self.attribute is not None:
value = state["attributes"].get(self.attribute, -1)
uom = None
else:
value = float(state["state"]) if state["state"].isnumeric() else -1
uom = state["attributes"].get("unit_of_measurement")
if self.friendly_name:
name = (
state["attributes"]
.get("friendly_name", state["entity_id"])
.translate({ord(c): "_" for c in "'="})
)
else:
name = state["entity_id"]
if state["state"] == "unavailable":
_log.info(f"{state['entity_id']} unavailable")
yield nagiosplugin.Metric(
name,
value,
uom,
context=self.device_class,
min=self.min,
max=self.max,
)
def key_value(arg: str):
k, _, v = arg.partition("=")
return k, v
class Icinga2ConfAction(argparse.Action):
def __init__(
self,
option_strings,
dest=argparse.SUPPRESS,
default=argparse.SUPPRESS,
help="Generate conf file for icinga2 CheckCommand",
):
super().__init__(
option_strings=option_strings,
dest=dest,
default=default,
nargs=0,
help=help,
)
def __call__(
self, parser: argparse.ArgumentParser, namespace, values, option_string=None
):
filename = Path(__file__).absolute()
command_name = filename.stem.removeprefix("check_")
# FIXME: assumes that script will be in a subdirectory of PluginContribDir
command_path = filename.relative_to(filename.parents[1])
print(f'object CheckCommand "{command_name}" {{')
print(f' command = [ PluginContribDir + "/{command_path}" ]')
print(" arguments = {")
for action in parser._actions:
if action.dest in [self.dest, "help", "version"]:
continue
arg_str = action.option_strings[-1]
icinga2_var = arg_str.lstrip("-").replace("-", "_")
print(f' "{arg_str}" = {{')
if action.required:
print(" required = true")
if isinstance(action, argparse._StoreConstAction):
print(f' set_if = "${command_name}_{icinga2_var}$"')
else:
print(f' value = "${command_name}_{icinga2_var}$"')
print(f' description = "{action.help}"')
print(" }")
print(" }\n}")
parser.exit()
@nagiosplugin.guarded
def main():
argp = argparse.ArgumentParser(description=__doc__)
argp.add_argument("-v", "--verbose", action="count", default=0)
argp.add_argument("--make-icinga-conf", action=Icinga2ConfAction)
argp.add_argument(
"-t",
"--token",
required=True,
type=str,
help="token for Home Assistant REST API (see https://developers.home-assistant.io/docs/api/rest/)",
)
argp.add_argument(
"-u", "--url", required=True, type=str, help="URL for Home Assistant"
)
argp.add_argument(
"-d",
"--device-class",
type=str,
required=True,
help="device class of entities to monitor",
)
argp.add_argument(
"-w",
"--warning",
metavar="RANGE",
required=True,
help="return warning if value is outside %(metavar)s",
)
argp.add_argument(
"-c",
"--critical",
metavar="RANGE",
required=True,
help="return critical if value is outside %(metavar)s",
)
argp.add_argument(
"--min",
type=float,
help="min for performance data",
)
argp.add_argument(
"--max",
type=float,
help="max for performance data",
)
argp.add_argument(
"-a", "--attribute", type=str, help="check attribute instead of value"
)
argp.add_argument(
"-f",
"--filter",
type=key_value,
default=[],
nargs="*",
help="filter by 'attribute=value' (may be specified multiple times)",
)
argp.add_argument(
"--friendly",
action="store_true",
help="use friendly name, when available",
)
args = argp.parse_args()
check = nagiosplugin.Check(
Entities(
args.url,
args.token,
args.device_class,
args.min,
args.max,
args.filter,
args.attribute,
args.friendly,
),
nagiosplugin.ScalarContext(args.device_class, args.warning, args.critical),
)
check.main(args.verbose)
if __name__ == "__main__":
main()