889 lines
28 KiB
Python
Executable File
889 lines
28 KiB
Python
Executable File
#!/usr/bin/env python3
|
||
import logging
|
||
import os
|
||
import tempfile
|
||
import include_homekit
|
||
|
||
from enum import Enum
|
||
from datetime import datetime, timedelta
|
||
from html import escape
|
||
from typing import Optional, List, Dict, Tuple
|
||
|
||
from homekit.config import config
|
||
from homekit.api import WebApiClient
|
||
from homekit.api.types import SoundSensorLocation
|
||
from homekit.api.errors import ApiResponseError
|
||
from homekit.media import SoundNodeClient, SoundRecordClient, SoundRecordFile, CameraNodeClient
|
||
from homekit.soundsensor import SoundSensorServerGuardClient
|
||
from homekit.util import Addr, chunks, filesize_fmt
|
||
|
||
from homekit.telegram import bot
|
||
|
||
from telegram.error import TelegramError
|
||
from telegram import ReplyKeyboardMarkup, InlineKeyboardMarkup, InlineKeyboardButton, User
|
||
|
||
from PIL import Image
|
||
|
||
config.load_app('sound_bot')
|
||
|
||
nodes = {}
|
||
for nodename, nodecfg in config['nodes'].items():
|
||
nodes[nodename] = Addr.fromstring(nodecfg['addr'])
|
||
|
||
bot.initialize()
|
||
bot.lang.ru(
|
||
start_message="Выберите команду на клавиатуре",
|
||
unknown_command="Неизвестная команда",
|
||
unexpected_callback_data="Ошибка: неверные данные",
|
||
settings="Настройки микшера",
|
||
record="Запись",
|
||
loading="Загрузка...",
|
||
select_place="Выберите место:",
|
||
invalid_location="Неверное место",
|
||
invalid_interval="Неверная длительность",
|
||
unsupported_action="Неподдерживаемое действие",
|
||
# select_control="Выберите контрол для изменения настроек:",
|
||
control_state="Состояние контрола %s",
|
||
incr="громкость +",
|
||
decr="громкость -",
|
||
back="◀️ Назад",
|
||
n_min="%d мин.",
|
||
n_sec="%d сек.",
|
||
select_interval="Выберите длительность:",
|
||
place="Место",
|
||
beginning="Начало",
|
||
end="Конец",
|
||
record_result="Результат записи",
|
||
record_started='Запись запущена!',
|
||
record_error="Ошибка записи",
|
||
files="Локальные файлы",
|
||
remote_files="Файлы на сервере",
|
||
file_line="— Запись с <b>%s</b> до <b>%s</b> <i>(%s)</i>",
|
||
access_denied="Доступ запрещён",
|
||
|
||
guard_disable="Снять с охраны",
|
||
guard_enable="Поставить на охрану",
|
||
guard_status="Статус охраны",
|
||
guard_user_action_notification='Пользователь <a href="tg://user?id=%d">%s</a> %s.',
|
||
guard_user_action_enable="включил охрану ✅",
|
||
guard_user_action_disable="выключил охрану ❌",
|
||
guard_status_enabled="Включена ✅",
|
||
guard_status_disabled="Выключена ❌",
|
||
|
||
done="Готово 👌",
|
||
|
||
sound_sensors="Датчики звука",
|
||
sound_sensors_info="Здесь можно получить информацию о последних срабатываниях датчиков звука.",
|
||
sound_sensors_no_24h_data="За последние 24 часа данных нет.",
|
||
sound_sensors_show_anything="Показать, что есть",
|
||
|
||
cameras="Камеры",
|
||
select_option="Выберите опцию",
|
||
w_flash="Со вспышкой",
|
||
wo_flash="Без вспышки",
|
||
)
|
||
|
||
bot.lang.en(
|
||
start_message="Select command on the keyboard",
|
||
unknown_command="Unknown command",
|
||
settings="Mixer settings",
|
||
record="Record",
|
||
unexpected_callback_data="Unexpected callback data",
|
||
loading="Loading...",
|
||
select_place="Select place:",
|
||
invalid_location="Invalid place",
|
||
invalid_interval="Invalid duration",
|
||
unsupported_action="Unsupported action",
|
||
# select_control="Select control to adjust its parameters:",
|
||
control_state="%s control state",
|
||
incr="vol +",
|
||
decr="vol -",
|
||
back="◀️ Back",
|
||
n_min="%d min.",
|
||
n_sec="%d s.",
|
||
select_interval="Select duration:",
|
||
place="Place",
|
||
beginning="Started",
|
||
end="Ended",
|
||
record_result="Result",
|
||
record_started='Recording started!',
|
||
record_error="Recording error",
|
||
files="Local files",
|
||
remote_files="Remote files",
|
||
file_line="— From <b>%s</b> to <b>%s</b> <i>(%s)</i>",
|
||
access_denied="Access denied",
|
||
|
||
guard_disable="Disable guard",
|
||
guard_enable="Enable guard",
|
||
guard_status="Guard status",
|
||
guard_user_action_notification='User <a href="tg://user?id=%d">%s</a> %s.',
|
||
guard_user_action_enable="turned the guard ON ✅",
|
||
guard_user_action_disable="turn the guard OFF ❌",
|
||
guard_status_enabled="Active ✅",
|
||
guard_status_disabled="Disabled ❌",
|
||
done="Done 👌",
|
||
|
||
sound_sensors="Sound sensors",
|
||
sound_sensors_info="Here you can get information about last sound sensors hits.",
|
||
sound_sensors_no_24h_data="No data for the last 24 hours.",
|
||
sound_sensors_show_anything="Show me at least something",
|
||
|
||
cameras="Cameras",
|
||
select_option="Select option",
|
||
w_flash="With flash",
|
||
wo_flash="Without flash",
|
||
)
|
||
|
||
logger = logging.getLogger(__name__)
|
||
RenderedContent = Tuple[str, Optional[InlineKeyboardMarkup]]
|
||
record_client: Optional[SoundRecordClient] = None
|
||
node_client_links: Dict[str, SoundNodeClient] = {}
|
||
cam_client_links: Dict[str, CameraNodeClient] = {}
|
||
|
||
|
||
def node_client(node: str) -> SoundNodeClient:
|
||
if node not in node_client_links:
|
||
node_client_links[node] = SoundNodeClient(Addr.fromstring(config['nodes'][node]['addr']))
|
||
return node_client_links[node]
|
||
|
||
|
||
def camera_client(cam: str) -> CameraNodeClient:
|
||
if cam not in node_client_links:
|
||
cam_client_links[cam] = CameraNodeClient(Addr.fromstring(config['cameras'][cam]['addr']))
|
||
return cam_client_links[cam]
|
||
|
||
|
||
def node_exists(node: str) -> bool:
|
||
return node in config['nodes']
|
||
|
||
|
||
def camera_exists(name: str) -> bool:
|
||
return name in config['cameras']
|
||
|
||
|
||
def camera_settings(name: str) -> Optional[dict]:
|
||
try:
|
||
return config['cameras'][name]['settings']
|
||
except KeyError:
|
||
return None
|
||
|
||
|
||
def have_cameras() -> bool:
|
||
return 'cameras' in config and config['cameras']
|
||
|
||
|
||
def sound_sensor_exists(node: str) -> bool:
|
||
return node in config['sound_sensors']
|
||
|
||
|
||
def interval_defined(interval: int) -> bool:
|
||
return interval in config['bot']['record_intervals']
|
||
|
||
|
||
def callback_unpack(ctx: bot.Context) -> List[str]:
|
||
return ctx.callback_query.data[3:].split('/')
|
||
|
||
|
||
def manual_recording_allowed(user_id: int) -> bool:
|
||
return 'manual_record_allowlist' not in config['bot'] or user_id in config['bot']['manual_record_allowlist']
|
||
|
||
|
||
def guard_client() -> SoundSensorServerGuardClient:
|
||
return SoundSensorServerGuardClient(Addr.fromstring(config['bot']['guard_server']))
|
||
|
||
|
||
# message renderers
|
||
# -----------------
|
||
|
||
class Renderer:
|
||
@classmethod
|
||
def places_markup(cls, ctx: bot.Context, callback_prefix: str) -> InlineKeyboardMarkup:
|
||
buttons = []
|
||
for node, nodeconfig in config['nodes'].items():
|
||
buttons.append([InlineKeyboardButton(nodeconfig['label'][ctx.user_lang], callback_data=f'{callback_prefix}/{node}')])
|
||
return InlineKeyboardMarkup(buttons)
|
||
|
||
@classmethod
|
||
def back_button(cls,
|
||
ctx: bot.Context,
|
||
buttons: list,
|
||
callback_data: str):
|
||
buttons.append([
|
||
InlineKeyboardButton(ctx.lang('back'), callback_data=callback_data)
|
||
])
|
||
|
||
|
||
class SettingsRenderer(Renderer):
|
||
@classmethod
|
||
def index(cls, ctx: bot.Context) -> RenderedContent:
|
||
html = f'<b>{ctx.lang("settings")}</b>\n\n'
|
||
html += ctx.lang('select_place')
|
||
return html, cls.places_markup(ctx, callback_prefix='s0')
|
||
|
||
@classmethod
|
||
def node(cls, ctx: bot.Context,
|
||
controls: List[dict]) -> RenderedContent:
|
||
node, = callback_unpack(ctx)
|
||
|
||
html = []
|
||
buttons = []
|
||
for control in controls:
|
||
html.append(f'<b>{control["name"]}</b>\n{escape(control["info"])}')
|
||
buttons.append([
|
||
InlineKeyboardButton(control['name'], callback_data=f's1/{node}/{control["name"]}')
|
||
])
|
||
|
||
html = "\n\n".join(html)
|
||
cls.back_button(ctx, buttons, callback_data='s0')
|
||
|
||
return html, InlineKeyboardMarkup(buttons)
|
||
|
||
@classmethod
|
||
def control(cls, ctx: bot.Context, data) -> RenderedContent:
|
||
node, control, *rest = callback_unpack(ctx)
|
||
|
||
html = '<b>' + ctx.lang('control_state', control) + '</b>\n\n'
|
||
html += escape(data['info'])
|
||
buttons = []
|
||
callback_prefix = f's2/{node}/{control}'
|
||
for cap in data['caps']:
|
||
if cap == 'mute':
|
||
muted = 'dB] [off]' in data['info']
|
||
act = 'unmute' if muted else 'mute'
|
||
buttons.append([InlineKeyboardButton(act, callback_data=f'{callback_prefix}/{act}')])
|
||
|
||
elif cap == 'cap':
|
||
cap_dis = 'Capture [off]' in data['info']
|
||
act = 'cap' if cap_dis else 'nocap'
|
||
buttons.append([InlineKeyboardButton(act, callback_data=f'{callback_prefix}/{act}')])
|
||
|
||
elif cap == 'volume':
|
||
buttons.append(
|
||
list(map(lambda s: InlineKeyboardButton(ctx.lang(s), callback_data=f'{callback_prefix}/{s}'),
|
||
['decr', 'incr']))
|
||
)
|
||
|
||
cls.back_button(ctx, buttons, callback_data=f's0/{node}')
|
||
|
||
return html, InlineKeyboardMarkup(buttons)
|
||
|
||
|
||
class RecordRenderer(Renderer):
|
||
@classmethod
|
||
def index(cls, ctx: bot.Context) -> RenderedContent:
|
||
html = f'<b>{ctx.lang("record")}</b>\n\n'
|
||
html += ctx.lang('select_place')
|
||
return html, cls.places_markup(ctx, callback_prefix='r0')
|
||
|
||
@classmethod
|
||
def node(cls, ctx: bot.Context, durations: List[int]) -> RenderedContent:
|
||
node, = callback_unpack(ctx)
|
||
|
||
html = ctx.lang('select_interval')
|
||
|
||
buttons = []
|
||
for s in durations:
|
||
if s >= 60:
|
||
m = int(s / 60)
|
||
label = ctx.lang('n_min', m)
|
||
else:
|
||
label = ctx.lang('n_sec', s)
|
||
buttons.append(InlineKeyboardButton(label, callback_data=f'r1/{node}/{s}'))
|
||
buttons = list(chunks(buttons, 3))
|
||
cls.back_button(ctx, buttons, callback_data=f'r0')
|
||
|
||
return html, InlineKeyboardMarkup(buttons)
|
||
|
||
@classmethod
|
||
def record_started(cls, ctx: bot.Context, rid: int) -> RenderedContent:
|
||
node, *rest = callback_unpack(ctx)
|
||
|
||
place = config['nodes'][node]['label'][ctx.user_lang]
|
||
|
||
html = f'<b>{ctx.lang("record_started")}</b> (<i>{place}</i>, id={rid})'
|
||
return html, None
|
||
|
||
@classmethod
|
||
def record_done(cls, info: dict, node: str, uid: int) -> str:
|
||
ulang = bot.db.get_user_lang(uid)
|
||
|
||
def lang(key, *args):
|
||
return bot.lang.get(key, ulang, *args)
|
||
|
||
rid = info['id']
|
||
fmt = '%d.%m.%y %H:%M:%S'
|
||
start_time = datetime.fromtimestamp(int(info['start_time'])).strftime(fmt)
|
||
stop_time = datetime.fromtimestamp(int(info['stop_time'])).strftime(fmt)
|
||
|
||
place = config['nodes'][node]['label'][ulang]
|
||
|
||
html = f'<b>{lang("record_result")}</b> (<i>{place}</i>, id={rid})\n\n'
|
||
html += f'<b>{lang("beginning")}</b>: {start_time}\n'
|
||
html += f'<b>{lang("end")}</b>: {stop_time}'
|
||
|
||
return html
|
||
|
||
@classmethod
|
||
def record_error(cls, info: dict, node: str, uid: int) -> str:
|
||
ulang = bot.db.get_user_lang(uid)
|
||
|
||
def lang(key, *args):
|
||
return bot.lang.get(key, ulang, *args)
|
||
|
||
place = config['nodes'][node]['label'][ulang]
|
||
rid = info['id']
|
||
|
||
html = f'<b>{lang("record_error")}</b> (<i>{place}</i>, id={rid})'
|
||
if 'error' in info:
|
||
html += '\n'+str(info['error'])
|
||
|
||
return html
|
||
|
||
|
||
class FilesRenderer(Renderer):
|
||
@classmethod
|
||
def index(cls, ctx: bot.Context) -> RenderedContent:
|
||
html = f'<b>{ctx.lang("files")}</b>\n\n'
|
||
html += ctx.lang('select_place')
|
||
return html, cls.places_markup(ctx, callback_prefix='f0')
|
||
|
||
@classmethod
|
||
def filelist(cls, ctx: bot.Context, files: List[SoundRecordFile]) -> RenderedContent:
|
||
node, = callback_unpack(ctx)
|
||
|
||
html_files = map(lambda file: cls.file(ctx, file, node), files)
|
||
html = '\n\n'.join(html_files)
|
||
|
||
buttons = []
|
||
cls.back_button(ctx, buttons, callback_data='f0')
|
||
|
||
return html, InlineKeyboardMarkup(buttons)
|
||
|
||
@classmethod
|
||
def file(cls, ctx: bot.Context, file: SoundRecordFile, node: str) -> str:
|
||
html = ctx.lang('file_line', file.start_humantime, file.stop_humantime, filesize_fmt(file.filesize))
|
||
if file.file_id is not None:
|
||
html += f'/audio_{node}_{file.file_id}'
|
||
return html
|
||
|
||
|
||
class RemoteFilesRenderer(FilesRenderer):
|
||
@classmethod
|
||
def index(cls, ctx: bot.Context) -> RenderedContent:
|
||
html = f'<b>{ctx.lang("remote_files")}</b>\n\n'
|
||
html += ctx.lang('select_place')
|
||
return html, cls.places_markup(ctx, callback_prefix='g0')
|
||
|
||
|
||
class SoundSensorRenderer(Renderer):
|
||
@classmethod
|
||
def places_markup(cls, ctx: bot.Context, callback_prefix: str) -> InlineKeyboardMarkup:
|
||
buttons = []
|
||
for sensor, sensor_label in config['sound_sensors'].items():
|
||
buttons.append(
|
||
[InlineKeyboardButton(sensor_label[ctx.user_lang], callback_data=f'{callback_prefix}/{sensor}')])
|
||
return InlineKeyboardMarkup(buttons)
|
||
|
||
@classmethod
|
||
def index(cls, ctx: bot.Context) -> RenderedContent:
|
||
html = f'{ctx.lang("sound_sensors_info")}\n\n'
|
||
html += ctx.lang('select_place')
|
||
return html, cls.places_markup(ctx, callback_prefix='S0')
|
||
|
||
@classmethod
|
||
def hits(cls, ctx: bot.Context, data, is_last=False) -> RenderedContent:
|
||
node, = callback_unpack(ctx)
|
||
buttons = []
|
||
|
||
if not data:
|
||
html = ctx.lang('sound_sensors_no_24h_data')
|
||
if not is_last:
|
||
buttons.append([InlineKeyboardButton(ctx.lang('sound_sensors_show_anything'), callback_data=f'S1/{node}')])
|
||
else:
|
||
html = ''
|
||
prev_date = None
|
||
for item in data:
|
||
item_date = item['time'].strftime('%d.%m.%y')
|
||
if prev_date is None or prev_date != item_date:
|
||
if html != '':
|
||
html += '\n\n'
|
||
html += f'<b>{item_date}</b>'
|
||
prev_date = item_date
|
||
html += '\n' + item['time'].strftime('%H:%M:%S') + f' (+{item["hits"]})'
|
||
cls.back_button(ctx, buttons, callback_data='S0')
|
||
return html, InlineKeyboardMarkup(buttons)
|
||
|
||
@classmethod
|
||
def hits_plain(cls, ctx: bot.Context, data, is_last=False) -> bytes:
|
||
node, = callback_unpack(ctx)
|
||
|
||
text = ''
|
||
prev_date = None
|
||
for item in data:
|
||
item_date = item['time'].strftime('%d.%m.%y')
|
||
if prev_date is None or prev_date != item_date:
|
||
if text != '':
|
||
text += '\n\n'
|
||
text += item_date
|
||
prev_date = item_date
|
||
text += '\n' + item['time'].strftime('%H:%M:%S') + f' (+{item["hits"]})'
|
||
|
||
return text.encode()
|
||
|
||
|
||
class CamerasRenderer(Renderer):
|
||
@classmethod
|
||
def index(cls, ctx: bot.Context) -> RenderedContent:
|
||
html = f'<b>{ctx.lang("cameras")}</b>\n\n'
|
||
html += ctx.lang('select_place')
|
||
return html, cls.places_markup(ctx, callback_prefix='c0')
|
||
|
||
@classmethod
|
||
def places_markup(cls, ctx: bot.Context, callback_prefix: str) -> InlineKeyboardMarkup:
|
||
buttons = []
|
||
for camera_name, camera_data in config['cameras'].items():
|
||
buttons.append(
|
||
[InlineKeyboardButton(camera_data['label'][ctx.user_lang], callback_data=f'{callback_prefix}/{camera_name}')])
|
||
return InlineKeyboardMarkup(buttons)
|
||
|
||
@classmethod
|
||
def camera(cls, ctx: bot.Context, flash_available: bool) -> RenderedContent:
|
||
node, = callback_unpack(ctx)
|
||
|
||
html = ctx.lang('select_option')
|
||
|
||
buttons = []
|
||
if flash_available:
|
||
buttons.append(InlineKeyboardButton(ctx.lang('w_flash'), callback_data=f'c1/{node}/1'))
|
||
buttons.append(InlineKeyboardButton(ctx.lang('wo_flash'), callback_data=f'c1/{node}/0'))
|
||
|
||
cls.back_button(ctx, [buttons], callback_data=f'c0')
|
||
|
||
return html, InlineKeyboardMarkup([buttons])
|
||
#
|
||
# @classmethod
|
||
# def record_started(cls, ctx: bot.Context, rid: int) -> RenderedContent:
|
||
# node, *rest = callback_unpack(ctx)
|
||
#
|
||
# place = config['nodes'][node]['label'][ctx.user_lang]
|
||
#
|
||
# html = f'<b>{ctx.lang("record_started")}</b> (<i>{place}</i>, id={rid})'
|
||
# return html, None
|
||
#
|
||
# @classmethod
|
||
# def record_done(cls, info: dict, node: str, uid: int) -> str:
|
||
# ulang = bot.db.get_user_lang(uid)
|
||
#
|
||
# def lang(key, *args):
|
||
# return bot.lang.get(key, ulang, *args)
|
||
#
|
||
# rid = info['id']
|
||
# fmt = '%d.%m.%y %H:%M:%S'
|
||
# start_time = datetime.fromtimestamp(int(info['start_time'])).strftime(fmt)
|
||
# stop_time = datetime.fromtimestamp(int(info['stop_time'])).strftime(fmt)
|
||
#
|
||
# place = config['nodes'][node]['label'][ulang]
|
||
#
|
||
# html = f'<b>{lang("record_result")}</b> (<i>{place}</i>, id={rid})\n\n'
|
||
# html += f'<b>{lang("beginning")}</b>: {start_time}\n'
|
||
# html += f'<b>{lang("end")}</b>: {stop_time}'
|
||
#
|
||
# return html
|
||
#
|
||
# @classmethod
|
||
# def record_error(cls, info: dict, node: str, uid: int) -> str:
|
||
# ulang = bot.db.get_user_lang(uid)
|
||
#
|
||
# def lang(key, *args):
|
||
# return bot.lang.get(key, ulang, *args)
|
||
#
|
||
# place = config['nodes'][node]['label'][ulang]
|
||
# rid = info['id']
|
||
#
|
||
# html = f'<b>{lang("record_error")}</b> (<i>{place}</i>, id={rid})'
|
||
# if 'error' in info:
|
||
# html += '\n'+str(info['error'])
|
||
#
|
||
# return html
|
||
|
||
|
||
# cameras handlers
|
||
# ----------------
|
||
|
||
@bot.handler(message='cameras', callback=r'^c0$')
|
||
def cameras(ctx: bot.Context):
|
||
""" List of cameras """
|
||
|
||
text, markup = CamerasRenderer.index(ctx)
|
||
if not ctx.is_callback_context():
|
||
return ctx.reply(text, markup=markup)
|
||
else:
|
||
ctx.answer()
|
||
return ctx.edit(text, markup=markup)
|
||
|
||
|
||
@bot.callbackhandler(callback=r'^c0/.*')
|
||
def camera_options(ctx: bot.Context) -> None:
|
||
""" List of options (with/without flash etc) """
|
||
|
||
cam, = callback_unpack(ctx)
|
||
if not camera_exists(cam):
|
||
ctx.answer(ctx.lang('invalid_location'))
|
||
return
|
||
|
||
ctx.answer()
|
||
flash_available = 'flash_available' in config['cameras'][cam] and config['cameras'][cam]['flash_available'] is True
|
||
|
||
text, markup = CamerasRenderer.camera(ctx, flash_available)
|
||
ctx.edit(text, markup)
|
||
|
||
|
||
@bot.callbackhandler(callback=r'^c1/.*')
|
||
def camera_capture(ctx: bot.Context) -> None:
|
||
""" Cheese """
|
||
|
||
cam, flash = callback_unpack(ctx)
|
||
flash = int(flash)
|
||
if not camera_exists(cam):
|
||
ctx.answer(ctx.lang('invalid_location'))
|
||
return
|
||
|
||
ctx.answer()
|
||
|
||
client = camera_client(cam)
|
||
fd = tempfile.NamedTemporaryFile(delete=False, suffix='.jpg')
|
||
fd.close()
|
||
|
||
client.capture(fd.name, with_flash=bool(flash))
|
||
logger.debug(f'captured photo ({cam}), saved to {fd.name}')
|
||
|
||
camera_config = config['cameras'][cam]
|
||
if 'rotate' in camera_config:
|
||
im = Image.open(fd.name)
|
||
im.rotate(camera_config['rotate'], expand=True)
|
||
# im.show()
|
||
im.save(fd.name)
|
||
logger.debug(f"rotated image {camera_config['rotate']} degrees")
|
||
|
||
try:
|
||
with open(fd.name, 'rb') as f:
|
||
bot.send_photo(ctx.user_id, photo=f)
|
||
except TelegramError as exc:
|
||
logger.exception(exc)
|
||
|
||
try:
|
||
os.unlink(fd.name)
|
||
except OSError as exc:
|
||
logger.exception(exc)
|
||
|
||
|
||
# settings handlers
|
||
# -----------------
|
||
|
||
@bot.handler(message='settings', callback=r'^s0$')
|
||
def settings(ctx: bot.Context):
|
||
""" List of nodes """
|
||
|
||
text, markup = SettingsRenderer.index(ctx)
|
||
if not ctx.is_callback_context():
|
||
return ctx.reply(text, markup=markup)
|
||
else:
|
||
ctx.answer()
|
||
return ctx.edit(text, markup=markup)
|
||
|
||
|
||
@bot.callbackhandler(callback=r'^s0/.*')
|
||
def settings_place(ctx: bot.Context):
|
||
""" List of controls """
|
||
|
||
node, = callback_unpack(ctx)
|
||
if not node_exists(node):
|
||
ctx.answer(ctx.lang('invalid_location'))
|
||
return
|
||
|
||
cl = node_client(node)
|
||
controls = cl.amixer_get_all()
|
||
|
||
ctx.answer()
|
||
|
||
text, markup = SettingsRenderer.node(ctx, controls)
|
||
ctx.edit(text, markup)
|
||
|
||
|
||
@bot.callbackhandler(callback=r'^s1/.*')
|
||
def settings_place_control(ctx: bot.Context):
|
||
""" List of available tunes for control """
|
||
|
||
node, control = callback_unpack(ctx)
|
||
if not node_exists(node):
|
||
ctx.answer(ctx.lang('invalid_location'))
|
||
return
|
||
|
||
cl = node_client(node)
|
||
control_data = cl.amixer_get(control)
|
||
|
||
ctx.answer()
|
||
|
||
text, markup = SettingsRenderer.control(ctx, control_data)
|
||
ctx.edit(text, markup)
|
||
|
||
|
||
@bot.callbackhandler(callback=r'^s2/.*')
|
||
def settings_place_control_action(ctx: bot.Context):
|
||
""" Tuning """
|
||
|
||
node, control, action = callback_unpack(ctx)
|
||
if not node_exists(node):
|
||
return
|
||
|
||
cl = node_client(node)
|
||
if not hasattr(cl, f'amixer_{action}'):
|
||
ctx.answer(ctx.lang('invalid_action'))
|
||
return
|
||
|
||
func = getattr(cl, f'amixer_{action}')
|
||
control_data = func(control)
|
||
|
||
ctx.answer()
|
||
|
||
text, markup = SettingsRenderer.control(ctx, control_data)
|
||
ctx.edit(text, markup)
|
||
|
||
|
||
# recording handlers
|
||
# ------------------
|
||
|
||
@bot.handler(message='record', callback=r'^r0$')
|
||
def record(ctx: bot.Context):
|
||
""" List of nodes """
|
||
|
||
if not manual_recording_allowed(ctx.user_id):
|
||
return ctx.reply(ctx.lang('access_denied'))
|
||
|
||
text, markup = RecordRenderer.index(ctx)
|
||
if not ctx.is_callback_context():
|
||
return ctx.reply(text, markup=markup)
|
||
else:
|
||
ctx.answer()
|
||
return ctx.edit(text, markup=markup)
|
||
|
||
|
||
@bot.callbackhandler(callback=r'^r0/.*')
|
||
def record_place(ctx: bot.Context):
|
||
""" List of available intervals """
|
||
|
||
node, = callback_unpack(ctx)
|
||
if not node_exists(node):
|
||
ctx.answer(ctx.lang('invalid_location'))
|
||
return
|
||
|
||
ctx.answer()
|
||
|
||
text, markup = RecordRenderer.node(ctx, config['bot']['record_intervals'])
|
||
ctx.edit(text, markup)
|
||
|
||
|
||
@bot.callbackhandler(callback=r'^r1/.*')
|
||
def record_place_interval(ctx: bot.Context):
|
||
""" Do record! """
|
||
|
||
node, interval = callback_unpack(ctx)
|
||
interval = int(interval)
|
||
if not node_exists(node):
|
||
ctx.answer(ctx.lang('invalid_location'))
|
||
return
|
||
if not interval_defined(interval):
|
||
ctx.answer(ctx.lang('invalid_interval'))
|
||
return
|
||
|
||
try:
|
||
record_id = record_client.record(node, interval, {'user_id': ctx.user_id, 'node': node})
|
||
except ApiResponseError as e:
|
||
ctx.answer(e.error_message)
|
||
logger.error(e)
|
||
return
|
||
|
||
ctx.answer()
|
||
|
||
html, markup = RecordRenderer.record_started(ctx, record_id)
|
||
ctx.edit(html, markup)
|
||
|
||
|
||
# sound sensor handlers
|
||
# ---------------------
|
||
|
||
@bot.handler(message='sound_sensors', callback=r'^S0$')
|
||
def sound_sensors(ctx: bot.Context):
|
||
""" List of places """
|
||
|
||
text, markup = SoundSensorRenderer.index(ctx)
|
||
if not ctx.is_callback_context():
|
||
return ctx.reply(text, markup=markup)
|
||
else:
|
||
ctx.answer()
|
||
return ctx.edit(text, markup=markup)
|
||
|
||
|
||
@bot.callbackhandler(callback=r'^S0/.*')
|
||
def sound_sensors_last_24h(ctx: bot.Context):
|
||
""" Last 24h log """
|
||
|
||
node, = callback_unpack(ctx)
|
||
if not sound_sensor_exists(node):
|
||
ctx.answer(ctx.lang('invalid location'))
|
||
return
|
||
|
||
ctx.answer()
|
||
|
||
cl = WebApiClient()
|
||
data = cl.get_sound_sensor_hits(location=SoundSensorLocation[node.upper()],
|
||
after=datetime.now() - timedelta(hours=24))
|
||
|
||
text, markup = SoundSensorRenderer.hits(ctx, data)
|
||
if len(text) > 4096:
|
||
plain = SoundSensorRenderer.hits_plain(ctx, data)
|
||
bot.send_file(ctx.user_id, document=plain, filename='data.txt')
|
||
else:
|
||
ctx.edit(text, markup=markup)
|
||
|
||
|
||
@bot.callbackhandler(callback=r'^S1/.*')
|
||
def sound_sensors_last_anything(ctx: bot.Context):
|
||
""" Last _something_ """
|
||
|
||
node, = callback_unpack(ctx)
|
||
if not sound_sensor_exists(node):
|
||
ctx.answer(ctx.lang('invalid location'))
|
||
return
|
||
|
||
ctx.answer()
|
||
|
||
cl = WebApiClient()
|
||
data = cl.get_last_sound_sensor_hits(location=SoundSensorLocation[node.upper()],
|
||
last=20)
|
||
|
||
text, markup = SoundSensorRenderer.hits(ctx, data, is_last=True)
|
||
if len(text) > 4096:
|
||
plain = SoundSensorRenderer.hits_plain(ctx, data)
|
||
bot.send_file(ctx.user_id, document=plain, filename='data.txt')
|
||
else:
|
||
ctx.edit(text, markup=markup)
|
||
|
||
|
||
# guard enable/disable handlers
|
||
# -----------------------------
|
||
|
||
class GuardUserAction(Enum):
|
||
ENABLE = 'enable'
|
||
DISABLE = 'disable'
|
||
|
||
|
||
if 'guard_server' in config['bot']:
|
||
@bot.handler(message='guard_status')
|
||
def guard_status(ctx: bot.Context):
|
||
guard = guard_client()
|
||
resp = guard.guard_status()
|
||
|
||
key = 'enabled' if resp['enabled'] is True else 'disabled'
|
||
ctx.reply(ctx.lang(f'guard_status_{key}'))
|
||
|
||
|
||
@bot.handler(message='guard_enable')
|
||
def guard_enable(ctx: bot.Context):
|
||
guard = guard_client()
|
||
guard.guard_enable()
|
||
ctx.reply(ctx.lang('done'))
|
||
|
||
_guard_notify(ctx.user, GuardUserAction.ENABLE)
|
||
|
||
|
||
@bot.handler(message='guard_disable')
|
||
def guard_disable(ctx: bot.Context):
|
||
guard = guard_client()
|
||
guard.guard_disable()
|
||
ctx.reply(ctx.lang('done'))
|
||
|
||
_guard_notify(ctx.user, GuardUserAction.DISABLE)
|
||
|
||
|
||
def _guard_notify(user: User, action: GuardUserAction):
|
||
def text_getter(lang: str):
|
||
action_name = bot.lang.get(f'guard_user_action_{action.value}', lang)
|
||
user_name = bot.user_any_name(user)
|
||
return 'ℹ ' + bot.lang.get('guard_user_action_notification', lang,
|
||
user.id, user_name, action_name)
|
||
|
||
bot.notify_all(text_getter, exclude=(user.id,))
|
||
|
||
|
||
@bot.defaultreplymarkup
|
||
def markup(ctx: Optional[bot.Context]) -> Optional[ReplyKeyboardMarkup]:
|
||
buttons = [
|
||
[ctx.lang('record'), ctx.lang('settings')],
|
||
# [ctx.lang('files'), ctx.lang('remote_files')],
|
||
]
|
||
if 'guard_server' in config['bot']:
|
||
buttons.append([
|
||
ctx.lang('guard_enable'), ctx.lang('guard_disable'), ctx.lang('guard_status')
|
||
])
|
||
buttons.append([ctx.lang('sound_sensors')])
|
||
if have_cameras():
|
||
buttons.append([ctx.lang('cameras')])
|
||
return ReplyKeyboardMarkup(buttons, one_time_keyboard=False)
|
||
|
||
|
||
# record client callbacks
|
||
# -----------------------
|
||
|
||
def record_onerror(info: dict, userdata: dict):
|
||
uid = userdata['user_id']
|
||
node = userdata['node']
|
||
|
||
html = RecordRenderer.record_error(info, node, uid)
|
||
try:
|
||
bot.notify_user(userdata['user_id'], html)
|
||
except TelegramError as exc:
|
||
logger.exception(exc)
|
||
finally:
|
||
record_client.forget(node, info['id'])
|
||
|
||
|
||
def record_onfinished(info: dict, fn: str, userdata: dict):
|
||
logger.info('record finished: ' + str(info))
|
||
|
||
uid = userdata['user_id']
|
||
node = userdata['node']
|
||
|
||
html = RecordRenderer.record_done(info, node, uid)
|
||
bot.notify_user(uid, html)
|
||
|
||
try:
|
||
# sending audiofile to telegram
|
||
with open(fn, 'rb') as f:
|
||
bot.send_audio(uid, audio=f, filename='audio.mp3')
|
||
|
||
# deleting temp file
|
||
try:
|
||
os.unlink(fn)
|
||
except OSError as exc:
|
||
logger.exception(exc)
|
||
bot.notify_user(uid, exc)
|
||
|
||
# remove the recording from sound_node's history
|
||
record_client.forget(node, info['id'])
|
||
|
||
# remove file from storage
|
||
# node_client(node).storage_delete(info['file']['fileid'])
|
||
except Exception as e:
|
||
logger.exception(e)
|
||
|
||
|
||
if __name__ == '__main__':
|
||
record_client = SoundRecordClient(nodes,
|
||
error_handler=record_onerror,
|
||
finished_handler=record_onfinished,
|
||
download_on_finish=True)
|
||
|
||
bot.run()
|
||
record_client.stop()
|