Use a template to filter entities server-side

This commit is contained in:
Adam Goldsmith 2024-06-20 00:54:28 -04:00
parent bac092e430
commit 5a4b04ffac

View File

@ -4,6 +4,7 @@
import argparse
import dataclasses
import json
import logging
import re
import sys
@ -60,9 +61,6 @@ class AttributeFilter:
else:
return cls(k, v, False)
def check(self, attributes: dict[str, str]) -> bool:
return self.negated ^ (attributes.get(self.attribute) == self.value)
@dataclasses.dataclass
class Entities(nagiosplugin.Resource):
@ -91,6 +89,18 @@ class Entities(nagiosplugin.Resource):
return r.json()
def hass_post(self, endpoint: str, data: Any) -> Any:
headers = {
"Authorization": "Bearer " + self.token,
"Content-Type": "application/json",
}
r = requests.post(urljoin(self.url, endpoint), headers=headers, json=data)
if not r.ok:
raise Exception("Failed to query Home Assistant API: " + r.text)
return r.json()
def check_api(self):
message = self.hass_get("/api/").get("message")
if message != "API running.":
@ -136,14 +146,37 @@ class Entities(nagiosplugin.Resource):
)
def probe(self):
response = self.hass_get("/api/states")
template_filter = f"states|selectattr('attributes.device_class', 'eq', '{ self.device_class }')"
for f in self.filters:
op = "ne" if f.negated else "eq"
template_filter += (
f"|selectattr('attributes.{ f.attribute }', '{ op }', '{ f.value }')"
)
if self.exclude:
template_filter += (
f"|rejectattr('entity_id', 'in', { json.dumps(self.exclude) })"
)
if self.include:
template_filter = (
f"({template_filter}|list + "
f" states|selectattr('entity_id', 'in', { json.dumps(self.include) })|list"
")|unique(attribute='entity_id')"
)
template = (
"{% set ns = namespace(out=[]) %}\n"
f"{{% for s in { template_filter } %}}\n"
" {% set ns.out = ns.out + [{\n"
' "entity_id": s.entity_id,\n'
' "state": s.state,\n'
' "attributes": s.attributes,\n'
" }]\n"
" %}\n"
"{% endfor %}\n"
"{{ ns.out|tojson }}"
)
response = self.hass_post("/api/template", {"template": template})
for state in response:
if (
state["attributes"].get("device_class") == self.device_class
and all(filter.check(state["attributes"]) for filter in self.filters)
and (len(self.include) == 0 or state["entity_id"] in self.include)
and state["entity_id"] not in self.exclude
):
yield from self._state_to_metric(state)