This commit is contained in:
Evgeny Zinoviev 2023-05-31 23:12:05 +03:00
parent 357d3ac030
commit 21b39f245c
7 changed files with 107 additions and 27 deletions

View File

@ -3,9 +3,10 @@ from __future__ import annotations
import abc import abc
import logging import logging
from typing import TYPE_CHECKING from typing import TYPE_CHECKING, Optional
if TYPE_CHECKING: if TYPE_CHECKING:
from ._node import MqttNode from ._node import MqttNode
from ._payload import MqttPayload
class MqttModule(abc.ABC): class MqttModule(abc.ABC):
@ -29,5 +30,5 @@ class MqttModule(abc.ABC):
def tick(self, mqtt: MqttNode): def tick(self, mqtt: MqttNode):
pass pass
def handle_payload(self, mqtt: MqttNode, topic: str, payload: bytes): def handle_payload(self, mqtt: MqttNode, topic: str, payload: bytes) -> Optional[MqttPayload]:
pass pass

View File

@ -3,30 +3,32 @@ import paho.mqtt.client as mqtt
from .mqtt import MqttBase from .mqtt import MqttBase
from typing import List from typing import List
from ._module import MqttModule from ._module import MqttModule
from ._payload import MqttPayload
class MqttNode(MqttBase): class MqttNode(MqttBase):
_modules: List[MqttModule] _modules: List[MqttModule]
_module_subscriptions: dict[str, MqttModule] _module_subscriptions: dict[str, MqttModule]
_node_id: str _node_id: str
_payload_callbacks: list[callable]
# _devices: list[MqttEspDevice] # _devices: list[MqttEspDevice]
# _message_callback: Optional[callable] # _message_callback: Optional[callable]
# _ota_publish_callback: Optional[callable] # _ota_publish_callback: Optional[callable]
def __init__(self, def __init__(self,
node_id: str, node_id: str,
# devices: Union[MqttEspDevice, list[MqttEspDevice]], # devices: Union[MqttEspDevice, list[MqttEspDevice]]
subscribe_to_updates=True): ):
super().__init__(clean_session=True) super().__init__(clean_session=True)
self._modules = [] self._modules = []
self._module_subscriptions = {} self._module_subscriptions = {}
self._node_id = node_id self._node_id = node_id
self._payload_callbacks = []
# if not isinstance(devices, list): # if not isinstance(devices, list):
# devices = [devices] # devices = [devices]
# self._devices = devices # self._devices = devices
# self._message_callback = None # self._message_callback = None
# self._ota_publish_callback = None # self._ota_publish_callback = None
# self._subscribe_to_updates = subscribe_to_updates
# self._ota_mid = None # self._ota_mid = None
def on_connect(self, client: mqtt.Client, userdata, flags, rc): def on_connect(self, client: mqtt.Client, userdata, flags, rc):
@ -47,7 +49,10 @@ class MqttNode(MqttBase):
actual_topic = topic[len(f'hk/{self._node_id}/'):] actual_topic = topic[len(f'hk/{self._node_id}/'):]
if actual_topic in self._module_subscriptions: if actual_topic in self._module_subscriptions:
self._module_subscriptions[actual_topic].handle_payload(self, actual_topic, msg.payload) payload = self._module_subscriptions[actual_topic].handle_payload(self, actual_topic, msg.payload)
if isinstance(payload, MqttPayload):
for f in self._payload_callbacks:
f(payload)
except Exception as e: except Exception as e:
self._logger.exception(str(e)) self._logger.exception(str(e))
@ -85,3 +90,6 @@ class MqttNode(MqttBase):
def publish(self, topic: str, payload: bytes, qos: int = 1): def publish(self, topic: str, payload: bytes, qos: int = 1):
self._client.publish(f'hk/{self._node_id}/{topic}', payload, qos) self._client.publish(f'hk/{self._node_id}/{topic}', payload, qos)
self._client.loop_write() self._client.loop_write()
def add_payload_callback(self, callback: callable):
self._payload_callbacks.append(callback)

View File

@ -1,5 +1,6 @@
from ..mqtt import MqttPayload, MqttPayloadCustomField from ..mqtt import MqttPayload, MqttPayloadCustomField
from .._node import MqttNode, MqttModule from .._node import MqttNode, MqttModule
from typing import Optional
MODULE_NAME = 'MqttDiagnosticsModule' MODULE_NAME = 'MqttDiagnosticsModule'
@ -51,9 +52,10 @@ class MqttDiagnosticsModule(MqttModule):
for topic in ('diag', 'd1ag', 'stat', 'stat1'): for topic in ('diag', 'd1ag', 'stat', 'stat1'):
mqtt.subscribe_module(topic, self) mqtt.subscribe_module(topic, self)
def handle_payload(self, mqtt: MqttNode, topic: str, payload: bytes): def handle_payload(self, mqtt: MqttNode, topic: str, payload: bytes) -> Optional[MqttPayload]:
if topic in ('stat', 'diag'): if topic in ('stat', 'diag'):
message = DiagnosticsPayload.unpack(payload) message = DiagnosticsPayload.unpack(payload)
elif topic in ('stat1', 'd1ag'): elif topic in ('stat1', 'd1ag'):
message = InitialDiagnosticsPayload.unpack(payload) message = InitialDiagnosticsPayload.unpack(payload)
self._logger.debug(message) self._logger.debug(message)
return message

View File

@ -1,5 +1,6 @@
import hashlib import hashlib
from typing import Optional
from ..mqtt import MqttPayload from ..mqtt import MqttPayload
from .._node import MqttModule, MqttNode from .._node import MqttModule, MqttNode
@ -43,10 +44,11 @@ class MqttOtaModule(MqttModule):
def init(self, mqtt: MqttNode): def init(self, mqtt: MqttNode):
mqtt.subscribe_module("otares", self) mqtt.subscribe_module("otares", self)
def handle_payload(self, mqtt: MqttNode, topic: str, payload: bytes): def handle_payload(self, mqtt: MqttNode, topic: str, payload: bytes) -> Optional[MqttPayload]:
if topic == 'otares': if topic == 'otares':
message = OtaResultPayload.unpack(payload) message = OtaResultPayload.unpack(payload)
self._logger.debug(message) self._logger.debug(message)
return message
# def push_ota(self, # def push_ota(self,
# node_id, # node_id,

View File

@ -1,7 +1,6 @@
import paho.mqtt.client as mqtt
import re
import datetime import datetime
from typing import Optional
from .. import MqttModule, MqttPayload, MqttNode from .. import MqttModule, MqttPayload, MqttNode
MODULE_NAME = 'MqttRelayModule' MODULE_NAME = 'MqttRelayModule'
@ -22,6 +21,18 @@ class MqttPowerSwitchPayload(MqttPayload):
state: bool state: bool
class MqttPowerStatusPayload(MqttPayload):
FORMAT = '=B'
PACKER = {
'opened': lambda n: int(n),
}
UNPACKER = {
'opened': lambda n: bool(n),
}
opened: bool
class MqttRelayState: class MqttRelayState:
enabled: bool enabled: bool
update_time: datetime.datetime update_time: datetime.datetime
@ -57,9 +68,11 @@ class MqttRelayModule(MqttModule):
payload = MqttPowerSwitchPayload(secret=secret, state=enable) payload = MqttPowerSwitchPayload(secret=secret, state=enable)
mqtt.publish('relay/switch', payload=payload.pack()) mqtt.publish('relay/switch', payload=payload.pack())
def handle_payload(self, mqtt: MqttNode, topic: str, payload: bytes): def handle_payload(self, mqtt: MqttNode, topic: str, payload: bytes) -> Optional[MqttPayload]:
if topic != 'relay/switch': if topic != 'relay/switch':
return return
message = MqttPowerSwitchPayload.unpack(payload) message = MqttPowerSwitchPayload.unpack(payload)
self._logger.debug(message) self._logger.debug(message)
return message

View File

@ -3,13 +3,14 @@ from .._node import MqttNode
from .._module import MqttModule from .._module import MqttModule
from .._payload import MqttPayload from .._payload import MqttPayload
from ...util import HashableEnum from ...util import HashableEnum
from typing import Optional
two_digits_precision = lambda x: round(x, 2) two_digits_precision = lambda x: round(x, 2)
MODULE_NAME = 'MqttTempHumModule' MODULE_NAME = 'MqttTempHumModule'
class TempHumDataPayload(MqttPayload): class MqttTemphumDataPayload(MqttPayload):
FORMAT = '=ddb' FORMAT = '=ddb'
UNPACKER = { UNPACKER = {
'temp': two_digits_precision, 'temp': two_digits_precision,
@ -49,7 +50,8 @@ class MqttTempHumModule(MqttModule):
def handle_payload(self, def handle_payload(self,
mqtt: MqttNode, mqtt: MqttNode,
topic: str, topic: str,
payload: bytes): payload: bytes) -> Optional[MqttPayload]:
if topic == 'temphum/data': if topic == 'temphum/data':
message = TempHumDataPayload.unpack(payload) message = MqttTemphumDataPayload.unpack(payload)
self._logger.debug(message) self._logger.debug(message)
return message

View File

@ -2,18 +2,33 @@
from enum import Enum from enum import Enum
from typing import Optional from typing import Optional
from telegram import ReplyKeyboardMarkup, User from telegram import ReplyKeyboardMarkup, User
from time import time
from datetime import datetime
from home.config import config, is_development_mode from home.config import config, is_development_mode
from home.telegram import bot from home.telegram import bot
from home.telegram._botutil import user_any_name from home.telegram._botutil import user_any_name
from home.relay.sunxi_h3_client import RelayClient from home.relay.sunxi_h3_client import RelayClient
from home.api.types import BotType from home.api.types import BotType
from home.mqtt import MqttNode, MqttModule, add_mqtt_module from home.mqtt import MqttNode, MqttModule, MqttPayload, add_mqtt_module
from home.mqtt.module.relay import MqttPowerStatusPayload
from home.mqtt.module.temphum import MqttTemphumDataPayload
from home.mqtt.module.diagnostics import InitialDiagnosticsPayload
config.load('pump_bot') config.load('pump_bot')
mqtt: Optional[MqttNode] = None mqtt: Optional[MqttNode] = None
mqtt_relay_module: Optional[MqttModule] = None mqtt_relay_module: Optional[MqttModule] = None
time_format = '%d.%m.%Y, %H:%M:%S'
watering_mcu_status = {
'last_time': 0,
'last_boot_time': 0,
'relay_opened': False,
'ambient_temp': 0.0,
'ambient_rh': 0.0,
}
bot.initialize() bot.initialize()
bot.lang.ru( bot.lang.ru(
@ -31,7 +46,9 @@ bot.lang.ru(
start_watering="Включить полив", start_watering="Включить полив",
stop_watering="Отключить полив", stop_watering="Отключить полив",
status="Статус", status="Статус насоса",
watering_status="Статус полива",
done="Готово 👌", done="Готово 👌",
user_action_notification='Пользователь <a href="tg://user?id=%d">%s</a> <b>%s</b> насос.', user_action_notification='Пользователь <a href="tg://user?id=%d">%s</a> <b>%s</b> насос.',
user_watering_notification='Пользователь <a href="tg://user?id=%d">%s</a> <b>%s</b> полив.', user_watering_notification='Пользователь <a href="tg://user?id=%d">%s</a> <b>%s</b> полив.',
@ -55,7 +72,9 @@ bot.lang.en(
start_watering="Start watering", start_watering="Start watering",
stop_watering="Stop watering", stop_watering="Stop watering",
status="Status", status="Pump status",
watering_status="Watering status",
done="Done 👌", done="Done 👌",
user_action_notification='User <a href="tg://user?id=%d">%s</a> turned the pump <b>%s</b>.', user_action_notification='User <a href="tg://user?id=%d">%s</a> turned the pump <b>%s</b>.',
user_watering_notification='User <a href="tg://user?id=%d">%s</a> <b>%s</b> the watering.', user_watering_notification='User <a href="tg://user?id=%d">%s</a> <b>%s</b> the watering.',
@ -153,27 +172,60 @@ def status(ctx: bot.Context) -> None:
) )
def _get_timestamp_as_string(timestamp: int) -> str:
if timestamp != 0:
return datetime.fromtimestamp(timestamp).strftime(time_format)
else:
return 'unknown'
@bot.handler(message='watering_status')
def watering_status(ctx: bot.Context) -> None:
buf = f'last report time: <b>{_get_timestamp_as_string(watering_mcu_status["last_time"])}</b>\n'
if watering_mcu_status["last_boot_time"] != 0:
buf += f'boot time: <b>{_get_timestamp_as_string(watering_mcu_status["last_boot_time"])}</b>\n'
buf += 'relay opened: <b>' + ('yes' if watering_mcu_status['relay_opened'] else 'no') + '</b>\n'
buf += f'ambient temp & humidity: <b>{watering_mcu_status["ambient_temp"]} C, {watering_mcu_status["ambient_rh"]}%</b>'
ctx.reply(buf)
@bot.defaultreplymarkup @bot.defaultreplymarkup
def markup(ctx: Optional[bot.Context]) -> Optional[ReplyKeyboardMarkup]: def markup(ctx: Optional[bot.Context]) -> Optional[ReplyKeyboardMarkup]:
buttons = [ buttons = []
[ctx.lang('enable'), ctx.lang('disable')],
]
if ctx.user_id in config['bot']['silent_users']: if ctx.user_id in config['bot']['silent_users']:
buttons.append([ctx.lang('enable_silently'), ctx.lang('disable_silently')]) buttons.append([ctx.lang('enable_silently'), ctx.lang('disable_silently')])
buttons.append([ctx.lang('enable'), ctx.lang('disable'), ctx.lang('status')],)
buttons.append([ctx.lang('start_watering'), ctx.lang('stop_watering')]) buttons.append([ctx.lang('start_watering'), ctx.lang('stop_watering'), ctx.lang('watering_status')])
buttons.append([ctx.lang('status')])
return ReplyKeyboardMarkup(buttons, one_time_keyboard=False) return ReplyKeyboardMarkup(buttons, one_time_keyboard=False)
def mqtt_payload_callback(payload: MqttPayload):
global watering_mcu_status
watering_mcu_status['last_time'] = int(time())
if isinstance(payload, InitialDiagnosticsPayload):
watering_mcu_status['last_boot_time'] = int(time())
elif isinstance(payload, MqttTemphumDataPayload):
watering_mcu_status['ambient_temp'] = payload.temp
watering_mcu_status['ambient_rh'] = payload.rh
elif isinstance(payload, MqttPowerStatusPayload):
watering_mcu_status['relay_opened'] = payload.opened
if __name__ == '__main__': if __name__ == '__main__':
mqtt = MqttNode(node_id=config.get('mqtt_water_relay.node_id')) mqtt = MqttNode(node_id=config.get('mqtt_water_relay.node_id'))
if is_development_mode(): if is_development_mode():
add_mqtt_module(mqtt, 'diagnostics') add_mqtt_module(mqtt, 'diagnostics')
mqtt_relay_module = add_mqtt_module(mqtt, 'temphum')
mqtt_relay_module = add_mqtt_module(mqtt, 'relay') mqtt_relay_module = add_mqtt_module(mqtt, 'relay')
mqtt.add_payload_callback(mqtt_payload_callback)
mqtt.configure_tls() mqtt.configure_tls()
mqtt.connect_and_loop(loop_forever=False) mqtt.connect_and_loop(loop_forever=False)