Compare commits

...

8 Commits

2 changed files with 139 additions and 60 deletions

View File

@ -17,15 +17,65 @@ import nagiosplugin
_log = logging.getLogger("nagiosplugin")
class ScalarOrUnknownContext(nagiosplugin.ScalarContext):
"""
Same as a :class:`nagiosplugin.ScalarContext`, but returns UNKNOWN when the
value is not an int or float
"""
def evaluate(self, metric: nagiosplugin.Metric, resource: nagiosplugin.Resource):
if isinstance(metric.value, (int, float)):
return super().evaluate(metric, resource)
else:
return self.result_cls(
nagiosplugin.state.Unknown, "non-scalar value", metric
)
def performance(self, metric: nagiosplugin.Metric, resource: nagiosplugin.Resource):
if isinstance(metric.value, (int, float)):
return super().performance(metric, resource)
else:
return nagiosplugin.Performance(
metric.name,
"U",
metric.uom,
self.warning,
self.critical,
metric.min,
metric.max,
)
@dataclasses.dataclass
class AttributeFilter:
attribute: str
value: str
negated: bool
@classmethod
def from_str(cls, arg: str):
k, _, v = arg.partition("=")
if k.endswith("!"):
return cls(k.removesuffix("!"), v, True)
else:
return cls(k, v, False)
def check(self, attributes: dict[str, str]) -> bool:
return self.negated ^ (attributes.get(self.attribute) == self.value)
@dataclasses.dataclass
class Entities(nagiosplugin.Resource):
url: str
token: str
device_class: str
numeric: bool
filters: list[str]
filters: list[AttributeFilter]
attribute: Optional[str]
friendly_name: bool
ignore_missing: bool
include: list[str]
exclude: list[str]
min: Optional[float] = None
max: Optional[float] = None
@ -49,45 +99,52 @@ class Entities(nagiosplugin.Resource):
else:
print("OK: " + message)
def _state_to_metric(self, state):
"""Convert a state into a metric"""
if self.attribute is not None:
if self.ignore_missing and self.attribute not in state["attributes"]:
return
else:
value = state["attributes"].get(self.attribute, "missing attribute")
uom = None
else:
if self.numeric:
try:
value = float(state["state"])
except ValueError:
value = state["state"]
else:
value = state["state"]
uom = state["attributes"].get("unit_of_measurement")
if self.friendly_name:
name = (
state["attributes"]
.get("friendly_name", state["entity_id"])
.translate({ord(c): "_" for c in "'="})
)
else:
name = state["entity_id"]
yield nagiosplugin.Metric(
name,
value,
uom,
context=self.device_class,
min=self.min,
max=self.max,
)
def probe(self):
response = self.hass_get("/api/states")
for state in response:
if state["attributes"].get("device_class") == self.device_class and all(
state["attributes"].get(k) == v for k, v in self.filters
if (
state["attributes"].get("device_class") == self.device_class
and all(filter.check(state["attributes"]) for filter in self.filters)
and (len(self.include) == 0 or state["entity_id"] in self.include)
and state["entity_id"] not in self.exclude
):
if self.attribute is not None:
value = state["attributes"].get(
self.attribute, -1 if self.numeric else "attribute missing"
)
uom = None
else:
if self.numeric:
value = (
float(state["state"]) if state["state"].isnumeric() else -1
)
else:
value = state["state"]
uom = state["attributes"].get("unit_of_measurement")
if self.friendly_name:
name = (
state["attributes"]
.get("friendly_name", state["entity_id"])
.translate({ord(c): "_" for c in "'="})
)
else:
name = state["entity_id"]
if state["state"] == "unavailable":
_log.info(f"{state['entity_id']} unavailable")
yield nagiosplugin.Metric(
name,
value,
uom,
context=self.device_class,
min=self.min,
max=self.max,
)
yield from self._state_to_metric(state)
class RegexContext(nagiosplugin.Context):
@ -130,11 +187,6 @@ class RegexContext(nagiosplugin.Context):
)
def key_value(arg: str):
k, _, v = arg.partition("=")
return k, v
class Icinga2ConfAction(argparse.Action):
def __init__(
self,
@ -151,7 +203,12 @@ class Icinga2ConfAction(argparse.Action):
help=help,
)
def _format_actions(self, param_prefix: str, parser: argparse.ArgumentParser, order: Optional[int] = None):
def _format_actions(
self,
param_prefix: str,
parser: argparse.ArgumentParser,
order: Optional[int] = None,
):
for action in parser._actions:
if action.dest in [self.dest, "help", "version"] or isinstance(
action, argparse._SubParsersAction
@ -180,7 +237,7 @@ class Icinga2ConfAction(argparse.Action):
print(f' description = "{action.help}"')
if order is not None:
print(f' order = {order}')
print(f" order = {order}")
print(" }")
@ -197,17 +254,15 @@ class Icinga2ConfAction(argparse.Action):
choices = parser._subparsers._group_actions[0].choices
for subname, subparser in choices.items():
print(f'object CheckCommand "{command_name}_{subname}" {{')
print(
f' command = [ PluginContribDir + "/{command_path}" ]'
)
print(f' command = [ PluginContribDir + "/{command_path}" ]')
print(" arguments = {")
self._format_actions(command_name, parser, order=-2)
print(f' "{subname}" = {{')
print(' set_if = true')
print(' order = -1')
print(' }')
print(" set_if = true")
print(" order = -1")
print(" }")
self._format_actions(f"{command_name}_{subname}", subparser)
print(" }\n}")
@ -260,16 +315,36 @@ def main():
common_args.add_argument(
"-f",
"--filter",
type=key_value,
type=AttributeFilter.from_str,
default=[],
nargs="*",
help="filter by 'attribute=value' (may be specified multiple times)",
action="extend",
help="filter by 'attribute=value' (may be specified multiple times). Use != to negate the match",
)
common_args.add_argument(
"-i",
"--include",
default=[],
nargs="*",
action="extend",
help="explicitly include entities by id. Other entities will not be considered if this is specified. Listed entities must also match the specified device class",
)
common_args.add_argument(
"-e",
"--exclude",
default=[],
nargs="*",
action="extend",
help="exclude entities by id",
)
common_args.add_argument(
"--friendly",
action="store_true",
help="use friendly name, when available",
)
common_args.add_argument(
"--ignore-missing", action="store_true", help="Ignore missing attributes"
)
subparsers = argp.add_subparsers(
title="Query type", required=True, dest="subparser_name"
@ -326,31 +401,34 @@ def main():
)
args = argp.parse_args()
parsed_common_args = {
"device_class": args.device_class,
"attribute": args.attribute,
"filters": args.filter,
"include": args.include,
"exclude": args.exclude,
"friendly_name": args.friendly,
"ignore_missing": args.ignore_missing,
}
if args.subparser_name == "scalar":
check = nagiosplugin.Check(
Entities(
url=args.url,
token=args.token,
device_class=args.device_class,
numeric=True,
min=args.min,
max=args.max,
filters=args.filter,
attribute=args.attribute,
friendly_name=args.friendly,
**parsed_common_args,
),
nagiosplugin.ScalarContext(args.device_class, args.warning, args.critical),
ScalarOrUnknownContext(args.device_class, args.warning, args.critical),
)
elif args.subparser_name == "text":
check = nagiosplugin.Check(
Entities(
url=args.url,
token=args.token,
device_class=args.device_class,
numeric=False,
filters=args.filter,
attribute=args.attribute,
friendly_name=args.friendly,
**parsed_common_args,
),
RegexContext(args.device_class, args.ok, args.warning, args.critical),
)

1
pyproject.toml Normal file
View File

@ -0,0 +1 @@
[tool.black]