138 lines
5.4 KiB
Python
138 lines
5.4 KiB
Python
import requests
|
|
|
|
from time import time
|
|
from .util import xml_to_dict, sha256_hex
|
|
from homekit.util import validate_ipv4
|
|
from homekit.http import HTTPMethod
|
|
from typing import Optional, Union
|
|
|
|
|
|
class ResponseError(RuntimeError):
|
|
pass
|
|
|
|
|
|
class AuthError(ResponseError):
|
|
pass
|
|
|
|
|
|
class ISAPIClient:
|
|
def __init__(self, host):
|
|
self.host = host
|
|
self.cookies = {}
|
|
|
|
def call(self,
|
|
path: str,
|
|
method: HTTPMethod = HTTPMethod.GET,
|
|
data: Optional[Union[dict, str, bytes]] = None,
|
|
raise_for_status=True,
|
|
check_put_response=False):
|
|
f = getattr(requests, method.value.lower())
|
|
|
|
headers = {
|
|
'X-Requested-With': 'XMLHttpRequest',
|
|
}
|
|
if method in (HTTPMethod.PUT, HTTPMethod.POST):
|
|
headers['Content-Type'] = 'application/x-www-form-urlencoded; charset=UTF-8'
|
|
|
|
kwargs = {}
|
|
if data:
|
|
kwargs['data' if method is not HTTPMethod.GET else 'params'] = data
|
|
if len(self.cookies) > 0:
|
|
kwargs['cookies'] = self.cookies
|
|
|
|
r = f(f'http://{self.host}/ISAPI/{path}', headers=headers, **kwargs)
|
|
|
|
if raise_for_status or check_put_response:
|
|
r.raise_for_status()
|
|
|
|
parsed_xml = None
|
|
if check_put_response:
|
|
parsed_xml = xml_to_dict(r.text)
|
|
resp = parsed_xml['ResponseStatus']
|
|
status_code = int(resp['statusCode'][0])
|
|
status_string = resp['statusString'][0]
|
|
if status_code != 1 or status_string.lower() != 'ok':
|
|
raise ResponseError('response status looks bad')
|
|
|
|
self.cookies.update(r.cookies.get_dict())
|
|
|
|
if parsed_xml is None:
|
|
parsed_xml = xml_to_dict(r.text)
|
|
|
|
return parsed_xml
|
|
|
|
def auth(self, username: str, password: str):
|
|
xml = self.call('Security/sessionLogin/capabilities', data={'username': username})
|
|
caps = xml['SessionLoginCap']
|
|
is_irreversible = caps['isIrreversible'][0].lower() == 'true'
|
|
|
|
# https://github.com/JakeVincet/nvt/blob/master/2018/hikvision/gb_hikvision_ip_camera_default_credentials.nasl
|
|
# also look into webAuth.js and utils.js
|
|
|
|
if 'salt' in caps and is_irreversible:
|
|
p = sha256_hex(username + caps['salt'][0] + password)
|
|
p = sha256_hex(p + caps['challenge'][0])
|
|
for i in range(int(caps['iterations'][0])-2):
|
|
p = sha256_hex(p)
|
|
else:
|
|
p = sha256_hex(password) + caps['challenge'][0]
|
|
for i in range(int(caps['iterations'][0])-1):
|
|
p = sha256_hex(p)
|
|
|
|
data = '<SessionLogin>'
|
|
data += f'<userName>{username}</userName>'
|
|
data += f'<password>{p}</password>'
|
|
data += f'<sessionID>{caps["sessionID"][0]}</sessionID>'
|
|
data += '<isSessionIDValidLongTerm>false</isSessionIDValidLongTerm>'
|
|
data += f'<sessionIDVersion>{caps["sessionIDVersion"][0]}</sessionIDVersion>'
|
|
data += '</SessionLogin>'
|
|
|
|
resp = self.call(f'Security/sessionLogin?timeStamp={int(time())}', HTTPMethod.POST, data=data)['SessionLogin']
|
|
status_value = int(resp['statusValue'][0])
|
|
status_string = resp['statusString'][0]
|
|
if status_value != 200:
|
|
raise AuthError(f'{status_value}: {status_string}')
|
|
|
|
def get_ntp_server(self) -> str:
|
|
try:
|
|
# works on newer 1080p cams
|
|
xml = self.call('System/time/ntpServers/capabilities')
|
|
ntp_server = xml['NTPServerList']['NTPServer'][0]
|
|
if ntp_server['addressingFormatType'][0]['#text'] == 'hostname':
|
|
ntp_host = ntp_server['hostName'][0]
|
|
else:
|
|
ntp_host = ntp_server['ipAddress'][0]
|
|
|
|
except requests.exceptions.HTTPError:
|
|
# works on older 720p cams
|
|
ntp_server = self.call('System/time/ntpServers/1')['NTPServer']
|
|
if ntp_server['addressingFormatType'][0] == 'hostname':
|
|
ntp_host = ntp_server['hostName'][0]
|
|
else:
|
|
ntp_host = ntp_server['ipAddress'][0]
|
|
|
|
return ntp_host
|
|
|
|
def set_timezone(self):
|
|
data = '<?xml version="1.0" encoding="UTF-8"?>'
|
|
data += '<Time><timeMode>NTP</timeMode><timeZone>CST-3:00:00</timeZone></Time>'
|
|
self.call('System/time', HTTPMethod.PUT, data=data, check_put_response=True)
|
|
|
|
def set_ntp_server(self,
|
|
ntp_host: str,
|
|
ntp_port: int = 123):
|
|
format = 'ipaddress' if validate_ipv4(ntp_host) else 'hostname'
|
|
|
|
# test ntp server first
|
|
data = f'<?xml version="1.0" encoding="UTF-8"?><NTPTestDescription><addressingFormatType>{format}</addressingFormatType><ipAddress>{ntp_host}</ipAddress><portNo>{ntp_port}</portNo></NTPTestDescription>'
|
|
resp = self.call('System/time/ntpServers/test', HTTPMethod.POST, data=data)['NTPTestResult']
|
|
error_code = int(resp['errorCode'][0])
|
|
error_description = resp['errorDescription'][0]
|
|
if error_code != 0 or error_description.lower() != 'ok':
|
|
raise ResponseError('response status looks bad')
|
|
|
|
# then set it
|
|
data = '<?xml version="1.0" encoding="UTF-8"?>'
|
|
data += f'<NTPServer><id>1</id><addressingFormatType>{format}</addressingFormatType><ipAddress>{ntp_host}</ipAddress><portNo>{ntp_port}</portNo><synchronizeInterval>1440</synchronizeInterval></NTPServer>'
|
|
self.call('System/time/ntpServers/1', HTTPMethod.PUT, data=data, check_put_response=True)
|