Compare commits

...

5 Commits

2 changed files with 77 additions and 23 deletions

View File

@ -4,15 +4,16 @@
import argparse import argparse
import dataclasses import dataclasses
import json
import logging import logging
import re import re
import sys import sys
from urllib.parse import urljoin
from typing import Optional
from pathlib import Path from pathlib import Path
from typing import Any
from urllib.parse import urljoin
import requests
import nagiosplugin import nagiosplugin
import requests
_log = logging.getLogger("nagiosplugin") _log = logging.getLogger("nagiosplugin")
@ -60,9 +61,6 @@ class AttributeFilter:
else: else:
return cls(k, v, False) return cls(k, v, False)
def check(self, attributes: dict[str, str]) -> bool:
return self.negated ^ (attributes.get(self.attribute) == self.value)
@dataclasses.dataclass @dataclasses.dataclass
class Entities(nagiosplugin.Resource): class Entities(nagiosplugin.Resource):
@ -70,16 +68,18 @@ class Entities(nagiosplugin.Resource):
token: str token: str
device_class: str device_class: str
numeric: bool numeric: bool
area: str
label: str
filters: list[AttributeFilter] filters: list[AttributeFilter]
attribute: Optional[str] attribute: str | None
friendly_name: bool friendly_name: bool
ignore_missing: bool ignore_missing: bool
include: list[str] include: list[str]
exclude: list[str] exclude: list[str]
min: Optional[float] = None min: float | None = None
max: Optional[float] = None max: float | None = None
def hass_get(self, endpoint: str) -> requests.Response: def hass_get(self, endpoint: str) -> Any:
headers = { headers = {
"Authorization": "Bearer " + self.token, "Authorization": "Bearer " + self.token,
"Content-Type": "application/json", "Content-Type": "application/json",
@ -91,6 +91,18 @@ class Entities(nagiosplugin.Resource):
return r.json() return r.json()
def hass_post(self, endpoint: str, data: Any) -> Any:
headers = {
"Authorization": "Bearer " + self.token,
"Content-Type": "application/json",
}
r = requests.post(urljoin(self.url, endpoint), headers=headers, json=data)
if not r.ok:
raise Exception("Failed to query Home Assistant API: " + r.text)
return r.json()
def check_api(self): def check_api(self):
message = self.hass_get("/api/").get("message") message = self.hass_get("/api/").get("message")
if message != "API running.": if message != "API running.":
@ -136,24 +148,57 @@ class Entities(nagiosplugin.Resource):
) )
def probe(self): def probe(self):
response = self.hass_get("/api/states") template_filter = f"states|selectattr('attributes.device_class', 'eq', '{ self.device_class }')"
for f in self.filters:
op = "ne" if f.negated else "eq"
template_filter += (
f"|selectattr('attributes.{ f.attribute }', '{ op }', '{ f.value }')"
)
if self.area:
template_filter += (
f"|selectattr('entity_id', 'in', area_entities('{ self.area }'))"
)
if self.label:
template_filter += (
f"|selectattr('entity_id', 'in', label_entities('{ self.label }'))"
)
if self.exclude:
template_filter += (
f"|rejectattr('entity_id', 'in', { json.dumps(self.exclude) })"
)
if self.include:
template_filter = (
f"({template_filter}|list + "
f" states|selectattr('entity_id', 'in', { json.dumps(self.include) })|list"
")|unique(attribute='entity_id')"
)
template = (
"{% set ns = namespace(out=[]) %}\n"
f"{{% for s in { template_filter } %}}\n"
" {% set ns.out = ns.out + [{\n"
' "entity_id": s.entity_id,\n'
' "state": s.state,\n'
' "attributes": s.attributes,\n'
" }]\n"
" %}\n"
"{% endfor %}\n"
"{{ ns.out|tojson }}"
)
response = self.hass_post("/api/template", {"template": template})
for state in response: for state in response:
if ( yield from self._state_to_metric(state)
state["attributes"].get("device_class") == self.device_class
and all(filter.check(state["attributes"]) for filter in self.filters)
and (len(self.include) == 0 or state["entity_id"] in self.include)
and state["entity_id"] not in self.exclude
):
yield from self._state_to_metric(state)
class RegexContext(nagiosplugin.Context): class RegexContext(nagiosplugin.Context):
def __init__( def __init__(
self, self,
name: str, name: str,
ok: str = None, ok: str | None = None,
warning: str = None, warning: str | None = None,
critical: str = None, critical: str | None = None,
fmt_metric=None, fmt_metric=None,
result_cls=nagiosplugin.Result, result_cls=nagiosplugin.Result,
): ):
@ -207,7 +252,7 @@ class Icinga2ConfAction(argparse.Action):
self, self,
param_prefix: str, param_prefix: str,
parser: argparse.ArgumentParser, parser: argparse.ArgumentParser,
order: Optional[int] = None, order: int | None = None,
): ):
for action in parser._actions: for action in parser._actions:
if action.dest in [self.dest, "help", "version"] or isinstance( if action.dest in [self.dest, "help", "version"] or isinstance(
@ -329,6 +374,8 @@ def main():
action="extend", action="extend",
help="explicitly include entities by id. Other entities will not be considered if this is specified. Listed entities must also match the specified device class", help="explicitly include entities by id. Other entities will not be considered if this is specified. Listed entities must also match the specified device class",
) )
common_args.add_argument("--area", type=str, help="area to filter by")
common_args.add_argument("--label", type=str, help="label to filter by")
common_args.add_argument( common_args.add_argument(
"-e", "-e",
"--exclude", "--exclude",
@ -405,6 +452,8 @@ def main():
"device_class": args.device_class, "device_class": args.device_class,
"attribute": args.attribute, "attribute": args.attribute,
"filters": args.filter, "filters": args.filter,
"area": args.area,
"label": args.label,
"include": args.include, "include": args.include,
"exclude": args.exclude, "exclude": args.exclude,
"friendly_name": args.friendly, "friendly_name": args.friendly,

View File

@ -1 +1,6 @@
[tool.black] [tool.ruff]
line-length = 88
[tool.ruff.lint]
select = ["E4", "E7", "E9", "F", "I", "C4", "UP", "PERF", "PL", "SIM"]