Compare commits

...

5 Commits

2 changed files with 77 additions and 23 deletions

View File

@ -4,15 +4,16 @@
import argparse
import dataclasses
import json
import logging
import re
import sys
from urllib.parse import urljoin
from typing import Optional
from pathlib import Path
from typing import Any
from urllib.parse import urljoin
import requests
import nagiosplugin
import requests
_log = logging.getLogger("nagiosplugin")
@ -60,9 +61,6 @@ class AttributeFilter:
else:
return cls(k, v, False)
def check(self, attributes: dict[str, str]) -> bool:
return self.negated ^ (attributes.get(self.attribute) == self.value)
@dataclasses.dataclass
class Entities(nagiosplugin.Resource):
@ -70,16 +68,18 @@ class Entities(nagiosplugin.Resource):
token: str
device_class: str
numeric: bool
area: str
label: str
filters: list[AttributeFilter]
attribute: Optional[str]
attribute: str | None
friendly_name: bool
ignore_missing: bool
include: list[str]
exclude: list[str]
min: Optional[float] = None
max: Optional[float] = None
min: float | None = None
max: float | None = None
def hass_get(self, endpoint: str) -> requests.Response:
def hass_get(self, endpoint: str) -> Any:
headers = {
"Authorization": "Bearer " + self.token,
"Content-Type": "application/json",
@ -91,6 +91,18 @@ class Entities(nagiosplugin.Resource):
return r.json()
def hass_post(self, endpoint: str, data: Any) -> Any:
headers = {
"Authorization": "Bearer " + self.token,
"Content-Type": "application/json",
}
r = requests.post(urljoin(self.url, endpoint), headers=headers, json=data)
if not r.ok:
raise Exception("Failed to query Home Assistant API: " + r.text)
return r.json()
def check_api(self):
message = self.hass_get("/api/").get("message")
if message != "API running.":
@ -136,14 +148,47 @@ class Entities(nagiosplugin.Resource):
)
def probe(self):
response = self.hass_get("/api/states")
template_filter = f"states|selectattr('attributes.device_class', 'eq', '{ self.device_class }')"
for f in self.filters:
op = "ne" if f.negated else "eq"
template_filter += (
f"|selectattr('attributes.{ f.attribute }', '{ op }', '{ f.value }')"
)
if self.area:
template_filter += (
f"|selectattr('entity_id', 'in', area_entities('{ self.area }'))"
)
if self.label:
template_filter += (
f"|selectattr('entity_id', 'in', label_entities('{ self.label }'))"
)
if self.exclude:
template_filter += (
f"|rejectattr('entity_id', 'in', { json.dumps(self.exclude) })"
)
if self.include:
template_filter = (
f"({template_filter}|list + "
f" states|selectattr('entity_id', 'in', { json.dumps(self.include) })|list"
")|unique(attribute='entity_id')"
)
template = (
"{% set ns = namespace(out=[]) %}\n"
f"{{% for s in { template_filter } %}}\n"
" {% set ns.out = ns.out + [{\n"
' "entity_id": s.entity_id,\n'
' "state": s.state,\n'
' "attributes": s.attributes,\n'
" }]\n"
" %}\n"
"{% endfor %}\n"
"{{ ns.out|tojson }}"
)
response = self.hass_post("/api/template", {"template": template})
for state in response:
if (
state["attributes"].get("device_class") == self.device_class
and all(filter.check(state["attributes"]) for filter in self.filters)
and (len(self.include) == 0 or state["entity_id"] in self.include)
and state["entity_id"] not in self.exclude
):
yield from self._state_to_metric(state)
@ -151,9 +196,9 @@ class RegexContext(nagiosplugin.Context):
def __init__(
self,
name: str,
ok: str = None,
warning: str = None,
critical: str = None,
ok: str | None = None,
warning: str | None = None,
critical: str | None = None,
fmt_metric=None,
result_cls=nagiosplugin.Result,
):
@ -207,7 +252,7 @@ class Icinga2ConfAction(argparse.Action):
self,
param_prefix: str,
parser: argparse.ArgumentParser,
order: Optional[int] = None,
order: int | None = None,
):
for action in parser._actions:
if action.dest in [self.dest, "help", "version"] or isinstance(
@ -329,6 +374,8 @@ def main():
action="extend",
help="explicitly include entities by id. Other entities will not be considered if this is specified. Listed entities must also match the specified device class",
)
common_args.add_argument("--area", type=str, help="area to filter by")
common_args.add_argument("--label", type=str, help="label to filter by")
common_args.add_argument(
"-e",
"--exclude",
@ -405,6 +452,8 @@ def main():
"device_class": args.device_class,
"attribute": args.attribute,
"filters": args.filter,
"area": args.area,
"label": args.label,
"include": args.include,
"exclude": args.exclude,
"friendly_name": args.friendly,

View File

@ -1 +1,6 @@
[tool.black]
[tool.ruff]
line-length = 88
[tool.ruff.lint]
select = ["E4", "E7", "E9", "F", "I", "C4", "UP", "PERF", "PL", "SIM"]