62 lines
1.4 KiB
Python
Executable File
62 lines
1.4 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
import include_homekit
|
|
import threading
|
|
|
|
from time import sleep
|
|
from homekit.config import config
|
|
from homekit.api import WebApiClient
|
|
from homekit.api.types import SoundSensorLocation
|
|
from typing import List, Tuple
|
|
|
|
interrupted = False
|
|
|
|
|
|
class HitCounter:
|
|
def __init__(self):
|
|
self.sensors = {}
|
|
self.lock = threading.Lock()
|
|
self._reset_sensors()
|
|
|
|
def _reset_sensors(self):
|
|
for loc in SoundSensorLocation:
|
|
self.sensors[loc.name.lower()] = 0
|
|
|
|
def add(self, name: str, hits: int):
|
|
if name not in self.sensors:
|
|
raise ValueError(f'sensor {name} not found')
|
|
|
|
with self.lock:
|
|
self.sensors[name] += hits
|
|
|
|
def get_all(self) -> List[Tuple[str, int]]:
|
|
vals = []
|
|
with self.lock:
|
|
for name, hits in self.sensors.items():
|
|
if hits > 0:
|
|
vals.append((name, hits))
|
|
self._reset_sensors()
|
|
return vals
|
|
|
|
|
|
def hits_sender():
|
|
while True:
|
|
try:
|
|
all_hits = hc.get_all()
|
|
if all_hits:
|
|
api.add_sound_sensor_hits(all_hits)
|
|
sleep(5)
|
|
except (KeyboardInterrupt, SystemExit):
|
|
return
|
|
|
|
|
|
if __name__ == '__main__':
|
|
config.load_app('test_api')
|
|
|
|
hc = HitCounter()
|
|
api = WebApiClient()
|
|
|
|
hc.add('spb1', 1)
|
|
# hc.add('big_house', 123)
|
|
|
|
hits_sender()
|