Compare commits

..

No commits in common. "97d07142d8c442dd54bf15e9d5473725d24f8883" and "3e9d362805d57731e0ddc5ec84a72f1c9499f18d" have entirely different histories.

2 changed files with 60 additions and 139 deletions

View File

@ -17,65 +17,15 @@ import nagiosplugin
_log = logging.getLogger("nagiosplugin") _log = logging.getLogger("nagiosplugin")
class ScalarOrUnknownContext(nagiosplugin.ScalarContext):
"""
Same as a :class:`nagiosplugin.ScalarContext`, but returns UNKNOWN when the
value is not an int or float
"""
def evaluate(self, metric: nagiosplugin.Metric, resource: nagiosplugin.Resource):
if isinstance(metric.value, (int, float)):
return super().evaluate(metric, resource)
else:
return self.result_cls(
nagiosplugin.state.Unknown, "non-scalar value", metric
)
def performance(self, metric: nagiosplugin.Metric, resource: nagiosplugin.Resource):
if isinstance(metric.value, (int, float)):
return super().performance(metric, resource)
else:
return nagiosplugin.Performance(
metric.name,
"U",
metric.uom,
self.warning,
self.critical,
metric.min,
metric.max,
)
@dataclasses.dataclass
class AttributeFilter:
attribute: str
value: str
negated: bool
@classmethod
def from_str(cls, arg: str):
k, _, v = arg.partition("=")
if k.endswith("!"):
return cls(k.removesuffix("!"), v, True)
else:
return cls(k, v, False)
def check(self, attributes: dict[str, str]) -> bool:
return self.negated ^ (attributes.get(self.attribute) == self.value)
@dataclasses.dataclass @dataclasses.dataclass
class Entities(nagiosplugin.Resource): class Entities(nagiosplugin.Resource):
url: str url: str
token: str token: str
device_class: str device_class: str
numeric: bool numeric: bool
filters: list[AttributeFilter] filters: list[str]
attribute: Optional[str] attribute: Optional[str]
friendly_name: bool friendly_name: bool
ignore_missing: bool
include: list[str]
exclude: list[str]
min: Optional[float] = None min: Optional[float] = None
max: Optional[float] = None max: Optional[float] = None
@ -99,20 +49,22 @@ class Entities(nagiosplugin.Resource):
else: else:
print("OK: " + message) print("OK: " + message)
def _state_to_metric(self, state): def probe(self):
"""Convert a state into a metric""" response = self.hass_get("/api/states")
for state in response:
if state["attributes"].get("device_class") == self.device_class and all(
state["attributes"].get(k) == v for k, v in self.filters
):
if self.attribute is not None: if self.attribute is not None:
if self.ignore_missing and self.attribute not in state["attributes"]: value = state["attributes"].get(
return self.attribute, -1 if self.numeric else "attribute missing"
else: )
value = state["attributes"].get(self.attribute, "missing attribute")
uom = None uom = None
else: else:
if self.numeric: if self.numeric:
try: value = (
value = float(state["state"]) float(state["state"]) if state["state"].isnumeric() else -1
except ValueError: )
value = state["state"]
else: else:
value = state["state"] value = state["state"]
uom = state["attributes"].get("unit_of_measurement") uom = state["attributes"].get("unit_of_measurement")
@ -126,6 +78,8 @@ class Entities(nagiosplugin.Resource):
else: else:
name = state["entity_id"] name = state["entity_id"]
if state["state"] == "unavailable":
_log.info(f"{state['entity_id']} unavailable")
yield nagiosplugin.Metric( yield nagiosplugin.Metric(
name, name,
value, value,
@ -135,17 +89,6 @@ class Entities(nagiosplugin.Resource):
max=self.max, max=self.max,
) )
def probe(self):
response = self.hass_get("/api/states")
for state in response:
if (
state["attributes"].get("device_class") == self.device_class
and all(filter.check(state["attributes"]) for filter in self.filters)
and (len(self.include) == 0 or state["entity_id"] in self.include)
and state["entity_id"] not in self.exclude
):
yield from self._state_to_metric(state)
class RegexContext(nagiosplugin.Context): class RegexContext(nagiosplugin.Context):
def __init__( def __init__(
@ -187,6 +130,11 @@ class RegexContext(nagiosplugin.Context):
) )
def key_value(arg: str):
k, _, v = arg.partition("=")
return k, v
class Icinga2ConfAction(argparse.Action): class Icinga2ConfAction(argparse.Action):
def __init__( def __init__(
self, self,
@ -203,12 +151,7 @@ class Icinga2ConfAction(argparse.Action):
help=help, help=help,
) )
def _format_actions( def _format_actions(self, param_prefix: str, parser: argparse.ArgumentParser, order: Optional[int] = None):
self,
param_prefix: str,
parser: argparse.ArgumentParser,
order: Optional[int] = None,
):
for action in parser._actions: for action in parser._actions:
if action.dest in [self.dest, "help", "version"] or isinstance( if action.dest in [self.dest, "help", "version"] or isinstance(
action, argparse._SubParsersAction action, argparse._SubParsersAction
@ -237,7 +180,7 @@ class Icinga2ConfAction(argparse.Action):
print(f' description = "{action.help}"') print(f' description = "{action.help}"')
if order is not None: if order is not None:
print(f" order = {order}") print(f' order = {order}')
print(" }") print(" }")
@ -254,15 +197,17 @@ class Icinga2ConfAction(argparse.Action):
choices = parser._subparsers._group_actions[0].choices choices = parser._subparsers._group_actions[0].choices
for subname, subparser in choices.items(): for subname, subparser in choices.items():
print(f'object CheckCommand "{command_name}_{subname}" {{') print(f'object CheckCommand "{command_name}_{subname}" {{')
print(f' command = [ PluginContribDir + "/{command_path}" ]') print(
f' command = [ PluginContribDir + "/{command_path}" ]'
)
print(" arguments = {") print(" arguments = {")
self._format_actions(command_name, parser, order=-2) self._format_actions(command_name, parser, order=-2)
print(f' "{subname}" = {{') print(f' "{subname}" = {{')
print(" set_if = true") print(' set_if = true')
print(" order = -1") print(' order = -1')
print(" }") print(' }')
self._format_actions(f"{command_name}_{subname}", subparser) self._format_actions(f"{command_name}_{subname}", subparser)
print(" }\n}") print(" }\n}")
@ -315,36 +260,16 @@ def main():
common_args.add_argument( common_args.add_argument(
"-f", "-f",
"--filter", "--filter",
type=AttributeFilter.from_str, type=key_value,
default=[], default=[],
nargs="*", nargs="*",
action="extend", help="filter by 'attribute=value' (may be specified multiple times)",
help="filter by 'attribute=value' (may be specified multiple times). Use != to negate the match",
)
common_args.add_argument(
"-i",
"--include",
default=[],
nargs="*",
action="extend",
help="explicitly include entities by id. Other entities will not be considered if this is specified. Listed entities must also match the specified device class",
)
common_args.add_argument(
"-e",
"--exclude",
default=[],
nargs="*",
action="extend",
help="exclude entities by id",
) )
common_args.add_argument( common_args.add_argument(
"--friendly", "--friendly",
action="store_true", action="store_true",
help="use friendly name, when available", help="use friendly name, when available",
) )
common_args.add_argument(
"--ignore-missing", action="store_true", help="Ignore missing attributes"
)
subparsers = argp.add_subparsers( subparsers = argp.add_subparsers(
title="Query type", required=True, dest="subparser_name" title="Query type", required=True, dest="subparser_name"
@ -401,34 +326,31 @@ def main():
) )
args = argp.parse_args() args = argp.parse_args()
parsed_common_args = {
"device_class": args.device_class,
"attribute": args.attribute,
"filters": args.filter,
"include": args.include,
"exclude": args.exclude,
"friendly_name": args.friendly,
"ignore_missing": args.ignore_missing,
}
if args.subparser_name == "scalar": if args.subparser_name == "scalar":
check = nagiosplugin.Check( check = nagiosplugin.Check(
Entities( Entities(
url=args.url, url=args.url,
token=args.token, token=args.token,
device_class=args.device_class,
numeric=True, numeric=True,
min=args.min, min=args.min,
max=args.max, max=args.max,
**parsed_common_args, filters=args.filter,
attribute=args.attribute,
friendly_name=args.friendly,
), ),
ScalarOrUnknownContext(args.device_class, args.warning, args.critical), nagiosplugin.ScalarContext(args.device_class, args.warning, args.critical),
) )
elif args.subparser_name == "text": elif args.subparser_name == "text":
check = nagiosplugin.Check( check = nagiosplugin.Check(
Entities( Entities(
url=args.url, url=args.url,
token=args.token, token=args.token,
device_class=args.device_class,
numeric=False, numeric=False,
**parsed_common_args, filters=args.filter,
attribute=args.attribute,
friendly_name=args.friendly,
), ),
RegexContext(args.device_class, args.ok, args.warning, args.critical), RegexContext(args.device_class, args.ok, args.warning, args.critical),
) )

View File

@ -1 +0,0 @@
[tool.black]