initial camera support (only esp32-cam at the moment)

This commit is contained in:
Evgeny Zinoviev 2022-05-17 10:38:27 +03:00
parent f1b52a9220
commit 6f965e85a6
5 changed files with 360 additions and 8 deletions

View File

@ -271,6 +271,9 @@ class Wrapper:
text = exc2text(text)
self.updater.bot.send_message(chat_id=user_id, text=text, parse_mode='HTML')
def send_photo(self, user_id, **kwargs):
self.updater.bot.send_photo(chat_id=user_id, **kwargs)
def send_audio(self, user_id, **kwargs):
self.updater.bot.send_audio(chat_id=user_id, **kwargs)

View File

166
src/home/camera/esp32.py Normal file
View File

@ -0,0 +1,166 @@
import logging
import shutil
import requests
import json
from typing import Union, Optional
from time import sleep
from enum import Enum
from ..api.errors import ApiResponseError
from ..util import Addr
class FrameSize(Enum):
UXGA_1600x1200 = 13
SXGA_1280x1024 = 12
HD_1280x720 = 11
XGA_1024x768 = 10
SVGA_800x600 = 9
VGA_640x480 = 8
HVGA_480x320 = 7
CIF_400x296 = 6
QVGA_320x240 = 5
N_240x240 = 4
HQVGA_240x176 = 3
QCIF_176x144 = 2
QQVGA_160x120 = 1
N_96x96 = 0
class WBMode(Enum):
AUTO = 0
SUNNY = 1
CLOUDY = 2
OFFICE = 3
HOME = 4
def _assert_bounds(n: int, min: int, max: int):
if not min <= n <= max:
raise ValueError(f'value must be between {min} and {max}')
class WebClient:
def __init__(self, addr: Addr):
self.endpoint = f'http://{addr[0]}:{addr[1]}'
self.logger = logging.getLogger(self.__class__.__name__)
self.delay = 0
self.isfirstrequest = True
def setdelay(self, delay: int):
self.delay = delay
def capture(self, save_to: str):
self._call('capture', save_to=save_to)
def getstatus(self):
return json.loads(self._call('status'))
def setflash(self, enable: bool):
self._control('flash', int(enable))
def setresolution(self, fs: FrameSize):
self._control('framesize', fs.value)
def sethmirror(self, enable: bool):
self._control('hmirror', int(enable))
def setvflip(self, enable: bool):
self._control('vflip', int(enable))
def setawb(self, enable: bool):
self._control('awb', int(enable))
def setawbgain(self, enable: bool):
self._control('awb_gain', int(enable))
def setwbmode(self, mode: WBMode):
self._control('wb_mode', mode.value)
def setaecsensor(self, enable: bool):
self._control('aec', int(enable))
def setaecdsp(self, enable: bool):
self._control('aec2', int(enable))
def setagc(self, enable: bool):
self._control('agc', int(enable))
def setagcgain(self, gain: int):
_assert_bounds(gain, 1, 31)
self._control('agc_gain', gain)
def setgainceiling(self, gainceiling: int):
_assert_bounds(gainceiling, 2, 128)
self._control('gainceiling', gainceiling)
def setbpc(self, enable: bool):
self._control('bpc', int(enable))
def setwpc(self, enable: bool):
self._control('wpc', int(enable))
def setrawgma(self, enable: bool):
self._control('raw_gma', int(enable))
def setlenscorrection(self, enable: bool):
self._control('lenc', int(enable))
def setdcw(self, enable: bool):
self._control('dcw', int(enable))
def setcolorbar(self, enable: bool):
self._control('colorbar', int(enable))
def setquality(self, q: int):
_assert_bounds(q, 4, 63)
self._control('quality', q)
def setbrightness(self, brightness: int):
_assert_bounds(brightness, -2, -2)
self._control('brightness', brightness)
def setcontrast(self, contrast: int):
_assert_bounds(contrast, -2, 2)
self._control('contrast', contrast)
def setsaturation(self, saturation: int):
_assert_bounds(saturation, -2, 2)
self._control('saturation', saturation)
def _control(self, var: str, value: Union[int, str]):
self._call('control', params={'var': var, 'val': value})
def _call(self,
method: str,
params: Optional[dict] = None,
save_to: Optional[str] = None):
if not self.isfirstrequest and self.delay > 0:
sleeptime = self.delay / 1000
self.logger.debug(f'sleeping for {sleeptime}')
sleep(sleeptime)
self.isfirstrequest = False
url = f'{self.endpoint}/{method}'
self.logger.debug(f'calling {url}, params: {params}')
kwargs = {}
if params:
kwargs['params'] = params
if save_to:
kwargs['stream'] = True
r = requests.get(url, **kwargs)
if r.status_code != 200:
raise ApiResponseError(status_code=r.status_code)
if save_to:
r.raise_for_status()
with open(save_to, 'wb') as f:
shutil.copyfileobj(r.raw, f)
return True
return r.text

View File

@ -37,20 +37,23 @@ class ConfigStore:
log_default_fmt = False
log_file = None
log_verbose = False
no_config = name is False
path = None
if use_cli:
if parser is None:
parser = ArgumentParser()
parser.add_argument('--config', type=str, required=name is None,
if not no_config:
parser.add_argument('-c', '--config', type=str, required=name is None,
help='Path to the config in TOML format')
parser.add_argument('--verbose', action='store_true')
parser.add_argument('-V', '--verbose', action='store_true')
parser.add_argument('--log-file', type=str)
parser.add_argument('--log-default-fmt', action='store_true')
args = parser.parse_args()
if args.config:
if not no_config and args.config:
path = args.config
if args.verbose:
log_verbose = True
if args.log_file:
@ -58,10 +61,10 @@ class ConfigStore:
if args.log_default_fmt:
log_default_fmt = args.log_default_fmt
if name and path is None:
if not no_config and path is None:
path = _get_config_path(name)
self.data = toml.load(path)
self.data = {} if no_config else toml.load(path)
if 'logging' in self:
if not log_file and 'file' in self['logging']:

View File

@ -1,6 +1,8 @@
#!/usr/bin/env python3
import logging
import os
import time
import tempfile
from enum import Enum
from datetime import datetime, timedelta
@ -12,6 +14,7 @@ from home.api.types import BotType
from home.api.errors import ApiResponseError
from home.sound import SoundNodeClient, RecordClient, RecordFile
from home.soundsensor import SoundSensorServerGuardClient
from home.camera import esp32
from home.util import parse_addr, chunks, filesize_fmt
from home.api import WebAPIClient
from home.api.types import SoundSensorLocation
@ -28,6 +31,7 @@ RenderedContent = tuple[str, Optional[InlineKeyboardMarkup]]
record_client: Optional[RecordClient] = None
bot: Optional[Wrapper] = None
node_client_links: dict[str, SoundNodeClient] = {}
cam_client_links: dict[str, esp32.WebClient] = {}
def node_client(node: str) -> SoundNodeClient:
@ -36,10 +40,24 @@ def node_client(node: str) -> SoundNodeClient:
return node_client_links[node]
def camera_client(cam: str) -> esp32.WebClient:
if cam not in node_client_links:
cam_client_links[cam] = esp32.WebClient(parse_addr(config['cameras'][cam]['addr']))
return cam_client_links[cam]
def node_exists(node: str) -> bool:
return node in config['nodes']
def camera_exists(name: str) -> bool:
return name in config['cameras']
def have_cameras() -> bool:
return 'cameras' in config and config['cameras']
def sound_sensor_exists(node: str) -> bool:
return node in config['sound_sensors']
@ -299,6 +317,141 @@ class SoundSensorRenderer(Renderer):
return text.encode()
class CamerasRenderer(Renderer):
@classmethod
def index(cls, ctx: Context) -> RenderedContent:
html = f'<b>{ctx.lang("cameras")}</b>\n\n'
html += ctx.lang('select_place')
return html, cls.places_markup(ctx, callback_prefix='c0')
@classmethod
def places_markup(cls, ctx: Context, callback_prefix: str) -> InlineKeyboardMarkup:
buttons = []
for sensor, sensor_label in config['cameras'].items():
buttons.append(
[InlineKeyboardButton(sensor_label[ctx.user_lang], callback_data=f'{callback_prefix}/{sensor}')])
return InlineKeyboardMarkup(buttons)
@classmethod
def camera(cls, ctx: Context) -> RenderedContent:
node, = callback_unpack(ctx)
html = ctx.lang('select_interval')
buttons = [
[
InlineKeyboardButton(ctx.lang('w_flash'), callback_data=f'c1/{node}/1'),
InlineKeyboardButton(ctx.lang('wo_flash'), callback_data=f'c1/{node}/0'),
]
]
cls.back_button(ctx, buttons, callback_data=f'c0')
return html, InlineKeyboardMarkup(buttons)
#
# @classmethod
# def record_started(cls, ctx: Context, rid: int) -> RenderedContent:
# node, *rest = callback_unpack(ctx)
#
# place = config['nodes'][node]['label'][ctx.user_lang]
#
# html = f'<b>{ctx.lang("record_started")}</b> (<i>{place}</i>, id={rid})'
# return html, None
#
# @classmethod
# def record_done(cls, info: dict, node: str, uid: int) -> str:
# ulang = bot.store.get_user_lang(uid)
#
# def lang(key, *args):
# return bot.lang.get(key, ulang, *args)
#
# rid = info['id']
# fmt = '%d.%m.%y %H:%M:%S'
# start_time = datetime.fromtimestamp(int(info['start_time'])).strftime(fmt)
# stop_time = datetime.fromtimestamp(int(info['stop_time'])).strftime(fmt)
#
# place = config['nodes'][node]['label'][ulang]
#
# html = f'<b>{lang("record_result")}</b> (<i>{place}</i>, id={rid})\n\n'
# html += f'<b>{lang("beginning")}</b>: {start_time}\n'
# html += f'<b>{lang("end")}</b>: {stop_time}'
#
# return html
#
# @classmethod
# def record_error(cls, info: dict, node: str, uid: int) -> str:
# ulang = bot.store.get_user_lang(uid)
#
# def lang(key, *args):
# return bot.lang.get(key, ulang, *args)
#
# place = config['nodes'][node]['label'][ulang]
# rid = info['id']
#
# html = f'<b>{lang("record_error")}</b> (<i>{place}</i>, id={rid})'
# if 'error' in info:
# html += '\n'+str(info['error'])
#
# return html
# cameras handlers
# ----------------
def cameras(ctx: Context):
text, markup = CamerasRenderer.index(ctx)
if not ctx.is_callback_context():
return ctx.reply(text, markup=markup)
else:
ctx.answer()
return ctx.edit(text, markup=markup)
def camera_options(ctx: Context) -> None:
cam, = callback_unpack(ctx)
if not camera_exists(cam):
ctx.answer(ctx.lang('invalid_location'))
return
ctx.answer()
text, markup = CamerasRenderer.camera(ctx)
ctx.edit(text, markup)
def camera_capture(ctx: Context) -> None:
cam, flash = callback_unpack(ctx)
flash = int(flash)
if not camera_exists(cam):
ctx.answer(ctx.lang('invalid_location'))
return
ctx.answer()
client = camera_client(cam)
client.setflash(True if flash else False)
time.sleep(0.2)
fd = tempfile.NamedTemporaryFile(delete=False, suffix='.jpg')
fd.close()
client.capture(fd.name)
logger.debug(f'captured photo ({cam}), saved to {fd.name}')
# disable flash led
if flash:
client.setflash(False)
try:
with open(fd.name, 'rb') as f:
bot.send_photo(ctx.user_id, photo=f)
except TelegramError as exc:
logger.exception(exc)
try:
os.unlink(fd.name)
except OSError as exc:
logger.exception(exc)
# settings handlers
# -----------------
@ -626,7 +779,12 @@ class SoundBot(Wrapper):
sound_sensors="Датчики звука",
sound_sensors_info="Здесь можно получить информацию о последних срабатываниях датчиков звука.",
sound_sensors_no_24h_data="За последние 24 часа данных нет.",
sound_sensors_show_anything="Показать, что есть"
sound_sensors_show_anything="Показать, что есть",
cameras="Камеры",
select_option="Выберите опцию",
w_flash="Со вспышкой",
wo_flash="Без вспышки",
)
self.lang.en(
@ -672,7 +830,12 @@ class SoundBot(Wrapper):
sound_sensors="Sound sensors",
sound_sensors_info="Here you can get information about last sound sensors hits.",
sound_sensors_no_24h_data="No data for the last 24 hours.",
sound_sensors_show_anything="Show me at least something"
sound_sensors_show_anything="Show me at least something",
cameras="Cameras",
select_option="Select option",
w_flash="With flash",
wo_flash="Without flash",
)
# ------
@ -750,6 +913,21 @@ class SoundBot(Wrapper):
# list of specific node's files
# self.add_handler(CallbackQueryHandler(self.wrap(files_list), pattern=r'^g0/.*'))
# ------
# cameras
# ------------
# list of cameras
self.add_handler(MessageHandler(text_filter(self.lang.all('cameras')), self.wrap(cameras)))
self.add_handler(CallbackQueryHandler(self.wrap(cameras), pattern=r'^c0$'))
# list of options (with/without flash etc)
self.add_handler(CallbackQueryHandler(self.wrap(camera_options), pattern=r'^c0/.*'))
# cheese
self.add_handler(CallbackQueryHandler(self.wrap(camera_capture), pattern=r'^c1/.*'))
def markup(self, ctx: Optional[Context]) -> Optional[ReplyKeyboardMarkup]:
buttons = [
[ctx.lang('record'), ctx.lang('settings')],
@ -760,6 +938,8 @@ class SoundBot(Wrapper):
ctx.lang('guard_enable'), ctx.lang('guard_disable'), ctx.lang('guard_status')
])
buttons.append([ctx.lang('sound_sensors')])
if have_cameras():
buttons.append([ctx.lang('cameras')])
return ReplyKeyboardMarkup(buttons, one_time_keyboard=False)