144 lines
5.0 KiB
Python
Executable File
144 lines
5.0 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
import include_homekit
|
|
import sys
|
|
import os
|
|
import subprocess
|
|
import asyncio
|
|
import signal
|
|
|
|
from typing import TextIO
|
|
from argparse import ArgumentParser
|
|
from socket import gethostname
|
|
from asyncio.streams import StreamReader
|
|
from homekit.config import config as homekit_config
|
|
from homekit.linux import LinuxBoardsConfig
|
|
from homekit.camera import IpcamConfig, CaptureType
|
|
from homekit.camera.util import get_hls_directory, get_hls_channel_name, get_recordings_path
|
|
|
|
ipcam_config = IpcamConfig()
|
|
lbc_config = LinuxBoardsConfig()
|
|
channels = (1, 2)
|
|
tasks = []
|
|
restart_delay = 3
|
|
lock = asyncio.Lock()
|
|
worker_type: CaptureType
|
|
|
|
|
|
async def read_output(stream: StreamReader,
|
|
thread_name: str,
|
|
output: TextIO):
|
|
try:
|
|
while True:
|
|
line = await stream.readline()
|
|
if not line:
|
|
break
|
|
print(f"[{thread_name}] {line.decode().strip()}", file=output)
|
|
|
|
except asyncio.LimitOverrunError:
|
|
print(f"[{thread_name}] Output limit exceeded.", file=output)
|
|
|
|
except Exception as e:
|
|
print(f"[{thread_name}] Error occurred while reading output: {e}", file=sys.stderr)
|
|
|
|
|
|
async def run_ffmpeg(cam: int, channel: int):
|
|
prefix = get_hls_channel_name(cam, channel)
|
|
|
|
if homekit_config.app_config.logging_is_verbose():
|
|
debug_args = ['-v', '-info']
|
|
else:
|
|
debug_args = ['-nostats', '-loglevel', 'error']
|
|
|
|
# protocol = 'tcp' if ipcam_config.should_use_tcp_for_rtsp(cam) else 'udp'
|
|
protocol = 'tcp'
|
|
user, pw = ipcam_config.get_rtsp_creds()
|
|
ip = ipcam_config.get_camera_ip(cam)
|
|
path = ipcam_config.get_camera_type(cam).get_channel_url(channel)
|
|
ext = ipcam_config.get_camera_container(cam)
|
|
ffmpeg_command = ['ffmpeg', *debug_args,
|
|
'-rtsp_transport', protocol,
|
|
'-i', f'rtsp://{user}:{pw}@{ip}:554{path}',
|
|
'-c', 'copy',]
|
|
|
|
if worker_type == CaptureType.HLS:
|
|
ffmpeg_command.extend(['-bufsize', '1835k',
|
|
'-pix_fmt', 'yuv420p',
|
|
'-flags', '-global_header',
|
|
'-hls_time', '2',
|
|
'-hls_list_size', '3',
|
|
'-hls_flags', 'delete_segments',
|
|
os.path.join(get_hls_directory(cam, channel), 'live.m3u8')])
|
|
|
|
elif worker_type == CaptureType.RECORD:
|
|
ffmpeg_command.extend(['-f', 'segment',
|
|
'-strftime', '1',
|
|
'-segment_time', '00:10:00',
|
|
'-segment_atclocktime', '1',
|
|
os.path.join(get_recordings_path(cam), f'record_%Y-%m-%d-%H.%M.%S.{ext.value}')])
|
|
|
|
else:
|
|
raise ValueError(f'invalid worker type: {worker_type}')
|
|
|
|
while True:
|
|
try:
|
|
process = await asyncio.create_subprocess_exec(
|
|
*ffmpeg_command,
|
|
stdout=subprocess.PIPE,
|
|
stderr=subprocess.PIPE
|
|
)
|
|
|
|
stdout_task = asyncio.create_task(read_output(process.stdout, prefix, sys.stdout))
|
|
stderr_task = asyncio.create_task(read_output(process.stderr, prefix, sys.stderr))
|
|
|
|
await asyncio.gather(stdout_task, stderr_task)
|
|
|
|
# check the return code of the process
|
|
if process.returncode != 0:
|
|
raise subprocess.CalledProcessError(process.returncode, ffmpeg_command)
|
|
|
|
except (FileNotFoundError, PermissionError, subprocess.CalledProcessError) as e:
|
|
# an error occurred, print the error message
|
|
error_message = f"Error occurred in {prefix}: {e}"
|
|
print(error_message, file=sys.stderr)
|
|
|
|
# sleep for 5 seconds before restarting the process
|
|
await asyncio.sleep(restart_delay)
|
|
|
|
|
|
async def run():
|
|
kwargs = {}
|
|
if worker_type == CaptureType.RECORD:
|
|
kwargs['filter_by_server'] = gethostname()
|
|
for cam in ipcam_config.get_all_cam_names(**kwargs):
|
|
for channel in channels:
|
|
task = asyncio.create_task(run_ffmpeg(cam, channel))
|
|
tasks.append(task)
|
|
|
|
try:
|
|
await asyncio.gather(*tasks)
|
|
except KeyboardInterrupt:
|
|
print('KeyboardInterrupt: stopping processes...', file=sys.stderr)
|
|
for task in tasks:
|
|
task.cancel()
|
|
|
|
# wait for subprocesses to terminate
|
|
await asyncio.gather(*tasks, return_exceptions=True)
|
|
|
|
# send termination signal to all subprocesses
|
|
for task in tasks:
|
|
process = task.get_stack()
|
|
if process:
|
|
process.send_signal(signal.SIGTERM)
|
|
|
|
|
|
if __name__ == '__main__':
|
|
capture_types = [t.value for t in CaptureType]
|
|
parser = ArgumentParser()
|
|
parser.add_argument('type', type=str, metavar='CAPTURE_TYPE', choices=tuple(capture_types),
|
|
help='capture type (variants: '+', '.join(capture_types)+')')
|
|
|
|
arg = homekit_config.load_app(no_config=True, parser=parser)
|
|
worker_type = CaptureType(arg['type'])
|
|
|
|
asyncio.run(run())
|