homekit/src/polaris_kettle_util.py
Evgeny Zinoviev 8f20c9b825 polaris pwk 1725cgld full support
- significant improvements, correctnesses and stability fixes in
  protocol implementation
- correct handling of device appearances and disappearances
- flawlessly functioning telegram bot that re-renders kettle's state
  (temperature and other) in real time
2022-06-30 03:47:49 +03:00

114 lines
3.2 KiB
Python
Executable File

#!/usr/bin/env python3
# SPDX-License-Identifier: BSD-3-Clause
import logging
import sys
import paho.mqtt.client as mqtt
from typing import Optional
from argparse import ArgumentParser
from queue import SimpleQueue
from home.mqtt import MQTTBase
from home.config import config
from polaris import (
Kettle,
PowerType,
protocol as kettle_proto
)
k: Optional[Kettle] = None
logger = logging.getLogger(__name__)
control_tasks = SimpleQueue()
class MQTTServer(MQTTBase):
def __init__(self):
super().__init__(clean_session=False)
def on_connect(self, client: mqtt.Client, userdata, flags, rc):
super().on_connect(client, userdata, flags, rc)
logger.info("subscribing to #")
client.subscribe('#', qos=1)
def on_message(self, client: mqtt.Client, userdata, msg):
try:
print(msg.topic, msg.payload)
except Exception as e:
logger.exception(str(e))
def kettle_connection_established(response: kettle_proto.MessageResponse):
try:
assert isinstance(response, kettle_proto.AckMessage), f'ACK expected, but received: {response}'
except AssertionError:
k.stop_all()
return
def next_task(response: kettle_proto.MessageResponse):
try:
assert response is not False, 'server error'
except AssertionError:
k.stop_all()
return
if not control_tasks.empty():
task = control_tasks.get()
f, args = task(k)
args.append(next_task)
f(*args)
else:
k.stop_all()
next_task(response)
def main():
tempmin = 30
tempmax = 100
tempstep = 5
parser = ArgumentParser()
parser.add_argument('-m', dest='mode', required=True, type=str, choices=('mqtt', 'control'))
parser.add_argument('--on', action='store_true')
parser.add_argument('--off', action='store_true')
parser.add_argument('-t', '--temperature', dest='temp', type=int, default=tempmax,
choices=range(tempmin, tempmax+tempstep, tempstep))
arg = config.load('polaris_kettle_util', use_cli=True, parser=parser)
if arg.mode == 'mqtt':
server = MQTTServer()
try:
server.connect_and_loop(loop_forever=True)
except KeyboardInterrupt:
pass
elif arg.mode == 'control':
if arg.on and arg.off:
raise RuntimeError('--on and --off are mutually exclusive')
if arg.off:
control_tasks.put(lambda k: (k.set_power, [PowerType.OFF]))
else:
if arg.temp == tempmax:
control_tasks.put(lambda k: (k.set_power, [PowerType.ON]))
else:
control_tasks.put(lambda k: (k.set_target_temperature, [arg.temp]))
control_tasks.put(lambda k: (k.set_power, [PowerType.CUSTOM]))
k = Kettle(mac=config['kettle']['mac'], device_token=config['kettle']['token'])
info = k.discover()
if not info:
print('no device found.')
return 1
print('found service:', info)
k.start_server_if_needed(kettle_connection_established)
return 0
if __name__ == '__main__':
sys.exit(main())