pump-bot/main.py
2021-11-16 02:59:27 +03:00

188 lines
5.0 KiB
Python

#!/usr/bin/env python3
import logging
import solarmon_api
from typing import Optional
from argparse import ArgumentParser
import socket
from telegram import (
Update,
ParseMode,
ReplyKeyboardMarkup
)
from telegram.ext import (
Updater,
Filters,
CommandHandler,
MessageHandler,
CallbackContext
)
from telegram.error import TimedOut
solarmon: Optional[solarmon_api.Client] = None
class RelayClient:
def __init__(self, port=8307, host='127.0.0.1'):
self._host = host
self._port = port
self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
def __del__(self):
self.sock.close()
def connect(self):
self.sock.connect((self._host, self._port))
def _write(self, line):
self.sock.sendall((line+'\r\n').encode())
def _read(self):
buf = bytearray()
while True:
buf.extend(self.sock.recv(256))
if b'\r\n' in buf:
break
response = buf.decode().strip()
return response
def on(self):
self._write('on')
return self._read()
def off(self):
self._write('off')
return self._read()
def status(self):
self._write('get')
return self._read()
#
# helpers
#
def get_markup() -> ReplyKeyboardMarkup:
button = [
[
'Включить',
'Выключить'
],
[
'Статус'
]
]
return ReplyKeyboardMarkup(button, one_time_keyboard=False)
def reply(update: Update, text: str) -> None:
update.message.reply_text(text,
reply_markup=get_markup(),
parse_mode=ParseMode.HTML)
def handle_exc(update: Update, e) -> None:
logging.exception(str(e))
if not isinstance(e, TimedOut):
reply(update, 'exception: ' + str(e))
#
# command/message handlers
#
def start(update: Update, context: CallbackContext) -> None:
reply(update, 'Выберите команду на клавиатуре.')
def msg_status(update: Update, context: CallbackContext) -> None:
try:
relay = RelayClient()
relay.connect()
response = relay.status()
status = 'Включен ✅' if response == 'on' else 'Выключен ❌'
reply(update, status)
solarmon.log_bot_request(solarmon_api.BotType.PUMP, update.message.chat_id, 'Статус')
except Exception as e:
handle_exc(update, e)
def msg_on(update: Update, context: CallbackContext) -> None:
try:
relay = RelayClient()
relay.connect()
relay.on()
reply(update, 'Готово')
solarmon.log_bot_request(solarmon_api.BotType.PUMP, update.message.chat_id, 'Включить')
except Exception as e:
handle_exc(update, e)
def msg_off(update: Update, context: CallbackContext) -> None:
try:
relay = RelayClient()
relay.connect()
relay.off()
reply(update, 'Готово')
solarmon.log_bot_request(solarmon_api.BotType.PUMP, update.message.chat_id, 'Выключить')
except Exception as e:
handle_exc(update, e)
def msg_all(update: Update, context: CallbackContext) -> None:
reply(update, "Неизвестная команда.")
if __name__ == '__main__':
# command-line arguments
parser = ArgumentParser()
parser.add_argument('--token', required=True, type=str,
help='Telegram bot token')
parser.add_argument('--users-whitelist', nargs='+',
help='ID of users allowed to use the bot')
parser.add_argument('--gpio-relayd-host', default='127.0.0.1', type=str)
parser.add_argument('--gpio-relayd-port', default=8307, type=int)
parser.add_argument('--solarmon-api-token', type=str, required=True)
args = parser.parse_args()
whitelist = list(map(lambda x: int(x), args.users_whitelist))
# configure logging
logging.basicConfig(format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
level=logging.INFO)
# configure bot
updater = Updater(args.token, request_kwargs={'read_timeout': 6, 'connect_timeout': 7})
dispatcher = updater.dispatcher
user_filter = Filters.user(whitelist)
dispatcher.add_handler(CommandHandler('start', start))
dispatcher.add_handler(MessageHandler(Filters.text('Включить') & user_filter, msg_on))
dispatcher.add_handler(MessageHandler(Filters.text('Выключить') & user_filter, msg_off))
dispatcher.add_handler(MessageHandler(Filters.text('Статус') & user_filter, msg_status))
dispatcher.add_handler(MessageHandler(Filters.all & user_filter, msg_all))
# create api client instance
solarmon = solarmon_api.Client(args.solarmon_api_token, timeout=3)
solarmon.enable_async()
# start the bot
updater.start_polling()
# run the bot until the user presses Ctrl-C or the process receives SIGINT, SIGTERM or SIGABRT
updater.idle()