Evgeny Zinoviev e97f98e5e2 wip
2023-06-19 23:19:35 +03:00

169 lines
5.3 KiB
Python

import asyncio
import os.path
import logging
import psutil
import re
from datetime import datetime
from typing import List, Tuple
from ..util import chunks
from ..config import config, LinuxBoardsConfig
from .config import IpcamConfig
from .types import VideoContainerType
_logger = logging.getLogger(__name__)
_ipcam_config = IpcamConfig()
_lbc_config = LinuxBoardsConfig()
datetime_format = '%Y-%m-%d-%H.%M.%S'
datetime_format_re = r'\d{4}-\d{2}-\d{2}-\d{2}\.\d{2}.\d{2}'
def _get_ffmpeg_path() -> str:
return 'ffmpeg' if 'ffmpeg' not in config else config['ffmpeg']['path']
def time2seconds(time: str) -> int:
time, frac = time.split('.')
frac = int(frac)
h, m, s = [int(i) for i in time.split(':')]
return round(s + m*60 + h*3600 + frac/1000)
async def ffmpeg_recreate(filename: str):
filedir = os.path.dirname(filename)
_, fileext = os.path.splitext(filename)
tempname = os.path.join(filedir, f'.temporary_fixing.{fileext}')
mtime = os.path.getmtime(filename)
args = [_get_ffmpeg_path(), '-nostats', '-loglevel', 'error', '-i', filename, '-c', 'copy', '-y', tempname]
proc = await asyncio.create_subprocess_exec(*args,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE)
stdout, stderr = await proc.communicate()
if proc.returncode != 0:
_logger.error(f'fix_timestamps({filename}): ffmpeg returned {proc.returncode}, stderr: {stderr.decode().strip()}')
if os.path.isfile(tempname):
os.unlink(filename)
os.rename(tempname, filename)
os.utime(filename, (mtime, mtime))
_logger.info(f'fix_timestamps({filename}): OK')
else:
_logger.error(f'fix_timestamps({filename}): temp file \'{tempname}\' does not exists, fix failed')
async def ffmpeg_cut(input: str,
output: str,
start_pos: int,
duration: int):
args = [_get_ffmpeg_path(), '-nostats', '-loglevel', 'error', '-i', input,
'-ss', str(start_pos), '-t', str(duration),
'-c', 'copy', '-y', output]
proc = await asyncio.create_subprocess_exec(*args,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE)
stdout, stderr = await proc.communicate()
if proc.returncode != 0:
_logger.error(f'ffmpeg_cut({input}, start_pos={start_pos}, duration={duration}): ffmpeg returned {proc.returncode}, stderr: {stderr.decode().strip()}')
else:
_logger.info(f'ffmpeg_cut({input}): OK')
def dvr_scan_timecodes(timecodes: str) -> List[Tuple[int, int]]:
tc_backup = timecodes
timecodes = timecodes.split(',')
if len(timecodes) % 2 != 0:
raise DVRScanInvalidTimecodes(f'invalid number of timecodes. input: {tc_backup}')
timecodes = list(map(time2seconds, timecodes))
timecodes = list(chunks(timecodes, 2))
# sort out invalid fragments (dvr-scan returns them sometimes, idk why...)
timecodes = list(filter(lambda f: f[0] < f[1], timecodes))
if not timecodes:
raise DVRScanInvalidTimecodes(f'no valid timecodes. input: {tc_backup}')
# https://stackoverflow.com/a/43600953
timecodes.sort(key=lambda interval: interval[0])
merged = [timecodes[0]]
for current in timecodes:
previous = merged[-1]
if current[0] <= previous[1]:
previous[1] = max(previous[1], current[1])
else:
merged.append(current)
return merged
class DVRScanInvalidTimecodes(Exception):
pass
def has_handle(fpath):
for proc in psutil.process_iter():
try:
for item in proc.open_files():
if fpath == item.path:
return True
except Exception:
pass
return False
def get_recordings_path(cam: int) -> str:
server, disk = _ipcam_config.get_cam_server_and_disk(cam)
disks = _lbc_config.get_board_disks(server)
disk_mountpoint = disks[disk-1]
return f'{disk_mountpoint}/cam-{cam}'
def get_motion_path(cam: int) -> str:
return f'{get_recordings_path(cam)}/motion'
def is_valid_recording_name(filename: str) -> bool:
if not filename.startswith('record_'):
return False
for container_type in VideoContainerType:
if filename.endswith(f'.{container_type.value}'):
return True
return False
def datetime_from_filename(name: str) -> datetime:
name = os.path.basename(name)
exts = '|'.join([t.value for t in VideoContainerType])
if name.startswith('record_'):
return datetime.strptime(re.match(rf'record_(.*?)\.(?:{exts})', name).group(1), datetime_format)
m = re.match(rf'({datetime_format_re})__{datetime_format_re}\.(?:{exts})', name)
if m:
return datetime.strptime(m.group(1), datetime_format)
raise ValueError(f'unrecognized filename format: {name}')
def get_hls_channel_name(cam: int, channel: int) -> str:
name = str(cam)
if channel == 2:
name += '-low'
return name
def get_hls_directory(cam, channel) -> str:
dirname = os.path.join(
_ipcam_config['hls_path'],
get_hls_channel_name(cam, channel)
)
if not os.path.exists(dirname):
os.makedirs(dirname)
return dirname