ac charging program: improve user interaction, also report some errors

This commit is contained in:
Evgeny Zinoviev 2021-11-03 19:07:35 +03:00
parent ac84cda5bf
commit 3887262236
2 changed files with 182 additions and 92 deletions

View File

@ -13,7 +13,6 @@ from inverterd import Format, InverterError
from telegram import (
Update,
ParseMode,
KeyboardButton,
InlineKeyboardButton,
InlineKeyboardMarkup,
ReplyKeyboardMarkup
@ -28,7 +27,6 @@ from telegram.ext import (
)
from telegram.error import TimedOut
monitor: Optional[InverterMonitor] = None
updater: Optional[Updater] = None
notify_to: list[int] = []
@ -60,10 +58,12 @@ _strings = {
# monitor
'chrg_evt_started': 'Started charging from AC.',
'chrg_evt_finished': 'Finished charging from AC.',
'chrg_evt_disconnected': 'AC line disconnected.',
'chrg_evt_disconnected': 'AC disconnected.',
'chrg_evt_current_changed': 'AC charging current set to <b>%dA</b>.',
'chrg_evt_na_solar': 'AC line detected, but battery charging is unavailable due to active solar power line.',
'battery_level_changed': 'Battery level: <b>%s</b> (<b>%0.1f V</b> under <b>%d W</b> load)'
'chrg_evt_not_charging': 'AC connected but not charging.',
'chrg_evt_na_solar': 'AC connected, but battery won\'t be charged due to active solar power line.',
'battery_level_changed': 'Battery level: <b>%s</b> (<b>%0.1f V</b> under <b>%d W</b> load)',
'error_message': '<b>Error:</b> %s.'
}
logger = logging.getLogger(__name__)
@ -266,6 +266,7 @@ def on_set_ac_charging_thresholds(update: Update, context: CallbackContext) -> N
if 44 <= cv <= 51 and 48 <= dv <= 58:
response = inverter.exec('set-charging-thresholds', (cv, dv))
reply(update, 'OK' if response['result'] == 'ok' else 'ERROR')
monitor.set_battery_ac_charging_thresholds(cv, dv)
else:
raise ValueError('invalid values')
@ -372,20 +373,27 @@ def on_button(update: Update, context: CallbackContext) -> None:
query.answer('unexpected callback data')
#
# InverterMonitor event handlers
#
def monitor_charging_event_handler(event: ChargingEvent, **kwargs) -> None:
key = None
args = []
if event == ChargingEvent.AC_CHARGING_STARTED:
match event:
case ChargingEvent.AC_CHARGING_STARTED:
key = 'started'
elif event == ChargingEvent.AC_CHARGING_FINISHED:
case ChargingEvent.AC_CHARGING_FINISHED:
key = 'finished'
elif event == ChargingEvent.AC_DISCONNECTED:
case ChargingEvent.AC_DISCONNECTED:
key = 'disconnected'
elif event == ChargingEvent.AC_CURRENT_CHANGED:
case ChargingEvent.AC_NOT_CHARGING:
key = 'not_charging'
case ChargingEvent.AC_CURRENT_CHANGED:
key = 'current_changed'
args.append(kwargs['current'])
elif event == ChargingEvent.AC_CHARGING_UNAVAILABLE_BECAUSE_SOLAR:
case ChargingEvent.AC_CHARGING_UNAVAILABLE_BECAUSE_SOLAR:
key = 'na_solar'
if key is None:
@ -396,19 +404,24 @@ def monitor_charging_event_handler(event: ChargingEvent, **kwargs) -> None:
def monitor_battery_event_handler(state: BatteryState, v: float, load_watts: int) -> None:
if state == BatteryState.NORMAL:
match state:
case BatteryState.NORMAL:
label = '✅ Normal'
elif state == BatteryState.LOW:
case BatteryState.LOW:
label = '⚠️ Low'
elif state == BatteryState.CRITICAL:
case BatteryState.CRITICAL:
label = '‼️ Critical'
else:
case _:
logger.error('unknown battery state:', state)
return
notify_all(_('battery_level_changed', label, v, load_watts))
def monitor_error_handler(error: str) -> None:
notify_all(_('error_message', error))
if __name__ == '__main__':
# command-line arguments
parser = ArgumentParser()
@ -433,6 +446,7 @@ if __name__ == '__main__':
monitor = InverterMonitor(args.ac_current_range)
monitor.set_charging_event_handler(monitor_charging_event_handler)
monitor.set_battery_event_handler(monitor_battery_event_handler)
monitor.set_error_handler(monitor_error_handler)
monitor.start()
# configure logging

View File

@ -7,12 +7,18 @@ from typing import Union, List, Tuple, Callable, Optional
from inverter_wrapper import wrapper_instance as inverter
from inverterd import InverterError
_logger = logging.getLogger(__name__)
class BatteryPowerDirection(Enum):
DISCHARGING = auto()
CHARGING = auto()
DO_NOTHING = auto()
class ChargingEvent(Enum):
AC_CHARGING_UNAVAILABLE_BECAUSE_SOLAR = auto()
AC_NOT_CHARGING = auto()
AC_CHARGING_STARTED = auto()
AC_DISCONNECTED = auto()
AC_CURRENT_CHANGED = auto()
@ -22,6 +28,7 @@ class ChargingEvent(Enum):
class ChargingState(Enum):
NOT_CHARGING = auto()
AC_BUT_SOLAR = auto()
AC_WAITING = auto()
AC_OK = auto()
AC_DONE = auto()
@ -32,34 +39,60 @@ class BatteryState(Enum):
CRITICAL = auto()
def _pd_from_string(pd: str) -> BatteryPowerDirection:
match pd:
case 'Discharge':
return BatteryPowerDirection.DISCHARGING
case 'Charge':
return BatteryPowerDirection.CHARGING
case 'Do nothing':
return BatteryPowerDirection.DO_NOTHING
case _:
raise ValueError(f'invalid power direction: {pd}')
class InverterMonitor(Thread):
max_ac_current: Optional[int]
min_ac_current: Optional[int]
charging_thresholds: Optional[tuple[float, float]]
allowed_currents: list[int]
battery_under_voltage: Optional[float]
charging_event_handler: Optional[Callable]
battery_event_handler: Optional[Callable]
error_handler: Optional[Callable]
currents: list[int]
active_current: Optional[int]
interrupted: bool
battery_state: BatteryState
charging_state: ChargingState
def __init__(self, ac_current_range: Union[List, Tuple] = ()):
super().__init__()
# settings
self.max_ac_current = None
self.min_ac_current = None
self.charging_thresholds = None
self.allowed_currents = []
self.battery_under_voltage = None
# event handlers
self.charging_event_handler = None
self.battery_event_handler = None
self.error_handler = None
# variables related to active program
self.currents = []
self.active_current = None
self.interrupted = False
self.battery_state = BatteryState.NORMAL
self.charging_state = ChargingState.NOT_CHARGING
# other stuff
self.interrupted = False
self.set_ac_current_range(ac_current_range)
def set_ac_current_range(self, ac_current_range: Union[List, Tuple] = ()) -> None:
self.max_ac_current = ac_current_range[0]
self.min_ac_current = ac_current_range[1]
_logger.debug(f'setting AC current range to {ac_current_range[0]}..{ac_current_range[1]}')
def set_battery_under_voltage(self, v: float):
self.battery_under_voltage = v
_logger.debug(f'setting battery under voltage: {v}')
def run(self):
self.allowed_currents = list(inverter.exec('get-allowed-ac-charging-currents')['data'])
self.allowed_currents.sort()
@ -67,8 +100,13 @@ class InverterMonitor(Thread):
if self.max_ac_current not in self.allowed_currents or self.min_ac_current not in self.allowed_currents:
raise RuntimeError('invalid AC currents range')
# read config
cfg = inverter.exec('get-rated')['data']
self.set_battery_under_voltage(cfg['battery_under_voltage']['value'])
self.charging_thresholds = (
float(cfg['battery_recharge_voltage']['value']),
float(cfg['battery_redischarge_voltage']['value']),
)
while not self.interrupted:
try:
@ -82,14 +120,18 @@ class InverterMonitor(Thread):
solar = gs['pv1_input_power']['value'] > 0
v = float(gs['battery_voltage']['value'])
load_watts = int(gs['ac_output_active_power']['value'])
pd = _pd_from_string(gs['battery_power_direction'])
_logger.debug(f'got status: ac={ac}, solar={solar}, v={v}')
_logger.debug(f'got status: ac={ac}, solar={solar}, v={v}, pd={pd}')
self.ac_charging_program(ac, solar, v)
self.ac_charging_program(ac, solar, v, pd)
if not ac:
if not ac or pd != BatteryPowerDirection.CHARGING:
# if AC is disconnected or not charging, run the low voltage checking program
self.low_voltage_program(v, load_watts)
elif self.battery_state != BatteryState.NORMAL:
# AC is connected and charging the battery, assume its level is 'normal'
self.battery_state = BatteryState.NORMAL
except InverterError as e:
@ -97,36 +139,38 @@ class InverterMonitor(Thread):
sleep(2)
def ac_charging_program(self, ac: bool, solar: bool, v: float):
if self.charging_state == ChargingState.NOT_CHARGING:
def ac_charging_program(self, ac: bool, solar: bool, v: float, pd: BatteryPowerDirection):
match self.charging_state:
case ChargingState.NOT_CHARGING:
if ac and solar:
self.charging_state = ChargingState.AC_BUT_SOLAR
self.charging_event_handler(ChargingEvent.AC_CHARGING_UNAVAILABLE_BECAUSE_SOLAR)
_logger.info('entering charging AC_BUT_SOLAR state')
_logger.info('entering AC_BUT_SOLAR state')
elif ac:
self.ac_charging_start()
self.ac_charging_start(pd)
elif self.charging_state == ChargingState.AC_BUT_SOLAR:
case ChargingState.AC_BUT_SOLAR:
if not ac:
self.charging_state = ChargingState.NOT_CHARGING
self.charging_event_handler(ChargingEvent.AC_DISCONNECTED)
_logger.info('AC disconnected, entering NOT_CHARGING state')
self.ac_charging_stop(ChargingState.NOT_CHARGING)
elif not solar:
self.ac_charging_start()
self.ac_charging_start(pd)
elif self.charging_state == ChargingState.AC_OK:
case ChargingState.AC_OK | ChargingState.AC_WAITING:
if not ac:
self.charging_state = ChargingState.NOT_CHARGING
self.charging_event_handler(ChargingEvent.AC_DISCONNECTED)
_logger.info('AC disconnected, entering NOT_CHARGING state')
self.ac_charging_stop(ChargingState.NOT_CHARGING)
return
if solar:
self.charging_state = ChargingState.AC_BUT_SOLAR
self.charging_event_handler(ChargingEvent.AC_CHARGING_UNAVAILABLE_BECAUSE_SOLAR)
_logger.info('solar power connected, entering AC_BUT_SOLAR state')
_logger.info('solar power connected during charging, entering AC_BUT_SOLAR state')
state = ChargingState.AC_OK if pd == BatteryPowerDirection.CHARGING else ChargingState.AC_WAITING
if state != self.charging_state:
self.charging_state = state
evt = ChargingEvent.AC_CHARGING_STARTED if state == ChargingState.AC_OK else ChargingEvent.AC_NOT_CHARGING
self.charging_event_handler(evt)
# if currently charging, monitor battery voltage dynamics here
if self.active_current is not None:
@ -134,32 +178,48 @@ class InverterMonitor(Thread):
if v >= upper_bound:
self.ac_charging_next_current()
# TODO
# handle battery charging direction changes to do-nothing or discharging,
# as well as drops to 0A current
elif self.charging_state == ChargingState.AC_DONE:
case ChargingState.AC_DONE:
if not ac:
self.charging_state = ChargingState.NOT_CHARGING
self.charging_event_handler(ChargingEvent.AC_DISCONNECTED)
_logger.info('AC disconnected, charging is done, entering NOT_CHARGING state')
self.ac_charging_stop(ChargingState.NOT_CHARGING)
def ac_charging_start(self):
def ac_charging_start(self, pd: BatteryPowerDirection):
if pd == BatteryPowerDirection.CHARGING:
self.charging_state = ChargingState.AC_OK
self.charging_event_handler(ChargingEvent.AC_CHARGING_STARTED)
_logger.info('AC line connected, entering AC_OK state')
_logger.info('AC line connected and charging, entering AC_OK state')
else:
self.charging_state = ChargingState.AC_WAITING
self.charging_event_handler(ChargingEvent.AC_NOT_CHARGING)
_logger.info('AC line connected but not charging yet, entering AC_WAITING state')
# set the current even if charging has not been started yet
# this path must be entered only once per charging cycle,
# and self.currents array is used to guarantee that
if not self.currents:
index_min = self.allowed_currents.index(self.min_ac_current)
index_max = self.allowed_currents.index(self.max_ac_current)
self.currents = self.allowed_currents[index_min:index_max + 1]
self.ac_charging_next_current()
def ac_charging_stop(self):
self.charging_state = ChargingState.AC_DONE
self.charging_event_handler(ChargingEvent.AC_CHARGING_FINISHED)
_logger.info('charging is finished, entering AC_DONE state')
def ac_charging_stop(self, reason: ChargingState):
self.charging_state = reason
match reason:
case ChargingState.AC_DONE:
event = ChargingEvent.AC_CHARGING_FINISHED
case ChargingState.NOT_CHARGING:
event = ChargingEvent.AC_DISCONNECTED
case _:
raise ValueError(f'ac_charging_stop: unexpected reason {reason}')
_logger.info(f'charging is finished, entering {reason} state')
self.charging_event_handler(event)
if self.currents:
self.currents = []
self.active_current = None
def ac_charging_next_current(self):
try:
@ -168,18 +228,19 @@ class InverterMonitor(Thread):
self.active_current = current
except IndexError:
_logger.debug('was going to change charging current, but no currents left; finishing charging program')
self.ac_charging_stop()
self.ac_charging_stop(ChargingState.AC_DONE)
return
try:
response = inverter.exec('set-max-ac-charging-current', (0, current))
if response['result'] != 'ok':
_logger.error(f'failed to change AC charging current to {current}A')
_logger.error(f'failed to change AC charging current to {current} A')
raise InverterError('set-max-ac-charging-current: inverterd reported error')
else:
self.charging_event_handler(ChargingEvent.AC_CURRENT_CHANGED, current=current)
_logger.info(f'changed AC charging current to {current}A')
_logger.info(f'changed AC charging current to {current} A')
except InverterError as e:
self.error_handler(f'failed to set charging current to {current} A (caught InverterError)')
_logger.exception(e)
def low_voltage_program(self, v: float, load_watts: int):
@ -200,5 +261,20 @@ class InverterMonitor(Thread):
def set_battery_event_handler(self, handler: Callable):
self.battery_event_handler = handler
def set_error_handler(self, handler: Callable):
self.error_handler = handler
def set_ac_current_range(self, ac_current_range: Union[List, Tuple] = ()) -> None:
self.max_ac_current = ac_current_range[0]
self.min_ac_current = ac_current_range[1]
_logger.debug(f'setting AC current range to {ac_current_range[0]} A .. {ac_current_range[1]} A')
def set_battery_under_voltage(self, v: float):
self.battery_under_voltage = v
_logger.debug(f'setting battery under voltage: {v}')
def set_battery_ac_charging_thresholds(self, cv: float, dv: float):
self.charging_thresholds = (cv, dv)
def stop(self):
self.interrupted = True