mqtt changes

This commit is contained in:
Evgeny Zinoviev 2024-02-24 02:00:40 +03:00
parent 2a5c34b28d
commit c9b351a08e
5 changed files with 77 additions and 59 deletions

View File

@ -54,6 +54,8 @@ if __name__ == '__main__':
help='send relay state') help='send relay state')
parser.add_argument('--push-ota', type=str, metavar='OTA_FILENAME', parser.add_argument('--push-ota', type=str, metavar='OTA_FILENAME',
help='push OTA, receives path to firmware.bin (not .elf!)') help='push OTA, receives path to firmware.bin (not .elf!)')
parser.add_argument('--custom-ota-topic', type=str,
help='only needed for update very old devices')
parser.add_argument('--no-wait', action='store_true', parser.add_argument('--no-wait', action='store_true',
help='execute command and exit') help='execute command and exit')
@ -86,7 +88,10 @@ if __name__ == '__main__':
mqtt.add_node(mqtt_node) mqtt.add_node(mqtt_node)
# must-have modules # must-have modules
ota_module = mqtt_node.load_module('ota') ota_kwargs = {}
if arg.custom_ota_topic:
ota_kwargs['custom_ota_topic'] = arg.custom_ota_topic
ota_module = mqtt_node.load_module('ota', **ota_kwargs)
ota_val = arg.push_ota ota_val = arg.push_ota
mqtt_node.load_module('diagnostics') mqtt_node.load_module('diagnostics')

View File

@ -1,79 +1,91 @@
#!/usr/bin/env python3 #!/usr/bin/env python3
import asyncio
import json
import logging
import __py_include import __py_include
import asyncio
import logging
from datetime import datetime
from apscheduler.schedulers.asyncio import AsyncIOScheduler
from typing import Optional from typing import Optional
from argparse import ArgumentParser
from homekit.config import config from homekit.config import config
from homekit.mqtt import MqttNodesConfig, MqttNode, MqttWrapper
from homekit.mqtt.module.temphum import MqttTempHumModule, MqttTemphumDataPayload
from homekit.temphum import SensorType, BaseSensor from homekit.temphum import SensorType, BaseSensor
from homekit.temphum.i2c import create_sensor from homekit.temphum.i2c import create_sensor
logger = logging.getLogger(__name__) _logger = logging.getLogger(__name__)
sensor: Optional[BaseSensor] = None _sensor: Optional[BaseSensor] = None
lock = asyncio.Lock() _lock = asyncio.Lock()
delay = 0.01 _mqtt: MqttWrapper
_mqtt_ndoe: MqttNode
_mqtt_temphum: MqttTempHumModule
_stopped = True
_scheduler = AsyncIOScheduler()
_sched_task_added = False
async def get_measurements(): async def get_measurements():
async with lock: async with _lock:
await asyncio.sleep(delay) temp = _sensor.temperature()
rh = _sensor.humidity()
temp = sensor.temperature()
rh = sensor.humidity()
return rh, temp return rh, temp
async def handle_client(reader, writer): def on_mqtt_connect():
request = None global _stopped, _sched_task_added
while request != 'quit': _stopped = False
try:
request = await reader.read(255)
if request == b'\x04':
break
request = request.decode('utf-8').strip()
except Exception:
break
if request == 'read': if not _sched_task_added:
try: _scheduler.add_job(on_sched_task, 'interval', seconds=60, next_run_time=datetime.now())
rh, temp = await asyncio.wait_for(get_measurements(), timeout=3) _scheduler.start()
data = dict(humidity=rh, temp=temp) _sched_task_added = True
except asyncio.TimeoutError as e:
logger.exception(e)
data = dict(error='i2c call timed out')
else:
data = dict(error='invalid request')
writer.write((json.dumps(data) + '\r\n').encode('utf-8'))
try:
await writer.drain()
except ConnectionResetError:
pass
writer.close()
async def run_server(host, port): def on_mqtt_disconnect():
server = await asyncio.start_server(handle_client, host, port) global _stopped
async with server: _stopped = True
logger.info('Server started.')
await server.serve_forever()
async def on_sched_task():
if _stopped:
return
rh, temp = await get_measurements()
payload = MqttTemphumDataPayload(temp=temp, rh=rh)
_mqtt_node.publish('data', payload.pack())
if __name__ == '__main__': if __name__ == '__main__':
config.load_app() parser = ArgumentParser()
parser.add_argument('--node-id',
type=str,
required=True,
choices=MqttNodesConfig().get_nodes(only_names=True),
help='node id must be defined in the config')
args = config.load_app(parser=parser)
if 'measure_delay' in config['sensor']: node_cfg = MqttNodesConfig()[args.node_id]
delay = float(config['sensor']['measure_delay']) _sensor = create_sensor(SensorType(node_cfg['temphum']['module']),
int(node_cfg['temphum']['i2c_bus']))
sensor = create_sensor(SensorType(config['sensor']['type']), _mqtt = MqttWrapper(client_id=args.node_id)
int(config['sensor']['bus'])) _mqtt.add_connect_callback(on_mqtt_connect)
_mqtt.add_disconnect_callback(on_mqtt_disconnect)
_mqtt_node = MqttNode(node_id=args.node_id,
node_secret=MqttNodesConfig.get_node(args.node_id)['password'])
_mqtt.add_node(_mqtt_node)
_mqtt_temphum = _mqtt_node.load_module('temphum')
try: try:
host, port = config.get_addr('server.listen') _mqtt.connect_and_loop(loop_forever=True)
asyncio.run(run_server(host, port))
except KeyboardInterrupt: except (KeyboardInterrupt, SystemExit):
logging.info('Exiting...') if _scheduler:
_scheduler.shutdown()
_logger.info('Exiting...')
finally:
_mqtt.disconnect()

View File

@ -44,7 +44,6 @@ class MqttWrapper(Mqtt):
except Exception as e: except Exception as e:
self._logger.exception(e) self._logger.exception(e)
def on_message(self, client: mqtt.Client, userdata, msg): def on_message(self, client: mqtt.Client, userdata, msg):
try: try:
topic = msg.topic topic = msg.topic

View File

@ -42,10 +42,12 @@ class OtaPayload(MqttPayload):
class MqttOtaModule(MqttModule): class MqttOtaModule(MqttModule):
_ota_request: Optional[tuple[str, int]] _ota_request: Optional[tuple[str, int]]
_custom_ota_topic: Optional[str]
def __init__(self, *args, **kwargs): def __init__(self, custom_ota_topic=None, *args, **kwargs):
super().__init__(*args, **kwargs) super().__init__(*args, **kwargs)
self._ota_request = None self._ota_request = None
self._custom_ota_topic = custom_ota_topic
def on_connect(self, mqtt: MqttNode): def on_connect(self, mqtt: MqttNode):
super().on_connect(mqtt) super().on_connect(mqtt)
@ -64,7 +66,7 @@ class MqttOtaModule(MqttModule):
def do_push_ota(self, secret: str, filename: str, qos: int): def do_push_ota(self, secret: str, filename: str, qos: int):
payload = OtaPayload(secret=secret, filename=filename) payload = OtaPayload(secret=secret, filename=filename)
self._mqtt_node_ref.publish('ota', self._mqtt_node_ref.publish('ota' if not self._custom_ota_topic else self._custom_ota_topic,
payload=payload.pack(), payload=payload.pack(),
qos=qos) qos=qos)