Compare commits
8 Commits
3e9d362805
...
97d07142d8
Author | SHA1 | Date | |
---|---|---|---|
97d07142d8 | |||
a0ad62c4f2 | |||
c7f1b9c7c9 | |||
8ce50398e9 | |||
5d4f9fb98b | |||
0c9eb910a6 | |||
780edf0423 | |||
82acfd3718 |
@ -17,15 +17,65 @@ import nagiosplugin
|
||||
_log = logging.getLogger("nagiosplugin")
|
||||
|
||||
|
||||
class ScalarOrUnknownContext(nagiosplugin.ScalarContext):
|
||||
"""
|
||||
Same as a :class:`nagiosplugin.ScalarContext`, but returns UNKNOWN when the
|
||||
value is not an int or float
|
||||
"""
|
||||
|
||||
def evaluate(self, metric: nagiosplugin.Metric, resource: nagiosplugin.Resource):
|
||||
if isinstance(metric.value, (int, float)):
|
||||
return super().evaluate(metric, resource)
|
||||
else:
|
||||
return self.result_cls(
|
||||
nagiosplugin.state.Unknown, "non-scalar value", metric
|
||||
)
|
||||
|
||||
def performance(self, metric: nagiosplugin.Metric, resource: nagiosplugin.Resource):
|
||||
if isinstance(metric.value, (int, float)):
|
||||
return super().performance(metric, resource)
|
||||
else:
|
||||
return nagiosplugin.Performance(
|
||||
metric.name,
|
||||
"U",
|
||||
metric.uom,
|
||||
self.warning,
|
||||
self.critical,
|
||||
metric.min,
|
||||
metric.max,
|
||||
)
|
||||
|
||||
|
||||
@dataclasses.dataclass
|
||||
class AttributeFilter:
|
||||
attribute: str
|
||||
value: str
|
||||
negated: bool
|
||||
|
||||
@classmethod
|
||||
def from_str(cls, arg: str):
|
||||
k, _, v = arg.partition("=")
|
||||
if k.endswith("!"):
|
||||
return cls(k.removesuffix("!"), v, True)
|
||||
else:
|
||||
return cls(k, v, False)
|
||||
|
||||
def check(self, attributes: dict[str, str]) -> bool:
|
||||
return self.negated ^ (attributes.get(self.attribute) == self.value)
|
||||
|
||||
|
||||
@dataclasses.dataclass
|
||||
class Entities(nagiosplugin.Resource):
|
||||
url: str
|
||||
token: str
|
||||
device_class: str
|
||||
numeric: bool
|
||||
filters: list[str]
|
||||
filters: list[AttributeFilter]
|
||||
attribute: Optional[str]
|
||||
friendly_name: bool
|
||||
ignore_missing: bool
|
||||
include: list[str]
|
||||
exclude: list[str]
|
||||
min: Optional[float] = None
|
||||
max: Optional[float] = None
|
||||
|
||||
@ -49,45 +99,52 @@ class Entities(nagiosplugin.Resource):
|
||||
else:
|
||||
print("OK: " + message)
|
||||
|
||||
def _state_to_metric(self, state):
|
||||
"""Convert a state into a metric"""
|
||||
if self.attribute is not None:
|
||||
if self.ignore_missing and self.attribute not in state["attributes"]:
|
||||
return
|
||||
else:
|
||||
value = state["attributes"].get(self.attribute, "missing attribute")
|
||||
uom = None
|
||||
else:
|
||||
if self.numeric:
|
||||
try:
|
||||
value = float(state["state"])
|
||||
except ValueError:
|
||||
value = state["state"]
|
||||
else:
|
||||
value = state["state"]
|
||||
uom = state["attributes"].get("unit_of_measurement")
|
||||
|
||||
if self.friendly_name:
|
||||
name = (
|
||||
state["attributes"]
|
||||
.get("friendly_name", state["entity_id"])
|
||||
.translate({ord(c): "_" for c in "'="})
|
||||
)
|
||||
else:
|
||||
name = state["entity_id"]
|
||||
|
||||
yield nagiosplugin.Metric(
|
||||
name,
|
||||
value,
|
||||
uom,
|
||||
context=self.device_class,
|
||||
min=self.min,
|
||||
max=self.max,
|
||||
)
|
||||
|
||||
def probe(self):
|
||||
response = self.hass_get("/api/states")
|
||||
for state in response:
|
||||
if state["attributes"].get("device_class") == self.device_class and all(
|
||||
state["attributes"].get(k) == v for k, v in self.filters
|
||||
if (
|
||||
state["attributes"].get("device_class") == self.device_class
|
||||
and all(filter.check(state["attributes"]) for filter in self.filters)
|
||||
and (len(self.include) == 0 or state["entity_id"] in self.include)
|
||||
and state["entity_id"] not in self.exclude
|
||||
):
|
||||
if self.attribute is not None:
|
||||
value = state["attributes"].get(
|
||||
self.attribute, -1 if self.numeric else "attribute missing"
|
||||
)
|
||||
uom = None
|
||||
else:
|
||||
if self.numeric:
|
||||
value = (
|
||||
float(state["state"]) if state["state"].isnumeric() else -1
|
||||
)
|
||||
else:
|
||||
value = state["state"]
|
||||
uom = state["attributes"].get("unit_of_measurement")
|
||||
|
||||
if self.friendly_name:
|
||||
name = (
|
||||
state["attributes"]
|
||||
.get("friendly_name", state["entity_id"])
|
||||
.translate({ord(c): "_" for c in "'="})
|
||||
)
|
||||
else:
|
||||
name = state["entity_id"]
|
||||
|
||||
if state["state"] == "unavailable":
|
||||
_log.info(f"{state['entity_id']} unavailable")
|
||||
yield nagiosplugin.Metric(
|
||||
name,
|
||||
value,
|
||||
uom,
|
||||
context=self.device_class,
|
||||
min=self.min,
|
||||
max=self.max,
|
||||
)
|
||||
yield from self._state_to_metric(state)
|
||||
|
||||
|
||||
class RegexContext(nagiosplugin.Context):
|
||||
@ -130,11 +187,6 @@ class RegexContext(nagiosplugin.Context):
|
||||
)
|
||||
|
||||
|
||||
def key_value(arg: str):
|
||||
k, _, v = arg.partition("=")
|
||||
return k, v
|
||||
|
||||
|
||||
class Icinga2ConfAction(argparse.Action):
|
||||
def __init__(
|
||||
self,
|
||||
@ -151,7 +203,12 @@ class Icinga2ConfAction(argparse.Action):
|
||||
help=help,
|
||||
)
|
||||
|
||||
def _format_actions(self, param_prefix: str, parser: argparse.ArgumentParser, order: Optional[int] = None):
|
||||
def _format_actions(
|
||||
self,
|
||||
param_prefix: str,
|
||||
parser: argparse.ArgumentParser,
|
||||
order: Optional[int] = None,
|
||||
):
|
||||
for action in parser._actions:
|
||||
if action.dest in [self.dest, "help", "version"] or isinstance(
|
||||
action, argparse._SubParsersAction
|
||||
@ -180,7 +237,7 @@ class Icinga2ConfAction(argparse.Action):
|
||||
print(f' description = "{action.help}"')
|
||||
|
||||
if order is not None:
|
||||
print(f' order = {order}')
|
||||
print(f" order = {order}")
|
||||
|
||||
print(" }")
|
||||
|
||||
@ -197,17 +254,15 @@ class Icinga2ConfAction(argparse.Action):
|
||||
choices = parser._subparsers._group_actions[0].choices
|
||||
for subname, subparser in choices.items():
|
||||
print(f'object CheckCommand "{command_name}_{subname}" {{')
|
||||
print(
|
||||
f' command = [ PluginContribDir + "/{command_path}" ]'
|
||||
)
|
||||
print(f' command = [ PluginContribDir + "/{command_path}" ]')
|
||||
|
||||
print(" arguments = {")
|
||||
|
||||
self._format_actions(command_name, parser, order=-2)
|
||||
print(f' "{subname}" = {{')
|
||||
print(' set_if = true')
|
||||
print(' order = -1')
|
||||
print(' }')
|
||||
print(" set_if = true")
|
||||
print(" order = -1")
|
||||
print(" }")
|
||||
self._format_actions(f"{command_name}_{subname}", subparser)
|
||||
|
||||
print(" }\n}")
|
||||
@ -260,16 +315,36 @@ def main():
|
||||
common_args.add_argument(
|
||||
"-f",
|
||||
"--filter",
|
||||
type=key_value,
|
||||
type=AttributeFilter.from_str,
|
||||
default=[],
|
||||
nargs="*",
|
||||
help="filter by 'attribute=value' (may be specified multiple times)",
|
||||
action="extend",
|
||||
help="filter by 'attribute=value' (may be specified multiple times). Use != to negate the match",
|
||||
)
|
||||
common_args.add_argument(
|
||||
"-i",
|
||||
"--include",
|
||||
default=[],
|
||||
nargs="*",
|
||||
action="extend",
|
||||
help="explicitly include entities by id. Other entities will not be considered if this is specified. Listed entities must also match the specified device class",
|
||||
)
|
||||
common_args.add_argument(
|
||||
"-e",
|
||||
"--exclude",
|
||||
default=[],
|
||||
nargs="*",
|
||||
action="extend",
|
||||
help="exclude entities by id",
|
||||
)
|
||||
common_args.add_argument(
|
||||
"--friendly",
|
||||
action="store_true",
|
||||
help="use friendly name, when available",
|
||||
)
|
||||
common_args.add_argument(
|
||||
"--ignore-missing", action="store_true", help="Ignore missing attributes"
|
||||
)
|
||||
|
||||
subparsers = argp.add_subparsers(
|
||||
title="Query type", required=True, dest="subparser_name"
|
||||
@ -326,31 +401,34 @@ def main():
|
||||
)
|
||||
|
||||
args = argp.parse_args()
|
||||
parsed_common_args = {
|
||||
"device_class": args.device_class,
|
||||
"attribute": args.attribute,
|
||||
"filters": args.filter,
|
||||
"include": args.include,
|
||||
"exclude": args.exclude,
|
||||
"friendly_name": args.friendly,
|
||||
"ignore_missing": args.ignore_missing,
|
||||
}
|
||||
if args.subparser_name == "scalar":
|
||||
check = nagiosplugin.Check(
|
||||
Entities(
|
||||
url=args.url,
|
||||
token=args.token,
|
||||
device_class=args.device_class,
|
||||
numeric=True,
|
||||
min=args.min,
|
||||
max=args.max,
|
||||
filters=args.filter,
|
||||
attribute=args.attribute,
|
||||
friendly_name=args.friendly,
|
||||
**parsed_common_args,
|
||||
),
|
||||
nagiosplugin.ScalarContext(args.device_class, args.warning, args.critical),
|
||||
ScalarOrUnknownContext(args.device_class, args.warning, args.critical),
|
||||
)
|
||||
elif args.subparser_name == "text":
|
||||
check = nagiosplugin.Check(
|
||||
Entities(
|
||||
url=args.url,
|
||||
token=args.token,
|
||||
device_class=args.device_class,
|
||||
numeric=False,
|
||||
filters=args.filter,
|
||||
attribute=args.attribute,
|
||||
friendly_name=args.friendly,
|
||||
**parsed_common_args,
|
||||
),
|
||||
RegexContext(args.device_class, args.ok, args.warning, args.critical),
|
||||
)
|
||||
|
1
pyproject.toml
Normal file
1
pyproject.toml
Normal file
@ -0,0 +1 @@
|
||||
[tool.black]
|
Loading…
Reference in New Issue
Block a user