ipcam_ntp_util (wip: only supports hikvision cams for now)

This commit is contained in:
Evgeny Zinoviev 2024-02-17 03:51:08 +03:00
parent 0ce2e41a2b
commit c5e69cf2c9
6 changed files with 109 additions and 37 deletions

View File

@ -4,12 +4,22 @@ import requests
import hashlib
import xml.etree.ElementTree as ET
from enum import Enum, auto
from time import time
from typing import Optional
from argparse import ArgumentParser, ArgumentError
from homekit.util import validate_ipv4, validate_ipv4_or_hostname
from homekit.camera import IpcamConfig
ipcam_config = IpcamConfig()
class Action(Enum):
GET_NTP = auto()
SET_NTP = auto()
def xml_to_dict(xml_data: str) -> dict:
# Parse the XML data
root = ET.fromstring(xml_data)
@ -131,11 +141,14 @@ class HikvisionISAPIClient:
data += '<Time><timeMode>NTP</timeMode><timeZone>CST-3:00:00</timeZone></Time>'
r = requests.put(self.isapi_uri('System/time'), cookies=self.cookies, data=data, headers={
'Content-Type': 'application/x-www-form-urlencoded; charset=UTF-8'
'Content-Type': 'application/x-www-form-urlencoded; charset=UTF-8',
'X-Requested-With': 'XMLHttpRequest',
})
self.isapi_check_put_response(r)
def set_ntp_server(self, ntp_host: str, ntp_port: int = 123):
def set_ntp_server(self,
ntp_host: str,
ntp_port: int = 123):
format = 'ipaddress' if validate_ipv4(ntp_host) else 'hostname'
data = '<?xml version="1.0" encoding="UTF-8"?>'
@ -145,7 +158,8 @@ class HikvisionISAPIClient:
data=data,
cookies=self.cookies,
headers={
'Content-Type': 'application/x-www-form-urlencoded; charset=UTF-8'
'Content-Type': 'application/x-www-form-urlencoded; charset=UTF-8',
'X-Requested-With': 'XMLHttpRequest',
})
self.isapi_check_put_response(r)
@ -153,7 +167,12 @@ class HikvisionISAPIClient:
return f'http://{self.host}/ISAPI/{path}'
def isapi_check_put_response(self, r):
r.raise_for_status()
try:
r.raise_for_status()
except requests.exceptions.HTTPError as e:
# print(r.text)
raise e
resp = xml_to_dict(r.text)['ResponseStatus']
status_code = int(resp['statusCode'][0])
@ -163,36 +182,60 @@ class HikvisionISAPIClient:
raise ResponseError('response status looks bad')
def process_hikvision_camera(host: str,
action: Action,
login: str,
password: str,
ntp_server: Optional[str] = None):
client = HikvisionISAPIClient(host)
try:
client.auth(login, password)
if action == Action.GET_NTP:
print(f'[{host}] {client.get_ntp_server()}')
return
client.set_ntp_server(ntp_server)
except AuthError as e:
print(f'[{host}] ({str(e)})')
except ResponseError as e:
print(f'[{host}] ({str(e)})')
def main():
parser = ArgumentParser()
parser.add_argument('--host', type=str, required=True)
parser.add_argument('--host', type=str)
parser.add_argument('--all', action='store_true')
parser.add_argument('--get-ntp-server', action='store_true')
parser.add_argument('--set-ntp-server', type=str)
parser.add_argument('--username', type=str)
parser.add_argument('--password', type=str)
args = parser.parse_args()
if not args.host and not args.all:
raise ArgumentError(None, 'either --all or --host <IP> is required')
if not args.get_ntp_server and not args.set_ntp_server:
raise ArgumentError(None, 'either --get-ntp-server or --set-ntp-server is required')
ipcam_config = IpcamConfig()
action = Action.GET_NTP if args.get_ntp_server else Action.SET_NTP
login = args.username if args.username else ipcam_config['web_creds']['login']
password = args.password if args.password else ipcam_config['web_creds']['password']
client = HikvisionISAPIClient(args.host)
client.auth(args.username, args.password)
if action == Action.SET_NTP:
if not args.set_ntp_server:
raise ArgumentError(None, '--set-ntp-server is required')
if not validate_ipv4_or_hostname(args.set_ntp_server):
raise ArgumentError(None, 'input ntp server is neither ip address nor a valid hostname')
if args.get_ntp_server:
print(client.get_ntp_server())
return
if not args.set_ntp_server:
raise ArgumentError(None, '--set-ntp-server is required')
if not validate_ipv4_or_hostname(args.set_ntp_server):
raise ArgumentError(None, 'input ntp server is neither ip address nor a valid hostname')
client.set_ntp_server(args.set_ntp_server)
kwargs = {}
if args.set_ntp_server:
kwargs['ntp_server'] = args.set_ntp_server
if not args.all:
process_hikvision_camera(args.host, action, login, password, **kwargs)
else:
for cam in ipcam_config.get_all_cam_names():
if not ipcam_config.get_camera_type(cam).is_hikvision():
continue
process_hikvision_camera(ipcam_config.get_camera_ip(cam), action, login, password, **kwargs)
if __name__ == '__main__':

View File

@ -30,6 +30,7 @@ class IpcamConfig(ConfigUnit):
'type': 'dict',
'schema': {
'type': {'type': 'string', 'allowed': [t.value for t in CameraType], 'required': True},
'enabled': {'type': 'boolean'},
'motion': {
'type': 'dict',
'schema': {
@ -87,13 +88,16 @@ class IpcamConfig(ConfigUnit):
@staticmethod
def custom_validator(data):
for n, cam in data['cams'].items():
linux_box = _lbc[cam['server']]
if 'ext_hdd' not in linux_box:
raise ValueError(f'cam-{n}: linux box {cam["server"]} must have ext_hdd defined')
disk = cam['disk']-1
if disk < 0 or disk >= len(linux_box['ext_hdd']):
raise ValueError(f'cam-{n}: invalid disk index for linux box {cam["server"]}')
pass
# FIXME rewrite or delete, looks kinda obsolete
# for n, cam in data['cameras'].items():
# linux_box = _lbc[cam['server']]
# if 'ext_hdd' not in linux_box:
# raise ValueError(f'cam-{n}: linux box {cam["server"]} must have ext_hdd defined')
# disk = cam['disk']-1
# if disk < 0 or disk >= len(linux_box['ext_hdd']):
# raise ValueError(f'cam-{n}: invalid disk index for linux box {cam["server"]}')
@classmethod
def _url_templates_schema(cls) -> dict:
@ -114,7 +118,7 @@ class IpcamConfig(ConfigUnit):
cams = []
if filter_by_server is not None and filter_by_server not in _lbc:
raise ValueError(f'invalid filter_by_server: {filter_by_server} not found in {_lbc.__class__.__name__}')
for cam, params in self['cams'].items():
for cam, params in self['cameras'].items():
if filter_by_server is None or params['server'] == filter_by_server:
if filter_by_disk is None or params['disk'] == filter_by_disk:
cams.append(int(cam))
@ -126,13 +130,13 @@ class IpcamConfig(ConfigUnit):
# filter_by_disk=filter_by_disk)
# def get_cam_server_and_disk(self, cam: int) -> tuple[str, int]:
# return self['cams'][cam]['server'], self['cams'][cam]['disk']
# return self['cameras'][cam]['server'], self['cameras'][cam]['disk']
def get_camera_container(self, camera: int) -> VideoContainerType:
return self.get_camera_type(camera).get_container()
def get_camera_type(self, camera: int) -> CameraType:
return CameraType(self['cams'][camera]['type'])
return CameraType(self['cameras'][camera]['type'])
def get_rtsp_creds(self) -> tuple[str, str]:
return self['rtsp_creds']['login'], self['rtsp_creds']['password']

View File

@ -23,7 +23,7 @@ class CameraType(Enum):
if channel == 1:
return ''
elif channel == 2:
if self.value in (CameraType.HIKVISION_264, CameraType.HIKVISION_265):
if self.is_hikvision():
return '/Streaming/Channels/2'
elif self.value == CameraType.ALIEXPRESS_NONAME:
return '/?stream=1.sdp'
@ -41,6 +41,9 @@ class CameraType(Enum):
def get_container(self) -> VideoContainerType:
return VideoContainerType.MP4 if self.get_codec(1) == VideoCodecType.H264 else VideoContainerType.MOV
def is_hikvision(self) -> bool:
return self in (CameraType.HIKVISION_264.value, CameraType.HIKVISION_265)
class TimeFilterType(Enum):
FIX = 'fix'

View File

@ -24,17 +24,18 @@ class LinuxBoardsConfig(ConfigUnit):
return {
'type': 'dict',
'schema': {
'mdns': {'type': 'string', 'required': True},
# 'mdns': {'type': 'string', 'required': True},
'board': {'type': 'string', 'required': True},
'location': {'type': 'string', 'required': True},
'mac': cls._addr_schema(mac=True, required=False), # FIXME mac should be required field
'network': {
'type': 'list',
'required': True,
'empty': False,
'allowed': ['wifi', 'ethernet']
},
'ram': {'type': 'integer', 'required': True},
'online': {'type': 'boolean', 'required': True},
'ram': {'type': 'integer', 'required': False}, # FIXME same as below
'online': {'type': 'boolean', 'required': False}, # FIXME made required=False temporarily, should be always required I guess
# optional
'services': {
@ -52,6 +53,12 @@ class LinuxBoardsConfig(ConfigUnit):
}
},
},
'misc': {
'type': 'dict',
'schema': {
'case': {'type': 'string', 'allowed': ['metal', 'plastic']}
}
},
}
}

View File

@ -3,6 +3,7 @@ import logging
import os
import cerberus
import cerberus.errors
import re
from abc import ABC
from typing import Optional, Any, MutableMapping, Union
@ -135,11 +136,25 @@ class ConfigUnit(BaseConfigUnit):
return None
@classmethod
def _addr_schema(cls, required=False, only_ip=False, **kwargs):
def _addr_schema(cls, required=False, mac=False, only_ip=False, **kwargs):
def validate_mac_address(field, value, error):
if not re.match("[0-9a-fA-F]{2}([-:])[0-9a-fA-F]{2}(\\1[0-9a-fA-F]{2}){4}$", value):
error(field, "Invalid MAC address format")
if mac:
l_kwargs = {
'type': 'string',
'check_with': validate_mac_address
}
else:
l_kwargs = {
'type': 'addr',
'coerce': Addr.fromstring if not only_ip else Addr.fromipstring,
}
return {
'type': 'addr',
'coerce': Addr.fromstring if not only_ip else Addr.fromipstring,
'required': required,
**l_kwargs,
**kwargs
}

View File

@ -41,7 +41,7 @@ def validate_ipv4_or_hostname(address: str, raise_exception: bool = False) -> bo
def validate_ipv4(address: str) -> bool:
try:
ipaddress.IPv6Address(address)
ipaddress.IPv4Address(address)
return True
except ipaddress.AddressValueError:
return False