This commit is contained in:
Evgeny Zinoviev 2023-06-08 01:38:58 +03:00
parent 5aad97192d
commit 3ae1c3b5a7
17 changed files with 185 additions and 162 deletions

View File

@ -14,7 +14,7 @@ apscheduler~=3.9.1
psutil~=5.9.1
aioshutil~=1.1
scikit-image~=0.19.3
cerberus~=1.3.4
# following can be installed from debian repositories
# matplotlib~=3.5.0

View File

@ -0,0 +1,46 @@
import logging
import inspect
from cerberus import Validator, DocumentError
__all__ = [
'linux_boards_validator'
]
_logger = logging.getLogger(__name__)
def validate(schema, data):
v = Validator(schema)
if not v.validate(data):
frame = inspect.currentframe().f_back
caller_name = frame.f_code.co_name
raise DocumentError(f'{caller_name}: failed to validate data: ' + v.errors)
def linux_boards_validator(data) -> None:
validate({
'type': 'dict',
'valuesrules': {
'type': 'dict',
'schema': {
'mdns': {'type': 'string', 'required': True},
'board': {'type': 'string', 'required': True},
'network': {'type': 'list', 'required': True, 'empty': False},
'ram': {'type': 'integer', 'required': True},
'ext_hdd': {
'type': 'list',
'schema': {
'type': 'dict',
'schema': {
'mountpoint': {'type': 'string', 'required': True},
'size': {'type': 'integer', 'required': True}
}
},
},
'services': {'type': 'list', 'empty': False},
'online': {'type': 'boolean', 'required': True}
}
}
}, data)

View File

@ -7,39 +7,50 @@ from os.path import join, isdir, isfile
from typing import Optional, Any, MutableMapping
from argparse import ArgumentParser
from ..util import parse_addr
import _validators as validators
_validators = {}
def _get_validator(name: str) -> Optional[callable]:
if hasattr(validators, f'{name}_validator'):
return getattr(validators, f'{name}_validator')
if name in _validators:
return _validators[name]
return None
def add_validator(name: str, f: callable):
_validators[name] = f
def _get_config_path(name: str) -> str:
formats = ['toml', 'yaml']
dirname = join(os.environ['HOME'], '.config', name)
dirnames = [
join(os.environ['HOME'], '.config', 'homekit'),
'/etc/homekit'
]
if isdir(dirname):
for fmt in formats:
filename = join(dirname, f'config.{fmt}')
if isfile(filename):
return filename
for dirname in dirnames:
if isdir(dirname):
for fmt in formats:
filename = join(dirname, f'{name}.{fmt}')
if isfile(filename):
return filename
raise IOError(f'config not found in {dirname}')
else:
filenames = [join(os.environ['HOME'], '.config', f'{name}.{format}') for format in formats]
for file in filenames:
if isfile(file):
return file
raise IOError(f'config not found')
raise IOError(f'config \'{name}\' not found')
class ConfigStore:
class SingleConfig:
data: MutableMapping[str, Any]
app_name: Optional[str]
def __init__(self):
self.data = {}
self.app_name = None
def load(self, name: Optional[str] = None,
def load(self,
name: Optional[str] = None,
use_cli=True,
parser: ArgumentParser = None):
self.app_name = name
@ -126,6 +137,16 @@ class ConfigStore:
return self.data.items()
class Config:
app_name: Optional[str]
def __init__(self):
self.app_name = None
config = ConfigStore()

View File

@ -14,13 +14,17 @@ class MqttNode:
_modules: List[MqttModule]
_module_subscriptions: dict[str, MqttModule]
_node_id: str
_node_secret: str
_payload_callbacks: list[callable]
_wrapper: Optional[MqttWrapper]
def __init__(self, node_id: str):
def __init__(self,
node_id: str,
node_secret: Optional[str] = None):
self._modules = []
self._module_subscriptions = {}
self._node_id = node_id
self._node_secret = node_secret
self._payload_callbacks = []
self._logger = logging.getLogger(self.__class__.__name__)
self._wrapper = None
@ -42,7 +46,7 @@ class MqttNode:
payload = self._module_subscriptions[topic].handle_payload(self, topic, payload)
if isinstance(payload, MqttPayload):
for f in self._payload_callbacks:
f(payload)
f(self, payload)
def load_module(self, module_name: str, *args, **kwargs) -> MqttModule:
module = importlib.import_module(f'..module.{module_name}', __name__)
@ -78,3 +82,11 @@ class MqttNode:
@property
def id(self) -> str:
return self._node_id
@property
def secret(self) -> str:
return self._node_secret
@secret.setter
def secret(self, secret: str) -> None:
self._node_secret = secret

View File

@ -9,11 +9,15 @@ from ..util import strgen
class MqttWrapper(Mqtt):
_nodes: list[MqttNode]
def __init__(self, topic_prefix='hk', randomize_client_id=False):
def __init__(self,
topic_prefix='hk',
randomize_client_id=False,
clean_session=True):
client_id = config['mqtt']['client_id']
if randomize_client_id:
client_id += '_'+strgen(6)
super().__init__(clean_session=True, client_id=client_id)
super().__init__(clean_session=clean_session,
client_id=client_id)
self._nodes = []
self._topic_prefix = topic_prefix

View File

@ -41,7 +41,7 @@ class OtaPayload(MqttPayload):
class MqttOtaModule(MqttModule):
_ota_request: Optional[tuple[str, str, int]]
_ota_request: Optional[tuple[str, int]]
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
@ -52,9 +52,9 @@ class MqttOtaModule(MqttModule):
mqtt.subscribe_module("otares", self)
if self._ota_request is not None:
secret, filename, qos = self._ota_request
filename, qos = self._ota_request
self._ota_request = None
self.do_push_ota(secret, filename, qos)
self.do_push_ota(self._mqtt_node_ref.secret, filename, qos)
def handle_payload(self, mqtt: MqttNode, topic: str, payload: bytes) -> Optional[MqttPayload]:
if topic == 'otares':
@ -69,10 +69,9 @@ class MqttOtaModule(MqttModule):
qos=qos)
def push_ota(self,
secret: str,
filename: str,
qos: int):
if not self._initialized:
self._ota_request = (secret, filename, qos)
self._ota_request = (filename, qos)
else:
self.do_push_ota(secret, filename, qos)
self.do_push_ota(filename, qos)

View File

@ -64,9 +64,9 @@ class MqttRelayModule(MqttModule):
mqtt.subscribe_module('relay/status', self)
def switchpower(self,
enable: bool,
secret: str):
payload = MqttPowerSwitchPayload(secret=secret, state=enable)
enable: bool):
payload = MqttPowerSwitchPayload(secret=self._mqtt_node_ref.secret,
state=enable)
self._mqtt_node_ref.publish('relay/switch', payload=payload.pack())
def handle_payload(self, mqtt: MqttNode, topic: str, payload: bytes) -> Optional[MqttPayload]:

View File

@ -48,6 +48,7 @@ class MqttTemphumDataPayload(MqttPayload):
class MqttTempHumModule(MqttModule):
def __init__(self,
sensor: Optional[BaseSensor] = None,
write_to_database=False,
*args, **kwargs):
if sensor is not None:
kwargs['tick_interval'] = 10

View File

@ -11,7 +11,7 @@ if __name__ == '__main__':
config.load('inverter_mqtt_util', parser=parser)
arg = parser.parse_args()
mqtt = MqttWrapper()
mqtt = MqttWrapper(clean_session=arg.mode[0] != 'receiver')
node = MqttNode(node_id='inverter')
module_kwargs = {}
if arg.mode[0] == 'sender':

View File

@ -30,7 +30,7 @@ if __name__ == '__main__':
raise ArgumentError(None, '--relay is only allowed when \'relay\' module included in --modules')
mqtt = MqttWrapper(randomize_client_id=True)
mqtt_node = MqttNode(node_id=arg.node_id)
mqtt_node = MqttNode(node_id=arg.node_id, node_secret=arg.node_secret)
mqtt.add_node(mqtt_node)
@ -44,9 +44,7 @@ if __name__ == '__main__':
if m == 'relay' and arg.switch_relay is not None:
if not arg.node_secret:
raise ArgumentError(None, '--switch-relay requires --node-secret')
module_instance.switchpower(mqtt_node,
arg.switch_relay == 1,
arg.node_secret)
module_instance.switchpower(arg.switch_relay == 1)
mqtt.configure_tls()
try:
@ -58,7 +56,7 @@ if __name__ == '__main__':
if not arg.node_secret:
raise ArgumentError(None, 'pushing OTA requires --node-secret')
ota_module.push_ota(arg.node_secret, arg.push_ota, 1)
ota_module.push_ota(arg.push_ota, 1)
while True:
sleep(0.1)

View File

@ -208,7 +208,7 @@ def markup(ctx: Optional[bot.Context]) -> Optional[ReplyKeyboardMarkup]:
return ReplyKeyboardMarkup(buttons, one_time_keyboard=False)
def mqtt_payload_callback(payload: MqttPayload):
def mqtt_payload_callback(mqtt_node: MqttNode, payload: MqttPayload):
global watering_mcu_status
types_the_node_can_send = (

View File

@ -6,10 +6,9 @@ from functools import partial
from home.config import config
from home.telegram import bot
from home.mqtt import MqttRelay, MqttRelayState
from home.mqtt.esp import MqttEspDevice
from home.mqtt.payload import MqttPayload
from home.mqtt.payload.relay import InitialDiagnosticsPayload, DiagnosticsPayload
from home.mqtt import MqttPayload, MqttNode, MqttWrapper
from home.mqtt.module.relay import MqttRelayModule, MqttRelayState
from home.mqtt.module.diagnostics import InitialDiagnosticsPayload, DiagnosticsPayload
config.load('relay_mqtt_bot')
@ -36,7 +35,9 @@ status_emoji = {
}
mqtt_relay: Optional[MqttRelay] = None
# mqtt_relay: Optional[MqttRelayModule] = None
mqtt: Optional[MqttWrapper] = None
relay_nodes: dict[str, MqttRelayModule] = {}
relay_states: dict[str, MqttRelayState] = {}
@ -45,23 +46,24 @@ class UserAction(Enum):
OFF = 'off'
def on_mqtt_message(home_id, message: MqttPayload):
def on_mqtt_message(node: MqttNode,
message: MqttPayload):
if isinstance(message, InitialDiagnosticsPayload) or isinstance(message, DiagnosticsPayload):
kwargs = dict(rssi=message.rssi, enabled=message.flags.state)
if isinstance(message, InitialDiagnosticsPayload):
kwargs['fw_version'] = message.fw_version
if home_id not in relay_states:
relay_states[home_id] = MqttRelayState()
relay_states[home_id].update(**kwargs)
if node.id not in relay_states:
relay_states[node.id] = MqttRelayState()
relay_states[node.id].update(**kwargs)
def enable_handler(home_id: str, ctx: bot.Context) -> None:
mqtt_relay.set_power(home_id, True)
def enable_handler(node_id: str, ctx: bot.Context) -> None:
relay_nodes[node_id].switchpower(True)
ctx.reply(ctx.lang('done'))
def disable_handler(home_id: str, ctx: bot.Context) -> None:
mqtt_relay.set_power(home_id, False)
def disable_handler(node_id: str, ctx: bot.Context) -> None:
relay_nodes[node_id].switchpower(False)
ctx.reply(ctx.lang('done'))
@ -88,9 +90,13 @@ def markup(ctx: Optional[bot.Context]) -> Optional[ReplyKeyboardMarkup]:
if __name__ == '__main__':
devices = []
mqtt = MqttWrapper()
for device_id, data in config['relays'].items():
devices.append(MqttEspDevice(id=device_id,
secret=data['secret']))
mqtt_node = MqttNode(node_id=device_id, node_secret=data['secret'])
relay_nodes[device_id] = mqtt_node.load_module('relay')
mqtt_node.add_payload_callback(on_mqtt_message)
mqtt.add_node(mqtt_node)
labels = data['labels']
bot.lang.ru(**{device_id: labels['ru']})
bot.lang.en(**{device_id: labels['en']})
@ -103,12 +109,9 @@ if __name__ == '__main__':
messages.append(f'{type_emoji}{status_emoji[action.value]} {labels[_lang]}')
bot.handler(texts=messages)(partial(enable_handler if action == UserAction.ON else disable_handler, device_id))
mqtt_relay = MqttRelay(devices=devices)
mqtt_relay.set_message_callback(on_mqtt_message)
mqtt_relay.configure_tls()
mqtt_relay.connect_and_loop(loop_forever=False)
mqtt.configure_tls()
mqtt.connect_and_loop(loop_forever=False)
# bot.enable_logging(BotType.RELAY_MQTT)
bot.run(start_handler=start)
mqtt_relay.disconnect()
mqtt.disconnect()

View File

@ -1,16 +1,19 @@
#!/usr/bin/env python3
from home import http
from home.config import config
from home.mqtt import MqttRelay, MqttRelayState
from home.mqtt import MqttPayload
from home.mqtt import MqttPayload, MqttWrapper, MqttNode
from home.mqtt.module.relay import MqttRelayState, MqttRelayModule
from home.mqtt.module.diagnostics import InitialDiagnosticsPayload, DiagnosticsPayload
from typing import Optional
mqtt_relay: Optional[MqttRelay] = None
mqtt: Optional[MqttWrapper] = None
mqtt_nodes: dict[str, MqttNode] = {}
relay_modules: dict[str, MqttRelayModule] = {}
relay_states: dict[str, MqttRelayState] = {}
def on_mqtt_message(device_id, message: MqttPayload):
def on_mqtt_message(node: MqttNode,
message: MqttPayload):
if isinstance(message, InitialDiagnosticsPayload) or isinstance(message, DiagnosticsPayload):
kwargs = dict(rssi=message.rssi, enabled=message.flags.state)
if device_id not in relay_states:
@ -28,17 +31,22 @@ class RelayMqttHttpProxy(http.HTTPServer):
async def _relay_on_off(self,
enable: Optional[bool],
req: http.Request):
device_id = req.match_info['id']
device_secret = req.query['secret']
node_id = req.match_info['id']
node_secret = req.query['secret']
node = mqtt_nodes[node_id]
relay_module = relay_modules[node_id]
if enable is None:
if device_id in relay_states and relay_states[device_id].ever_updated:
cur_state = relay_states[device_id].enabled
if node_id in relay_states and relay_states[node_id].ever_updated:
cur_state = relay_states[node_id].enabled
else:
cur_state = False
enable = not cur_state
mqtt_relay.set_power(device_id, enable, device_secret)
if not node.secret:
node.secret = node_secret
relay_module.switchpower(enable)
return self.ok()
async def relay_on(self, req: http.Request):
@ -54,13 +62,20 @@ class RelayMqttHttpProxy(http.HTTPServer):
if __name__ == '__main__':
config.load('relay_mqtt_http_proxy')
mqtt_relay = MqttRelay(devices=[MqttEspDevice(id=device_id) for device_id in config.get('relay.devices')])
mqtt_relay.configure_tls()
mqtt_relay.set_message_callback(on_mqtt_message)
mqtt_relay.connect_and_loop(loop_forever=False)
mqtt = MqttWrapper()
for device_id, data in config['relays'].items():
mqtt_node = MqttNode(node_id=device_id)
relay_modules[device_id] = mqtt_node.load_module('relay')
mqtt_nodes[device_id] = mqtt_node
mqtt_node.add_payload_callback(on_mqtt_message)
mqtt.add_node(mqtt_node)
mqtt_node.add_payload_callback(on_mqtt_message)
mqtt.configure_tls()
mqtt.connect_and_loop(loop_forever=False)
proxy = RelayMqttHttpProxy(config.get_addr('server.listen'))
try:
proxy.run()
except KeyboardInterrupt:
mqtt_relay.disconnect()
mqtt.disconnect()

View File

@ -1,58 +0,0 @@
#!/usr/bin/env python3
import time
import json
from home.util import parse_addr, MySimpleSocketClient
from home.mqtt import Mqtt, poll_tick
from home.mqtt.payload.sensors import Temperature
from home.config import config
class MqttClient(Mqtt):
def __init__(self):
super().__init__(self)
self._home_id = config['mqtt']['home_id']
def poll(self):
freq = int(config['mqtt']['sensors']['poll_freq'])
self._logger.debug(f'freq={freq}')
g = poll_tick(freq)
while True:
time.sleep(next(g))
for k, v in config['mqtt']['sensors']['si7021'].items():
host, port = parse_addr(v['addr'])
self.publish_si7021(host, port, k)
def publish_si7021(self, host: str, port: int, name: str):
self._logger.debug(f"publish_si7021/{name}: {host}:{port}")
try:
now = time.time()
socket = MySimpleSocketClient(host, port)
socket.write('read')
response = json.loads(socket.read().strip())
temp = response['temp']
humidity = response['humidity']
self._logger.debug(f'publish_si7021/{name}: temp={temp} humidity={humidity}')
pld = Temperature(time=round(now),
temp=temp,
rh=humidity)
self._client.publish(f'hk/{self._home_id}/si7021/{name}',
payload=pld.pack(),
qos=1)
except Exception as e:
self._logger.exception(e)
if __name__ == '__main__':
config.load('sensors_mqtt_sender')
client = MqttClient()
client.configure_tls()
client.connect_and_loop(loop_forever=False)
client.poll()

View File

@ -2,18 +2,8 @@
import paho.mqtt.client as mqtt
import re
from home.mqtt import Mqtt
from home.config import config
from home.mqtt.payload.sensors import Temperature
from home.api.types import TemperatureSensorLocation
from home.database import SensorsDatabase
def get_sensor_type(sensor: str) -> TemperatureSensorLocation:
for item in TemperatureSensorLocation:
if sensor == item.name.lower():
return item
raise ValueError(f'unexpected sensor value: {sensor}')
from home.mqtt import MqttWrapper, MqttNode
class MqttServer(Mqtt):
@ -47,7 +37,12 @@ class MqttServer(Mqtt):
if __name__ == '__main__':
config.load('sensors_mqtt_receiver')
config.load('temphum_mqtt_receiver')
server = MqttServer()
server.connect_and_loop()
mqtt = MqttWrapper(clean_session=False)
node = MqttNode(node_id='+')
node.load_module('temphum', write_to_database=True)
mqtt.add_node(node)
mqtt.configure_tls()
mqtt.connect_and_loop()

View File

@ -1,12 +1,12 @@
[Unit]
Description=sensors mqtt receiver
Description=temphum mqtt receiver
After=network.target
[Service]
User=user
Group=user
Restart=on-failure
ExecStart=python3 /home/user/home/src/sensors_mqtt_receiver.py
ExecStart=python3 /home/user/home/src/temphum_mqtt_receiver.py
WorkingDirectory=/home/user
[Install]

View File

@ -1,13 +0,0 @@
[Unit]
Description=Sensors MQTT sender
After=temphumd.service
[Service]
User=user
Group=user
Restart=on-failure
ExecStart=/home/user/homekit/src/sensors_mqtt_sender.py
WorkingDirectory=/home/user
[Install]
WantedBy=multi-user.target