ipcam_ntp_util: support chinese noname cameras

This commit is contained in:
Evgeny Zinoviev 2024-02-17 23:20:49 +03:00
parent 77b80dd9b3
commit 70b4a4f044
12 changed files with 637 additions and 206 deletions

View File

@ -1,4 +1,4 @@
Copyright 2022, Evgeny Zinoviev
Copyright 2022-2024, Evgeny Zinoviev
All rights reserved.
Redistribution and use in source and binary forms, with or without modification,

View File

@ -1,16 +1,13 @@
#!/usr/bin/env python3
import __py_include
import requests
import hashlib
import xml.etree.ElementTree as ET
import homekit.camera.hikvision as hikvision
import homekit.camera.alinoname as alinoname
from enum import Enum, auto
from time import time
from typing import Optional
from argparse import ArgumentParser, ArgumentError
from homekit.util import validate_ipv4, validate_ipv4_or_hostname
from homekit.camera import IpcamConfig
from homekit.util import validate_ipv4_or_hostname
from homekit.camera import IpcamConfig, CameraType
ipcam_config = IpcamConfig()
@ -20,216 +17,63 @@ class Action(Enum):
SET_NTP = auto()
def xml_to_dict(xml_data: str) -> dict:
# Parse the XML data
root = ET.fromstring(xml_data)
# Function to remove namespace from the tag name
def remove_namespace(tag):
return tag.split('}')[-1] # Splits on '}' and returns the last part, the actual tag name without namespace
# Function to recursively convert XML elements to a dictionary
def elem_to_dict(elem):
tag = remove_namespace(elem.tag)
elem_dict = {tag: {}}
# If the element has attributes, add them to the dictionary
elem_dict[tag].update({'@' + remove_namespace(k): v for k, v in elem.attrib.items()})
# Handle the element's text content, if present and not just whitespace
text = elem.text.strip() if elem.text and elem.text.strip() else None
if text:
elem_dict[tag]['#text'] = text
# Process child elements
for child in elem:
child_dict = elem_to_dict(child)
child_tag = remove_namespace(child.tag)
if child_tag not in elem_dict[tag]:
elem_dict[tag][child_tag] = []
elem_dict[tag][child_tag].append(child_dict[child_tag])
# Simplify structure if there's only text or no children and no attributes
if len(elem_dict[tag]) == 1 and '#text' in elem_dict[tag]:
return {tag: elem_dict[tag]['#text']}
elif not elem_dict[tag]:
return {tag: ''}
return elem_dict
# Convert the root element to dictionary
return elem_to_dict(root)
def sha256_hex(input_string: str) -> str:
return hashlib.sha256(input_string.encode()).hexdigest()
class ResponseError(RuntimeError):
pass
class AuthError(ResponseError):
pass
class HikvisionISAPIClient:
def __init__(self, host):
self.host = host
self.cookies = {}
def auth(self, username: str, password: str):
r = requests.get(self.isapi_uri('Security/sessionLogin/capabilities'),
{'username': username},
headers={
'X-Requested-With': 'XMLHttpRequest',
})
r.raise_for_status()
caps = xml_to_dict(r.text)['SessionLoginCap']
is_irreversible = caps['isIrreversible'][0].lower() == 'true'
# https://github.com/JakeVincet/nvt/blob/master/2018/hikvision/gb_hikvision_ip_camera_default_credentials.nasl
# also look into webAuth.js and utils.js
if 'salt' in caps and is_irreversible:
p = sha256_hex(username + caps['salt'][0] + password)
p = sha256_hex(p + caps['challenge'][0])
for i in range(int(caps['iterations'][0])-2):
p = sha256_hex(p)
else:
p = sha256_hex(password) + caps['challenge'][0]
for i in range(int(caps['iterations'][0])-1):
p = sha256_hex(p)
data = '<SessionLogin>'
data += f'<userName>{username}</userName>'
data += f'<password>{p}</password>'
data += f'<sessionID>{caps["sessionID"][0]}</sessionID>'
data += '<isSessionIDValidLongTerm>false</isSessionIDValidLongTerm>'
data += f'<sessionIDVersion>{caps["sessionIDVersion"][0]}</sessionIDVersion>'
data += '</SessionLogin>'
r = requests.post(self.isapi_uri(f'Security/sessionLogin?timeStamp={int(time())}'), data=data, headers={
'Accept-Encoding': 'gzip, deflate',
'If-Modified-Since': '0',
'X-Requested-With': 'XMLHttpRequest',
'Content-Type': 'application/x-www-form-urlencoded; charset=UTF-8',
})
r.raise_for_status()
resp = xml_to_dict(r.text)['SessionLogin']
status_value = int(resp['statusValue'][0])
status_string = resp['statusString'][0]
if status_value != 200:
raise AuthError(f'{status_value}: {status_string}')
self.cookies = r.cookies.get_dict()
def get_ntp_server(self) -> str:
r = requests.get(self.isapi_uri('System/time/ntpServers/capabilities'), cookies=self.cookies)
r.raise_for_status()
ntp_server = xml_to_dict(r.text)['NTPServerList']['NTPServer'][0]
if ntp_server['addressingFormatType'][0]['#text'] == 'hostname':
ntp_host = ntp_server['hostName'][0]
else:
ntp_host = ntp_server['ipAddress'][0]
return ntp_host
def set_timezone(self):
data = '<?xml version="1.0" encoding="UTF-8"?>'
data += '<Time><timeMode>NTP</timeMode><timeZone>CST-3:00:00</timeZone></Time>'
r = requests.put(self.isapi_uri('System/time'), cookies=self.cookies, data=data, headers={
'Content-Type': 'application/x-www-form-urlencoded; charset=UTF-8',
'X-Requested-With': 'XMLHttpRequest',
})
self.isapi_check_put_response(r)
def set_ntp_server(self,
ntp_host: str,
ntp_port: int = 123):
format = 'ipaddress' if validate_ipv4(ntp_host) else 'hostname'
# test ntp server first
data = f'<?xml version="1.0" encoding="UTF-8"?><NTPTestDescription><addressingFormatType>{format}</addressingFormatType><ipAddress>{ntp_host}</ipAddress><portNo>{ntp_port}</portNo></NTPTestDescription>'
r = requests.post(self.isapi_uri('System/time/ntpServers/test'), data=data, headers={
'Content-Type': 'application/x-www-form-urlencoded; charset=UTF-8',
'X-Requested-With': 'XMLHttpRequest',
}, cookies=self.cookies)
r.raise_for_status()
resp = xml_to_dict(r.text)['NTPTestResult']
error_code = int(resp['errorCode'][0])
error_description = resp['errorDescription'][0]
if error_code != 0 or error_description.lower() != 'ok':
raise ResponseError('response status looks bad')
# then set it
data = '<?xml version="1.0" encoding="UTF-8"?>'
data += f'<NTPServer><id>1</id><addressingFormatType>{format}</addressingFormatType><ipAddress>{ntp_host}</ipAddress><portNo>{ntp_port}</portNo><synchronizeInterval>1440</synchronizeInterval></NTPServer>'
r = requests.put(self.isapi_uri('System/time/ntpServers/1'),
data=data,
cookies=self.cookies,
headers={
'Content-Type': 'application/x-www-form-urlencoded; charset=UTF-8',
'X-Requested-With': 'XMLHttpRequest',
})
self.isapi_check_put_response(r)
def isapi_uri(self, path: str) -> str:
return f'http://{self.host}/ISAPI/{path}'
def isapi_check_put_response(self, r):
def process_camera(host: str,
action: Action,
login: str,
password: str,
camera_type: CameraType,
ntp_server: Optional[str] = None):
if camera_type.is_hikvision():
client = hikvision.ISAPIClient(host)
try:
r.raise_for_status()
except requests.exceptions.HTTPError as e:
# print(r.text)
raise e
client.auth(login, password)
if action == Action.GET_NTP:
print(f'[{host}] {client.get_ntp_server()}')
return
resp = xml_to_dict(r.text)['ResponseStatus']
client.set_ntp_server(ntp_server)
print(f'[{host}] done')
status_code = int(resp['statusCode'][0])
status_string = resp['statusString'][0]
except hikvision.AuthError as e:
print(f'[{host}] ({str(e)})')
if status_code != 1 or status_string.lower() != 'ok':
raise ResponseError('response status looks bad')
except hikvision.ResponseError as e:
print(f'[{host}] ({str(e)})')
elif camera_type.is_ali():
try:
client = alinoname.XMEyeCamera(hostname=host, username=login, password=password)
client.login()
def process_hikvision_camera(host: str,
action: Action,
login: str,
password: str,
ntp_server: Optional[str] = None):
client = HikvisionISAPIClient(host)
try:
client.auth(login, password)
if action == Action.GET_NTP:
print(f'[{host}] {client.get_ntp_server()}')
return
client.set_ntp_server(ntp_server)
print(f'[{host}] done')
except AuthError as e:
print(f'[{host}] ({str(e)})')
except ResponseError as e:
print(f'[{host}] ({str(e)})')
if action == Action.GET_NTP:
print(f'[{host}] {client.get_ntp_server()}')
return
client.set_ntp_server(ntp_server)
print(f'[{host}] done')
except OSError as e:
print(f'[{host}] ({str(e)})')
def main():
camera_types = ['hikvision', 'ali']
parser = ArgumentParser()
parser.add_argument('--host', type=str)
parser.add_argument('--camera', type=str)
parser.add_argument('--camera-type', type=str, choices=camera_types)
parser.add_argument('--all', action='store_true')
parser.add_argument('--all-of-type', type=str, choices=camera_types)
parser.add_argument('--get-ntp-server', action='store_true')
parser.add_argument('--set-ntp-server', type=str)
parser.add_argument('--username', type=str)
parser.add_argument('--password', type=str)
args = parser.parse_args()
if not args.host and not args.all:
raise ArgumentError(None, 'either --all or --host <IP> is required')
if args.all and args.all_of_type:
raise ArgumentError(None, 'you can\'t pass both --all and --all-of-type')
if not args.camera and not args.all and not args.all_of_type:
raise ArgumentError(None, 'either --all, --all-of-type <TYPE> or --camera <NUM> is required')
if not args.get_ntp_server and not args.set_ntp_server:
raise ArgumentError(None, 'either --get-ntp-server or --set-ntp-server is required')
@ -247,13 +91,32 @@ def main():
kwargs = {}
if args.set_ntp_server:
kwargs['ntp_server'] = args.set_ntp_server
if not args.all:
process_hikvision_camera(args.host, action, login, password, **kwargs)
if not args.all and not args.all_of_type:
if not args.camera_type:
raise ArgumentError(None, '--camera-type is required')
if not ipcam_config.has_camera(int(args.camera)):
raise ArgumentError(None, f'invalid camera {args.camera}')
camera_host = ipcam_config.get_camera_ip(args.camera)
if args.camera_type == 'hikvision':
camera_type = CameraType.HIKVISION_264
elif args.camera_type == 'ali':
camera_type = CameraType.ALIEXPRESS_NONAME
else:
raise ValueError('invalid --camera-type')
process_camera(camera_host, action, login, password, camera_type, **kwargs)
else:
for cam in ipcam_config.get_all_cam_names():
if not ipcam_config.get_camera_type(cam).is_hikvision():
if not ipcam_config.is_camera_enabled(cam):
continue
process_hikvision_camera(ipcam_config.get_camera_ip(cam), action, login, password, **kwargs)
cam_type = ipcam_config.get_camera_type(cam)
if args.all_of_type == 'hikvision' and not cam_type.is_hikvision():
continue
if args.all_of_type == 'ali' and not ipcam_config.get_camera_type(cam).is_ali():
continue
process_camera(ipcam_config.get_camera_ip(cam), action, login, password, cam_type, **kwargs)
if __name__ == '__main__':

View File

@ -1,2 +1,2 @@
from .types import CameraType, VideoContainerType, VideoCodecType, CaptureType
from .config import IpcamConfig
from .config import IpcamConfig

View File

@ -0,0 +1,42 @@
# numenworld-ipcam - Reverse engineering of the NuMenWorld NCV-I536A IP camera
# Copyright (C) 2019-2019 Johannes Bauer
#
# This file is part of numenworld-ipcam (https://github.com/johndoe31415/numenworld-ipcam).
#
# numenworld-ipcam is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; this program is ONLY licensed under
# version 3 of the License, later versions are explicitly excluded.
#
# numenworld-ipcam is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with numenworld-ipcam; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
#
# Johannes Bauer <JohannesBauer@gmx.de>
import hashlib
class HorrificallyBrokenPasswordFunction():
@classmethod
def derive(self, passphrase):
alphabet = "0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz"
assert (len(alphabet) == 62)
passphrase = passphrase.encode("utf-8")
hashval = hashlib.md5(passphrase).digest()
encoded = ""
for i in range(0, 16, 2):
index = (hashval[i] + hashval[i + 1]) % len(alphabet)
char = alphabet[index]
encoded += char
return encoded
if __name__ == "__main__":
assert (HorrificallyBrokenPasswordFunction.derive("") == "tlJwpbo6")
assert (HorrificallyBrokenPasswordFunction.derive("abc") == "LkM7s2Ht")

View File

@ -0,0 +1 @@
from .nwipcam import XMEyeCamera

View File

@ -0,0 +1,326 @@
#!/usr/bin/python3
# numenworld-ipcam - Reverse engineering of the NuMenWorld NCV-I536A IP camera
# Copyright (C) 2019-2019 Johannes Bauer
#
# Changes and improvements:
# Copyright (C) 2024 Evgeny Zinoviev
#
# This file is part of numenworld-ipcam (https://github.com/johndoe31415/numenworld-ipcam).
#
# numenworld-ipcam is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; this program is ONLY licensed under
# version 3 of the License, later versions are explicitly excluded.
#
# numenworld-ipcam is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with numenworld-ipcam; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
#
# Johannes Bauer <JohannesBauer@gmx.de>
import collections
import struct
import socket
import enum
import json
import subprocess
from .HorrificallyBrokenPasswordFunction import HorrificallyBrokenPasswordFunction
class XMEyeMsgCode(enum.IntEnum):
LoginCmd = 1000
LoginReply = LoginCmd + 1
KeepAliveCmd = 1006
KeepAliveReply = KeepAliveCmd + 1
SetConfigCmd = 1040
SetConfigReply = SetConfigCmd + 1
GetConfigCmd = 1042
GetConfidReply = GetConfigCmd + 1
GetSystemInfoCmd = 1020
GetSystemInfoReply = GetSystemInfoCmd + 1
ChannelTitleCmd = 1048
ChannelTitleReply = ChannelTitleCmd + 1
SystemFunctionCmd = 1360
SystemFunctionReply = SystemFunctionCmd + 1
OPMonitorStartStopCmd = 1410
OPMonitorStartStopReply = OPMonitorStartStopCmd + 1
OPMonitorClaimCmd = 1413
OPMonitorClaimReply = OPMonitorClaimCmd + 1
OPTimeQueryCmd = 1452
OPTimeQueryReply = OPTimeQueryCmd + 1
VideoStreamData = 1412
class AudioVideoDataType(enum.IntEnum):
VideoIncomplete = 0xfc
VideoComplete = 0xfd
AudioComplete = 0xfa
class AudioVideoPayload():
_HeaderFields = collections.namedtuple("HeaderFields", ["unknown1", "channel", "datatype", "unknown2", "length"])
_HeaderStruct = struct.Struct("< H B B H H")
def __init__(self, payload, hint=""):
self._header = self._HeaderFields(*self._HeaderStruct.unpack(payload[:self._HeaderStruct.size]))
print(
"%20s [%5d]: %s %s" % (hint, len(payload), " ".join("%02x" % (c) for c in payload[:8]), str(self._header)))
# if len(payload) != (self._HeaderStruct.size + self._header.length):
# raise Exception("Unexpected AV payload, expected %d bytes but got %d." % (self._HeaderStruct.size + self._header.length, len(payload)))
# print(self._header)
self._data = payload[self._HeaderStruct.size:]
@property
def data(self):
return self._data
class XMEyeMessage():
_HeaderFields = collections.namedtuple("HeaderFields", ["station", "session", "unknown", "msgcode", "length"])
_HeaderStruct = struct.Struct("< L L 6s H L")
def __init__(self, station, session, msgcode, message):
if isinstance(message, (bytes, bytearray)):
self._message = bytes(message)
else:
self._message = json.dumps(message).encode("ascii")
self._header = self._HeaderFields(station=station,
session=session,
unknown=bytes(6),
msgcode=msgcode,
length=len(self._message))
@property
def header(self):
return self._header
@property
def message(self):
return self._message
@property
def payload(self):
msg = self.message.rstrip(bytes(1))
try:
data = json.loads(msg)
except (json.JSONDecodeError, UnicodeError):
return self.message
return data
@property
def length(self):
return len(self.message)
def __bytes__(self):
header = self._HeaderStruct.pack(*self._header)
return header + self._message
@classmethod
def deserialize(cls, msg):
header_data = msg[:20]
header = cls._HeaderFields(*cls._HeaderStruct.unpack(header_data))
payload = msg[20: 20 + header.length]
if len(payload) < header.length:
payload += bytes(header.length - len(payload))
try:
msgcode = XMEyeMsgCode(header.msgcode)
except ValueError:
msgcode = header.msgcode
return cls(station=header.station, session=header.session, msgcode=msgcode, message=payload)
@classmethod
def deserialize_all(cls, msg):
msg = bytearray(msg)
while len(msg) >= 20:
next_msg = cls.deserialize(msg)
yield next_msg
msg = msg[20 + next_msg.length:]
def dump(self):
print("%s (%d bytes payload):" % (self._header.msgcode, self.length))
if isinstance(self.payload, bytes):
print(self.payload)
else:
print(json.dumps(self.payload, indent=4, sort_keys=True))
print()
def __repr__(self):
return "Msg(%s): %s" % (self.header, str(self.payload))
class XMEyeCamera():
def __init__(self, hostname, username="admin", password="", port=34567):
self._hostname = hostname
self._conn = socket.create_connection((hostname, port))
self._session = None
self._username = username
self._password = password
@property
def derived_password(self):
return HorrificallyBrokenPasswordFunction.derive(self._password)
@property
def rtsp_uri(self):
rtsp_port = 554
return "rtsp://%s:%d/user=%s&password=%s&channel=" % (
self._hostname, rtsp_port, self._username, self.derived_password)
def _rx_bytes(self, length):
result = bytearray()
while len(result) < length:
remaining_bytes = length - len(result)
rx_data = self._conn.recv(remaining_bytes)
result += rx_data
return result
def _rx(self):
response_header = self._rx_bytes(20)
header = XMEyeMessage.deserialize(response_header)
payload_data = self._rx_bytes(header.length)
response_data = response_header + payload_data
response = XMEyeMessage.deserialize(response_data)
# print("<-", response)
return response
def _tx(self, command):
# print("->", command)
data = bytes(command)
self._conn.send(data)
def _tx_rx(self, command):
self._tx(command)
return self._rx()
def login(self):
data = {
"EncryptType": "MD5",
"LoginType": "DVRIP-Web",
"UserName": self._username,
"PassWord": self.derived_password,
}
command = XMEyeMessage(station=0xff, session=0, msgcode=XMEyeMsgCode.LoginCmd, message=data)
response = self._tx_rx(command)
if int(response.payload["Ret"]) == 100:
# Login successful
self._session = int(response.payload["SessionID"], 16)
else:
raise Exception("Login failed:", response)
def _generic_cmd(self, name, msgcode):
data = {
"Name": name,
"SessionID": "0x%x" % (self._session,),
}
command = XMEyeMessage(station=0xff, session=self._session, msgcode=msgcode, message=data)
return self._tx_rx(command)
def get_systeminfo(self):
return self._generic_cmd("SystemInfo", XMEyeMsgCode.GetSystemInfoCmd)
def get_channel_title(self):
return self._generic_cmd("ChannelTitle", XMEyeMsgCode.ChannelTitleCmd)
def get_system_function(self):
return self._generic_cmd("SystemFunction", XMEyeMsgCode.SystemFunctionCmd)
def get_talk_audio_format(self):
return self._generic_cmd("TalkAudioFormat", XMEyeMsgCode.SystemFunctionCmd)
def get_ntp_server(self):
ntp_config = self._generic_cmd("NetWork.NetNTP", XMEyeMsgCode.GetConfigCmd)
return ntp_config.payload['NetWork.NetNTP']['Server']['Name']
def set_ntp_server(self,
ntp_host: str,
ntp_port: int = 123) -> None:
data = {
'Name': 'NetWork.NetNTP',
'NetWork.NetNTP': {
'Enable': True,
'Server': {
'Address': '0x00000000',
'Anonymity': False,
'Name': ntp_host,
'Password': '',
'Port': ntp_port,
'UserName': ''
},
'TimeZone': 13, # Moscow time
'UpdatePeriod': 60
},
"SessionID": "0x%x" % (self._session,),
}
command = XMEyeMessage(station=0xff, session=self._session, msgcode=XMEyeMsgCode.SetConfigCmd, message=data)
self._tx_rx(command)
def _opmonitor_cmd(self, action, msgcode, rx_msg=True):
data = {
"Name": "OPMonitor",
"OPMonitor": {
"Action": action,
"Parameter": {
"Channel": 0,
"CombinMode": "CONNECT_ALL",
"StreamType": "Main",
"TransMode": "TCP",
},
},
"SessionID": "0x%x" % (self._session,),
}
command = XMEyeMessage(station=0xff, session=self._session, msgcode=msgcode, message=data)
if rx_msg:
return self._tx_rx(command)
else:
self._tx(command)
def get_stream(self, packet_callback):
self._opmonitor_cmd("Claim", XMEyeMsgCode.OPMonitorClaimCmd)
self._opmonitor_cmd("Start", XMEyeMsgCode.OPMonitorStartStopCmd, rx_msg=False)
while True:
rx_pkt = self._rx()
packet_callback(rx_pkt)
# def playback_stream(self):
# mplayer_process = subprocess.Popen(["mplayer", "-demuxer", "h264es", "-"], stdin=subprocess.PIPE)
# with open("audio.raw", "wb") as f, open("video.raw", "wb") as video_f:
# def pkt_callback(pkt):
# if (pkt.header.station == 511) and (pkt.header.msgcode == XMEyeMsgCode.VideoStreamData):
# avpayload = AudioVideoPayload(pkt.payload, hint="video")
# mplayer_process.stdin.raw.write(pkt.payload)
# video_f.write(pkt.payload)
# elif pkt.header.msgcode == XMEyeMsgCode.VideoStreamData:
# # Audio data?
# avpayload = AudioVideoPayload(pkt.payload, hint="audio")
# f.write(avpayload.data)
# elif pkt.header.msgcode != XMEyeMsgCode.VideoStreamData:
# print(pkt)
#
# self.get_stream(pkt_callback)
if __name__ == '__main__':
cam = XMEyeCamera("192.168.0.47", password="DerPr03ess")
cam.login()
print(cam.get_systeminfo())
print(cam.get_channel_title())
# print(cam.get_talk_audio_format())
# print(cam.get_system_function())

View File

@ -132,6 +132,9 @@ class IpcamConfig(ConfigUnit):
# def get_cam_server_and_disk(self, cam: int) -> tuple[str, int]:
# return self['cameras'][cam]['server'], self['cameras'][cam]['disk']
def has_camera(self, camera: int) -> bool:
return camera in tuple(self['cameras'].keys())
def get_camera_container(self, camera: int) -> VideoContainerType:
return self.get_camera_type(camera).get_container()
@ -143,3 +146,9 @@ class IpcamConfig(ConfigUnit):
def get_camera_ip(self, camera: int) -> str:
return self['camera_ip_template'] % (str(camera),)
def is_camera_enabled(self, camera: int) -> bool:
try:
return self['cameras'][camera]['enabled']
except KeyError:
return True

View File

@ -0,0 +1 @@
from .isapi import ISAPIClient, ResponseError, AuthError

View File

@ -0,0 +1,137 @@
import requests
from time import time
from .util import xml_to_dict, sha256_hex
from ...util import validate_ipv4
from ...http import HTTPMethod
from typing import Optional, Union
class ResponseError(RuntimeError):
pass
class AuthError(ResponseError):
pass
class ISAPIClient:
def __init__(self, host):
self.host = host
self.cookies = {}
def call(self,
path: str,
method: HTTPMethod = HTTPMethod.GET,
data: Optional[Union[dict, str, bytes]] = None,
raise_for_status=True,
check_put_response=False):
f = getattr(requests, method.value.lower())
headers = {
'X-Requested-With': 'XMLHttpRequest',
}
if method in (HTTPMethod.PUT, HTTPMethod.POST):
headers['Content-Type'] = 'application/x-www-form-urlencoded; charset=UTF-8'
kwargs = {}
if data:
kwargs['data' if method is not HTTPMethod.GET else 'params'] = data
if len(self.cookies) > 0:
kwargs['cookies'] = self.cookies
r = f(f'http://{self.host}/ISAPI/{path}', headers=headers, **kwargs)
if raise_for_status or check_put_response:
r.raise_for_status()
parsed_xml = None
if check_put_response:
parsed_xml = xml_to_dict(r.text)
resp = parsed_xml['ResponseStatus']
status_code = int(resp['statusCode'][0])
status_string = resp['statusString'][0]
if status_code != 1 or status_string.lower() != 'ok':
raise ResponseError('response status looks bad')
self.cookies.update(r.cookies.get_dict())
if parsed_xml is None:
parsed_xml = xml_to_dict(r.text)
return parsed_xml
def auth(self, username: str, password: str):
xml = self.call('Security/sessionLogin/capabilities', data={'username': username})
caps = xml['SessionLoginCap']
is_irreversible = caps['isIrreversible'][0].lower() == 'true'
# https://github.com/JakeVincet/nvt/blob/master/2018/hikvision/gb_hikvision_ip_camera_default_credentials.nasl
# also look into webAuth.js and utils.js
if 'salt' in caps and is_irreversible:
p = sha256_hex(username + caps['salt'][0] + password)
p = sha256_hex(p + caps['challenge'][0])
for i in range(int(caps['iterations'][0])-2):
p = sha256_hex(p)
else:
p = sha256_hex(password) + caps['challenge'][0]
for i in range(int(caps['iterations'][0])-1):
p = sha256_hex(p)
data = '<SessionLogin>'
data += f'<userName>{username}</userName>'
data += f'<password>{p}</password>'
data += f'<sessionID>{caps["sessionID"][0]}</sessionID>'
data += '<isSessionIDValidLongTerm>false</isSessionIDValidLongTerm>'
data += f'<sessionIDVersion>{caps["sessionIDVersion"][0]}</sessionIDVersion>'
data += '</SessionLogin>'
resp = self.call(f'Security/sessionLogin?timeStamp={int(time())}', HTTPMethod.POST, data=data)['SessionLogin']
status_value = int(resp['statusValue'][0])
status_string = resp['statusString'][0]
if status_value != 200:
raise AuthError(f'{status_value}: {status_string}')
def get_ntp_server(self) -> str:
try:
# works on newer 1080p cams
xml = self.call('System/time/ntpServers/capabilities')
ntp_server = xml['NTPServerList']['NTPServer'][0]
if ntp_server['addressingFormatType'][0]['#text'] == 'hostname':
ntp_host = ntp_server['hostName'][0]
else:
ntp_host = ntp_server['ipAddress'][0]
except requests.exceptions.HTTPError:
# works on older 720p cams
ntp_server = self.call('System/time/ntpServers/1')['NTPServer']
if ntp_server['addressingFormatType'][0] == 'hostname':
ntp_host = ntp_server['hostName'][0]
else:
ntp_host = ntp_server['ipAddress'][0]
return ntp_host
def set_timezone(self):
data = '<?xml version="1.0" encoding="UTF-8"?>'
data += '<Time><timeMode>NTP</timeMode><timeZone>CST-3:00:00</timeZone></Time>'
self.call('System/time', HTTPMethod.PUT, data=data, check_put_response=True)
def set_ntp_server(self,
ntp_host: str,
ntp_port: int = 123):
format = 'ipaddress' if validate_ipv4(ntp_host) else 'hostname'
# test ntp server first
data = f'<?xml version="1.0" encoding="UTF-8"?><NTPTestDescription><addressingFormatType>{format}</addressingFormatType><ipAddress>{ntp_host}</ipAddress><portNo>{ntp_port}</portNo></NTPTestDescription>'
resp = self.call('System/time/ntpServers/test', HTTPMethod.POST, data=data)['NTPTestResult']
error_code = int(resp['errorCode'][0])
error_description = resp['errorDescription'][0]
if error_code != 0 or error_description.lower() != 'ok':
raise ResponseError('response status looks bad')
# then set it
data = '<?xml version="1.0" encoding="UTF-8"?>'
data += f'<NTPServer><id>1</id><addressingFormatType>{format}</addressingFormatType><ipAddress>{ntp_host}</ipAddress><portNo>{ntp_port}</portNo><synchronizeInterval>1440</synchronizeInterval></NTPServer>'
self.call('System/time/ntpServers/1', HTTPMethod.PUT, data=data, check_put_response=True)

View File

@ -0,0 +1,48 @@
import requests
import hashlib
import xml.etree.ElementTree as ET
def xml_to_dict(xml_data: str) -> dict:
# Parse the XML data
root = ET.fromstring(xml_data)
# Function to remove namespace from the tag name
def remove_namespace(tag):
return tag.split('}')[-1] # Splits on '}' and returns the last part, the actual tag name without namespace
# Function to recursively convert XML elements to a dictionary
def elem_to_dict(elem):
tag = remove_namespace(elem.tag)
elem_dict = {tag: {}}
# If the element has attributes, add them to the dictionary
elem_dict[tag].update({'@' + remove_namespace(k): v for k, v in elem.attrib.items()})
# Handle the element's text content, if present and not just whitespace
text = elem.text.strip() if elem.text and elem.text.strip() else None
if text:
elem_dict[tag]['#text'] = text
# Process child elements
for child in elem:
child_dict = elem_to_dict(child)
child_tag = remove_namespace(child.tag)
if child_tag not in elem_dict[tag]:
elem_dict[tag][child_tag] = []
elem_dict[tag][child_tag].append(child_dict[child_tag])
# Simplify structure if there's only text or no children and no attributes
if len(elem_dict[tag]) == 1 and '#text' in elem_dict[tag]:
return {tag: elem_dict[tag]['#text']}
elif not elem_dict[tag]:
return {tag: ''}
return elem_dict
# Convert the root element to dictionary
return elem_to_dict(root)
def sha256_hex(input_string: str) -> str:
return hashlib.sha256(input_string.encode()).hexdigest()

View File

@ -44,6 +44,9 @@ class CameraType(Enum):
def is_hikvision(self) -> bool:
return self in (CameraType.HIKVISION_264, CameraType.HIKVISION_265)
def is_ali(self) -> bool:
return self == CameraType.ALIEXPRESS_NONAME
class TimeFilterType(Enum):
FIX = 'fix'

View File

@ -113,3 +113,4 @@ class HTTPServer:
class HTTPMethod(Enum):
GET = 'GET'
POST = 'POST'
PUT = 'PUT'