This commit is contained in:
Evgeny Zinoviev 2024-03-03 11:17:04 +03:00
parent 7092c79b56
commit 63ff5dd151

87
bin/mqtt_sensors_listener.py Executable file
View File

@ -0,0 +1,87 @@
#!/usr/bin/env python3
import include_homekit
import sys
import asyncio
import logging
from enum import Enum
from typing import Optional, Union
from telegram import ReplyKeyboardMarkup, User
from time import time
from datetime import datetime
from aiohttp import web
from homekit.config import config, is_development_mode, AppConfigUnit
from homekit.telegram import bot
from homekit.telegram.config import TelegramBotConfig, TelegramUserListType
from homekit.telegram._botutil import user_any_name
from homekit.relay.sunxi_h3_client import RelayClient
from homekit import http
from homekit.mqtt import MqttNode, MqttWrapper, MqttPayload, MqttNodesConfig, MqttModule
from homekit.mqtt.module.relay import MqttPowerStatusPayload, MqttRelayModule
from homekit.mqtt.module.temphum import MqttTemphumDataPayload
from homekit.mqtt.module.diagnostics import InitialDiagnosticsPayload, DiagnosticsPayload
class MqttSensorsListenerConfig(AppConfigUnit):
NAME = 'mqtt_sensors_listeners'
@classmethod
def schema(cls) -> Optional[dict]:
return {
'http_listen': cls._addr_schema(required=True),
'nodes': {
'type': 'list',
'required': True,
'empty': False,
'schema': {
'type': 'string',
'allowed': MqttNodesConfig.get_nodes(only_names=True)
}
}
}
# config.load_app(MqttSensorsListenerConfig)
routes = web.RouteTableDef()
logger = logging.getLogger(__name__)
mqtt: MqttWrapper
mqtt_node: MqttNode
mqtt_relay_module: Union[MqttRelayModule, MqttModule]
time_format = '%d.%m.%Y, %H:%M:%S'
class SensorStatus:
last_time: int = 0
last_boot_time: int = 0
class TemphumStatus(SensorStatus):
temp: float = 0.0
rh: float = 0.0
class RelayStatus(SensorStatus):
opened = False
class WateringMcuStatus(RelayStatus):
ambient_temp: float = 0.0
ambient_rh: float = 0.0
@routes.get('/sensors')
async def http_sensors_get(req: web.Request):
return await http.ajax_ok({
'hello': 'world'
})
if __name__ == '__main__':
mqtt = MqttWrapper(client_id='mqtt_sensors_listener',
randomize_client_id=is_development_mode())
http.serve(addr=config.app_config['http_addr'],
routes=routes)