inverter_bot: refactor and introduce new functions

This commit is contained in:
Evgeny Zinoviev 2022-11-06 20:40:42 +03:00
parent 28c67c4510
commit 75ee161b6e
19 changed files with 2092 additions and 1133 deletions

View File

@ -1,3 +1,4 @@
# Inverter Bot
### Bot configuration
@ -76,11 +77,6 @@ errors - Show errors
flags - Toggle flags
calcw - Calculate daily watts usage
calcwadv - Advanced watts usage calculator
setbatuv - Set battery under voltage
setgencc - Set AC charging current
setgenct - Set AC charging thresholds
setacmode - Set AC input mode
setosp - Set output source priority
monstatus - Monitor: dump state
monsetcur - Monitor: set charging currents
```

View File

@ -1,35 +0,0 @@
#!/usr/bin/env python3
from typing import Optional
from telegram import ReplyKeyboardMarkup
from telegram.ext import MessageHandler
from home.config import config
from home.bot import Wrapper, Context, text_filter
def get_latest_logs(ctx: Context):
u = ctx.user
ctx.reply(ctx.lang('blbla'))
class AdminBot(Wrapper):
def __init__(self):
super().__init__()
self.lang.ru(get_latest_logs="Смотреть последние логи")
self.lang.en(get_latest_logs="Get latest logs")
self.add_handler(MessageHandler(text_filter(self.lang('get_latest_logs')), self.wrap(get_latest_logs)))
def markup(self, ctx: Optional[Context]) -> Optional[ReplyKeyboardMarkup]:
buttons = [
[self.lang('get_latest_logs')]
]
return ReplyKeyboardMarkup(buttons, one_time_keyboard=False)
if __name__ == '__main__':
config.load('admin_bot')
bot = AdminBot()
# bot.enable_logging(BotType.ADMIN)
bot.run()

View File

@ -1,6 +0,0 @@
from .reporting import ReportingHelper
from .lang import LangPack
from .wrapper import Wrapper, Context, text_filter, handlermethod, IgnoreMarkup
from .store import Store
from .errors import *
from .util import command_usage, user_any_name

View File

@ -1,2 +0,0 @@
class StoreNotEnabledError(Exception):
pass

View File

@ -1,22 +0,0 @@
import logging
from telegram import Message
from ..api import WebAPIClient as APIClient
from ..api.errors import ApiResponseError
from ..api.types import BotType
logger = logging.getLogger(__name__)
class ReportingHelper:
def __init__(self, client: APIClient, bot_type: BotType):
self.client = client
self.bot_type = bot_type
def report(self, message, text: str = None) -> None:
if text is None:
text = message.text
try:
self.client.log_bot_request(self.bot_type, message.chat_id, text)
except ApiResponseError as error:
logger.exception(error)

View File

@ -1,57 +0,0 @@
from telegram import User
from .lang import LangStrings
_strings = {
'en': LangStrings(
usage='Usage',
arguments='Arguments'
),
'ru': LangStrings(
usage='Использование',
arguments='Аргументы'
)
}
def command_usage(command: str, arguments: dict, language='en') -> str:
if language not in _strings:
raise ValueError('unsupported language')
blocks = []
argument_names = []
argument_lines = []
for k, v in arguments.items():
argument_names.append(k)
argument_lines.append(
f'<code>{k}</code>: {v}'
)
command = f'/{command}'
if argument_names:
command += ' ' + ' '.join(argument_names)
blocks.append(
f'<b>{_strings[language]["usage"]}</b>\n'
f'<code>{command}</code>'
)
if argument_lines:
blocks.append(
f'<b>{_strings[language]["arguments"]}</b>\n' + '\n'.join(argument_lines)
)
return '\n\n'.join(blocks)
def user_any_name(user: User) -> str:
name = [user.first_name, user.last_name]
name = list(filter(lambda s: s is not None, name))
name = ' '.join(name).strip()
if not name:
name = user.username
if not name:
name = str(user.id)
return name

View File

@ -1,369 +0,0 @@
import logging
import traceback
from html import escape
from telegram import (
Update,
ParseMode,
ReplyKeyboardMarkup,
CallbackQuery,
User,
Message,
)
from telegram.ext import (
Updater,
Filters,
BaseFilter,
Handler,
CommandHandler,
MessageHandler,
CallbackQueryHandler,
CallbackContext,
ConversationHandler
)
from telegram.error import TimedOut
from ..config import config
from typing import Optional, Union, List, Tuple
from .store import Store
from .lang import LangPack
from ..api.types import BotType
from ..api import WebAPIClient
from .reporting import ReportingHelper
logger = logging.getLogger(__name__)
languages = {
'en': 'English',
'ru': 'Русский'
}
LANG_STARTED, = range(1)
user_filter: Optional[BaseFilter] = None
def default_langpack() -> LangPack:
lang = LangPack()
lang.en(
start_message="Select command on the keyboard.",
unknown_message="Unknown message",
cancel="Cancel",
select_language="Select language on the keyboard.",
invalid_language="Invalid language. Please try again.",
saved='Saved.',
)
lang.ru(
start_message="Выберите команду на клавиатуре.",
unknown_message="Неизвестная команда",
cancel="Отмена",
select_language="Выберите язык на клавиатуре.",
invalid_language="Неверный язык. Пожалуйста, попробуйте снова",
saved="Настройки сохранены."
)
return lang
def init_user_filter():
global user_filter
if user_filter is None:
if 'users' in config['bot']:
logger.info('allowed users: ' + str(config['bot']['users']))
user_filter = Filters.user(config['bot']['users'])
else:
user_filter = Filters.all # not sure if this is correct
def text_filter(*args):
init_user_filter()
return Filters.text(args[0] if isinstance(args[0], list) else [*args]) & user_filter
def exc2text(e: Exception) -> str:
tb = ''.join(traceback.format_tb(e.__traceback__))
return f'{e.__class__.__name__}: ' + escape(str(e)) + "\n\n" + escape(tb)
class IgnoreMarkup:
pass
class Context:
_update: Optional[Update]
_callback_context: Optional[CallbackContext]
_markup_getter: callable
_lang: LangPack
_store: Optional[Store]
_user_lang: Optional[str]
def __init__(self,
update: Optional[Update],
callback_context: Optional[CallbackContext],
markup_getter: callable,
lang: LangPack,
store: Optional[Store]):
self._update = update
self._callback_context = callback_context
self._markup_getter = markup_getter
self._lang = lang
self._store = store
self._user_lang = None
def reply(self, text, markup=None):
if markup is None:
markup = self._markup_getter(self)
kwargs = dict(parse_mode=ParseMode.HTML)
if not isinstance(markup, IgnoreMarkup):
kwargs['reply_markup'] = markup
return self._update.message.reply_text(text, **kwargs)
def reply_exc(self, e: Exception) -> None:
self.reply(exc2text(e))
def answer(self, text: str = None):
self.callback_query.answer(text)
def edit(self, text, markup=None):
kwargs = dict(parse_mode=ParseMode.HTML)
if not isinstance(markup, IgnoreMarkup):
kwargs['reply_markup'] = markup
self.callback_query.edit_message_text(text, **kwargs)
@property
def text(self) -> str:
return self._update.message.text
@property
def callback_query(self) -> CallbackQuery:
return self._update.callback_query
@property
def args(self) -> Optional[List[str]]:
return self._callback_context.args
@property
def user_id(self) -> int:
return self.user.id
@property
def user(self) -> User:
return self._update.effective_user
@property
def user_lang(self) -> str:
if self._user_lang is None:
self._user_lang = self._store.get_user_lang(self.user_id)
return self._user_lang
def lang(self, key: str, *args) -> str:
return self._lang.get(key, self.user_lang, *args)
def is_callback_context(self) -> bool:
return self._update.callback_query and self._update.callback_query.data and self._update.callback_query.data != ''
def handlermethod(f: callable):
def _handler(self, update: Update, context: CallbackContext, *args, **kwargs):
ctx = Context(update,
callback_context=context,
markup_getter=self.markup,
lang=self.lang,
store=self.store)
try:
return f(self, ctx, *args, **kwargs)
except Exception as e:
if not self.exception_handler(e, ctx) and not isinstance(e, TimedOut):
logger.exception(e)
if not ctx.is_callback_context():
ctx.reply_exc(e)
else:
self.notify_user(ctx.user_id, exc2text(e))
return _handler
class Wrapper:
store: Optional[Store]
updater: Updater
lang: LangPack
reporting: Optional[ReportingHelper]
def __init__(self,
store: Optional[Store] = None):
self.updater = Updater(config['bot']['token'],
request_kwargs={'read_timeout': 6, 'connect_timeout': 7})
self.lang = default_langpack()
self.store = store if store else Store()
self.reporting = None
init_user_filter()
dispatcher = self.updater.dispatcher
dispatcher.add_handler(CommandHandler('start', self.wrap(self.start), user_filter))
# transparently log all messages
self.add_handler(MessageHandler(Filters.all & user_filter, self.logging_message_handler), group=10)
self.add_handler(CallbackQueryHandler(self.logging_callback_handler), group=10)
def run(self):
self._lang_setup()
self.updater.dispatcher.add_handler(
MessageHandler(Filters.all & user_filter, self.wrap(self.any))
)
# start the bot
self.updater.start_polling()
# run the bot until the user presses Ctrl-C or the process receives SIGINT, SIGTERM or SIGABRT
self.updater.idle()
def enable_logging(self, bot_type: BotType):
api = WebAPIClient(timeout=3)
api.enable_async()
self.reporting = ReportingHelper(api, bot_type)
def logging_message_handler(self, update: Update, context: CallbackContext):
if self.reporting is None:
return
self.reporting.report(update.message)
def logging_callback_handler(self, update: Update, context: CallbackContext):
if self.reporting is None:
return
self.reporting.report(update.callback_query.message, text=update.callback_query.data)
def wrap(self, f: callable):
def handler(update: Update, context: CallbackContext):
ctx = Context(update,
callback_context=context,
markup_getter=self.markup,
lang=self.lang,
store=self.store)
try:
return f(ctx)
except Exception as e:
if not self.exception_handler(e, ctx) and not isinstance(e, TimedOut):
logger.exception(e)
if not ctx.is_callback_context():
ctx.reply_exc(e)
else:
self.notify_user(ctx.user_id, exc2text(e))
return handler
def add_handler(self, handler: Handler, group=0):
self.updater.dispatcher.add_handler(handler, group=group)
def start(self, ctx: Context):
if 'start_message' not in self.lang:
ctx.reply('Please define start_message or override start()')
return
ctx.reply(ctx.lang('start_message'))
def any(self, ctx: Context):
if 'invalid_command' not in self.lang:
ctx.reply('Please define invalid_command or override any()')
return
ctx.reply(ctx.lang('invalid_command'))
def markup(self, ctx: Optional[Context]) -> Optional[ReplyKeyboardMarkup]:
return None
def exception_handler(self, e: Exception, ctx: Context) -> Optional[bool]:
pass
def notify_all(self, text_getter: callable, exclude: Tuple[int] = ()) -> None:
if 'notify_users' not in config['bot']:
logger.error('notify_all() called but no notify_users directive found in the config')
return
for user_id in config['bot']['notify_users']:
if user_id in exclude:
continue
text = text_getter(self.store.get_user_lang(user_id))
self.updater.bot.send_message(chat_id=user_id,
text=text,
parse_mode='HTML')
def notify_user(self, user_id: int, text: Union[str, Exception], **kwargs) -> None:
if isinstance(text, Exception):
text = exc2text(text)
self.updater.bot.send_message(chat_id=user_id, text=text, parse_mode='HTML', **kwargs)
def send_photo(self, user_id, **kwargs):
self.updater.bot.send_photo(chat_id=user_id, **kwargs)
def send_audio(self, user_id, **kwargs):
self.updater.bot.send_audio(chat_id=user_id, **kwargs)
def send_file(self, user_id, **kwargs):
self.updater.bot.send_document(chat_id=user_id, **kwargs)
def edit_message_text(self, user_id, message_id, *args, **kwargs):
self.updater.bot.edit_message_text(chat_id=user_id, message_id=message_id, parse_mode='HTML', *args, **kwargs)
def delete_message(self, user_id, message_id):
self.updater.bot.delete_message(chat_id=user_id, message_id=message_id)
#
# Language Selection
#
def _lang_setup(self):
supported = self.lang.languages
if len(supported) > 1:
cancel_filter = Filters.text(self.lang.all('cancel'))
self.add_handler(ConversationHandler(
entry_points=[CommandHandler('lang', self.wrap(self._lang_command), user_filter)],
states={
LANG_STARTED: [
*list(map(lambda key: MessageHandler(text_filter(languages[key]),
self.wrap(self._lang_input)), supported)),
MessageHandler(user_filter & ~cancel_filter, self.wrap(self._lang_invalid_input))
]
},
fallbacks=[MessageHandler(user_filter & cancel_filter, self.wrap(self._lang_cancel_input))]
))
def _lang_command(self, ctx: Context):
logger.debug(f'current language: {ctx.user_lang}')
buttons = []
for name in languages.values():
buttons.append(name)
markup = ReplyKeyboardMarkup([buttons, [ctx.lang('cancel')]], one_time_keyboard=False)
ctx.reply(ctx.lang('select_language'), markup=markup)
return LANG_STARTED
def _lang_input(self, ctx: Context):
lang = None
for key, value in languages.items():
if value == ctx.text:
lang = key
break
if lang is None:
raise ValueError('could not find the language')
self.store.set_user_lang(ctx.user_id, lang)
ctx.reply(ctx.lang('saved'), markup=IgnoreMarkup())
self.start(ctx)
return ConversationHandler.END
def _lang_invalid_input(self, ctx: Context):
ctx.reply(self.lang('invalid_language'), markup=IgnoreMarkup())
return LANG_STARTED
def _lang_cancel_input(self, ctx: Context):
self.start(ctx)
return ConversationHandler.END
@property
def user_filter(self):
return user_filter

View File

@ -105,6 +105,19 @@ class ConfigStore:
def __contains__(self, key):
return key in self.data
def get(self, key: str, default=None):
cur = self.data
pts = key.split('.')
for i in range(len(pts)):
k = pts[i]
if i < len(pts)-1:
if k not in cur:
raise KeyError(f'key {k} not found')
else:
return cur[k] if k in cur else default
cur = self.data[k]
raise KeyError(f'option {key} not found')
def items(self):
return self.data.items()

View File

@ -0,0 +1,556 @@
import asyncio
import logging
from inverterd import Format
from typing import Union
from enum import Enum
from ..util import Addr, stringify
class InverterEnum(Enum):
def as_text(self) -> str:
raise RuntimeError('abstract method')
class BatteryType(InverterEnum):
AGM = 0
Flooded = 1
User = 2
def as_text(self) -> str:
return ('AGM', 'Flooded', 'User')[self.value]
class InputVoltageRange(InverterEnum):
Appliance = 0
USP = 1
def as_text(self) -> str:
return ('Appliance', 'USP')[self.value]
class OutputSourcePriority(InverterEnum):
SolarUtilityBattery = 0
SolarBatteryUtility = 1
def as_text(self) -> str:
return ('Solar-Utility-Battery', 'Solar-Battery-Utility')[self.value]
class ChargeSourcePriority(InverterEnum):
SolarFirst = 0
SolarAndUtility = 1
SolarOnly = 2
def as_text(self) -> str:
return ('Solar-First', 'Solar-and-Utility', 'Solar-only')[self.value]
class MachineType(InverterEnum):
OffGridTie = 0
GridTie = 1
def as_text(self) -> str:
return ('Off-Grid-Tie', 'Grid-Tie')[self.value]
class Topology(InverterEnum):
TransformerLess = 0
Transformer = 1
def as_text(self) -> str:
return ('Transformer-less', 'Transformer')[self.value]
class OutputMode(InverterEnum):
SingleOutput = 0
ParallelOutput = 1
Phase_1_of_3 = 2
Phase_2_of_3 = 3
Phase_3_of_3 = 4
def as_text(self) -> str:
return (
'Single output',
'Parallel output',
'Phase 1 of 3-phase output',
'Phase 2 of 3-phase output',
'Phase 3 of 3-phase'
)[self.value]
class SolarPowerPriority(InverterEnum):
BatteryLoadUtility = 0
LoadBatteryUtility = 1
def as_text(self) -> str:
return ('Battery-Load-Utility', 'Load-Battery-Utility')[self.value]
class MPPTChargerStatus(InverterEnum):
Abnormal = 0
NotCharging = 1
Charging = 2
def as_text(self) -> str:
return ('Abnormal', 'Not charging', 'Charging')[self.value]
class BatteryPowerDirection(InverterEnum):
DoNothing = 0
Charge = 1
Discharge = 2
def as_text(self) -> str:
return ('Do nothing', 'Charge', 'Discharge')[self.value]
class DC_AC_PowerDirection(InverterEnum):
DoNothing = 0
AC_DC = 1
DC_AC = 2
def as_text(self) -> str:
return ('Do nothing', 'AC/DC', 'DC/AC')[self.value]
class LinePowerDirection(InverterEnum):
DoNothing = 0
Input = 1
Output = 2
def as_text(self) -> str:
return ('Do nothing', 'Input', 'Output')[self.value]
class WorkingMode(InverterEnum):
PowerOnMode = 0
StandbyMode = 1
BypassMode = 2
BatteryMode = 3
FaultMode = 4
HybridMode = 5
def as_text(self) -> str:
return (
'Power on mode',
'Standby mode',
'Bypass mode',
'Battery mode',
'Fault mode',
'Hybrid mode'
)[self.value]
class ParallelConnectionStatus(InverterEnum):
NotExistent = 0
Existent = 1
def as_text(self) -> str:
return ('Non-existent', 'Existent')[self.value]
class LoadConnectionStatus(InverterEnum):
Disconnected = 0
Connected = 1
def as_text(self) -> str:
return ('Disconnected', 'Connected')[self.value]
class ConfigurationStatus(InverterEnum):
Default = 0
Changed = 1
def as_text(self) -> str:
return ('Default', 'Changed')[self.value]
_g_human_readable = {"grid_voltage": "Grid voltage",
"grid_freq": "Grid frequency",
"ac_output_voltage": "AC output voltage",
"ac_output_freq": "AC output frequency",
"ac_output_apparent_power": "AC output apparent power",
"ac_output_active_power": "AC output active power",
"output_load_percent": "Output load percent",
"battery_voltage": "Battery voltage",
"battery_voltage_scc": "Battery voltage from SCC",
"battery_voltage_scc2": "Battery voltage from SCC2",
"battery_discharge_current": "Battery discharge current",
"battery_charge_current": "Battery charge current",
"battery_capacity": "Battery capacity",
"inverter_heat_sink_temp": "Inverter heat sink temperature",
"mppt1_charger_temp": "MPPT1 charger temperature",
"mppt2_charger_temp": "MPPT2 charger temperature",
"pv1_input_power": "PV1 input power",
"pv2_input_power": "PV2 input power",
"pv1_input_voltage": "PV1 input voltage",
"pv2_input_voltage": "PV2 input voltage",
"configuration_status": "Configuration state",
"mppt1_charger_status": "MPPT1 charger status",
"mppt2_charger_status": "MPPT2 charger status",
"load_connected": "Load connection",
"battery_power_direction": "Battery power direction",
"dc_ac_power_direction": "DC/AC power direction",
"line_power_direction": "Line power direction",
"local_parallel_id": "Local parallel ID",
"ac_input_rating_voltage": "AC input rating voltage",
"ac_input_rating_current": "AC input rating current",
"ac_output_rating_voltage": "AC output rating voltage",
"ac_output_rating_freq": "AC output rating frequency",
"ac_output_rating_current": "AC output rating current",
"ac_output_rating_apparent_power": "AC output rating apparent power",
"ac_output_rating_active_power": "AC output rating active power",
"battery_rating_voltage": "Battery rating voltage",
"battery_recharge_voltage": "Battery re-charge voltage",
"battery_redischarge_voltage": "Battery re-discharge voltage",
"battery_under_voltage": "Battery under voltage",
"battery_bulk_voltage": "Battery bulk voltage",
"battery_float_voltage": "Battery float voltage",
"battery_type": "Battery type",
"max_charge_current": "Max charge current",
"max_ac_charge_current": "Max AC charge current",
"input_voltage_range": "Input voltage range",
"output_source_priority": "Output source priority",
"charge_source_priority": "Charge source priority",
"parallel_max_num": "Parallel max num",
"machine_type": "Machine type",
"topology": "Topology",
"output_mode": "Output mode",
"solar_power_priority": "Solar power priority",
"mppt": "MPPT string",
"fault_code": "Fault code",
"line_fail": "Line fail",
"output_circuit_short": "Output circuit short",
"inverter_over_temperature": "Inverter over temperature",
"fan_lock": "Fan lock",
"battery_voltage_high": "Battery voltage high",
"battery_low": "Battery low",
"battery_under": "Battery under",
"over_load": "Over load",
"eeprom_fail": "EEPROM fail",
"power_limit": "Power limit",
"pv1_voltage_high": "PV1 voltage high",
"pv2_voltage_high": "PV2 voltage high",
"mppt1_overload_warning": "MPPT1 overload warning",
"mppt2_overload_warning": "MPPT2 overload warning",
"battery_too_low_to_charge_for_scc1": "Battery too low to charge for SCC1",
"battery_too_low_to_charge_for_scc2": "Battery too low to charge for SCC2",
"buzzer": "Buzzer",
"overload_bypass": "Overload bypass function",
"escape_to_default_screen_after_1min_timeout": "Escape to default screen after 1min timeout",
"overload_restart": "Overload restart",
"over_temp_restart": "Over temperature restart",
"backlight_on": "Backlight on",
"alarm_on_on_primary_source_interrupt": "Alarm on on primary source interrupt",
"fault_code_record": "Fault code record",
"wh": "Wh"}
class InverterEmulator:
def __init__(self, addr: Addr, wait=True):
self.status = {"grid_voltage": {"unit": "V", "value": 236.3},
"grid_freq": {"unit": "Hz", "value": 50.0},
"ac_output_voltage": {"unit": "V", "value": 229.9},
"ac_output_freq": {"unit": "Hz", "value": 50.0},
"ac_output_apparent_power": {"unit": "VA", "value": 207},
"ac_output_active_power": {"unit": "Wh", "value": 146},
"output_load_percent": {"unit": "%", "value": 4},
"battery_voltage": {"unit": "V", "value": 49.1},
"battery_voltage_scc": {"unit": "V", "value": 0.0},
"battery_voltage_scc2": {"unit": "V", "value": 0.0},
"battery_discharge_current": {"unit": "A", "value": 3},
"battery_charge_current": {"unit": "A", "value": 0},
"battery_capacity": {"unit": "%", "value": 69},
"inverter_heat_sink_temp": {"unit": "°C", "value": 17},
"mppt1_charger_temp": {"unit": "°C", "value": 0},
"mppt2_charger_temp": {"unit": "°C", "value": 0},
"pv1_input_power": {"unit": "Wh", "value": 0},
"pv2_input_power": {"unit": "Wh", "value": 0},
"pv1_input_voltage": {"unit": "V", "value": 0.0},
"pv2_input_voltage": {"unit": "V", "value": 0.0},
"configuration_status": ConfigurationStatus.Default,
"mppt1_charger_status": MPPTChargerStatus.Abnormal,
"mppt2_charger_status": MPPTChargerStatus.Abnormal,
"load_connected": LoadConnectionStatus.Connected,
"battery_power_direction": BatteryPowerDirection.Discharge,
"dc_ac_power_direction": DC_AC_PowerDirection.DC_AC,
"line_power_direction": LinePowerDirection.DoNothing,
"local_parallel_id": 0}
self.rated = {"ac_input_rating_voltage": {"unit": "V", "value": 230.0},
"ac_input_rating_current": {"unit": "A", "value": 21.7},
"ac_output_rating_voltage": {"unit": "V", "value": 230.0},
"ac_output_rating_freq": {"unit": "Hz", "value": 50.0},
"ac_output_rating_current": {"unit": "A", "value": 21.7},
"ac_output_rating_apparent_power": {"unit": "VA", "value": 5000},
"ac_output_rating_active_power": {"unit": "Wh", "value": 5000},
"battery_rating_voltage": {"unit": "V", "value": 48.0},
"battery_recharge_voltage": {"unit": "V", "value": 48.0},
"battery_redischarge_voltage": {"unit": "V", "value": 55.0},
"battery_under_voltage": {"unit": "V", "value": 42.0},
"battery_bulk_voltage": {"unit": "V", "value": 57.6},
"battery_float_voltage": {"unit": "V", "value": 54.0},
"battery_type": BatteryType.User,
"max_charge_current": {"unit": "A", "value": 60},
"max_ac_charge_current": {"unit": "A", "value": 30},
"input_voltage_range": InputVoltageRange.Appliance,
"output_source_priority": OutputSourcePriority.SolarBatteryUtility,
"charge_source_priority": ChargeSourcePriority.SolarAndUtility,
"parallel_max_num": 6,
"machine_type": MachineType.OffGridTie,
"topology": Topology.TransformerLess,
"output_mode": OutputMode.SingleOutput,
"solar_power_priority": SolarPowerPriority.LoadBatteryUtility,
"mppt": "2"}
self.errors = {"fault_code": 0,
"line_fail": False,
"output_circuit_short": False,
"inverter_over_temperature": False,
"fan_lock": False,
"battery_voltage_high": False,
"battery_low": False,
"battery_under": False,
"over_load": False,
"eeprom_fail": False,
"power_limit": False,
"pv1_voltage_high": False,
"pv2_voltage_high": False,
"mppt1_overload_warning": False,
"mppt2_overload_warning": False,
"battery_too_low_to_charge_for_scc1": False,
"battery_too_low_to_charge_for_scc2": False}
self.flags = {"buzzer": False,
"overload_bypass": True,
"escape_to_default_screen_after_1min_timeout": False,
"overload_restart": True,
"over_temp_restart": True,
"backlight_on": False,
"alarm_on_on_primary_source_interrupt": True,
"fault_code_record": False}
self.day_generated = 1000
self.logger = logging.getLogger(self.__class__.__name__)
host, port = addr
asyncio.run(self.run_server(host, port, wait))
# self.max_ac_charge_current = 30
# self.max_charge_current = 60
# self.charge_thresholds = [48, 54]
async def run_server(self, host, port, wait: bool):
server = await asyncio.start_server(self.client_handler, host, port)
async with server:
self.logger.info(f'listening on {host}:{port}')
if wait:
await server.serve_forever()
else:
asyncio.ensure_future(server.serve_forever())
async def client_handler(self, reader, writer):
client_fmt = Format.JSON
def w(s: str):
writer.write(s.encode('utf-8'))
def return_error(message=None):
w('err\r\n')
if message:
if client_fmt in (Format.JSON, Format.SIMPLE_JSON):
w(stringify({
'result': 'error',
'message': message
}))
elif client_fmt in (Format.TABLE, Format.SIMPLE_TABLE):
w(f'error: {message}')
w('\r\n')
w('\r\n')
def return_ok(data=None):
w('ok\r\n')
if client_fmt in (Format.JSON, Format.SIMPLE_JSON):
jdata = {
'result': 'ok'
}
if data:
jdata['data'] = data
w(stringify(jdata))
w('\r\n')
elif data:
w(data)
w('\r\n')
w('\r\n')
request = None
while request != 'quit':
try:
request = await reader.read(255)
if request == b'\x04':
break
request = request.decode('utf-8').strip()
except Exception:
break
if request.startswith('format '):
requested_format = request[7:]
try:
client_fmt = Format(requested_format)
except ValueError:
return_error('invalid format')
return_ok()
elif request.startswith('exec '):
buf = request[5:].split(' ')
command = buf[0]
args = buf[1:]
try:
return_ok(self.process_command(client_fmt, command, *args))
except ValueError as e:
return_error(str(e))
else:
return_error(f'invalid token: {request}')
try:
await writer.drain()
except ConnectionResetError as e:
# self.logger.exception(e)
pass
writer.close()
def process_command(self, fmt: Format, c: str, *args) -> Union[dict, str, list[int], None]:
ac_charge_currents = [2, 10, 20, 30, 40, 50, 60]
if c == 'get-status':
return self.format_dict(self.status, fmt)
elif c == 'get-rated':
return self.format_dict(self.rated, fmt)
elif c == 'get-errors':
return self.format_dict(self.errors, fmt)
elif c == 'get-flags':
return self.format_dict(self.flags, fmt)
elif c == 'get-day-generated':
return self.format_dict({'wh': 1000}, fmt)
elif c == 'get-allowed-ac-charge-currents':
return self.format_list(ac_charge_currents, fmt)
elif c == 'set-max-ac-charge-current':
if int(args[0]) != 0:
raise ValueError(f'invalid machine id: {args[0]}')
amps = int(args[1])
if amps not in ac_charge_currents:
raise ValueError(f'invalid value: {amps}')
self.rated['max_ac_charge_current']['value'] = amps
elif c == 'set-charge-thresholds':
self.rated['battery_recharge_voltage']['value'] = float(args[0])
self.rated['battery_redischarge_voltage']['value'] = float(args[1])
elif c == 'set-output-source-priority':
self.rated['output_source_priority'] = OutputSourcePriority.SolarBatteryUtility if args[0] == 'SBU' else OutputSourcePriority.SolarUtilityBattery
elif c == 'set-battery-cutoff-voltage':
self.rated['battery_under_voltage']['value'] = float(args[0])
elif c == 'set-flag':
flag = args[0]
val = bool(int(args[1]))
if flag == 'BUZZ':
k = 'buzzer'
elif flag == 'OLBP':
k = 'overload_bypass'
elif flag == 'LCDE':
k = 'escape_to_default_screen_after_1min_timeout'
elif flag == 'OLRS':
k = 'overload_restart'
elif flag == 'OTRS':
k = 'over_temp_restart'
elif flag == 'BLON':
k = 'backlight_on'
elif flag == 'ALRM':
k = 'alarm_on_on_primary_source_interrupt'
elif flag == 'FTCR':
k = 'fault_code_record'
else:
raise ValueError('invalid flag')
self.flags[k] = val
else:
raise ValueError(f'{c}: unsupported command')
@staticmethod
def format_list(values: list, fmt: Format) -> Union[str, list]:
if fmt in (Format.JSON, Format.SIMPLE_JSON):
return values
return '\n'.join(map(lambda v: str(v), values))
@staticmethod
def format_dict(data: dict, fmt: Format) -> Union[str, dict]:
new_data = {}
for k, v in data.items():
new_val = None
if fmt in (Format.JSON, Format.TABLE, Format.SIMPLE_TABLE):
if isinstance(v, dict):
new_val = v
elif isinstance(v, InverterEnum):
new_val = v.as_text()
else:
new_val = v
elif fmt == Format.SIMPLE_JSON:
if isinstance(v, dict):
new_val = v['value']
elif isinstance(v, InverterEnum):
new_val = v.value
else:
new_val = str(v)
new_data[k] = new_val
if fmt in (Format.JSON, Format.SIMPLE_JSON):
return new_data
lines = []
if fmt == Format.SIMPLE_TABLE:
for k, v in new_data.items():
buf = k
if isinstance(v, dict):
buf += ' ' + str(v['value']) + ' ' + v['unit']
elif isinstance(v, InverterEnum):
buf += ' ' + v.as_text()
else:
buf += ' ' + str(v)
lines.append(buf)
elif fmt == Format.TABLE:
max_k_len = 0
for k in new_data.keys():
if len(_g_human_readable[k]) > max_k_len:
max_k_len = len(_g_human_readable[k])
for k, v in new_data.items():
buf = _g_human_readable[k] + ':'
buf += ' ' * (max_k_len - len(_g_human_readable[k]) + 1)
if isinstance(v, dict):
buf += str(v['value']) + ' ' + v['unit']
elif isinstance(v, InverterEnum):
buf += v.as_text()
elif isinstance(v, bool):
buf += str(int(v))
else:
buf += str(v)
lines.append(buf)
return '\n'.join(lines)

View File

@ -0,0 +1,85 @@
from typing import Optional, List
from telegram import Update, ParseMode, User, CallbackQuery
from telegram.ext import CallbackContext
from ._botdb import BotDatabase
from ._botlang import lang
from ._botutil import IgnoreMarkup, exc2text
class Context:
_update: Optional[Update]
_callback_context: Optional[CallbackContext]
_markup_getter: callable
db: Optional[BotDatabase]
_user_lang: Optional[str]
def __init__(self,
update: Optional[Update],
callback_context: Optional[CallbackContext],
markup_getter: callable,
store: Optional[BotDatabase]):
self._update = update
self._callback_context = callback_context
self._markup_getter = markup_getter
self._store = store
self._user_lang = None
def reply(self, text, markup=None):
if markup is None:
markup = self._markup_getter(self)
kwargs = dict(parse_mode=ParseMode.HTML)
if not isinstance(markup, IgnoreMarkup):
kwargs['reply_markup'] = markup
return self._update.message.reply_text(text, **kwargs)
def reply_exc(self, e: Exception) -> None:
self.reply(exc2text(e), markup=IgnoreMarkup())
def answer(self, text: str = None):
self.callback_query.answer(text)
def edit(self, text, markup=None):
kwargs = dict(parse_mode=ParseMode.HTML)
if not isinstance(markup, IgnoreMarkup):
kwargs['reply_markup'] = markup
self.callback_query.edit_message_text(text, **kwargs)
@property
def text(self) -> str:
return self._update.message.text
@property
def callback_query(self) -> CallbackQuery:
return self._update.callback_query
@property
def args(self) -> Optional[List[str]]:
return self._callback_context.args
@property
def user_id(self) -> int:
return self.user.id
@property
def user_data(self):
return self._callback_context.user_data
@property
def user(self) -> User:
return self._update.effective_user
@property
def user_lang(self) -> str:
if self._user_lang is None:
self._user_lang = self._store.get_user_lang(self.user_id)
return self._user_lang
def lang(self, key: str, *args) -> str:
return lang.get(key, self.user_lang, *args)
def is_callback_context(self) -> bool:
return self._update.callback_query \
and self._update.callback_query.data \
and self._update.callback_query.data != ''

View File

@ -1,7 +1,7 @@
from ..database.sqlite import SQLiteBase
from home.database.sqlite import SQLiteBase
class Store(SQLiteBase):
class BotDatabase(SQLiteBase):
def __init__(self):
super().__init__()

View File

@ -1,10 +1,8 @@
from __future__ import annotations
import logging
from typing import Union, Optional, List, Dict
from typing import Optional, Dict, List, Union
logger = logging.getLogger(__name__)
_logger = logging.getLogger(__name__)
class LangStrings(dict):
@ -18,7 +16,7 @@ class LangStrings(dict):
self._lang = lang
def __missing__(self, key):
logger.warning(f'key {key} is missing in language {self._lang}')
_logger.warning(f'key {key} is missing in language {self._lang}')
return '{%s}' % key
def __setitem__(self, key, value):
@ -79,3 +77,41 @@ class LangPack:
def __contains__(self, key):
return key in self.strings[self.default_lang]
@staticmethod
def pfx(prefix: str, l: list) -> list:
return list(map(lambda s: f'{prefix}{s}', l))
languages = {
'en': 'English',
'ru': 'Русский'
}
lang = LangPack()
lang.en(
en='English',
ru='Russian',
start_message="Select command on the keyboard.",
unknown_message="Unknown message",
cancel="🚫 Cancel",
back='🔙 Back',
select_language="Select language on the keyboard.",
invalid_language="Invalid language. Please try again.",
saved='Saved.',
please_wait="⏳ Please wait..."
)
lang.ru(
en='Английский',
ru='Русский',
start_message="Выберите команду на клавиатуре.",
unknown_message="Неизвестная команда",
cancel="🚫 Отмена",
back='🔙 Назад',
select_language="Выберите язык на клавиатуре.",
invalid_language="Неверный язык. Пожалуйста, попробуйте снова",
saved="Настройки сохранены.",
please_wait="⏳ Ожидайте..."
)

View File

@ -0,0 +1,47 @@
import logging
import traceback
from html import escape
from telegram import User
from home.api import WebAPIClient as APIClient
from home.api.types import BotType
from home.api.errors import ApiResponseError
_logger = logging.getLogger(__name__)
def user_any_name(user: User) -> str:
name = [user.first_name, user.last_name]
name = list(filter(lambda s: s is not None, name))
name = ' '.join(name).strip()
if not name:
name = user.username
if not name:
name = str(user.id)
return name
class ReportingHelper:
def __init__(self, client: APIClient, bot_type: BotType):
self.client = client
self.bot_type = bot_type
def report(self, message, text: str = None) -> None:
if text is None:
text = message.text
try:
self.client.log_bot_request(self.bot_type, message.chat_id, text)
except ApiResponseError as error:
_logger.exception(error)
def exc2text(e: Exception) -> str:
tb = ''.join(traceback.format_tb(e.__traceback__))
return f'{e.__class__.__name__}: ' + escape(str(e)) + "\n\n" + escape(tb)
class IgnoreMarkup:
pass

542
src/home/telegram/bot.py Normal file
View File

@ -0,0 +1,542 @@
from __future__ import annotations
import logging
from enum import Enum, auto
from functools import wraps
from typing import Optional, Union, List, Tuple, Dict
from telegram import (
Update,
ParseMode,
ReplyKeyboardMarkup,
CallbackQuery,
User,
Message,
)
from telegram.ext import (
Updater,
Filters,
BaseFilter,
Handler,
CommandHandler,
MessageHandler,
CallbackQueryHandler,
CallbackContext,
ConversationHandler
)
from telegram.error import TimedOut
from home.config import config
from home.api import WebAPIClient
from home.api.types import BotType
from home.api.errors import ApiResponseError
from ._botlang import lang, languages
from ._botdb import BotDatabase
from ._botutil import ReportingHelper, exc2text, IgnoreMarkup
from ._botcontext import Context
# LANG_STARTED, = range(1)
user_filter: Optional[BaseFilter] = None
cancel_filter = Filters.text(lang.all('cancel'))
back_filter = Filters.text(lang.all('back'))
cancel_and_back_filter = Filters.text(lang.all('back') + lang.all('cancel'))
db: Optional[BotDatabase] = None
_logger = logging.getLogger(__name__)
_updater: Optional[Updater] = None
_reporting: Optional[ReportingHelper] = None
_exception_handler: Optional[callable] = None
_dispatcher = None
_markup_getter: Optional[callable] = None
_start_handler_ref: Optional[callable] = None
def text_filter(*args):
if not user_filter:
raise RuntimeError('user_filter is not initialized')
return Filters.text(args[0] if isinstance(args[0], list) else [*args]) & user_filter
def _handler_of_handler(*args, **kwargs):
self = None
context = None
update = None
_args = list(args)
while len(_args):
v = _args[0]
if isinstance(v, conversation):
self = v
_args.pop(0)
elif isinstance(v, Update):
update = v
_args.pop(0)
elif isinstance(v, CallbackContext):
context = v
_args.pop(0)
break
ctx = Context(update,
callback_context=context,
markup_getter=lambda _ctx: None if not _markup_getter else _markup_getter(_ctx),
store=db)
try:
_args.insert(0, ctx)
if self:
_args.insert(0, self)
f = kwargs['f']
del kwargs['f']
if 'return_with_context' in kwargs:
return_with_context = True
del kwargs['return_with_context']
else:
return_with_context = False
result = f(*_args, **kwargs)
return result if not return_with_context else (result, ctx)
except Exception as e:
if _exception_handler:
if not _exception_handler(e, ctx) and not isinstance(e, TimedOut):
_logger.exception(e)
if not ctx.is_callback_context():
ctx.reply_exc(e)
else:
notify_user(ctx.user_id, exc2text(e))
def handler(**kwargs):
def inner(f):
@wraps(f)
def _handler(*args, **kwargs):
return _handler_of_handler(f=f, *args, **kwargs)
if 'message' in kwargs:
_updater.dispatcher.add_handler(MessageHandler(text_filter(lang.all(kwargs['message'])), _handler), group=0)
elif 'command' in kwargs:
_updater.dispatcher.add_handler(CommandHandler(kwargs['command'], _handler), group=0)
elif 'callback' in kwargs:
_updater.dispatcher.add_handler(CallbackQueryHandler(_handler), group=0)
return _handler
return inner
def simplehandler(f: callable):
@wraps(f)
def _handler(*args, **kwargs):
return _handler_of_handler(f=f, *args, **kwargs)
return _handler
def callbackhandler(f: callable):
@wraps(f)
def _handler(*args, **kwargs):
return _handler_of_handler(f=f, *args, **kwargs)
_updater.dispatcher.add_handler(CallbackQueryHandler(_handler), group=0)
return _handler
def exceptionhandler(f: callable):
global _exception_handler
if _exception_handler:
_logger.warning('exception handler already set, we will overwrite it')
_exception_handler = f
def defaultreplymarkup(f: callable):
global _markup_getter
_markup_getter = f
def convinput(state, is_enter=False, **kwargs):
def inner(f):
f.__dict__['_conv_data'] = dict(
orig_f=f,
enter=is_enter,
type=ConversationMethodType.ENTRY if is_enter and state == 0 else ConversationMethodType.STATE_HANDLER,
state=state,
**kwargs
)
@wraps(f)
def _impl(*args, **kwargs):
result, ctx = _handler_of_handler(f=f, *args, **kwargs, return_with_context=True)
if result == conversation.END:
start(ctx)
return result
return _impl
return inner
def conventer(state, **kwargs):
return convinput(state, is_enter=True, **kwargs)
class ConversationMethodType(Enum):
ENTRY = auto()
STATE_HANDLER = auto()
class conversation:
END = ConversationHandler.END
STATE_SEQS = []
def __init__(self, enable_back=False):
self._logger = logging.getLogger(self.__class__.__name__)
self._user_state_cache = {}
self._back_enabled = enable_back
def make_handlers(self, f: callable, **kwargs) -> list:
messages = {}
handlers = []
if 'messages' in kwargs:
if isinstance(kwargs['messages'], dict):
messages = kwargs['messages']
else:
for m in kwargs['messages']:
messages[m] = None
if 'message' in kwargs:
if isinstance(kwargs['message'], str):
messages[kwargs['message']] = None
else:
AttributeError('invalid message type: ' + type(kwargs['message']))
if messages:
for message, target_state in messages.items():
if not target_state:
handlers.append(MessageHandler(text_filter(lang.all(message) if 'messages_lang_completed' not in kwargs else message), f))
else:
handlers.append(MessageHandler(text_filter(lang.all(message) if 'messages_lang_completed' not in kwargs else message), self.make_invoker(target_state)))
if 'regex' in kwargs:
handlers.append(MessageHandler(Filters.regex(kwargs['regex']) & user_filter, f))
if 'command' in kwargs:
handlers.append(CommandHandler(kwargs['command'], f, user_filter))
return handlers
def make_invoker(self, state):
def _invoke(update: Update, context: CallbackContext):
ctx = Context(update,
callback_context=context,
markup_getter=lambda _ctx: None if not _markup_getter else _markup_getter(_ctx),
store=db)
return self.invoke(state, ctx)
return _invoke
def invoke(self, state, ctx: Context):
self._logger.debug(f'invoke, state={state}')
for item in dir(self):
f = getattr(self, item)
if not callable(f) or item.startswith('_') or '_conv_data' not in f.__dict__:
continue
cd = f.__dict__['_conv_data']
if cd['enter'] and cd['state'] == state:
return cd['orig_f'](self, ctx)
raise RuntimeError(f'invoke: failed to find method for state {state}')
def get_handler(self) -> ConversationHandler:
entry_points = []
states = {}
l_cancel_filter = cancel_filter if not self._back_enabled else cancel_and_back_filter
for item in dir(self):
f = getattr(self, item)
if not callable(f) or item.startswith('_') or '_conv_data' not in f.__dict__:
continue
cd = f.__dict__['_conv_data']
if cd['type'] == ConversationMethodType.ENTRY:
entry_points = self.make_handlers(f, **cd)
elif cd['type'] == ConversationMethodType.STATE_HANDLER:
states[cd['state']] = self.make_handlers(f, **cd)
states[cd['state']].append(
MessageHandler(user_filter & ~l_cancel_filter, conversation.invalid)
)
fallbacks = [MessageHandler(user_filter & cancel_filter, self.cancel)]
if self._back_enabled:
fallbacks.append(MessageHandler(user_filter & back_filter, self.back))
return ConversationHandler(
entry_points=entry_points,
states=states,
fallbacks=fallbacks
)
def get_user_state(self, user_id: int) -> Optional[int]:
if user_id not in self._user_state_cache:
return None
return self._user_state_cache[user_id]
# TODO store in ctx.user_state
def set_user_state(self, user_id: int, state: Union[int, None]):
if not self._back_enabled:
return
if state is not None:
self._user_state_cache[user_id] = state
else:
del self._user_state_cache[user_id]
@staticmethod
@simplehandler
def invalid(ctx: Context):
ctx.reply(ctx.lang('invalid_input'), markup=IgnoreMarkup())
# return 0 # FIXME is this needed
@simplehandler
def cancel(self, ctx: Context):
start(ctx)
self.set_user_state(ctx.user_id, None)
return conversation.END
@simplehandler
def back(self, ctx: Context):
cur_state = self.get_user_state(ctx.user_id)
if cur_state is None:
start(ctx)
self.set_user_state(ctx.user_id, None)
return conversation.END
new_state = None
for seq in self.STATE_SEQS:
if cur_state in seq:
idx = seq.index(cur_state)
if idx > 0:
return self.invoke(seq[idx-1], ctx)
if new_state is None:
raise RuntimeError('failed to determine state to go back to')
@classmethod
def add_cancel_button(cls, ctx: Context, buttons):
buttons.append([ctx.lang('cancel')])
@classmethod
def add_back_button(cls, ctx: Context, buttons):
# buttons.insert(0, [ctx.lang('back')])
buttons.append([ctx.lang('back')])
def reply(self,
ctx: Context,
state: Union[int, Enum],
text: str,
buttons: Optional[list],
with_cancel=False,
with_back=False,
buttons_lang_completed=False):
if buttons:
new_buttons = []
if not buttons_lang_completed:
for item in buttons:
if isinstance(item, list):
item = map(lambda s: ctx.lang(s), item)
new_buttons.append(list(item))
elif isinstance(item, str):
new_buttons.append([ctx.lang(item)])
else:
raise ValueError('invalid type: ' + type(item))
else:
new_buttons = list(buttons)
buttons = None
else:
if with_cancel or with_back:
new_buttons = []
else:
new_buttons = None
if with_cancel:
self.add_cancel_button(ctx, new_buttons)
if with_back:
if not self._back_enabled:
raise AttributeError(f'back is not enabled for this conversation ({self.__class__.__name__})')
self.add_back_button(ctx, new_buttons)
markup = ReplyKeyboardMarkup(new_buttons, one_time_keyboard=True) if new_buttons else IgnoreMarkup()
ctx.reply(text, markup=markup)
self.set_user_state(ctx.user_id, state)
return state
class LangConversation(conversation):
START, = range(1)
@conventer(START, command='lang')
def entry(self, ctx: Context):
self._logger.debug(f'current language: {ctx.user_lang}')
buttons = []
for name in languages.values():
buttons.append(name)
markup = ReplyKeyboardMarkup([buttons, [ctx.lang('cancel')]], one_time_keyboard=False)
ctx.reply(ctx.lang('select_language'), markup=markup)
return self.START
@convinput(START, messages=lang.languages)
def input(self, ctx: Context):
selected_lang = None
for key, value in languages.items():
if value == ctx.text:
selected_lang = key
break
if selected_lang is None:
raise ValueError('could not find the language')
db.set_user_lang(ctx.user_id, selected_lang)
ctx.reply(ctx.lang('saved'), markup=IgnoreMarkup())
return self.END
def initialize():
global user_filter
global _updater
global _dispatcher
# init user_filter
if 'users' in config['bot']:
_logger.info('allowed users: ' + str(config['bot']['users']))
user_filter = Filters.user(config['bot']['users'])
else:
user_filter = Filters.all # not sure if this is correct
# init updater
_updater = Updater(config['bot']['token'],
request_kwargs={'read_timeout': 6, 'connect_timeout': 7})
# transparently log all messages
_updater.dispatcher.add_handler(MessageHandler(Filters.all & user_filter, _logging_message_handler), group=10)
_updater.dispatcher.add_handler(CallbackQueryHandler(_logging_callback_handler), group=10)
def run(start_handler=None, any_handler=None):
global db
global _start_handler_ref
if not start_handler:
start_handler = _default_start_handler
if not any_handler:
any_handler = _default_any_handler
if not db:
db = BotDatabase()
_start_handler_ref = start_handler
_updater.dispatcher.add_handler(LangConversation().get_handler(), group=0)
_updater.dispatcher.add_handler(CommandHandler('start', simplehandler(start_handler), user_filter))
_updater.dispatcher.add_handler(MessageHandler(Filters.all & user_filter, any_handler))
_updater.start_polling()
_updater.idle()
def add_conversation(conv: conversation) -> None:
_updater.dispatcher.add_handler(conv.get_handler(), group=0)
def start(ctx: Context):
return _start_handler_ref(ctx)
def _default_start_handler(ctx: Context):
if 'start_message' not in lang:
return ctx.reply('Please define start_message or override start()')
ctx.reply(ctx.lang('start_message'))
@simplehandler
def _default_any_handler(ctx: Context):
if 'invalid_command' not in lang:
return ctx.reply('Please define invalid_command or override any()')
ctx.reply(ctx.lang('invalid_command'))
def _logging_message_handler(update: Update, context: CallbackContext):
if _reporting:
_reporting.report(update.message)
def _logging_callback_handler(update: Update, context: CallbackContext):
if _reporting:
_reporting.report(update.callback_query.message, text=update.callback_query.data)
def enable_logging(bot_type: BotType):
api = WebAPIClient(timeout=3)
api.enable_async()
global _reporting
_reporting = ReportingHelper(api, bot_type)
def notify_all(text_getter: callable,
exclude: Tuple[int] = ()) -> None:
if 'notify_users' not in config['bot']:
_logger.error('notify_all() called but no notify_users directive found in the config')
return
for user_id in config['bot']['notify_users']:
if user_id in exclude:
continue
text = text_getter(db.get_user_lang(user_id))
_updater.bot.send_message(chat_id=user_id,
text=text,
parse_mode='HTML')
def notify_user(user_id: int, text: Union[str, Exception], **kwargs) -> None:
if isinstance(text, Exception):
text = exc2text(text)
_updater.bot.send_message(chat_id=user_id,
text=text,
parse_mode='HTML',
**kwargs)
def send_photo(user_id, **kwargs):
_updater.bot.send_photo(chat_id=user_id, **kwargs)
def send_audio(user_id, **kwargs):
_updater.bot.send_audio(chat_id=user_id, **kwargs)
def send_file(user_id, **kwargs):
_updater.bot.send_document(chat_id=user_id, **kwargs)
def edit_message_text(user_id, message_id, *args, **kwargs):
_updater.bot.edit_message_text(chat_id=user_id,
message_id=message_id,
parse_mode='HTML',
*args, **kwargs)
def delete_message(user_id, message_id):
_updater.bot.delete_message(chat_id=user_id, message_id=message_id)
def set_database(_db: BotDatabase):
global db
db = _db

File diff suppressed because it is too large Load Diff

9
src/inverterd_emulator.py Executable file
View File

@ -0,0 +1,9 @@
#!/usr/bin/env python3
import logging
from home.inverter.emulator import InverterEmulator
if __name__ == '__main__':
logging.basicConfig(level=logging.DEBUG)
InverterEmulator(addr=('127.0.0.1', 8305))

View File

@ -1,63 +1,18 @@
#!/usr/bin/env python3
from enum import Enum
from typing import Optional
from telegram import ReplyKeyboardMarkup, User
from home.config import config
from home.bot import Wrapper, Context, text_filter, user_any_name
from home.telegram import bot
from home.telegram._botutil import user_any_name
from home.relay import RelayClient
from home.api.types import BotType
from telegram import ReplyKeyboardMarkup, User
from telegram.ext import MessageHandler
from enum import Enum
from functools import partial
bot: Optional[Wrapper] = None
config.load('pump_bot')
class UserAction(Enum):
ON = 'on'
OFF = 'off'
def get_relay() -> RelayClient:
relay = RelayClient(host=config['relay']['ip'], port=config['relay']['port'])
relay.connect()
return relay
def on(silent: bool, ctx: Context) -> None:
get_relay().on()
ctx.reply(ctx.lang('done'))
if not silent:
notify(ctx.user, UserAction.ON)
def off(silent: bool, ctx: Context) -> None:
get_relay().off()
ctx.reply(ctx.lang('done'))
if not silent:
notify(ctx.user, UserAction.OFF)
def status(ctx: Context) -> None:
ctx.reply(
ctx.lang('enabled') if get_relay().status() == 'on' else ctx.lang('disabled')
)
def notify(user: User, action: UserAction) -> None:
def text_getter(lang: str):
action_name = bot.lang.get(f'user_action_{action.value}', lang)
user_name = user_any_name(user)
return ' ' + bot.lang.get('user_action_notification', lang,
user.id, user_name, action_name)
bot.notify_all(text_getter, exclude=(user.id,))
class PumpBot(Wrapper):
def __init__(self):
super().__init__()
self.lang.ru(
bot.initialize()
bot.lang.ru(
start_message="Выберите команду на клавиатуре",
unknown_command="Неизвестная команда",
@ -75,8 +30,7 @@ class PumpBot(Wrapper):
user_action_on="включил",
user_action_off="выключил",
)
self.lang.en(
bot.lang.en(
start_message="Select command on the keyboard",
unknown_command="Unknown command",
@ -95,15 +49,71 @@ class PumpBot(Wrapper):
user_action_off="OFF",
)
self.add_handler(MessageHandler(text_filter(self.lang.all('enable')), self.wrap(partial(on, False))))
self.add_handler(MessageHandler(text_filter(self.lang.all('disable')), self.wrap(partial(off, False))))
self.add_handler(MessageHandler(text_filter(self.lang.all('enable_silently')), self.wrap(partial(on, True))))
self.add_handler(MessageHandler(text_filter(self.lang.all('disable_silently')), self.wrap(partial(off, True))))
class UserAction(Enum):
ON = 'on'
OFF = 'off'
self.add_handler(MessageHandler(text_filter(self.lang.all('status')), self.wrap(status)))
def markup(self, ctx: Optional[Context]) -> Optional[ReplyKeyboardMarkup]:
def get_relay() -> RelayClient:
relay = RelayClient(host=config['relay']['ip'], port=config['relay']['port'])
relay.connect()
return relay
def on(ctx: bot.Context, silent=False) -> None:
get_relay().on()
ctx.reply(ctx.lang('done'))
if not silent:
notify(ctx.user, UserAction.ON)
def off(ctx: bot.Context, silent=False) -> None:
get_relay().off()
ctx.reply(ctx.lang('done'))
if not silent:
notify(ctx.user, UserAction.OFF)
def notify(user: User, action: UserAction) -> None:
def text_getter(lang: str):
action_name = bot.lang.get(f'user_action_{action.value}', lang)
user_name = user_any_name(user)
return ' ' + bot.lang.get('user_action_notification', lang,
user.id, user_name, action_name)
bot.notify_all(text_getter, exclude=(user.id,))
@bot.handler(message='enable')
def enable_handler(ctx: bot.Context) -> None:
on(ctx)
@bot.handler(message='enable_silently')
def enable_s_handler(ctx: bot.Context) -> None:
on(ctx, True)
@bot.handler(message='disable')
def disable_handler(ctx: bot.Context) -> None:
off(ctx)
@bot.handler(message='disable_silently')
def disable_s_handler(ctx: bot.Context) -> None:
off(ctx, True)
@bot.handler(message='status')
def status(ctx: bot.Context) -> None:
ctx.reply(
ctx.lang('enabled') if get_relay().status() == 'on' else ctx.lang('disabled')
)
@bot.defaultreplymarkup
def markup(ctx: Optional[bot.Context]) -> Optional[ReplyKeyboardMarkup]:
buttons = [
[ctx.lang('enable'), ctx.lang('disable')],
]
@ -117,8 +127,5 @@ class PumpBot(Wrapper):
if __name__ == '__main__':
config.load('pump_bot')
bot = PumpBot()
bot.enable_logging(BotType.PUMP)
bot.run()

View File

@ -7,17 +7,15 @@ import gc
from io import BytesIO
from typing import Optional
from functools import partial
import matplotlib.pyplot as plt
import matplotlib.dates as mdates
import matplotlib.ticker as mticker
from telegram import ReplyKeyboardMarkup, InlineKeyboardMarkup, InlineKeyboardButton
from telegram.ext import MessageHandler, CallbackQueryHandler
from home.config import config
from home.bot import Wrapper, Context, text_filter
from home.telegram import bot
from home.util import chunks, MySimpleSocketClient
from home.api import WebAPIClient
from home.api.types import (
@ -25,13 +23,50 @@ from home.api.types import (
TemperatureSensorLocation
)
bot: Optional[Wrapper] = None
config.load('sensors_bot')
bot.initialize()
_sensor_names = []
for k, v in config['sensors'].items():
_sensor_names.append(k)
bot.lang.set({k: v['label_ru']}, 'ru')
bot.lang.set({k: v['label_en']}, 'en')
bot.lang.ru(
start_message="Выберите датчик на клавиатуре",
unknown_command="Неизвестная команда",
temperature="Температура",
humidity="Влажность",
plot_3h="График за 3 часа",
plot_6h="График за 6 часов",
plot_12h="График за 12 часов",
plot_24h="График за 24 часа",
unexpected_callback_data="Ошибка: неверные данные",
loading="Загрузка...",
n_hrs="график за %d ч."
)
bot.lang.en(
start_message="Select the sensor on the keyboard",
unknown_command="Unknown command",
temperature="Temperature",
humidity="Relative humidity",
plot_3h="Graph for 3 hours",
plot_6h="Graph for 6 hours",
plot_12h="Graph for 12 hours",
plot_24h="Graph for 24 hours",
unexpected_callback_data="Unexpected callback data",
loading="Loading...",
n_hrs="graph for %d hours"
)
plt.rcParams['font.size'] = 7
logger = logging.getLogger(__name__)
plot_hours = [3, 6, 12, 24]
def read_sensor(sensor: str, ctx: Context) -> None:
@bot.handler(messages=_sensor_names)
def read_sensor(sensor: str, ctx: bot.Context) -> None:
host = config['sensors'][sensor]['ip']
port = config['sensors'][sensor]['port']
@ -55,7 +90,8 @@ def read_sensor(sensor: str, ctx: Context) -> None:
ctx.reply(text, markup=InlineKeyboardMarkup(chunks(buttons, 2)))
def callback_handler(ctx: Context) -> None:
@bot.callbackhandler
def callback_handler(ctx: bot.Context) -> None:
query = ctx.callback_query
sensors_variants = '|'.join(config['sensors'].keys())
@ -82,7 +118,7 @@ def callback_handler(ctx: Context) -> None:
plot = draw_plot(data, title,
ctx.lang('temperature'),
ctx.lang('humidity'))
bot.updater.bot.send_photo(ctx.user_id, plot)
bot.send_photo(ctx.user_id, photo=plot)
gc.collect()
@ -129,46 +165,8 @@ def draw_plot(data,
return buf
class SensorsBot(Wrapper):
def __init__(self):
super().__init__()
self.lang.ru(
start_message="Выберите датчик на клавиатуре",
unknown_command="Неизвестная команда",
temperature="Температура",
humidity="Влажность",
plot_3h="График за 3 часа",
plot_6h="График за 6 часов",
plot_12h="График за 12 часов",
plot_24h="График за 24 часа",
unexpected_callback_data="Ошибка: неверные данные",
loading="Загрузка...",
n_hrs="график за %d ч."
)
self.lang.en(
start_message="Select the sensor on the keyboard",
unknown_command="Unknown command",
temperature="Temperature",
humidity="Relative humidity",
plot_3h="Graph for 3 hours",
plot_6h="Graph for 6 hours",
plot_12h="Graph for 12 hours",
plot_24h="Graph for 24 hours",
unexpected_callback_data="Unexpected callback data",
loading="Loading...",
n_hrs="graph for %d hours"
)
for k, v in config['sensors'].items():
self.lang.set({k: v['label_ru']}, 'ru')
self.lang.set({k: v['label_en']}, 'en')
self.add_handler(MessageHandler(text_filter(self.lang.all(k)), self.wrap(partial(read_sensor, k))))
self.add_handler(CallbackQueryHandler(self.wrap(callback_handler)))
def markup(self, ctx: Optional[Context]) -> Optional[ReplyKeyboardMarkup]:
@bot.defaultreplymarkup
def markup(self, ctx: Optional[bot.Context]) -> Optional[ReplyKeyboardMarkup]:
buttons = []
for k in config['sensors'].keys():
buttons.append(ctx.lang(k))
@ -177,9 +175,6 @@ class SensorsBot(Wrapper):
if __name__ == '__main__':
config.load('sensors_bot')
bot = SensorsBot()
if 'api' in config:
bot.enable_logging(BotType.SENSORS)
bot.run()