check_home_assistant_state/check_home_assistant_state.py

520 lines
16 KiB
Python
Executable File

#!/usr/bin/env python3
"""A check plugin for Nagios/Icinga/etc. that queries Home Assistant for the current state of entities."""
import argparse
import dataclasses
import json
import logging
import re
import sys
from pathlib import Path
from typing import Any, Literal
from urllib.parse import urljoin
import nagiosplugin
import requests
_log = logging.getLogger("nagiosplugin")
class ScalarOrUnknownContext(nagiosplugin.ScalarContext):
"""
Same as a :class:`nagiosplugin.ScalarContext`, but returns UNKNOWN when the
value is not an int or float
"""
def evaluate(self, metric: nagiosplugin.Metric, resource: nagiosplugin.Resource):
if isinstance(metric.value, (int, float)):
return super().evaluate(metric, resource)
else:
return self.result_cls(
nagiosplugin.state.Unknown, "non-scalar value", metric
)
def performance(self, metric: nagiosplugin.Metric, resource: nagiosplugin.Resource):
if isinstance(metric.value, (int, float)):
return super().performance(metric, resource)
else:
return nagiosplugin.Performance(
metric.name,
"U",
metric.uom,
self.warning,
self.critical,
metric.min,
metric.max,
)
@dataclasses.dataclass
class AttributeFilter:
attribute: str
value: str
negated: bool
@classmethod
def from_str(cls, arg: str):
k, _, v = arg.partition("=")
if k.endswith("!"):
return cls(k.removesuffix("!"), v, True)
else:
return cls(k, v, False)
@dataclasses.dataclass
class Entities(nagiosplugin.Resource):
url: str
token: str
domain: str
device_class: str
numeric: bool
area: str
label: str
filters: list[AttributeFilter]
attribute: str | None
friendly_name: bool
ignore_missing: bool
include: list[str]
exclude: list[str]
min: float | None = None
max: float | None = None
def hass_get(self, endpoint: str) -> Any:
headers = {
"Authorization": "Bearer " + self.token,
"Content-Type": "application/json",
}
r = requests.get(urljoin(self.url, endpoint), headers=headers)
if not r.ok:
raise Exception("Failed to query Home Assistant API: " + r.text)
return r.json()
def hass_post(self, endpoint: str, data: Any) -> Any:
headers = {
"Authorization": "Bearer " + self.token,
"Content-Type": "application/json",
}
r = requests.post(urljoin(self.url, endpoint), headers=headers, json=data)
if not r.ok:
raise Exception("Failed to query Home Assistant API: " + r.text)
return r.json()
def check_api(self):
message = self.hass_get("/api/").get("message")
if message != "API running.":
print("ERROR: " + message)
sys.exit(1)
else:
print("OK: " + message)
def _state_to_metric(self, state):
"""Convert a state into a metric"""
if self.attribute is not None:
if self.ignore_missing and self.attribute not in state["attributes"]:
return
else:
value = state["attributes"].get(self.attribute, "missing attribute")
uom = None
else:
if self.numeric:
try:
value = float(state["state"])
except ValueError:
value = state["state"]
else:
value = state["state"]
uom = state["attributes"].get("unit_of_measurement")
if self.friendly_name:
name = (
state["attributes"]
.get("friendly_name", state["entity_id"])
.translate({ord(c): "_" for c in "'="})
)
else:
name = state["entity_id"]
yield nagiosplugin.Metric(
name,
value,
uom,
context="scalar_entities" if self.numeric else "text_entities",
min=self.min,
max=self.max,
)
@staticmethod
def _parse_negatable(filter: str) -> tuple[Literal["select", "reject"], str]:
if filter.startswith("!"):
filter = filter[1:]
op = "reject"
else:
op = "select"
return op, filter
def probe(self):
template_filter = "states"
if self.domain:
template_filter += f"|selectattr('domain', 'eq', '{ self.domain }')"
if self.device_class:
template_filter += (
f"|selectattr('attributes.device_class', 'eq', '{ self.device_class }')"
)
for f in self.filters:
op = "ne" if f.negated else "eq"
template_filter += (
f"|selectattr('attributes.{ f.attribute }', '{ op }', '{ f.value }')"
)
if self.area:
op, area = self._parse_negatable(self.area)
template_filter += (
f"|{op}attr('entity_id', 'in', area_entities('{ area }'))"
)
if self.label:
op, label = self._parse_negatable(self.label)
template_filter += (
f"|{op}attr('entity_id', 'in', label_entities('{ label }'))"
)
if self.exclude:
template_filter += (
f"|rejectattr('entity_id', 'in', { json.dumps(self.exclude) })"
)
if self.include:
template_filter = (
f"({template_filter}|list + "
f" states|selectattr('entity_id', 'in', { json.dumps(self.include) })|list"
")|unique(attribute='entity_id')"
)
template = (
"{% set ns = namespace(out=[]) %}\n"
f"{{% for s in { template_filter } %}}\n"
" {% set ns.out = ns.out + [{\n"
' "entity_id": s.entity_id,\n'
' "state": s.state,\n'
' "attributes": s.attributes,\n'
" }]\n"
" %}\n"
"{% endfor %}\n"
"{{ ns.out|tojson }}"
)
response = self.hass_post("/api/template", {"template": template})
for state in response:
yield from self._state_to_metric(state)
class RegexContext(nagiosplugin.Context):
def __init__(
self,
name: str,
ok: str | None = None,
warning: str | None = None,
critical: str | None = None,
fmt_metric=None,
result_cls=nagiosplugin.Result,
):
super().__init__(name, fmt_metric=fmt_metric, result_cls=result_cls)
# '.^' should match nothing
self.ok = re.compile(ok or ".^")
self.warning = re.compile(warning or ".^")
self.critical = re.compile(critical or ".^")
def fmt_reason(self, regex: re.Pattern, metric: nagiosplugin.Metric) -> str:
return f"{metric.name} matched regex '{regex.pattern}' ({metric.value})"
def evaluate(self, metric: nagiosplugin.Metric, resource):
if self.critical.match(metric.value):
return self.result_cls(
nagiosplugin.Critical, self.fmt_reason(self.critical, metric), metric
)
elif self.warning.match(metric.value):
return self.result_cls(
nagiosplugin.Warn, self.fmt_reason(self.warning, metric), metric
)
elif self.ok.match(metric.value):
return self.result_cls(
nagiosplugin.Ok, self.fmt_reason(self.ok, metric), metric
)
else:
return self.result_cls(
nagiosplugin.Unknown,
f"{metric.name} did not match any defined patterns ({metric.value})",
metric,
)
class Icinga2ConfAction(argparse.Action):
def __init__(
self,
option_strings,
dest=argparse.SUPPRESS,
default=argparse.SUPPRESS,
help="Generate conf file for icinga2 CheckCommand",
):
super().__init__(
option_strings=option_strings,
dest=dest,
default=default,
nargs=0,
help=help,
)
def _format_actions(
self,
param_prefix: str,
parser: argparse.ArgumentParser,
order: int | None = None,
):
for action in parser._actions:
if action.dest in [self.dest, "help", "version"] or isinstance(
action, argparse._SubParsersAction
):
continue
arg_str = action.option_strings[-1]
icinga2_var = arg_str.lstrip("-").replace("-", "_")
print(f' "{arg_str}" = {{')
if action.required:
print(" required = true")
if isinstance(
action,
(
argparse._StoreConstAction,
argparse._CountAction,
argparse._AppendConstAction,
),
):
print(f' set_if = "${param_prefix}_{icinga2_var}$"')
else:
print(f' value = "${param_prefix}_{icinga2_var}$"')
print(f' description = "{action.help}"')
if order is not None:
print(f" order = {order}")
print(" }")
def __call__(
self, parser: argparse.ArgumentParser, namespace, values, option_string=None
):
filename = Path(__file__).absolute()
command_name = filename.stem.removeprefix("check_")
# FIXME: assumes that script will be in a subdirectory of PluginContribDir
command_path = filename.relative_to(filename.parents[1])
if parser._subparsers:
# TODO: this seems a bit more hacky than it needs to be
choices = parser._subparsers._group_actions[0].choices
for subname, subparser in choices.items():
print(f'object CheckCommand "{command_name}_{subname}" {{')
print(f' command = [ PluginContribDir + "/{command_path}" ]')
print(" arguments = {")
self._format_actions(command_name, parser, order=-2)
print(f' "{subname}" = {{')
print(" set_if = true")
print(" order = -1")
print(" }")
self._format_actions(f"{command_name}_{subname}", subparser)
print(" }\n}")
else:
print(f'object CheckCommand "{command_name}" {{')
print(f' command = [ PluginContribDir + "/{command_path}" ]')
print(" arguments = {")
self._format_actions(command_name, parser)
print(" }\n}")
parser.exit()
@nagiosplugin.guarded
def main():
argp = argparse.ArgumentParser(description=__doc__)
argp.add_argument("--version", action="version", version="0.4.0")
argp.add_argument("-v", "--verbose", action="count", default=0)
argp.add_argument("--make-icinga-conf", action=Icinga2ConfAction)
argp.add_argument(
"-t",
"--token",
required=True,
type=str,
help="token for Home Assistant REST API (see https://developers.home-assistant.io/docs/api/rest/)",
)
argp.add_argument(
"-u", "--url", required=True, type=str, help="URL for Home Assistant"
)
shared_parser = argparse.ArgumentParser(add_help=False)
common_args = shared_parser.add_argument_group(
"Common Arguments", "Arguments shared between metric types"
)
common_args.add_argument(
"--domain",
type=str,
required=False,
help="domain of entities to monitor",
)
common_args.add_argument(
"-d",
"--device-class",
type=str,
required=False,
help="device class of entities to monitor",
)
common_args.add_argument(
"-a", "--attribute", type=str, help="check attribute instead of value"
)
common_args.add_argument(
"-f",
"--filter",
type=AttributeFilter.from_str,
default=[],
nargs="*",
action="extend",
help="filter by 'attribute=value' (may be specified multiple times). Use != to negate the match",
)
common_args.add_argument(
"-i",
"--include",
default=[],
nargs="*",
action="extend",
help="explicitly include entities by id. Other entities will not be considered if this is specified. Listed entities must also match the specified device class",
)
common_args.add_argument(
"--area", type=str, help="area to filter by. '!' before negates"
)
common_args.add_argument(
"--label", type=str, help="label to filter by. '!' before negates"
)
common_args.add_argument(
"-e",
"--exclude",
default=[],
nargs="*",
action="extend",
help="exclude entities by id",
)
common_args.add_argument(
"--friendly",
action="store_true",
help="use friendly name, when available",
)
common_args.add_argument(
"--ignore-missing", action="store_true", help="Ignore missing attributes"
)
subparsers = argp.add_subparsers(
title="Query type", required=True, dest="subparser_name"
)
scalar_parser = subparsers.add_parser(
"scalar", help="Numeric metrics", parents=[shared_parser]
)
scalar_parser.add_argument(
"-w",
"--warning",
metavar="RANGE",
required=True,
help="return warning if value is outside %(metavar)s",
)
scalar_parser.add_argument(
"-c",
"--critical",
metavar="RANGE",
required=True,
help="return critical if value is outside %(metavar)s",
)
scalar_parser.add_argument(
"--min",
type=float,
help="min for performance data",
)
scalar_parser.add_argument(
"--max",
type=float,
help="max for performance data",
)
text_parser = subparsers.add_parser(
"text", help="Textual metrics", parents=[shared_parser]
)
text_parser.add_argument(
"-o",
"--ok",
metavar="REGEX",
help="return ok if value matches %(metavar)s",
)
text_parser.add_argument(
"-w",
"--warning",
metavar="REGEX",
help="return warning if value matches %(metavar)s",
)
text_parser.add_argument(
"-c",
"--critical",
metavar="REGEX",
help="return critical if value matches %(metavar)s",
)
args = argp.parse_args()
parsed_common_args = {
"domain": args.domain,
"device_class": args.device_class,
"attribute": args.attribute,
"filters": args.filter,
"area": args.area,
"label": args.label,
"include": args.include,
"exclude": args.exclude,
"friendly_name": args.friendly,
"ignore_missing": args.ignore_missing,
}
if args.subparser_name == "scalar":
check = nagiosplugin.Check(
Entities(
url=args.url,
token=args.token,
numeric=True,
min=args.min,
max=args.max,
**parsed_common_args,
),
ScalarOrUnknownContext("scalar_entities", args.warning, args.critical),
)
elif args.subparser_name == "text":
check = nagiosplugin.Check(
Entities(
url=args.url,
token=args.token,
numeric=False,
**parsed_common_args,
),
RegexContext("text_entities", args.ok, args.warning, args.critical),
)
check.main(args.verbose)
if __name__ == "__main__":
main()