Merge branch 'mqtt-refactoring' of ch1p.io:homekit into mqtt-refactoring

This commit is contained in:
Evgeny Zinoviev 2023-06-07 22:41:22 +03:00
commit 5aad97192d
7 changed files with 181 additions and 156 deletions

View File

@ -1,13 +1,31 @@
import struct import time
import json
import datetime
try:
import inverterd
except:
pass
from typing import Optional
from .._module import MqttModule
from .._node import MqttNode from .._node import MqttNode
from .._payload import MqttPayload, bit_field from .._payload import MqttPayload, bit_field
try:
from home.database import InverterDatabase
except:
pass
_mult_10 = lambda n: int(n*10) _mult_10 = lambda n: int(n*10)
_div_10 = lambda n: n/10 _div_10 = lambda n: n/10
class Status(MqttPayload): MODULE_NAME = 'MqttInverterModule'
STATUS_TOPIC = 'status'
GENERATION_TOPIC = 'generation'
class MqttInverterStatusPayload(MqttPayload):
# 46 bytes # 46 bytes
FORMAT = 'IHHHHHHBHHHHHBHHHHHHHH' FORMAT = 'IHHHHHHBHHHHHBHHHHHHHH'
@ -65,7 +83,7 @@ class Status(MqttPayload):
load_connected: bit_field(0, 16, 1) load_connected: bit_field(0, 16, 1)
class Generation(MqttPayload): class MqttInverterGenerationPayload(MqttPayload):
# 8 bytes # 8 bytes
FORMAT = 'II' FORMAT = 'II'
@ -73,5 +91,105 @@ class Generation(MqttPayload):
wh: int wh: int
class MqttInverterModule(MqttNode): class MqttInverterModule(MqttModule):
pass _status_poll_freq: int
_generation_poll_freq: int
_inverter: Optional[inverterd.Client]
_database: Optional[InverterDatabase]
_gen_prev: float
def __init__(self, status_poll_freq=0, generation_poll_freq=0):
super().__init__(tick_interval=status_poll_freq)
self._status_poll_freq = status_poll_freq
self._generation_poll_freq = generation_poll_freq
# this defines whether this is a publisher or a subscriber
if status_poll_freq > 0:
self._inverter = inverterd.Client()
self._inverter.connect()
self._inverter.format(inverterd.Format.SIMPLE_JSON)
self._database = None
else:
self._inverter = None
self._database = InverterDatabase()
self._gen_prev = 0
def on_connect(self, mqtt: MqttNode):
super().on_connect(mqtt)
if not self._inverter:
mqtt.subscribe_module(STATUS_TOPIC, self)
mqtt.subscribe_module(GENERATION_TOPIC, self)
def tick(self):
if not self._inverter:
return
# read status
now = time.time()
try:
raw = self._inverter.exec('get-status')
except inverterd.InverterError as e:
self._logger.error(f'inverter error: {str(e)}')
# TODO send to server
return
data = json.loads(raw)['data']
status = MqttInverterStatusPayload(time=round(now), **data)
self._mqtt_node_ref.publish(STATUS_TOPIC, status.pack())
# read today's generation stat
now = time.time()
if self._gen_prev == 0 or now - self._gen_prev >= self._generation_poll_freq:
self._gen_prev = now
today = datetime.date.today()
try:
raw = self._inverter.exec('get-day-generated', (today.year, today.month, today.day))
except inverterd.InverterError as e:
self._logger.error(f'inverter error: {str(e)}')
# TODO send to server
return
data = json.loads(raw)['data']
gen = MqttInverterGenerationPayload(time=round(now), wh=data['wh'])
self._mqtt_node_ref.publish(GENERATION_TOPIC, gen.pack())
def handle_payload(self, mqtt: MqttNode, topic: str, payload: bytes) -> Optional[MqttPayload]:
home_id = 1 # legacy compat
if topic == STATUS_TOPIC:
s = MqttInverterStatusPayload.unpack(payload)
self._database.add_status(home_id=home_id,
client_time=s.time,
grid_voltage=int(s.grid_voltage*10),
grid_freq=int(s.grid_freq * 10),
ac_output_voltage=int(s.ac_output_voltage * 10),
ac_output_freq=int(s.ac_output_freq * 10),
ac_output_apparent_power=s.ac_output_apparent_power,
ac_output_active_power=s.ac_output_active_power,
output_load_percent=s.output_load_percent,
battery_voltage=int(s.battery_voltage * 10),
battery_voltage_scc=int(s.battery_voltage_scc * 10),
battery_voltage_scc2=int(s.battery_voltage_scc2 * 10),
battery_discharge_current=s.battery_discharge_current,
battery_charge_current=s.battery_charge_current,
battery_capacity=s.battery_capacity,
inverter_heat_sink_temp=s.inverter_heat_sink_temp,
mppt1_charger_temp=s.mppt1_charger_temp,
mppt2_charger_temp=s.mppt2_charger_temp,
pv1_input_power=s.pv1_input_power,
pv2_input_power=s.pv2_input_power,
pv1_input_voltage=int(s.pv1_input_voltage * 10),
pv2_input_voltage=int(s.pv2_input_voltage * 10),
mppt1_charger_status=s.mppt1_charger_status,
mppt2_charger_status=s.mppt2_charger_status,
battery_power_direction=s.battery_power_direction,
dc_ac_power_direction=s.dc_ac_power_direction,
line_power_direction=s.line_power_direction,
load_connected=s.load_connected)
return s
elif topic == GENERATION_TOPIC:
gen = MqttInverterGenerationPayload.unpack(payload)
self._database.add_generation(home_id, gen.time, gen.wh)
return gen

View File

@ -9,6 +9,7 @@ from ...temphum import BaseSensor
two_digits_precision = lambda x: round(x, 2) two_digits_precision = lambda x: round(x, 2)
MODULE_NAME = 'MqttTempHumModule' MODULE_NAME = 'MqttTempHumModule'
DATA_TOPIC = 'temphum/data'
class MqttTemphumDataPayload(MqttPayload): class MqttTemphumDataPayload(MqttPayload):
@ -45,22 +46,38 @@ class MqttTemphumDataPayload(MqttPayload):
class MqttTempHumModule(MqttModule): class MqttTempHumModule(MqttModule):
def __init__(self, sensor: Optional[BaseSensor] = None, *args, **kwargs): def __init__(self,
sensor: Optional[BaseSensor] = None,
*args, **kwargs):
if sensor is not None:
kwargs['tick_interval'] = 10
super().__init__(*args, **kwargs) super().__init__(*args, **kwargs)
self._sensor = sensor self._sensor = sensor
def on_connect(self, mqtt: MqttNode): def on_connect(self, mqtt: MqttNode):
super().on_connect(mqtt) super().on_connect(mqtt)
mqtt.subscribe_module('temphum/data', self) mqtt.subscribe_module(DATA_TOPIC, self)
def tick(self): def tick(self):
pass if not self._sensor:
return
error = 0
temp = 0
rh = 0
try:
temp = self._sensor.temperature()
rh = self._sensor.humidity()
except:
error = 1
pld = MqttTemphumDataPayload(temp=temp, rh=rh, error=error)
self._mqtt_node_ref.publish(DATA_TOPIC, pld.pack())
def handle_payload(self, def handle_payload(self,
mqtt: MqttNode, mqtt: MqttNode,
topic: str, topic: str,
payload: bytes) -> Optional[MqttPayload]: payload: bytes) -> Optional[MqttPayload]:
if topic == 'temphum/data': if topic == DATA_TOPIC:
message = MqttTemphumDataPayload.unpack(payload) message = MqttTemphumDataPayload.unpack(payload)
self._logger.debug(message) self._logger.debug(message)
return message return message

View File

@ -1,74 +0,0 @@
#!/usr/bin/env python3
import paho.mqtt.client as mqtt
import re
from home.mqtt import Mqtt
from home.mqtt.payload.inverter import Status, Generation
from home.database import InverterDatabase
from home.config import config
class MqttReceiver(Mqtt):
def __init__(self):
super().__init__(clean_session=False)
self.database = InverterDatabase()
def on_connect(self, client: mqtt.Client, userdata, flags, rc):
super().on_connect(client, userdata, flags, rc)
self._logger.info("subscribing to hk/#")
client.subscribe('hk/#', qos=1)
def on_message(self, client: mqtt.Client, userdata, msg):
super().on_message(client, userdata, msg)
try:
match = re.match(r'(?:home|hk)/(\d+)/(status|gen)', msg.topic)
if not match:
return
# FIXME string home_id must be supported
home_id, what = int(match.group(1)), match.group(2)
if what == 'gen':
gen = Generation.unpack(msg.payload)
self.database.add_generation(home_id, gen.time, gen.wh)
elif what == 'status':
s = Status.unpack(msg.payload)
self.database.add_status(home_id,
client_time=s.time,
grid_voltage=int(s.grid_voltage*10),
grid_freq=int(s.grid_freq * 10),
ac_output_voltage=int(s.ac_output_voltage * 10),
ac_output_freq=int(s.ac_output_freq * 10),
ac_output_apparent_power=s.ac_output_apparent_power,
ac_output_active_power=s.ac_output_active_power,
output_load_percent=s.output_load_percent,
battery_voltage=int(s.battery_voltage * 10),
battery_voltage_scc=int(s.battery_voltage_scc * 10),
battery_voltage_scc2=int(s.battery_voltage_scc2 * 10),
battery_discharge_current=s.battery_discharge_current,
battery_charge_current=s.battery_charge_current,
battery_capacity=s.battery_capacity,
inverter_heat_sink_temp=s.inverter_heat_sink_temp,
mppt1_charger_temp=s.mppt1_charger_temp,
mppt2_charger_temp=s.mppt2_charger_temp,
pv1_input_power=s.pv1_input_power,
pv2_input_power=s.pv2_input_power,
pv1_input_voltage=int(s.pv1_input_voltage * 10),
pv2_input_voltage=int(s.pv2_input_voltage * 10),
mppt1_charger_status=s.mppt1_charger_status,
mppt2_charger_status=s.mppt2_charger_status,
battery_power_direction=s.battery_power_direction,
dc_ac_power_direction=s.dc_ac_power_direction,
line_power_direction=s.line_power_direction,
load_connected=s.load_connected)
except Exception as e:
self._logger.exception(str(e))
if __name__ == '__main__':
config.load('inverter_mqtt_receiver')
server = MqttReceiver()
server.connect_and_loop()

View File

@ -1,72 +0,0 @@
#!/usr/bin/env python3
import time
import datetime
import json
import inverterd
from home.config import config
from home.mqtt import Mqtt, poll_tick
from home.mqtt.payload.inverter import Status, Generation
class MqttClient(Mqtt):
def __init__(self):
super().__init__()
self._home_id = config['mqtt']['home_id']
self._inverter = inverterd.Client()
self._inverter.connect()
self._inverter.format(inverterd.Format.SIMPLE_JSON)
def poll_inverter(self):
freq = int(config['mqtt']['inverter']['poll_freq'])
gen_freq = int(config['mqtt']['inverter']['generation_poll_freq'])
g = poll_tick(freq)
gen_prev = 0
while True:
time.sleep(next(g))
# read status
now = time.time()
try:
raw = self._inverter.exec('get-status')
except inverterd.InverterError as e:
self._logger.error(f'inverter error: {str(e)}')
# TODO send to server
continue
data = json.loads(raw)['data']
status = Status(time=round(now), **data) # FIXME this will crash with 99% probability
self._client.publish(f'hk/{self._home_id}/status',
payload=status.pack(),
qos=1)
# read today's generation stat
now = time.time()
if gen_prev == 0 or now - gen_prev >= gen_freq:
gen_prev = now
today = datetime.date.today()
try:
raw = self._inverter.exec('get-day-generated', (today.year, today.month, today.day))
except inverterd.InverterError as e:
self._logger.error(f'inverter error: {str(e)}')
# TODO send to server
continue
data = json.loads(raw)['data']
gen = Generation(time=round(now), wh=data['wh'])
self._client.publish(f'hk/{self._home_id}/gen',
payload=gen.pack(),
qos=1)
if __name__ == '__main__':
config.load('inverter_mqtt_sender')
client = MqttClient()
client.configure_tls()
client.connect_and_loop(loop_forever=False)
client.poll_inverter()

23
src/inverter_mqtt_util.py Executable file
View File

@ -0,0 +1,23 @@
#!/usr/bin/env python3
from argparse import ArgumentParser
from home.config import config
from home.mqtt import MqttWrapper, MqttNode
if __name__ == '__main__':
parser = ArgumentParser()
parser.add_argument('mode', type=str, choices=('sender', 'receiver'), nargs=1)
config.load('inverter_mqtt_util', parser=parser)
arg = parser.parse_args()
mqtt = MqttWrapper()
node = MqttNode(node_id='inverter')
module_kwargs = {}
if arg.mode[0] == 'sender':
module_kwargs['status_poll_freq'] = int(config['mqtt']['inverter']['poll_freq'])
module_kwargs['generation_poll_freq'] = int(config['mqtt']['inverter']['generation_poll_freq'])
node.load_module('inverter', **module_kwargs)
mqtt.add_node(node)
mqtt.connect_and_loop()

View File

@ -0,0 +1,13 @@
[Unit]
Description=Inverter MQTT receiver
After=clickhouse-server.service
[Service]
User=user
Group=user
Restart=on-failure
ExecStart=/home/user/homekit/src/inverter_mqtt_util.py receiver
WorkingDirectory=/home/user
[Install]
WantedBy=multi-user.target

View File

@ -6,7 +6,7 @@ After=inverterd.service
User=user User=user
Group=user Group=user
Restart=on-failure Restart=on-failure
ExecStart=/home/user/homekit/src/inverter_mqtt_sender.py ExecStart=/home/user/homekit/src/inverter_mqtt_util.py sender
WorkingDirectory=/home/user WorkingDirectory=/home/user
[Install] [Install]