check_home_assistant_state/check_home_assistant_state.py

494 lines
15 KiB
Python
Raw Normal View History

#!/usr/bin/env python3
2022-11-29 18:26:02 -05:00
"""A check plugin for Nagios/Icinga/etc. that queries Home Assistant for the current state of entities."""
import argparse
2022-11-26 11:11:03 -05:00
import dataclasses
import json
import logging
import re
import sys
from pathlib import Path
2024-04-06 12:43:48 -04:00
from typing import Any
from urllib.parse import urljoin
import nagiosplugin
2024-04-06 12:43:48 -04:00
import requests
_log = logging.getLogger("nagiosplugin")
class ScalarOrUnknownContext(nagiosplugin.ScalarContext):
2023-03-10 22:16:43 -05:00
"""
Same as a :class:`nagiosplugin.ScalarContext`, but returns UNKNOWN when the
value is not an int or float
"""
def evaluate(self, metric: nagiosplugin.Metric, resource: nagiosplugin.Resource):
if isinstance(metric.value, (int, float)):
return super().evaluate(metric, resource)
else:
2023-03-10 22:16:43 -05:00
return self.result_cls(
nagiosplugin.state.Unknown, "non-scalar value", metric
)
def performance(self, metric: nagiosplugin.Metric, resource: nagiosplugin.Resource):
if isinstance(metric.value, (int, float)):
return super().performance(metric, resource)
else:
return nagiosplugin.Performance(
2023-03-10 22:16:43 -05:00
metric.name,
"U",
metric.uom,
self.warning,
self.critical,
metric.min,
metric.max,
)
2023-03-11 00:55:38 -05:00
@dataclasses.dataclass
class AttributeFilter:
attribute: str
value: str
negated: bool
@classmethod
def from_str(cls, arg: str):
k, _, v = arg.partition("=")
if k.endswith("!"):
return cls(k.removesuffix("!"), v, True)
else:
return cls(k, v, False)
2022-11-26 11:11:03 -05:00
@dataclasses.dataclass
class Entities(nagiosplugin.Resource):
url: str
token: str
device_class: str
numeric: bool
2024-06-20 01:08:37 -04:00
area: str
label: str
2023-03-11 00:55:38 -05:00
filters: list[AttributeFilter]
2024-04-06 12:40:57 -04:00
attribute: str | None
friendly_name: bool
2023-03-10 22:16:43 -05:00
ignore_missing: bool
include: list[str]
exclude: list[str]
2024-04-06 12:40:57 -04:00
min: float | None = None
max: float | None = None
2024-04-06 12:40:57 -04:00
def hass_get(self, endpoint: str) -> Any:
headers = {
"Authorization": "Bearer " + self.token,
"Content-Type": "application/json",
}
r = requests.get(urljoin(self.url, endpoint), headers=headers)
if not r.ok:
raise Exception("Failed to query Home Assistant API: " + r.text)
return r.json()
def hass_post(self, endpoint: str, data: Any) -> Any:
headers = {
"Authorization": "Bearer " + self.token,
"Content-Type": "application/json",
}
r = requests.post(urljoin(self.url, endpoint), headers=headers, json=data)
if not r.ok:
raise Exception("Failed to query Home Assistant API: " + r.text)
return r.json()
def check_api(self):
message = self.hass_get("/api/").get("message")
if message != "API running.":
print("ERROR: " + message)
sys.exit(1)
else:
print("OK: " + message)
def _state_to_metric(self, state):
"""Convert a state into a metric"""
if self.attribute is not None:
if self.ignore_missing and self.attribute not in state["attributes"]:
return
else:
value = state["attributes"].get(self.attribute, "missing attribute")
uom = None
else:
if self.numeric:
try:
value = float(state["state"])
except ValueError:
value = state["state"]
else:
value = state["state"]
uom = state["attributes"].get("unit_of_measurement")
if self.friendly_name:
name = (
state["attributes"]
.get("friendly_name", state["entity_id"])
.translate({ord(c): "_" for c in "'="})
)
else:
name = state["entity_id"]
yield nagiosplugin.Metric(
name,
value,
uom,
2024-12-19 18:50:43 -05:00
context="scalar_entities" if self.numeric else "text_entities",
min=self.min,
max=self.max,
)
def probe(self):
2024-12-19 18:50:43 -05:00
template_filter = "states"
if self.device_class:
template_filter += (
f"|selectattr('attributes.device_class', 'eq', '{ self.device_class }')"
)
for f in self.filters:
op = "ne" if f.negated else "eq"
template_filter += (
f"|selectattr('attributes.{ f.attribute }', '{ op }', '{ f.value }')"
)
2024-06-20 01:08:37 -04:00
if self.area:
template_filter += (
f"|selectattr('entity_id', 'in', area_entities('{ self.area }'))"
)
if self.label:
template_filter += (
f"|selectattr('entity_id', 'in', label_entities('{ self.label }'))"
)
if self.exclude:
template_filter += (
f"|rejectattr('entity_id', 'in', { json.dumps(self.exclude) })"
)
if self.include:
template_filter = (
f"({template_filter}|list + "
f" states|selectattr('entity_id', 'in', { json.dumps(self.include) })|list"
")|unique(attribute='entity_id')"
)
template = (
"{% set ns = namespace(out=[]) %}\n"
f"{{% for s in { template_filter } %}}\n"
" {% set ns.out = ns.out + [{\n"
' "entity_id": s.entity_id,\n'
' "state": s.state,\n'
' "attributes": s.attributes,\n'
" }]\n"
" %}\n"
"{% endfor %}\n"
"{{ ns.out|tojson }}"
)
response = self.hass_post("/api/template", {"template": template})
for state in response:
yield from self._state_to_metric(state)
class RegexContext(nagiosplugin.Context):
def __init__(
self,
name: str,
2024-04-06 12:40:57 -04:00
ok: str | None = None,
warning: str | None = None,
critical: str | None = None,
fmt_metric=None,
result_cls=nagiosplugin.Result,
):
super().__init__(name, fmt_metric=fmt_metric, result_cls=result_cls)
# '.^' should match nothing
self.ok = re.compile(ok or ".^")
self.warning = re.compile(warning or ".^")
self.critical = re.compile(critical or ".^")
def fmt_reason(self, regex: re.Pattern, metric: nagiosplugin.Metric) -> str:
return f"{metric.name} matched regex '{regex.pattern}' ({metric.value})"
def evaluate(self, metric: nagiosplugin.Metric, resource):
if self.critical.match(metric.value):
return self.result_cls(
nagiosplugin.Critical, self.fmt_reason(self.critical, metric), metric
)
elif self.warning.match(metric.value):
return self.result_cls(
nagiosplugin.Warn, self.fmt_reason(self.warning, metric), metric
)
elif self.ok.match(metric.value):
return self.result_cls(
nagiosplugin.Ok, self.fmt_reason(self.ok, metric), metric
)
else:
return self.result_cls(
nagiosplugin.Unknown,
f"{metric.name} did not match any defined patterns ({metric.value})",
metric,
)
class Icinga2ConfAction(argparse.Action):
def __init__(
self,
option_strings,
dest=argparse.SUPPRESS,
default=argparse.SUPPRESS,
help="Generate conf file for icinga2 CheckCommand",
):
super().__init__(
option_strings=option_strings,
dest=dest,
default=default,
nargs=0,
help=help,
)
2023-03-10 22:16:43 -05:00
def _format_actions(
self,
param_prefix: str,
parser: argparse.ArgumentParser,
2024-04-06 12:40:57 -04:00
order: int | None = None,
2023-03-10 22:16:43 -05:00
):
for action in parser._actions:
if action.dest in [self.dest, "help", "version"] or isinstance(
action, argparse._SubParsersAction
):
continue
arg_str = action.option_strings[-1]
icinga2_var = arg_str.lstrip("-").replace("-", "_")
print(f' "{arg_str}" = {{')
if action.required:
print(" required = true")
if isinstance(
action,
(
argparse._StoreConstAction,
argparse._CountAction,
argparse._AppendConstAction,
),
):
print(f' set_if = "${param_prefix}_{icinga2_var}$"')
else:
print(f' value = "${param_prefix}_{icinga2_var}$"')
print(f' description = "{action.help}"')
if order is not None:
2023-03-10 22:16:43 -05:00
print(f" order = {order}")
print(" }")
def __call__(
self, parser: argparse.ArgumentParser, namespace, values, option_string=None
):
filename = Path(__file__).absolute()
command_name = filename.stem.removeprefix("check_")
# FIXME: assumes that script will be in a subdirectory of PluginContribDir
command_path = filename.relative_to(filename.parents[1])
if parser._subparsers:
# TODO: this seems a bit more hacky than it needs to be
choices = parser._subparsers._group_actions[0].choices
for subname, subparser in choices.items():
print(f'object CheckCommand "{command_name}_{subname}" {{')
2023-03-10 22:16:43 -05:00
print(f' command = [ PluginContribDir + "/{command_path}" ]')
print(" arguments = {")
self._format_actions(command_name, parser, order=-2)
print(f' "{subname}" = {{')
2023-03-10 22:16:43 -05:00
print(" set_if = true")
print(" order = -1")
print(" }")
self._format_actions(f"{command_name}_{subname}", subparser)
print(" }\n}")
else:
print(f'object CheckCommand "{command_name}" {{')
print(f' command = [ PluginContribDir + "/{command_path}" ]')
print(" arguments = {")
self._format_actions(command_name, parser)
print(" }\n}")
parser.exit()
@nagiosplugin.guarded
def main():
argp = argparse.ArgumentParser(description=__doc__)
2022-11-29 18:00:21 -05:00
2024-06-20 01:10:29 -04:00
argp.add_argument("--version", action="version", version="0.4.0")
2022-11-29 18:00:21 -05:00
argp.add_argument("-v", "--verbose", action="count", default=0)
argp.add_argument("--make-icinga-conf", action=Icinga2ConfAction)
2022-11-26 19:34:37 -05:00
argp.add_argument(
"-t",
"--token",
required=True,
type=str,
help="token for Home Assistant REST API (see https://developers.home-assistant.io/docs/api/rest/)",
2022-11-26 19:34:37 -05:00
)
argp.add_argument(
"-u", "--url", required=True, type=str, help="URL for Home Assistant"
)
shared_parser = argparse.ArgumentParser(add_help=False)
common_args = shared_parser.add_argument_group(
"Common Arguments", "Arguments shared between metric types"
)
common_args.add_argument(
"-d",
"--device-class",
type=str,
2024-12-19 18:50:43 -05:00
required=False,
help="device class of entities to monitor",
)
common_args.add_argument(
"-a", "--attribute", type=str, help="check attribute instead of value"
)
common_args.add_argument(
"-f",
"--filter",
2023-03-11 00:55:38 -05:00
type=AttributeFilter.from_str,
default=[],
nargs="*",
action="extend",
2023-03-11 00:55:38 -05:00
help="filter by 'attribute=value' (may be specified multiple times). Use != to negate the match",
)
common_args.add_argument(
"-i",
"--include",
default=[],
nargs="*",
action="extend",
help="explicitly include entities by id. Other entities will not be considered if this is specified. Listed entities must also match the specified device class",
)
2024-06-20 01:08:37 -04:00
common_args.add_argument("--area", type=str, help="area to filter by")
common_args.add_argument("--label", type=str, help="label to filter by")
common_args.add_argument(
"-e",
"--exclude",
default=[],
nargs="*",
action="extend",
help="exclude entities by id",
)
common_args.add_argument(
"--friendly",
action="store_true",
help="use friendly name, when available",
)
2023-03-10 22:16:43 -05:00
common_args.add_argument(
"--ignore-missing", action="store_true", help="Ignore missing attributes"
)
subparsers = argp.add_subparsers(
title="Query type", required=True, dest="subparser_name"
)
scalar_parser = subparsers.add_parser(
"scalar", help="Numeric metrics", parents=[shared_parser]
)
scalar_parser.add_argument(
"-w",
"--warning",
metavar="RANGE",
2022-11-26 19:34:06 -05:00
required=True,
help="return warning if value is outside %(metavar)s",
)
scalar_parser.add_argument(
"-c",
"--critical",
metavar="RANGE",
2022-11-26 19:34:06 -05:00
required=True,
help="return critical if value is outside %(metavar)s",
)
scalar_parser.add_argument(
"--min",
type=float,
help="min for performance data",
)
scalar_parser.add_argument(
"--max",
type=float,
help="max for performance data",
)
text_parser = subparsers.add_parser(
"text", help="Textual metrics", parents=[shared_parser]
)
text_parser.add_argument(
"-o",
"--ok",
metavar="REGEX",
help="return ok if value matches %(metavar)s",
)
text_parser.add_argument(
"-w",
"--warning",
metavar="REGEX",
help="return warning if value matches %(metavar)s",
)
text_parser.add_argument(
"-c",
"--critical",
metavar="REGEX",
help="return critical if value matches %(metavar)s",
)
2022-11-26 19:34:37 -05:00
args = argp.parse_args()
parsed_common_args = {
"device_class": args.device_class,
"attribute": args.attribute,
"filters": args.filter,
2024-06-20 01:08:37 -04:00
"area": args.area,
"label": args.label,
"include": args.include,
"exclude": args.exclude,
"friendly_name": args.friendly,
"ignore_missing": args.ignore_missing,
}
if args.subparser_name == "scalar":
check = nagiosplugin.Check(
Entities(
url=args.url,
token=args.token,
numeric=True,
min=args.min,
max=args.max,
**parsed_common_args,
),
2024-12-19 18:50:43 -05:00
ScalarOrUnknownContext("scalar_entities", args.warning, args.critical),
)
elif args.subparser_name == "text":
check = nagiosplugin.Check(
Entities(
url=args.url,
token=args.token,
numeric=False,
**parsed_common_args,
),
2024-12-19 18:50:43 -05:00
RegexContext("text_entities", args.ok, args.warning, args.critical),
)
check.main(args.verbose)
if __name__ == "__main__":
main()