This commit is contained in:
Evgeny Zinoviev 2023-05-31 09:22:00 +03:00
parent 3e3753d726
commit 0f0a5fd448
34 changed files with 760 additions and 397 deletions

View File

@ -6,7 +6,12 @@
namespace homekit::main {
#ifndef CONFIG_TARGET_ESP01
#ifndef CONFIG_NO_RECOVERY
enum WorkingMode working_mode = WorkingMode::NORMAL;
#endif
#endif
static const uint16_t recovery_boot_detection_ms = 2000;
static const uint8_t recovery_boot_delay_ms = 100;
@ -22,8 +27,10 @@ static StopWatch blinkStopWatch;
#endif
#ifndef CONFIG_TARGET_ESP01
#ifndef CONFIG_NO_RECOVERY
static DNSServer* dnsServer = nullptr;
#endif
#endif
static void onWifiConnected(const WiFiEventStationModeGotIP& event);
static void onWifiDisconnected(const WiFiEventStationModeDisconnected& event);
@ -45,6 +52,7 @@ static void wifiConnect() {
}
#ifndef CONFIG_TARGET_ESP01
#ifndef CONFIG_NO_RECOVERY
static void wifiHotspot() {
led::mcu_led->on();
@ -71,13 +79,16 @@ static void waitForRecoveryPress() {
}
}
#endif
#endif
void setup() {
WiFi.disconnect();
#ifndef CONFIG_NO_RECOVERY
#ifndef CONFIG_TARGET_ESP01
homekit::main::waitForRecoveryPress();
#endif
#endif
#ifdef DEBUG
Serial.begin(115200);
@ -95,25 +106,31 @@ void setup() {
}
#ifndef CONFIG_TARGET_ESP01
#ifndef CONFIG_NO_RECOVERY
switch (working_mode) {
case WorkingMode::RECOVERY:
wifiHotspot();
break;
case WorkingMode::NORMAL:
#endif
#endif
wifiConnectHandler = WiFi.onStationModeGotIP(onWifiConnected);
wifiDisconnectHandler = WiFi.onStationModeDisconnected(onWifiDisconnected);
wifiConnect();
#ifndef CONFIG_NO_RECOVERY
#ifndef CONFIG_TARGET_ESP01
break;
}
#endif
#endif
}
void loop(LoopConfig* config) {
#ifndef CONFIG_NO_RECOVERY
#ifndef CONFIG_TARGET_ESP01
if (working_mode == WorkingMode::NORMAL) {
#endif
#endif
if (wifi_state == WiFiConnectionState::WAITING) {
PRINT(".");
@ -166,6 +183,7 @@ void loop(LoopConfig* config) {
}
#endif
}
#ifndef CONFIG_NO_RECOVERY
#ifndef CONFIG_TARGET_ESP01
} else {
if (dnsServer != nullptr)
@ -176,6 +194,7 @@ void loop(LoopConfig* config) {
httpServer->loop();
}
#endif
#endif
}
static void onWifiConnected(const WiFiEventStationModeGotIP& event) {
@ -191,4 +210,4 @@ static void onWifiDisconnected(const WiFiEventStationModeDisconnected& event) {
wifiTimer.once(2, wifiConnect);
}
}
}

View File

@ -10,8 +10,10 @@
#include <homekit/config.h>
#include <homekit/logging.h>
#ifndef CONFIG_TARGET_ESP01
#ifndef CONFIG_NO_RECOVERY
#include <homekit/http_server.h>
#endif
#endif
#include <homekit/wifi.h>
#include <homekit/mqtt/mqtt.h>
@ -20,6 +22,7 @@
namespace homekit::main {
#ifndef CONFIG_TARGET_ESP01
#ifndef CONFIG_NO_RECOVERY
enum class WorkingMode {
RECOVERY, // AP mode, http server with configuration
NORMAL, // MQTT client
@ -27,6 +30,7 @@ enum class WorkingMode {
extern enum WorkingMode working_mode;
#endif
#endif
enum class WiFiConnectionState {
WAITING = 0,

View File

@ -1,6 +1,6 @@
{
"name": "homekit_main",
"version": "1.0.8",
"version": "1.0.10",
"build": {
"flags": "-I../../include"
},

View File

@ -55,13 +55,9 @@ Mqtt::Mqtt() {
}
}
// if (ota.readyToRestart) {
// restartTimer.once(1, restart);
// } else {
reconnectTimer.once(2, [&]() {
reconnect();
});
// }
reconnectTimer.once(2, [&]() {
reconnect();
});
});
client.onSubscribe([&](uint16_t packetId, const SubscribeReturncode* returncodes, size_t len) {
@ -79,7 +75,7 @@ Mqtt::Mqtt() {
PRINTF("mqtt: message received, topic=%s, qos=%d, dup=%d, retain=%d, len=%ul, index=%ul, total=%ul\n",
topic, properties.qos, (int)properties.dup, (int)properties.retain, len, index, total);
const char *ptr = topic + nodeId.length() + 10;
const char *ptr = topic + nodeId.length() + 4;
String relevantTopic(ptr);
auto it = moduleSubscriptions.find(relevantTopic);
@ -87,7 +83,7 @@ Mqtt::Mqtt() {
auto module = it->second;
module->handlePayload(*this, relevantTopic, properties.packetId, payload, len, index, total);
} else {
PRINTF("error: module subscription for topic %s not found\n", topic);
PRINTF("error: module subscription for topic %s not found\n", relevantTopic.c_str());
}
});

View File

@ -10,6 +10,10 @@ struct MqttRelaySwitchPayload {
uint8_t state;
} __attribute__((packed));
struct MqttRelayStatusPayload {
uint8_t opened;
} __attribute__((packed));
class MqttRelayModule : public MqttModule {
public:
MqttRelayModule() : MqttModule(0) {}
@ -21,3 +25,4 @@ public:
}
#endif //HOMEKIT_LIB_MQTT_MODULE_RELAY_H

View File

@ -1,6 +1,6 @@
{
"name": "homekit_mqtt_module_relay",
"version": "1.0.3",
"version": "1.0.4",
"build": {
"flags": "-I../../include"
},

View File

@ -1,42 +0,0 @@
#!/usr/bin/env python3
from typing import Optional
from argparse import ArgumentParser
from enum import Enum
from home.config import config
from home.mqtt import MqttRelay
from home.mqtt.esp import MqttEspBase
from home.mqtt.temphum import MqttTempHum
from home.mqtt.esp import MqttEspDevice
mqtt_client: Optional[MqttEspBase] = None
class NodeType(Enum):
RELAY = 'relay'
TEMPHUM = 'temphum'
if __name__ == '__main__':
parser = ArgumentParser()
parser.add_argument('--device-id', type=str, required=True)
parser.add_argument('--type', type=str, required=True,
choices=[i.name.lower() for i in NodeType])
config.load('mqtt_util', parser=parser)
arg = parser.parse_args()
mqtt_node_type = NodeType(arg.type)
devices = MqttEspDevice(id=arg.device_id)
if mqtt_node_type == NodeType.RELAY:
mqtt_client = MqttRelay(devices=devices)
elif mqtt_node_type == NodeType.TEMPHUM:
mqtt_client = MqttTempHum(devices=devices)
mqtt_client.set_message_callback(lambda device_id, payload: print(payload))
mqtt_client.configure_tls()
try:
mqtt_client.connect_and_loop()
except KeyboardInterrupt:
mqtt_client.disconnect()

View File

@ -12,6 +12,7 @@ __map__ = {
__all__ = list(itertools.chain(*__map__.values()))
def __getattr__(name):
if name in __all__:
for file, names in __map__.items():

View File

@ -1,4 +1,9 @@
from .mqtt import MqttBase
from .util import poll_tick
from .relay import MqttRelay, MqttRelayState
from .temphum import MqttTempHum
from .mqtt import MqttBase, MqttPayload, MqttPayloadCustomField
from ._node import MqttNode
from ._module import MqttModule
from .util import (
poll_tick,
get_modules as get_mqtt_modules,
import_module as import_mqtt_module,
add_module as add_mqtt_module
)

34
src/home/mqtt/_module.py Normal file
View File

@ -0,0 +1,34 @@
from __future__ import annotations
import abc
import logging
from typing import TYPE_CHECKING, Optional
if TYPE_CHECKING:
from ._node import MqttNode
from ._payload import MqttPayload
class MqttModule(abc.ABC):
tick_interval: int
_initialized: bool
def __init__(self, tick_interval=0):
self.tick_interval = tick_interval
self._initialized = False
self._logger = logging.getLogger(self.__class__.__name__)
def init(self, mqtt: MqttNode):
pass
def is_initialized(self):
return self._initialized
def set_initialized(self):
self._initialized = True
def tick(self, mqtt: MqttNode):
pass
def handle_payload(self, mqtt: MqttNode, topic: str, payload: bytes) -> Optional[MqttPayload]:
pass

95
src/home/mqtt/_node.py Normal file
View File

@ -0,0 +1,95 @@
import paho.mqtt.client as mqtt
from .mqtt import MqttBase
from typing import List
from ._module import MqttModule
from ._payload import MqttPayload
class MqttNode(MqttBase):
_modules: List[MqttModule]
_module_subscriptions: dict[str, MqttModule]
_node_id: str
_payload_callbacks: list[callable]
# _devices: list[MqttEspDevice]
# _message_callback: Optional[callable]
# _ota_publish_callback: Optional[callable]
def __init__(self,
node_id: str,
# devices: Union[MqttEspDevice, list[MqttEspDevice]]
):
super().__init__(clean_session=True)
self._modules = []
self._module_subscriptions = {}
self._node_id = node_id
self._payload_callbacks = []
# if not isinstance(devices, list):
# devices = [devices]
# self._devices = devices
# self._message_callback = None
# self._ota_publish_callback = None
# self._ota_mid = None
def on_connect(self, client: mqtt.Client, userdata, flags, rc):
super().on_connect(client, userdata, flags, rc)
for module in self._modules:
if not module.is_initialized():
module.init(self)
module.set_initialized()
def on_publish(self, client: mqtt.Client, userdata, mid):
pass # FIXME
# if self._ota_mid is not None and mid == self._ota_mid and self._ota_publish_callback:
# self._ota_publish_callback()
def on_message(self, client: mqtt.Client, userdata, msg):
try:
topic = msg.topic
actual_topic = topic[len(f'hk/{self._node_id}/'):]
if actual_topic in self._module_subscriptions:
payload = self._module_subscriptions[actual_topic].handle_payload(self, actual_topic, msg.payload)
if isinstance(payload, MqttPayload):
for f in self._payload_callbacks:
f(payload)
except Exception as e:
self._logger.exception(str(e))
# def push_ota(self,
# device_id,
# filename: str,
# publish_callback: callable,
# qos: int):
# device = next(d for d in self._devices if d.id == device_id)
# assert device.secret is not None, 'device secret not specified'
#
# self._ota_publish_callback = publish_callback
# payload = OtaPayload(secret=device.secret, filename=filename)
# publish_result = self._client.publish(f'hk/{device.id}/{self.TOPIC_LEAF}/admin/ota',
# payload=payload.pack(),
# qos=qos)
# self._ota_mid = publish_result.mid
# self._client.loop_write()
#
# @classmethod
# def get_mqtt_topics(cls, additional_topics: Optional[list[str]] = None):
# return rf'^hk/(.*?)/{cls.TOPIC_LEAF}/(stat|stat1|otares'+('|'+('|'.join(additional_topics)) if additional_topics else '')+')$'
def add_module(self, module: MqttModule):
self._modules.append(module)
if self._connected:
module.init(self)
module.set_initialized()
def subscribe_module(self, topic: str, module: MqttModule, qos: int = 1):
self._module_subscriptions[topic] = module
self._client.subscribe(f'hk/{self._node_id}/{topic}', qos)
def publish(self, topic: str, payload: bytes, qos: int = 1):
self._client.publish(f'hk/{self._node_id}/{topic}', payload, qos)
self._client.loop_write()
def add_payload_callback(self, callback: callable):
self._payload_callbacks.append(callback)

View File

@ -1,5 +1,5 @@
import abc
import struct
import abc
import re
from typing import Optional, Tuple
@ -142,4 +142,4 @@ def _bit_field_params(cl) -> Optional[Tuple[int, ...]]:
match = re.match(r'MQTTPayloadBitField_(\d+)_(\d+)_(\d)$', cl.__name__)
if match is not None:
return tuple([int(match.group(i)) for i in range(1, 4)])
return None
return None

View File

@ -1,106 +0,0 @@
import re
import paho.mqtt.client as mqtt
from .mqtt import MqttBase
from typing import Optional, Union
from .payload.esp import (
OTAPayload,
OTAResultPayload,
DiagnosticsPayload,
InitialDiagnosticsPayload
)
class MqttEspDevice:
id: str
secret: Optional[str]
def __init__(self, id: str, secret: Optional[str] = None):
self.id = id
self.secret = secret
class MqttEspBase(MqttBase):
_devices: list[MqttEspDevice]
_message_callback: Optional[callable]
_ota_publish_callback: Optional[callable]
TOPIC_LEAF = 'esp'
def __init__(self,
devices: Union[MqttEspDevice, list[MqttEspDevice]],
subscribe_to_updates=True):
super().__init__(clean_session=True)
if not isinstance(devices, list):
devices = [devices]
self._devices = devices
self._message_callback = None
self._ota_publish_callback = None
self._subscribe_to_updates = subscribe_to_updates
self._ota_mid = None
def on_connect(self, client: mqtt.Client, userdata, flags, rc):
super().on_connect(client, userdata, flags, rc)
if self._subscribe_to_updates:
for device in self._devices:
topic = f'hk/{device.id}/{self.TOPIC_LEAF}/#'
self._logger.debug(f"subscribing to {topic}")
client.subscribe(topic, qos=1)
def on_publish(self, client: mqtt.Client, userdata, mid):
if self._ota_mid is not None and mid == self._ota_mid and self._ota_publish_callback:
self._ota_publish_callback()
def set_message_callback(self, callback: callable):
self._message_callback = callback
def on_message(self, client: mqtt.Client, userdata, msg):
try:
match = re.match(self.get_mqtt_topics(), msg.topic)
self._logger.debug(f'topic: {msg.topic}')
if not match:
return
device_id = match.group(1)
subtopic = match.group(2)
# try:
next(d for d in self._devices if d.id == device_id)
# except StopIteration:h
# return
message = None
if subtopic == 'stat':
message = DiagnosticsPayload.unpack(msg.payload)
elif subtopic == 'stat1':
message = InitialDiagnosticsPayload.unpack(msg.payload)
elif subtopic == 'otares':
message = OTAResultPayload.unpack(msg.payload)
if message and self._message_callback:
self._message_callback(device_id, message)
return True
except Exception as e:
self._logger.exception(str(e))
def push_ota(self,
device_id,
filename: str,
publish_callback: callable,
qos: int):
device = next(d for d in self._devices if d.id == device_id)
assert device.secret is not None, 'device secret not specified'
self._ota_publish_callback = publish_callback
payload = OTAPayload(secret=device.secret, filename=filename)
publish_result = self._client.publish(f'hk/{device.id}/{self.TOPIC_LEAF}/admin/ota',
payload=payload.pack(),
qos=qos)
self._ota_mid = publish_result.mid
self._client.loop_write()
@classmethod
def get_mqtt_topics(cls, additional_topics: Optional[list[str]] = None):
return rf'^hk/(.*?)/{cls.TOPIC_LEAF}/(stat|stat1|otares'+('|'+('|'.join(additional_topics)) if additional_topics else '')+')$'

View File

@ -1,39 +1,8 @@
import hashlib
from ..mqtt import MqttPayload, MqttPayloadCustomField
from .._node import MqttNode, MqttModule
from typing import Optional
from .base_payload import MqttPayload, MqttPayloadCustomField
class OTAResultPayload(MqttPayload):
FORMAT = '=BB'
result: int
error_code: int
class OTAPayload(MqttPayload):
secret: str
filename: str
# structure of returned data:
#
# uint8_t[len(secret)] secret;
# uint8_t[16] md5;
# *uint8_t data
def pack(self):
buf = bytearray(self.secret.encode())
m = hashlib.md5()
with open(self.filename, 'rb') as fd:
content = fd.read()
m.update(content)
buf.extend(m.digest())
buf.extend(content)
return buf
def unpack(cls, buf: bytes):
raise RuntimeError(f'{cls.__class__.__name__}.unpack: not implemented')
# secret = buf[:12].decode()
# filename = buf[12:].decode()
# return OTAPayload(secret=secret, filename=filename)
MODULE_NAME = 'MqttDiagnosticsModule'
class DiagnosticsFlags(MqttPayloadCustomField):
@ -76,3 +45,17 @@ class DiagnosticsPayload(MqttPayload):
rssi: int
free_heap: int
flags: DiagnosticsFlags
class MqttDiagnosticsModule(MqttModule):
def init(self, mqtt: MqttNode):
for topic in ('diag', 'd1ag', 'stat', 'stat1'):
mqtt.subscribe_module(topic, self)
def handle_payload(self, mqtt: MqttNode, topic: str, payload: bytes) -> Optional[MqttPayload]:
if topic in ('stat', 'diag'):
message = DiagnosticsPayload.unpack(payload)
elif topic in ('stat1', 'd1ag'):
message = InitialDiagnosticsPayload.unpack(payload)
self._logger.debug(message)
return message

View File

@ -1,7 +1,7 @@
import struct
from .base_payload import MqttPayload, bit_field
from typing import Tuple
from .._node import MqttNode
from .._payload import MqttPayload, bit_field
_mult_10 = lambda n: int(n*10)
_div_10 = lambda n: n/10
@ -71,3 +71,7 @@ class Generation(MqttPayload):
time: int
wh: int
class MqttInverterModule(MqttNode):
pass

View File

@ -0,0 +1,67 @@
import hashlib
from typing import Optional
from ..mqtt import MqttPayload
from .._node import MqttModule, MqttNode
MODULE_NAME = 'MqttOtaModule'
class OtaResultPayload(MqttPayload):
FORMAT = '=BB'
result: int
error_code: int
class OtaPayload(MqttPayload):
secret: str
filename: str
# structure of returned data:
#
# uint8_t[len(secret)] secret;
# uint8_t[16] md5;
# *uint8_t data
def pack(self):
buf = bytearray(self.secret.encode())
m = hashlib.md5()
with open(self.filename, 'rb') as fd:
content = fd.read()
m.update(content)
buf.extend(m.digest())
buf.extend(content)
return buf
def unpack(cls, buf: bytes):
raise RuntimeError(f'{cls.__class__.__name__}.unpack: not implemented')
# secret = buf[:12].decode()
# filename = buf[12:].decode()
# return OTAPayload(secret=secret, filename=filename)
class MqttOtaModule(MqttModule):
def init(self, mqtt: MqttNode):
mqtt.subscribe_module("otares", self)
def handle_payload(self, mqtt: MqttNode, topic: str, payload: bytes) -> Optional[MqttPayload]:
if topic == 'otares':
message = OtaResultPayload.unpack(payload)
self._logger.debug(message)
return message
# def push_ota(self,
# node_id,
# filename: str,
# publish_callback: callable,
# qos: int):
# device = next(d for d in self._devices if d.id == device_id)
# assert device.secret is not None, 'device secret not specified'
#
# self._ota_publish_callback = publish_callback
# payload = OtaPayload(secret=device.secret, filename=filename)
# publish_result = self._client.publish(f'hk/{device.id}/{self.TOPIC_LEAF}/admin/ota',
# payload=payload.pack(),
# qos=qos)
# self._ota_mid = publish_result.mid
# self._client.loop_write()

View File

@ -0,0 +1,82 @@
import datetime
from typing import Optional
from .. import MqttModule, MqttPayload, MqttNode
MODULE_NAME = 'MqttRelayModule'
class MqttPowerSwitchPayload(MqttPayload):
FORMAT = '=12sB'
PACKER = {
'state': lambda n: int(n),
'secret': lambda s: s.encode('utf-8')
}
UNPACKER = {
'state': lambda n: bool(n),
'secret': lambda s: s.decode('utf-8')
}
secret: str
state: bool
class MqttPowerStatusPayload(MqttPayload):
FORMAT = '=B'
PACKER = {
'opened': lambda n: int(n),
}
UNPACKER = {
'opened': lambda n: bool(n),
}
opened: bool
class MqttRelayState:
enabled: bool
update_time: datetime.datetime
rssi: int
fw_version: int
ever_updated: bool
def __init__(self):
self.ever_updated = False
self.enabled = False
self.rssi = 0
def update(self,
enabled: bool,
rssi: int,
fw_version=None):
self.ever_updated = True
self.enabled = enabled
self.rssi = rssi
self.update_time = datetime.datetime.now()
if fw_version:
self.fw_version = fw_version
class MqttRelayModule(MqttModule):
def init(self, mqtt: MqttNode):
mqtt.subscribe_module('relay/switch', self)
mqtt.subscribe_module('relay/status', self)
@staticmethod
def switchpower(mqtt: MqttNode,
enable: bool,
secret: str):
payload = MqttPowerSwitchPayload(secret=secret, state=enable)
mqtt.publish('relay/switch', payload=payload.pack())
def handle_payload(self, mqtt: MqttNode, topic: str, payload: bytes) -> Optional[MqttPayload]:
message = None
if topic == 'relay/switch':
message = MqttPowerSwitchPayload.unpack(payload)
elif topic == 'relay/status':
message = MqttPowerStatusPayload.unpack(payload)
if message is not None:
self._logger.debug(message)
return message

View File

@ -0,0 +1,57 @@
from enum import auto
from .._node import MqttNode
from .._module import MqttModule
from .._payload import MqttPayload
from ...util import HashableEnum
from typing import Optional
two_digits_precision = lambda x: round(x, 2)
MODULE_NAME = 'MqttTempHumModule'
class MqttTemphumDataPayload(MqttPayload):
FORMAT = '=ddb'
UNPACKER = {
'temp': two_digits_precision,
'rh': two_digits_precision
}
temp: float
rh: float
error: int
class MqttTempHumNodes(HashableEnum):
KBN_SH_HALL = auto()
KBN_SH_BATHROOM = auto()
KBN_SH_LIVINGROOM = auto()
KBN_SH_BEDROOM = auto()
KBN_BH_2FL = auto()
KBN_BH_2FL_STREET = auto()
KBN_BH_1FL_LIVINGROOM = auto()
KBN_BH_1FL_BEDROOM = auto()
KBN_BH_1FL_BATHROOM = auto()
KBN_NH_1FL_INV = auto()
KBN_NH_1FL_CENTER = auto()
KBN_NH_1LF_KT = auto()
KBN_NH_1FL_DS = auto()
KBN_NH_1FS_EZ = auto()
SPB_FLAT120_CABINET = auto()
class MqttTempHumModule(MqttModule):
def init(self, mqtt: MqttNode):
mqtt.subscribe_module('temphum/data', self)
def handle_payload(self,
mqtt: MqttNode,
topic: str,
payload: bytes) -> Optional[MqttPayload]:
if topic == 'temphum/data':
message = MqttTemphumDataPayload.unpack(payload)
self._logger.debug(message)
return message

View File

@ -3,8 +3,8 @@ import paho.mqtt.client as mqtt
import ssl
import logging
from typing import Tuple
from ..config import config
from ._payload import *
def username_and_password() -> Tuple[str, str]:
@ -14,6 +14,8 @@ def username_and_password() -> Tuple[str, str]:
class MqttBase:
_connected: bool
def __init__(self, clean_session=True):
self._client = mqtt.Client(client_id=config['mqtt']['client_id'],
protocol=mqtt.MQTTv311,
@ -24,6 +26,7 @@ class MqttBase:
self._client.on_log = self.on_log
self._client.on_publish = self.on_publish
self._loop_started = False
self._connected = False
self._logger = logging.getLogger(self.__class__.__name__)
@ -41,7 +44,9 @@ class MqttBase:
'assets',
'mqtt_ca.crt'
))
self._client.tls_set(ca_certs=ca_certs, cert_reqs=ssl.CERT_REQUIRED, tls_version=ssl.PROTOCOL_TLSv1_2)
self._client.tls_set(ca_certs=ca_certs,
cert_reqs=ssl.CERT_REQUIRED,
tls_version=ssl.PROTOCOL_TLSv1_2)
def connect_and_loop(self, loop_forever=True):
host = config['mqtt']['host']
@ -61,9 +66,11 @@ class MqttBase:
def on_connect(self, client: mqtt.Client, userdata, flags, rc):
self._logger.info("Connected with result code " + str(rc))
self._connected = True
def on_disconnect(self, client: mqtt.Client, userdata, rc):
self._logger.info("Disconnected with result code " + str(rc))
self._connected = False
def on_log(self, client: mqtt.Client, userdata, level, buf):
level = mqtt.LOGGING_LEVEL[level] if level in mqtt.LOGGING_LEVEL else logging.INFO
@ -73,4 +80,15 @@ class MqttBase:
self._logger.debug(msg.topic + ": " + str(msg.payload))
def on_publish(self, client: mqtt.Client, userdata, mid):
self._logger.debug(f'publish done, mid={mid}')
self._logger.debug(f'publish done, mid={mid}')
class MqttEspDevice:
id: str
secret: Optional[str]
def __init__(self,
node_id: str,
secret: Optional[str] = None):
self.id = node_id
self.secret = secret

View File

@ -1 +0,0 @@
from .base_payload import MqttPayload

View File

@ -1,22 +0,0 @@
from .base_payload import MqttPayload
from .esp import (
OTAResultPayload,
OTAPayload,
InitialDiagnosticsPayload,
DiagnosticsPayload
)
class PowerPayload(MqttPayload):
FORMAT = '=12sB'
PACKER = {
'state': lambda n: int(n),
'secret': lambda s: s.encode('utf-8')
}
UNPACKER = {
'state': lambda n: bool(n),
'secret': lambda s: s.decode('utf-8')
}
secret: str
state: bool

View File

@ -1,20 +0,0 @@
from .base_payload import MqttPayload
_mult_100 = lambda n: int(n*100)
_div_100 = lambda n: n/100
class Temperature(MqttPayload):
FORMAT = 'IhH'
PACKER = {
'temp': _mult_100,
'rh': _mult_100,
}
UNPACKER = {
'temp': _div_100,
'rh': _div_100,
}
time: int
temp: float
rh: float

View File

@ -1,15 +0,0 @@
from .base_payload import MqttPayload
two_digits_precision = lambda x: round(x, 2)
class TempHumDataPayload(MqttPayload):
FORMAT = '=ddb'
UNPACKER = {
'temp': two_digits_precision,
'rh': two_digits_precision
}
temp: float
rh: float
error: int

View File

@ -1,71 +1,59 @@
#!/usr/bin/env python3
import paho.mqtt.client as mqtt
import re
import datetime
import logging
from .payload.relay import (
PowerPayload,
)
from .esp import MqttEspBase
from .mqtt import MQTTBase
class MqttRelay(MqttEspBase):
TOPIC_LEAF = 'relay'
class MQTTRelayClient(MQTTBase):
_home_id: str
def set_power(self, device_id, enable: bool, secret=None):
device = next(d for d in self._devices if d.id == device_id)
secret = secret if secret else device.secret
def __init__(self, home_id: str):
super().__init__(clean_session=True)
self._home_id = home_id
assert secret is not None, 'device secret not specified'
def on_connect(self, client: mqtt.Client, userdata, flags, rc):
super().on_connect(client, userdata, flags, rc)
payload = PowerPayload(secret=secret,
state=enable)
self._client.publish(f'hk/{device.id}/{self.TOPIC_LEAF}/power',
payload=payload.pack(),
qos=1)
self._client.loop_write()
topic = f'home/{self._home_id}/#'
self._logger.info(f"subscribing to {topic}")
client.subscribe(topic, qos=1)
def on_message(self, client: mqtt.Client, userdata, msg):
if super().on_message(client, userdata, msg):
return
try:
match = re.match(self.get_mqtt_topics(['power']), msg.topic)
match = re.match(r'^home/(.*?)/relay/(stat|power)(?:/(.+))?$', msg.topic)
self._logger.info(f'topic: {msg.topic}')
if not match:
return
device_id = match.group(1)
name = match.group(1)
subtopic = match.group(2)
message = None
if subtopic == 'power':
message = PowerPayload.unpack(msg.payload)
if name != self._home_id:
return
if message and self._message_callback:
self._message_callback(device_id, message)
if subtopic == 'stat':
stat_name, stat_value = match.group(3).split('/')
self._logger.info(f'stat: {stat_name} = {stat_value}')
except Exception as e:
self._logger.exception(str(e))
class MqttRelayState:
enabled: bool
update_time: datetime.datetime
rssi: int
fw_version: int
ever_updated: bool
class MQTTRelayController(MQTTBase):
_home_id: str
def __init__(self):
self.ever_updated = False
self.enabled = False
self.rssi = 0
def __init__(self, home_id: str):
super().__init__(clean_session=True)
self._home_id = home_id
def update(self,
enabled: bool,
rssi: int,
fw_version=None):
self.ever_updated = True
self.enabled = enabled
self.rssi = rssi
self.update_time = datetime.datetime.now()
if fw_version:
self.fw_version = fw_version
def set_power(self, enable: bool):
self._client.publish(f'home/{self._home_id}/relay/power',
payload=int(enable),
qos=1)
self._client.loop_write()
def send_stat(self, stat: dict):
pass

View File

@ -1,54 +0,0 @@
import paho.mqtt.client as mqtt
import re
from enum import auto
from .payload.temphum import TempHumDataPayload
from .esp import MqttEspBase
from ..util import HashableEnum
class MqttTempHumNodes(HashableEnum):
KBN_SH_HALL = auto()
KBN_SH_BATHROOM = auto()
KBN_SH_LIVINGROOM = auto()
KBN_SH_BEDROOM = auto()
KBN_BH_2FL = auto()
KBN_BH_2FL_STREET = auto()
KBN_BH_1FL_LIVINGROOM = auto()
KBN_BH_1FL_BEDROOM = auto()
KBN_BH_1FL_BATHROOM = auto()
KBN_NH_1FL_INV = auto()
KBN_NH_1FL_CENTER = auto()
KBN_NH_1LF_KT = auto()
KBN_NH_1FL_DS = auto()
KBN_NH_1FS_EZ = auto()
SPB_FLAT120_CABINET = auto()
class MqttTempHum(MqttEspBase):
TOPIC_LEAF = 'temphum'
def on_message(self, client: mqtt.Client, userdata, msg):
if super().on_message(client, userdata, msg):
return
try:
match = re.match(self.get_mqtt_topics(['data']), msg.topic)
if not match:
return
device_id = match.group(1)
subtopic = match.group(2)
message = None
if subtopic == 'data':
message = TempHumDataPayload.unpack(msg.payload)
if message and self._message_callback:
self._message_callback(device_id, message)
except Exception as e:
self._logger.exception(str(e))

View File

@ -1,4 +1,11 @@
import time
import os
import re
import importlib
from ._node import MqttNode
from . import MqttModule
from typing import List
def poll_tick(freq):
@ -6,3 +13,26 @@ def poll_tick(freq):
while True:
t += freq
yield max(t - time.time(), 0)
def get_modules() -> List[str]:
modules = []
for name in os.listdir(os.path.join(os.path.dirname(os.path.realpath(__file__)), 'module')):
name = re.sub(r'\.py$', '', name)
modules.append(name)
return modules
def import_module(module: str):
return importlib.import_module(
f'..module.{module}', __name__)
def add_module(mqtt_node: MqttNode, module: str) -> MqttModule:
module = import_module(module)
if not hasattr(module, 'MODULE_NAME'):
raise RuntimeError(f'MODULE_NAME not found in module {module}')
cl = getattr(module, getattr(module, 'MODULE_NAME'))
instance = cl()
mqtt_node.add_module(instance)
return instance

View File

@ -16,10 +16,6 @@ _products_dir = os.path.join(
def get_products():
products = []
for f in os.listdir(_products_dir):
# temp hack
if f.endswith('-esp01'):
continue
# skip the common dir
if f in ('common',):
continue

58
src/mqtt_node_util.py Executable file
View File

@ -0,0 +1,58 @@
#!/usr/bin/env python3
from typing import Optional
from argparse import ArgumentParser, ArgumentError
from home.config import config
from home.mqtt import MqttNode, get_mqtt_modules, import_mqtt_module, MqttModule
mqtt: Optional[MqttNode] = None
def add_module(module: str) -> MqttModule:
module = import_mqtt_module(module)
if not hasattr(module, 'MODULE_NAME'):
raise RuntimeError(f'MODULE_NAME not found in module {m}')
cl = getattr(module, getattr(module, 'MODULE_NAME'))
instance = cl()
mqtt.add_module(instance)
return instance
if __name__ == '__main__':
parser = ArgumentParser()
parser.add_argument('--node-id', type=str, required=True)
parser.add_argument('--modules', type=str, choices=get_mqtt_modules(), nargs='*',
help='mqtt modules to include')
parser.add_argument('--switch-relay', choices=[0, 1], type=int,
help='send relay state')
parser.add_argument('--switch-relay-secret', type=str,
help='secret password to switch relay')
config.load('mqtt_util', parser=parser)
arg = parser.parse_args()
if (arg.switch_relay is not None or arg.switch_relay_secret is not None) and 'relay' not in arg.modules:
raise ArgumentError(None, '--relay is only allowed when \'relay\' module included in --modules')
if (arg.switch_relay is not None and arg.switch_relay_secret is None) or (arg.switch_relay is None and arg.switch_relay_secret is not None):
raise ArgumentError(None, 'both --switch-relay and --switch-relay-secret are required')
mqtt = MqttNode(node_id=arg.node_id)
# must-have modules
add_module('ota')
add_module('diagnostics')
if arg.modules:
for m in arg.modules:
module_instance = add_module(m)
if m == 'relay' and arg.switch_relay is not None:
module_instance.switchpower(mqtt,
arg.switch_relay == 1,
arg.switch_relay_secret)
mqtt.configure_tls()
try:
mqtt.connect_and_loop()
except KeyboardInterrupt:
mqtt.disconnect()

View File

@ -54,12 +54,17 @@ def bsd_parser(product_config: dict,
arg_kwargs['type'] = int
elif kwargs['type'] == 'int':
arg_kwargs['type'] = int
elif kwargs['type'] == 'bool':
arg_kwargs['action'] = 'store_true'
arg_kwargs['required'] = False
else:
raise TypeError(f'unsupported type {kwargs["type"]} for define {define_name}')
else:
arg_kwargs['action'] = 'store_true'
parser.add_argument(f'--{define_name}', required=True, **arg_kwargs)
if 'required' not in arg_kwargs:
arg_kwargs['required'] = True
parser.add_argument(f'--{define_name}', **arg_kwargs)
bsd_walk(product_config, f)
@ -76,6 +81,9 @@ def bsd_get(product_config: dict,
enums.append(f'CONFIG_{define_name}')
defines[f'CONFIG_{define_name}'] = f'HOMEKIT_{attr_value.upper()}'
return
if kwargs['type'] == 'bool':
defines[f'CONFIG_{define_name}'] = True
return
defines[f'CONFIG_{define_name}'] = str(attr_value)
bsd_walk(product_config, f)
return defines, enums

View File

@ -2,15 +2,34 @@
from enum import Enum
from typing import Optional
from telegram import ReplyKeyboardMarkup, User
from time import time
from datetime import datetime
from home.config import config
from home.config import config, is_development_mode
from home.telegram import bot
from home.telegram._botutil import user_any_name
from home.relay.sunxi_h3_client import RelayClient
from home.api.types import BotType
from home.mqtt import MqttNode, MqttModule, MqttPayload, add_mqtt_module
from home.mqtt.module.relay import MqttPowerStatusPayload
from home.mqtt.module.temphum import MqttTemphumDataPayload
from home.mqtt.module.diagnostics import InitialDiagnosticsPayload
config.load('pump_bot')
mqtt: Optional[MqttNode] = None
mqtt_relay_module: Optional[MqttModule] = None
time_format = '%d.%m.%Y, %H:%M:%S'
watering_mcu_status = {
'last_time': 0,
'last_boot_time': 0,
'relay_opened': False,
'ambient_temp': 0.0,
'ambient_rh': 0.0,
}
bot.initialize()
bot.lang.ru(
start_message="Выберите команду на клавиатуре",
@ -18,17 +37,27 @@ bot.lang.ru(
enable="Включить",
enable_silently="Включить тихо",
enabled="Включен ✅",
enabled="Насос включен ✅",
disable="Выключить",
disable_silently="Выключить тихо",
disabled="Выключен ❌",
disabled="Насос выключен ❌",
start_watering="Включить полив",
stop_watering="Отключить полив",
status="Статус насоса",
watering_status="Статус полива",
status="Статус",
done="Готово 👌",
sent="Команда отправлена",
user_action_notification='Пользователь <a href="tg://user?id=%d">%s</a> <b>%s</b> насос.',
user_watering_notification='Пользователь <a href="tg://user?id=%d">%s</a> <b>%s</b> полив.',
user_action_on="включил",
user_action_off="выключил",
user_action_watering_on="включил",
user_action_watering_off="выключил",
)
bot.lang.en(
start_message="Select command on the keyboard",
@ -36,23 +65,35 @@ bot.lang.en(
enable="Turn ON",
enable_silently="Turn ON silently",
enabled="Turned ON ✅",
enabled="The pump is turned ON ✅",
disable="Turn OFF",
disable_silently="Turn OFF silently",
disabled="Turned OFF ❌",
disabled="The pump is turned OFF ❌",
start_watering="Start watering",
stop_watering="Stop watering",
status="Pump status",
watering_status="Watering status",
status="Status",
done="Done 👌",
sent="Request sent",
user_action_notification='User <a href="tg://user?id=%d">%s</a> turned the pump <b>%s</b>.',
user_watering_notification='User <a href="tg://user?id=%d">%s</a> <b>%s</b> the watering.',
user_action_on="ON",
user_action_off="OFF",
user_action_watering_on="started",
user_action_watering_off="stopped",
)
class UserAction(Enum):
ON = 'on'
OFF = 'off'
WATERING_ON = 'watering_on'
WATERING_OFF = 'watering_off'
def get_relay() -> RelayClient:
@ -75,11 +116,24 @@ def off(ctx: bot.Context, silent=False) -> None:
notify(ctx.user, UserAction.OFF)
def watering_on(ctx: bot.Context) -> None:
mqtt_relay_module.switchpower(mqtt, True, config.get('mqtt_water_relay.secret'))
ctx.reply(ctx.lang('sent'))
notify(ctx.user, UserAction.WATERING_ON)
def watering_off(ctx: bot.Context) -> None:
mqtt_relay_module.switchpower(mqtt, False, config.get('mqtt_water_relay.secret'))
ctx.reply(ctx.lang('sent'))
notify(ctx.user, UserAction.WATERING_OFF)
def notify(user: User, action: UserAction) -> None:
notification_key = 'user_watering_notification' if action in (UserAction.WATERING_ON, UserAction.WATERING_OFF) else 'user_action_notification'
def text_getter(lang: str):
action_name = bot.lang.get(f'user_action_{action.value}', lang)
user_name = user_any_name(user)
return ' ' + bot.lang.get('user_action_notification', lang,
return ' ' + bot.lang.get(notification_key, lang,
user.id, user_name, action_name)
bot.notify_all(text_getter, exclude=(user.id,))
@ -100,6 +154,16 @@ def disable_handler(ctx: bot.Context) -> None:
off(ctx)
@bot.handler(message='start_watering')
def start_watering(ctx: bot.Context) -> None:
watering_on(ctx)
@bot.handler(message='stop_watering')
def stop_watering(ctx: bot.Context) -> None:
watering_off(ctx)
@bot.handler(message='disable_silently')
def disable_s_handler(ctx: bot.Context) -> None:
off(ctx, True)
@ -112,20 +176,71 @@ def status(ctx: bot.Context) -> None:
)
def _get_timestamp_as_string(timestamp: int) -> str:
if timestamp != 0:
return datetime.fromtimestamp(timestamp).strftime(time_format)
else:
return 'unknown'
@bot.handler(message='watering_status')
def watering_status(ctx: bot.Context) -> None:
buf = ''
if 0 < watering_mcu_status["last_time"] < time()-1800:
buf += '<b>WARNING! long time no reports from mcu! maybe something\'s wrong</b>\n'
buf += f'last report time: <b>{_get_timestamp_as_string(watering_mcu_status["last_time"])}</b>\n'
if watering_mcu_status["last_boot_time"] != 0:
buf += f'boot time: <b>{_get_timestamp_as_string(watering_mcu_status["last_boot_time"])}</b>\n'
buf += 'relay opened: <b>' + ('yes' if watering_mcu_status['relay_opened'] else 'no') + '</b>\n'
buf += f'ambient temp & humidity: <b>{watering_mcu_status["ambient_temp"]} °C, {watering_mcu_status["ambient_rh"]}%</b>'
ctx.reply(buf)
@bot.defaultreplymarkup
def markup(ctx: Optional[bot.Context]) -> Optional[ReplyKeyboardMarkup]:
buttons = [
[ctx.lang('enable'), ctx.lang('disable')],
]
buttons = []
if ctx.user_id in config['bot']['silent_users']:
buttons.append([ctx.lang('enable_silently'), ctx.lang('disable_silently')])
buttons.append([ctx.lang('status')])
buttons.append([ctx.lang('enable'), ctx.lang('disable'), ctx.lang('status')],)
buttons.append([ctx.lang('start_watering'), ctx.lang('stop_watering'), ctx.lang('watering_status')])
return ReplyKeyboardMarkup(buttons, one_time_keyboard=False)
def mqtt_payload_callback(payload: MqttPayload):
global watering_mcu_status
watering_mcu_status['last_time'] = int(time())
if isinstance(payload, InitialDiagnosticsPayload):
watering_mcu_status['last_boot_time'] = int(time())
elif isinstance(payload, MqttTemphumDataPayload):
watering_mcu_status['ambient_temp'] = payload.temp
watering_mcu_status['ambient_rh'] = payload.rh
elif isinstance(payload, MqttPowerStatusPayload):
watering_mcu_status['relay_opened'] = payload.opened
if __name__ == '__main__':
mqtt = MqttNode(node_id=config.get('mqtt_water_relay.node_id'))
if is_development_mode():
add_mqtt_module(mqtt, 'diagnostics')
mqtt_relay_module = add_mqtt_module(mqtt, 'temphum')
mqtt_relay_module = add_mqtt_module(mqtt, 'relay')
mqtt.add_payload_callback(mqtt_payload_callback)
mqtt.configure_tls()
mqtt.connect_and_loop(loop_forever=False)
bot.enable_logging(BotType.PUMP)
bot.run()
try:
mqtt.disconnect()
except:
pass

View File

@ -8,10 +8,9 @@ from telegram import ReplyKeyboardMarkup, User
from home.config import config
from home.telegram import bot
from home.telegram._botutil import user_any_name
from home.mqtt.esp import MqttEspDevice
from home.mqtt import MqttRelay, MqttRelayState
from home.mqtt.payload import MqttPayload
from home.mqtt.payload.relay import InitialDiagnosticsPayload, DiagnosticsPayload
from home.mqtt import MqttNode, MqttPayload
from home.mqtt.module.relay import MqttRelayState
from home.mqtt.module.diagnostics import InitialDiagnosticsPayload, DiagnosticsPayload
config.load('pump_mqtt_bot')
@ -70,7 +69,7 @@ bot.lang.en(
)
mqtt_relay: Optional[MqttRelay] = None
mqtt: Optional[MqttNode] = None
relay_state = MqttRelayState()
@ -99,14 +98,14 @@ def notify(user: User, action: UserAction) -> None:
@bot.handler(message='enable')
def enable_handler(ctx: bot.Context) -> None:
mqtt_relay.set_power(config['mqtt']['home_id'], True)
mqtt.set_power(config['mqtt']['home_id'], True)
ctx.reply(ctx.lang('done'))
notify(ctx.user, UserAction.ON)
@bot.handler(message='disable')
def disable_handler(ctx: bot.Context) -> None:
mqtt_relay.set_power(config['mqtt']['home_id'], False)
mqtt.set_power(config['mqtt']['home_id'], False)
ctx.reply(ctx.lang('done'))
notify(ctx.user, UserAction.OFF)
@ -157,13 +156,13 @@ def markup(ctx: Optional[bot.Context]) -> Optional[ReplyKeyboardMarkup]:
if __name__ == '__main__':
mqtt_relay = MqttRelay(devices=MqttEspDevice(id=config['mqtt']['home_id'],
secret=config['mqtt']['home_secret']))
mqtt_relay.set_message_callback(on_mqtt_message)
mqtt_relay.configure_tls()
mqtt_relay.connect_and_loop(loop_forever=False)
mqtt = MqttRelay(devices=MqttEspDevice(id=config['mqtt']['home_id'],
secret=config['mqtt']['home_secret']))
mqtt.set_message_callback(on_mqtt_message)
mqtt.configure_tls()
mqtt.connect_and_loop(loop_forever=False)
# bot.enable_logging(BotType.PUMP_MQTT)
bot.run(start_handler=start)
mqtt_relay.disconnect()
mqtt.disconnect()

View File

@ -9,6 +9,8 @@ User=user
Group=user
EnvironmentFile=/etc/ipcam_rtsp2hls.conf.d/%i.conf
ExecStart=/home/user/homekit/tools/ipcam_rtsp2hls.sh --name %i --user $USER --password $PASSWORD --ip $IP --port $PORT $ARGS
Restart=on-failure
RestartSec=3
[Install]
WantedBy=multi-user.target

18
test/mqtt_relay_server_util.py Executable file
View File

@ -0,0 +1,18 @@
#!/usr/bin/env python3
import sys
import os.path
sys.path.extend([
os.path.realpath(
os.path.join(os.path.dirname(os.path.join(__file__)), '..')
)
])
from src.home.config import config
from src.home.mqtt.relay import MQTTRelayClient
if __name__ == '__main__':
config.load('test_mqtt_relay_server')
relay = MQTTRelayClient('test')
relay.configure_tls()
relay.connect_and_loop()

39
test/mqtt_relay_util.py Executable file
View File

@ -0,0 +1,39 @@
#!/usr/bin/env python3
import sys
import os.path
sys.path.extend([
os.path.realpath(
os.path.join(os.path.dirname(os.path.join(__file__)), '..')
)
])
from argparse import ArgumentParser
from src.home.config import config
from src.home.mqtt.relay import MQTTRelayController
if __name__ == '__main__':
parser = ArgumentParser()
parser.add_argument('--on', action='store_true')
parser.add_argument('--off', action='store_true')
parser.add_argument('--stat', action='store_true')
config.load('test_mqtt_relay', parser=parser)
arg = parser.parse_args()
relay = MQTTRelayController('test')
relay.configure_tls()
relay.connect_and_loop(loop_forever=False)
if arg.on:
relay.set_power(True)
elif arg.off:
relay.set_power(False)
elif arg.stat:
relay.send_stat(dict(
state=False,
signal=-59,
fw_v=1.0
))