implement AC charging program

This commit is contained in:
Evgeny Zinoviev 2021-11-02 21:29:25 +03:00
parent 3346b29044
commit eb97084457
5 changed files with 333 additions and 59 deletions

View File

@ -20,8 +20,12 @@ The bot accepts following parameters:
* ``--token`` — your telegram bot token (required)
* ``--users-whitelist`` — space-separated list of IDs of users who are allowed
to use the bot (required)
* ``--notify-to`` — space-separated list of IDs of users who need to be notified of
monitoring programs events
* ``--ac-current-range`` (default is `(10, 30)`)
* ``--inverterd-host`` (default is `127.0.0.1`)
* ``--inverterd-port`` (default is `8305`)
* ``--verbose``
## Launching with systemd
@ -31,6 +35,7 @@ Create environment configuration file `/etc/default/inverter-bot`:
```
TOKEN="YOUR_TOKEN"
USERS="ID ID ID ..."
NOTIFY_USERS="ID ID ID..."
PARAMS="" # here you can pass other options such as --inverterd-host
```
@ -46,7 +51,7 @@ EnvironmentFile=/etc/default/inverter-bot
User=user
Group=user
Restart=on-failure
ExecStart=python3 /home/user/inverter-bot/inverter-bot --token $TOKEN --users-whitelist $USERS $PARAMS
ExecStart=python3 /home/user/inverter-bot/inverter-bot --token $TOKEN --users-whitelist $USERS --notify-to $NOTIFY_USERS $PARAMS
WorkingDirectory=/home/user/inverter-bot
[Install]

View File

@ -1,13 +1,15 @@
#!/usr/bin/env python3
import logging, re, datetime, json
import logging
import re
import datetime
import json
from inverterd import Format, Client as InverterClient, InverterError
from typing import Optional, Tuple
from argparse import ArgumentParser
from html import escape
# from pprint import pprint
# from time import sleep
from strings import lang as _
from inverter_wrapper import InverterClientWrapper, wrapper_instance as inverter
from monitor import InverterMonitor, ChargingEvent
from inverterd import Format, InverterError
from telegram import (
Update,
ParseMode,
@ -27,6 +29,9 @@ from telegram.ext import (
from telegram.error import TimedOut
monitor: Optional[InverterMonitor] = None
updater: Optional[Updater] = None
notify_to: list[int] = []
LT = escape('<=')
flags_map = {
'buzzer': 'BUZZ',
@ -38,45 +43,38 @@ flags_map = {
'alarm_on_on_primary_source_interrupt': 'ALRM',
'fault_code_record': 'FTCR',
}
_strings = {
'status': 'Status',
'generation': 'Generation',
# flags
'flag_buzzer': 'Buzzer',
'flag_overload_bypass': 'Overload bypass',
'flag_escape_to_default_screen_after_1min_timeout': 'Reset to default LCD page after 1min timeout',
'flag_overload_restart': 'Restart on overload',
'flag_over_temp_restart': 'Restart on overtemp',
'flag_backlight_on': 'LCD backlight',
'flag_alarm_on_on_primary_source_interrupt': 'Beep on primary source interruption',
'flag_fault_code_record': 'Fault code recording',
class InverterClientWrapper:
def __init__(self, host: str, port: str):
self._host = host
self._port = port
self._inverter = None
self.create()
def create(self):
self._inverter = InverterClient(host=self._host, port=self._port)
self._inverter.connect()
def exec(self, command: str, arguments: tuple = (), format=Format.JSON):
try:
self._inverter.format(format)
response = self._inverter.exec(command, arguments)
if format == Format.JSON:
response = json.loads(response)
return response
except InverterError as e:
raise e
except Exception as e:
# silently try to reconnect
try:
self.create()
except Exception:
pass
raise e
inverter: Optional[InverterClientWrapper] = None
# monitor
'chrg_evt_started': 'Started charging from AC.',
'chrg_evt_finished': 'Finished charging from AC.',
'chrg_evt_disconnected': 'AC line disconnected.',
'chrg_evt_current_changed': 'AC charging current set to <b>%dA</b>.',
'chrg_evt_na_solar': 'AC line detected, but battery charging is unavailable due to active solar power line.'
}
logger = logging.getLogger(__name__)
#
# helpers
#
def _(key, *args):
global _strings
return (_strings[key] if key in _strings else f'{{{key}}}') % args
def get_usage(command: str, arguments: dict) -> str:
blocks = []
@ -277,6 +275,7 @@ def on_set_battery_under_voltage(update: Update, context: CallbackContext) -> No
if 40.0 <= v <= 48.0:
response = inverter.exec('set-battery-cut-off-voltage', (v,))
reply(update, 'OK' if response['result'] == 'ok' else 'ERROR')
monitor.set_battery_under_voltage(v)
else:
raise ValueError('invalid voltage')
@ -329,6 +328,10 @@ def on_errors(update: Update, context: CallbackContext) -> None:
handle_exc(update, e)
def on_test(update: Update, context: CallbackContext) -> None:
monitor_charging_event_handler(ChargingEvent.AC_CURRENT_CHANGED, current=20)
def on_button(update: Update, context: CallbackContext) -> None:
query = update.callback_query
@ -364,6 +367,39 @@ def on_button(update: Update, context: CallbackContext) -> None:
query.answer('unexpected callback data')
def monitor_charging_event_handler(event: ChargingEvent, **kwargs):
key = None
args = []
if event == ChargingEvent.AC_CHARGING_STARTED:
key = 'started'
elif event == ChargingEvent.AC_CHARGING_FINISHED:
key = 'finished'
elif event == ChargingEvent.AC_DISCONNECTED:
key = 'disconnected'
elif event == ChargingEvent.AC_CURRENT_CHANGED:
key = 'current_changed'
args.append(kwargs['current'])
elif event == ChargingEvent.AC_CHARGING_UNAVAILABLE_BECAUSE_SOLAR:
key = 'na_solar'
if key is None:
logger.error('unknown charging event:', event)
return
text = _(f'chrg_evt_{key}', *args)
for chat_id in notify_to:
updater.bot.send_message(chat_id=chat_id,
text=text,
parse_mode='HTML',
reply_markup=get_markup())
def monitor_battery_event_handler(event):
pass
if __name__ == '__main__':
# command-line arguments
parser = ArgumentParser()
@ -371,18 +407,29 @@ if __name__ == '__main__':
help='Telegram bot token')
parser.add_argument('--users-whitelist', nargs='+',
help='ID of users allowed to use the bot')
parser.add_argument('--notify-to', nargs='+')
parser.add_argument('--ac-current-range', nargs='+', default=(10, 30))
parser.add_argument('--inverterd-host', default='127.0.0.1', type=str)
parser.add_argument('--inverterd-port', default=8305, type=int)
parser.add_argument('--verbose', action='store_true')
args = parser.parse_args()
whitelist = list(map(lambda x: int(x), args.users_whitelist))
notify_to = list(map(lambda x: int(x), args.notify_to)) if args.notify_to is not None else []
# connect to inverterd
inverter = InverterClientWrapper(host=args.inverterd_host, port=args.inverterd_port)
inverter.init(host=args.inverterd_host, port=args.inverterd_port)
# start monitoring
monitor = InverterMonitor(args.ac_current_range)
monitor.set_charging_event_handler(monitor_charging_event_handler)
monitor.set_battery_event_handler(monitor_battery_event_handler)
monitor.start()
# configure logging
logging_level = logging.DEBUG if args.verbose else logging.INFO
logging.basicConfig(format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
level=logging.INFO)
level=logging_level)
# configure bot
updater = Updater(args.token, request_kwargs={'read_timeout': 6, 'connect_timeout': 7})
@ -403,7 +450,7 @@ if __name__ == '__main__':
dispatcher.add_handler(CommandHandler('status', on_status))
dispatcher.add_handler(CommandHandler('config', on_config))
dispatcher.add_handler(CommandHandler('errors', on_errors))
dispatcher.add_handler(CommandHandler('test', on_test))
dispatcher.add_handler(MessageHandler(Filters.all & user_filter, msg_all))
@ -412,3 +459,5 @@ if __name__ == '__main__':
# run the bot until the user presses Ctrl-C or the process receives SIGINT, SIGTERM or SIGABRT
updater.idle()
monitor.stop()

47
src/inverter_wrapper.py Normal file
View File

@ -0,0 +1,47 @@
import json
from threading import Lock
from inverterd import (
Format,
Client as InverterClient,
InverterError
)
_lock = Lock()
class InverterClientWrapper:
def __init__(self):
self._inverter = None
self._host = None
self._port = None
def init(self, host: str, port: int):
self._host = host
self._port = port
self.create()
def create(self):
self._inverter = InverterClient(host=self._host, port=self._port)
self._inverter.connect()
def exec(self, command: str, arguments: tuple = (), format=Format.JSON):
with _lock:
try:
self._inverter.format(format)
response = self._inverter.exec(command, arguments)
if format == Format.JSON:
response = json.loads(response)
return response
except InverterError as e:
raise e
except Exception as e:
# silently try to reconnect
try:
self.create()
except Exception:
pass
raise e
wrapper_instance = InverterClientWrapper()

192
src/monitor.py Normal file
View File

@ -0,0 +1,192 @@
import logging
from enum import Enum, auto
from time import sleep
from threading import Thread
from typing import Union, List, Tuple, Callable, Optional
from inverter_wrapper import wrapper_instance as inverter
from inverterd import InverterError
_logger = logging.getLogger(__name__)
class ChargingEvent(Enum):
AC_CHARGING_UNAVAILABLE_BECAUSE_SOLAR = auto()
AC_CHARGING_STARTED = auto()
AC_DISCONNECTED = auto()
AC_CURRENT_CHANGED = auto()
AC_CHARGING_FINISHED = auto()
class ChargingState(Enum):
NOT_CHARGING = auto()
AC_BUT_SOLAR = auto()
AC_OK = auto()
AC_DONE = auto()
class BatteryState(Enum):
NORMAL = auto()
WARNING = auto()
CRITICAL = auto()
class InverterMonitor(Thread):
def __init__(self, ac_current_range: Union[List, Tuple] = ()):
super().__init__()
self.max_ac_current = None
self.min_ac_current = None
self.allowed_currents = []
self.battery_under_voltage = None
self.charging_event_handler = None
self.battery_voltage_handler = None
self.currents = []
self.active_current = None
self.interrupted = False
self.battery_state = BatteryState.NORMAL
self.charging_state = ChargingState.NOT_CHARGING
self.set_ac_current_range(ac_current_range)
def set_ac_current_range(self, ac_current_range: Union[List, Tuple] = ()) -> None:
self.max_ac_current = ac_current_range[0]
self.min_ac_current = ac_current_range[1]
_logger.info(f'setting AC current range to {ac_current_range[0]}..{ac_current_range[1]}')
def set_battery_under_voltage(self, v: float):
self.battery_under_voltage = v
_logger.info(f'setting battery under voltage: {v}')
def run(self):
self.allowed_currents = list(inverter.exec('get-allowed-ac-charging-currents')['data'])
self.allowed_currents.sort()
if self.max_ac_current not in self.allowed_currents or self.min_ac_current not in self.allowed_currents:
raise RuntimeError('invalid AC currents range')
cfg = inverter.exec('get-rated')['data']
self.set_battery_under_voltage(cfg['battery_under_voltage']['value'])
while not self.interrupted:
try:
response = inverter.exec('get-status')
if response['result'] != 'ok':
_logger.error('get-status failed:', response)
else:
gs = response['data']
ac = gs['grid_voltage']['value'] > 0 or gs['grid_freq']['value'] > 0
solar = gs['pv1_input_power']['value'] > 0
v = float(gs['battery_voltage']['value'])
_logger.debug(f'got status: ac={ac}, solar={solar}, v={v}')
self.ac_charging_program(ac, solar, v)
if not ac:
self.low_voltage_program(v)
except InverterError as e:
_logger.exception(e)
sleep(2)
def ac_charging_program(self, ac: bool, solar: bool, v: float):
if self.charging_state == ChargingState.NOT_CHARGING:
if ac and solar:
self.charging_state = ChargingState.AC_BUT_SOLAR
self.charging_event_handler(ChargingEvent.AC_CHARGING_UNAVAILABLE_BECAUSE_SOLAR)
_logger.info('entering charging AC_BUT_SOLAR state')
elif ac:
self.ac_charging_start()
elif self.charging_state == ChargingState.AC_BUT_SOLAR:
if not ac:
self.charging_state = ChargingState.NOT_CHARGING
self.charging_event_handler(ChargingEvent.AC_DISCONNECTED)
_logger.info('AC disconnected, entering NOT_CHARGING state')
elif not solar:
self.ac_charging_start()
elif self.charging_state == ChargingState.AC_OK:
if not ac:
self.charging_state = ChargingState.NOT_CHARGING
self.charging_event_handler(ChargingEvent.AC_DISCONNECTED)
_logger.info('AC disconnected, entering NOT_CHARGING state')
return
if solar:
self.charging_state = ChargingState.AC_BUT_SOLAR
self.charging_event_handler(ChargingEvent.AC_CHARGING_UNAVAILABLE_BECAUSE_SOLAR)
_logger.info('solar power connected, entering AC_BUT_SOLAR state')
# if currently charging, monitor battery voltage dynamics here
if self.active_current is not None:
upper_bound = 56.6 if self.active_current > 10 else 54
if v >= upper_bound:
self.ac_charging_next_current()
# TODO
# handle battery charging direction changes to do-nothing or discharging,
# as well as drops to 0A current
elif self.charging_state == ChargingState.AC_DONE:
if not ac:
self.charging_state = ChargingState.NOT_CHARGING
self.charging_event_handler(ChargingEvent.AC_DISCONNECTED)
_logger.info('AC disconnected, charging is done, entering NOT_CHARGING state')
def ac_charging_start(self):
self.charging_state = ChargingState.AC_OK
self.charging_event_handler(ChargingEvent.AC_CHARGING_STARTED)
_logger.info('AC line connected, entering AC_OK state')
index_min = self.allowed_currents.index(self.min_ac_current)
index_max = self.allowed_currents.index(self.max_ac_current)
self.currents = self.allowed_currents[index_min:index_max + 1]
self.ac_charging_next_current()
def ac_charging_stop(self):
self.charging_state = ChargingState.AC_DONE
self.charging_event_handler(ChargingEvent.AC_CHARGING_FINISHED)
_logger.info('charging is finished, entering AC_DONE state')
def ac_charging_next_current(self):
try:
current = self.currents.pop()
_logger.debug(f'ready to change charging current to {current}A')
self.active_current = current
except IndexError:
_logger.debug('was going to change charging current, but no currents left; finishing charging program')
self.ac_charging_stop()
return
try:
response = inverter.exec('set-max-ac-charging-current', (0, current))
if response['result'] != 'ok':
_logger.error(f'failed to change AC charging current to {current}A')
raise InverterError('set-max-ac-charging-current: inverterd reported error')
else:
self.charging_event_handler(ChargingEvent.AC_CURRENT_CHANGED, current=current)
_logger.info(f'changed AC charging current to {current}A')
except InverterError as e:
_logger.exception(e)
def low_voltage_program(self, v: float):
pass
def set_charging_event_handler(self, handler: Callable):
self.charging_event_handler = handler
def set_battery_event_handler(self, handler: Callable):
self.battery_voltage_handler = handler
def stop(self):
self.interrupted = True

View File

@ -1,19 +0,0 @@
__strings = {
'status': 'Status',
'generation': 'Generation',
# flags
'flag_buzzer': 'Buzzer',
'flag_overload_bypass': 'Overload bypass',
'flag_escape_to_default_screen_after_1min_timeout': 'Reset to default LCD page after 1min timeout',
'flag_overload_restart': 'Restart on overload',
'flag_over_temp_restart': 'Restart on overtemp',
'flag_backlight_on': 'LCD backlight',
'flag_alarm_on_on_primary_source_interrupt': 'Beep on primary source interruption',
'flag_fault_code_record': 'Fault code recording',
}
def lang(key):
global __strings
return __strings[key] if key in __strings else f'{{{key}}}'