Add support for text/regex based attributes

This commit is contained in:
Adam Goldsmith 2022-12-30 21:46:49 -05:00
parent a1b8025c7e
commit 256529373d

View File

@ -5,6 +5,7 @@
import argparse import argparse
import dataclasses import dataclasses
import logging import logging
import re
import sys import sys
from urllib.parse import urljoin from urllib.parse import urljoin
from typing import Optional from typing import Optional
@ -21,11 +22,12 @@ class Entities(nagiosplugin.Resource):
url: str url: str
token: str token: str
device_class: str device_class: str
min: float numeric: bool
max: float
filters: list[str] filters: list[str]
attribute: Optional[str] attribute: Optional[str]
friendly_name: bool friendly_name: bool
min: Optional[float] = None
max: Optional[float] = None
def hass_get(self, endpoint: str) -> requests.Response: def hass_get(self, endpoint: str) -> requests.Response:
headers = { headers = {
@ -54,10 +56,17 @@ class Entities(nagiosplugin.Resource):
state["attributes"].get(k) == v for k, v in self.filters state["attributes"].get(k) == v for k, v in self.filters
): ):
if self.attribute is not None: if self.attribute is not None:
value = state["attributes"].get(self.attribute, -1) value = state["attributes"].get(
self.attribute, -1 if self.numeric else "attribute missing"
)
uom = None uom = None
else: else:
value = float(state["state"]) if state["state"].isnumeric() else -1 if self.numeric:
value = (
float(state["state"]) if state["state"].isnumeric() else -1
)
else:
value = state["state"]
uom = state["attributes"].get("unit_of_measurement") uom = state["attributes"].get("unit_of_measurement")
if self.friendly_name: if self.friendly_name:
@ -81,6 +90,46 @@ class Entities(nagiosplugin.Resource):
) )
class RegexContext(nagiosplugin.Context):
def __init__(
self,
name: str,
ok: str = None,
warning: str = None,
critical: str = None,
fmt_metric=None,
result_cls=nagiosplugin.Result,
):
super().__init__(name, fmt_metric=fmt_metric, result_cls=result_cls)
# '.^' should match nothing
self.ok = re.compile(ok or ".^")
self.warning = re.compile(warning or ".^")
self.critical = re.compile(critical or ".^")
def fmt_reason(self, regex: re.Pattern, metric: nagiosplugin.Metric) -> str:
return f"{metric.name} matched regex '{regex.pattern}' ({metric.value})"
def evaluate(self, metric: nagiosplugin.Metric, resource):
if self.critical.match(metric.value):
return self.result_cls(
nagiosplugin.Critical, self.fmt_reason(self.critical, metric), metric
)
elif self.warning.match(metric.value):
return self.result_cls(
nagiosplugin.Warn, self.fmt_reason(self.warning, metric), metric
)
elif self.ok.match(metric.value):
return self.result_cls(
nagiosplugin.Ok, self.fmt_reason(self.ok, metric), metric
)
else:
return self.result_cls(
nagiosplugin.Unknown,
f"{metric.name} did not match any defined patterns ({metric.value})",
metric,
)
def key_value(arg: str): def key_value(arg: str):
k, _, v = arg.partition("=") k, _, v = arg.partition("=")
return k, v return k, v
@ -102,19 +151,11 @@ class Icinga2ConfAction(argparse.Action):
help=help, help=help,
) )
def __call__( def _format_actions(self, param_prefix: str, parser: argparse.ArgumentParser, order: Optional[int] = None):
self, parser: argparse.ArgumentParser, namespace, values, option_string=None
):
filename = Path(__file__).absolute()
command_name = filename.stem.removeprefix("check_")
# FIXME: assumes that script will be in a subdirectory of PluginContribDir
command_path = filename.relative_to(filename.parents[1])
print(f'object CheckCommand "{command_name}" {{')
print(f' command = [ PluginContribDir + "/{command_path}" ]')
print(" arguments = {")
for action in parser._actions: for action in parser._actions:
if action.dest in [self.dest, "help", "version"]: if action.dest in [self.dest, "help", "version"] or isinstance(
action, argparse._SubParsersAction
):
continue continue
arg_str = action.option_strings[-1] arg_str = action.option_strings[-1]
@ -132,14 +173,54 @@ class Icinga2ConfAction(argparse.Action):
argparse._AppendConstAction, argparse._AppendConstAction,
), ),
): ):
print(f' set_if = "${command_name}_{icinga2_var}$"') print(f' set_if = "${param_prefix}_{icinga2_var}$"')
else: else:
print(f' value = "${command_name}_{icinga2_var}$"') print(f' value = "${param_prefix}_{icinga2_var}$"')
print(f' description = "{action.help}"') print(f' description = "{action.help}"')
if order is not None:
print(f' order = {order}')
print(" }") print(" }")
def __call__(
self, parser: argparse.ArgumentParser, namespace, values, option_string=None
):
filename = Path(__file__).absolute()
command_name = filename.stem.removeprefix("check_")
# FIXME: assumes that script will be in a subdirectory of PluginContribDir
command_path = filename.relative_to(filename.parents[1])
if parser._subparsers:
# TODO: this seems a bit more hacky than it needs to be
choices = parser._subparsers._group_actions[0].choices
for subname, subparser in choices.items():
print(f'object CheckCommand "{command_name}_{subname}" {{')
print(
f' command = [ PluginContribDir + "/{command_path}" ]'
)
print(" arguments = {")
self._format_actions(command_name, parser, order=-2)
print(f' "{subname}" = {{')
print(' set_if = true')
print(' order = -1')
print(' }')
self._format_actions(f"{command_name}_{subname}", subparser)
print(" }\n}") print(" }\n}")
else:
print(f'object CheckCommand "{command_name}" {{')
print(f' command = [ PluginContribDir + "/{command_path}" ]')
print(" arguments = {")
self._format_actions(command_name, parser)
print(" }\n}")
parser.exit() parser.exit()
@ -162,41 +243,21 @@ def main():
"-u", "--url", required=True, type=str, help="URL for Home Assistant" "-u", "--url", required=True, type=str, help="URL for Home Assistant"
) )
argp.add_argument( shared_parser = argparse.ArgumentParser(add_help=False)
common_args = shared_parser.add_argument_group(
"Common Arguments", "Arguments shared between metric types"
)
common_args.add_argument(
"-d", "-d",
"--device-class", "--device-class",
type=str, type=str,
required=True, required=True,
help="device class of entities to monitor", help="device class of entities to monitor",
) )
argp.add_argument( common_args.add_argument(
"-w",
"--warning",
metavar="RANGE",
required=True,
help="return warning if value is outside %(metavar)s",
)
argp.add_argument(
"-c",
"--critical",
metavar="RANGE",
required=True,
help="return critical if value is outside %(metavar)s",
)
argp.add_argument(
"--min",
type=float,
help="min for performance data",
)
argp.add_argument(
"--max",
type=float,
help="max for performance data",
)
argp.add_argument(
"-a", "--attribute", type=str, help="check attribute instead of value" "-a", "--attribute", type=str, help="check attribute instead of value"
) )
argp.add_argument( common_args.add_argument(
"-f", "-f",
"--filter", "--filter",
type=key_value, type=key_value,
@ -204,27 +265,96 @@ def main():
nargs="*", nargs="*",
help="filter by 'attribute=value' (may be specified multiple times)", help="filter by 'attribute=value' (may be specified multiple times)",
) )
argp.add_argument( common_args.add_argument(
"--friendly", "--friendly",
action="store_true", action="store_true",
help="use friendly name, when available", help="use friendly name, when available",
) )
args = argp.parse_args() subparsers = argp.add_subparsers(
title="Query type", required=True, dest="subparser_name"
)
scalar_parser = subparsers.add_parser(
"scalar", help="Numeric metrics", parents=[shared_parser]
)
scalar_parser.add_argument(
"-w",
"--warning",
metavar="RANGE",
required=True,
help="return warning if value is outside %(metavar)s",
)
scalar_parser.add_argument(
"-c",
"--critical",
metavar="RANGE",
required=True,
help="return critical if value is outside %(metavar)s",
)
scalar_parser.add_argument(
"--min",
type=float,
help="min for performance data",
)
scalar_parser.add_argument(
"--max",
type=float,
help="max for performance data",
)
text_parser = subparsers.add_parser(
"text", help="Textual metrics", parents=[shared_parser]
)
text_parser.add_argument(
"-o",
"--ok",
metavar="REGEX",
help="return ok if value matches %(metavar)s",
)
text_parser.add_argument(
"-w",
"--warning",
metavar="REGEX",
help="return warning if value matches %(metavar)s",
)
text_parser.add_argument(
"-c",
"--critical",
metavar="REGEX",
help="return critical if value matches %(metavar)s",
)
args = argp.parse_args()
if args.subparser_name == "scalar":
check = nagiosplugin.Check( check = nagiosplugin.Check(
Entities( Entities(
args.url, url=args.url,
args.token, token=args.token,
args.device_class, device_class=args.device_class,
args.min, numeric=True,
args.max, min=args.min,
args.filter, max=args.max,
args.attribute, filters=args.filter,
args.friendly, attribute=args.attribute,
friendly_name=args.friendly,
), ),
nagiosplugin.ScalarContext(args.device_class, args.warning, args.critical), nagiosplugin.ScalarContext(args.device_class, args.warning, args.critical),
) )
elif args.subparser_name == "text":
check = nagiosplugin.Check(
Entities(
url=args.url,
token=args.token,
device_class=args.device_class,
numeric=False,
filters=args.filter,
attribute=args.attribute,
friendly_name=args.friendly,
),
RegexContext(args.device_class, args.ok, args.warning, args.critical),
)
check.main(args.verbose) check.main(args.verbose)