From f5577068856ab8bbdb4fab062fe0100dd39cba87 Mon Sep 17 00:00:00 2001 From: Adam Goldsmith Date: Fri, 30 Dec 2022 21:46:49 -0500 Subject: [PATCH] Add support for text/regex based attributes --- check_home_assistant_state.py | 244 +++++++++++++++++++++++++--------- 1 file changed, 183 insertions(+), 61 deletions(-) diff --git a/check_home_assistant_state.py b/check_home_assistant_state.py index b966c80..31bb281 100755 --- a/check_home_assistant_state.py +++ b/check_home_assistant_state.py @@ -5,6 +5,7 @@ import argparse import dataclasses import logging +import re import sys from urllib.parse import urljoin from typing import Optional @@ -21,11 +22,12 @@ class Entities(nagiosplugin.Resource): url: str token: str device_class: str - min: float - max: float + numeric: bool filters: list[str] attribute: Optional[str] friendly_name: bool + min: Optional[float] = None + max: Optional[float] = None def hass_get(self, endpoint: str) -> requests.Response: headers = { @@ -54,10 +56,17 @@ class Entities(nagiosplugin.Resource): state["attributes"].get(k) == v for k, v in self.filters ): if self.attribute is not None: - value = state["attributes"].get(self.attribute, -1) + value = state["attributes"].get( + self.attribute, -1 if self.numeric else "attribute missing" + ) uom = None else: - value = float(state["state"]) if state["state"].isnumeric() else -1 + if self.numeric: + value = ( + float(state["state"]) if state["state"].isnumeric() else -1 + ) + else: + value = state["state"] uom = state["attributes"].get("unit_of_measurement") if self.friendly_name: @@ -81,6 +90,46 @@ class Entities(nagiosplugin.Resource): ) +class RegexContext(nagiosplugin.Context): + def __init__( + self, + name: str, + ok: str = None, + warning: str = None, + critical: str = None, + fmt_metric=None, + result_cls=nagiosplugin.Result, + ): + super().__init__(name, fmt_metric=fmt_metric, result_cls=result_cls) + # '.^' should match nothing + self.ok = re.compile(ok or ".^") + self.warning = re.compile(warning or ".^") + self.critical = re.compile(critical or ".^") + + def fmt_reason(self, regex: re.Pattern, metric: nagiosplugin.Metric) -> str: + return f"{metric.name} matched regex '{regex.pattern}' ({metric.value})" + + def evaluate(self, metric: nagiosplugin.Metric, resource): + if self.critical.match(metric.value): + return self.result_cls( + nagiosplugin.Critical, self.fmt_reason(self.critical, metric), metric + ) + elif self.warning.match(metric.value): + return self.result_cls( + nagiosplugin.Warn, self.fmt_reason(self.warning, metric), metric + ) + elif self.ok.match(metric.value): + return self.result_cls( + nagiosplugin.Ok, self.fmt_reason(self.ok, metric), metric + ) + else: + return self.result_cls( + nagiosplugin.Unknown, + f"{metric.name} did not match any defined patterns ({metric.value})", + metric, + ) + + def key_value(arg: str): k, _, v = arg.partition("=") return k, v @@ -102,19 +151,11 @@ class Icinga2ConfAction(argparse.Action): help=help, ) - def __call__( - self, parser: argparse.ArgumentParser, namespace, values, option_string=None - ): - filename = Path(__file__).absolute() - command_name = filename.stem.removeprefix("check_") - # FIXME: assumes that script will be in a subdirectory of PluginContribDir - command_path = filename.relative_to(filename.parents[1]) - - print(f'object CheckCommand "{command_name}" {{') - print(f' command = [ PluginContribDir + "/{command_path}" ]') - print(" arguments = {") + def _format_actions(self, param_prefix: str, parser: argparse.ArgumentParser): for action in parser._actions: - if action.dest in [self.dest, "help", "version"]: + if action.dest in [self.dest, "help", "version"] or isinstance( + action, argparse._SubParsersAction + ): continue arg_str = action.option_strings[-1] @@ -132,14 +173,46 @@ class Icinga2ConfAction(argparse.Action): argparse._AppendConstAction, ), ): - print(f' set_if = "${command_name}_{icinga2_var}$"') + print(f' set_if = "${param_prefix}_{icinga2_var}$"') else: - print(f' value = "${command_name}_{icinga2_var}$"') + print(f' value = "${param_prefix}_{icinga2_var}$"') print(f' description = "{action.help}"') print(" }") - print(" }\n}") + def __call__( + self, parser: argparse.ArgumentParser, namespace, values, option_string=None + ): + filename = Path(__file__).absolute() + command_name = filename.stem.removeprefix("check_") + # FIXME: assumes that script will be in a subdirectory of PluginContribDir + command_path = filename.relative_to(filename.parents[1]) + + if parser._subparsers: + # TODO: this seems a bit more hacky than it needs to be + choices = parser._subparsers._group_actions[0].choices + for subname, subparser in choices.items(): + print(f'object CheckCommand "{command_name}" {{') + print( + f' command = [ PluginContribDir + "/{command_path}", "{subname}" ]' + ) + + print(" arguments = {") + + self._format_actions(command_name, parser) + self._format_actions(f"{command_name}_{subname}", subparser) + + print(" }\n}") + else: + print(f'object CheckCommand "{command_name}" {{') + print(f' command = [ PluginContribDir + "/{command_path}" ]') + + print(" arguments = {") + + self._format_actions(command_name, parser) + + print(" }\n}") + parser.exit() @@ -162,41 +235,21 @@ def main(): "-u", "--url", required=True, type=str, help="URL for Home Assistant" ) - argp.add_argument( + shared_parser = argparse.ArgumentParser(add_help=False) + common_args = shared_parser.add_argument_group( + "Common Arguments", "Arguments shared between metric types" + ) + common_args.add_argument( "-d", "--device-class", type=str, required=True, help="device class of entities to monitor", ) - argp.add_argument( - "-w", - "--warning", - metavar="RANGE", - required=True, - help="return warning if value is outside %(metavar)s", - ) - argp.add_argument( - "-c", - "--critical", - metavar="RANGE", - required=True, - help="return critical if value is outside %(metavar)s", - ) - argp.add_argument( - "--min", - type=float, - help="min for performance data", - ) - argp.add_argument( - "--max", - type=float, - help="max for performance data", - ) - argp.add_argument( + common_args.add_argument( "-a", "--attribute", type=str, help="check attribute instead of value" ) - argp.add_argument( + common_args.add_argument( "-f", "--filter", type=key_value, @@ -204,27 +257,96 @@ def main(): nargs="*", help="filter by 'attribute=value' (may be specified multiple times)", ) - argp.add_argument( + common_args.add_argument( "--friendly", action="store_true", help="use friendly name, when available", ) - args = argp.parse_args() - - check = nagiosplugin.Check( - Entities( - args.url, - args.token, - args.device_class, - args.min, - args.max, - args.filter, - args.attribute, - args.friendly, - ), - nagiosplugin.ScalarContext(args.device_class, args.warning, args.critical), + subparsers = argp.add_subparsers( + title="Query type", required=True, dest="subparser_name" ) + + scalar_parser = subparsers.add_parser( + "scalar", help="Numeric metrics", parents=[shared_parser] + ) + scalar_parser.add_argument( + "-w", + "--warning", + metavar="RANGE", + required=True, + help="return warning if value is outside %(metavar)s", + ) + scalar_parser.add_argument( + "-c", + "--critical", + metavar="RANGE", + required=True, + help="return critical if value is outside %(metavar)s", + ) + scalar_parser.add_argument( + "--min", + type=float, + help="min for performance data", + ) + scalar_parser.add_argument( + "--max", + type=float, + help="max for performance data", + ) + + text_parser = subparsers.add_parser( + "text", help="Textual metrics", parents=[shared_parser] + ) + text_parser.add_argument( + "-o", + "--ok", + metavar="REGEX", + help="return ok if value matches %(metavar)s", + ) + text_parser.add_argument( + "-w", + "--warning", + metavar="REGEX", + help="return warning if value matches %(metavar)s", + ) + text_parser.add_argument( + "-c", + "--critical", + metavar="REGEX", + help="return critical if value matches %(metavar)s", + ) + + args = argp.parse_args() + if args.subparser_name == "scalar": + check = nagiosplugin.Check( + Entities( + url=args.url, + token=args.token, + device_class=args.device_class, + numeric=True, + min=args.min, + max=args.max, + filters=args.filter, + attribute=args.attribute, + friendly_name=args.friendly, + ), + nagiosplugin.ScalarContext(args.device_class, args.warning, args.critical), + ) + elif args.subparser_name == "text": + check = nagiosplugin.Check( + Entities( + url=args.url, + token=args.token, + device_class=args.device_class, + numeric=False, + filters=args.filter, + attribute=args.attribute, + friendly_name=args.friendly, + ), + RegexContext(args.device_class, args.ok, args.warning, args.critical), + ) + check.main(args.verbose)