pump-bot/main.py
2021-08-08 00:11:21 +03:00

179 lines
4.5 KiB
Python

#!/usr/bin/env python3
import logging, re, datetime, json, subprocess, os
from typing import Optional
from argparse import ArgumentParser
from html import escape
import socket
from pprint import pprint
from time import sleep
from strings import lang as _
from telegram import (
Update,
ParseMode,
KeyboardButton,
ReplyKeyboardMarkup
)
from telegram.ext import (
Updater,
Filters,
CommandHandler,
MessageHandler,
CallbackContext
)
from telegram.error import TimedOut
class RelayClient:
def __init__(self, port=8307, host='127.0.0.1'):
self._host = host
self._port = port
self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
def __del__(self):
self.sock.close()
def connect(self):
self.sock.connect((self._host, self._port))
def _write(self, line):
self.sock.sendall((line+'\r\n').encode())
def _read(self):
buf = bytearray()
while True:
buf.extend(self.sock.recv(256))
if b'\r\n' in buf:
break
response = buf.decode().strip()
return response
def on(self):
self._write('on')
return self._read()
def off(self):
self._write('off')
return self._read()
def status(self):
self._write('get')
return self._read()
#
# helpers
#
def get_markup() -> ReplyKeyboardMarkup:
button = [
[
'Включить',
'Выключить'
],
[
'Статус'
]
]
return ReplyKeyboardMarkup(button, one_time_keyboard=False)
def reply(update: Update, text: str) -> None:
update.message.reply_text(text,
reply_markup=get_markup(),
parse_mode=ParseMode.HTML)
def handle_exc(update: Update, e) -> None:
logging.exception(str(e))
if not isinstance(e, TimedOut):
reply(update, 'exception: ' + str(e))
#
# command/message handlers
#
def start(update: Update, context: CallbackContext) -> None:
reply(update, 'Выберите команду на клавиатуре.')
def msg_status(update: Update, context: CallbackContext) -> None:
try:
relay = RelayClient()
relay.connect()
response = relay.status()
status = 'Включен' if response == 'on' else 'Выключен'
reply(update, status)
except Exception as e:
handle_exc(update, e)
def msg_on(update: Update, context: CallbackContext) -> None:
try:
relay = RelayClient()
relay.connect()
relay.on()
reply(update, 'Готово')
except Exception as e:
handle_exc(update, e)
def msg_off(update: Update, context: CallbackContext) -> None:
try:
relay = RelayClient()
relay.connect()
relay.off()
reply(update, 'Готово')
except Exception as e:
handle_exc(update, e)
def msg_all(update: Update, context: CallbackContext) -> None:
reply(update, "Неизвестная команда.")
if __name__ == '__main__':
# command-line arguments
parser = ArgumentParser()
parser.add_argument('--token', required=True, type=str,
help='Telegram bot token')
parser.add_argument('--users-whitelist', nargs='+',
help='ID of users allowed to use the bot')
parser.add_argument('--gpio-relayd-host', default='127.0.0.1', type=str)
parser.add_argument('--gpio-relayd-port', default=8307, type=int)
args = parser.parse_args()
whitelist = list(map(lambda x: int(x), args.users_whitelist))
# configure logging
logging.basicConfig(format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
level=logging.INFO)
# configure bot
updater = Updater(args.token, request_kwargs={'read_timeout': 6, 'connect_timeout': 7})
dispatcher = updater.dispatcher
user_filter = Filters.user(whitelist)
dispatcher.add_handler(CommandHandler('start', start))
dispatcher.add_handler(MessageHandler(Filters.text('Включить') & user_filter, msg_on))
dispatcher.add_handler(MessageHandler(Filters.text('Выключить') & user_filter, msg_off))
dispatcher.add_handler(MessageHandler(Filters.text('Статус') & user_filter, msg_status))
dispatcher.add_handler(MessageHandler(Filters.all & user_filter, msg_all))
# start the bot
updater.start_polling()
# run the bot until the user presses Ctrl-C or the process receives SIGINT, SIGTERM or SIGABRT
updater.idle()