save
This commit is contained in:
parent
d237e81873
commit
05c85757b8
199
bin/ipcam_ntp_util.py
Executable file
199
bin/ipcam_ntp_util.py
Executable file
@ -0,0 +1,199 @@
|
||||
#!/usr/bin/env python3
|
||||
import __py_include
|
||||
import requests
|
||||
import hashlib
|
||||
import xml.etree.ElementTree as ET
|
||||
|
||||
from time import time
|
||||
from argparse import ArgumentParser, ArgumentError
|
||||
from homekit.util import validate_ipv4, validate_ipv4_or_hostname
|
||||
from homekit.camera import IpcamConfig
|
||||
|
||||
|
||||
def xml_to_dict(xml_data: str) -> dict:
|
||||
# Parse the XML data
|
||||
root = ET.fromstring(xml_data)
|
||||
|
||||
# Function to remove namespace from the tag name
|
||||
def remove_namespace(tag):
|
||||
return tag.split('}')[-1] # Splits on '}' and returns the last part, the actual tag name without namespace
|
||||
|
||||
# Function to recursively convert XML elements to a dictionary
|
||||
def elem_to_dict(elem):
|
||||
tag = remove_namespace(elem.tag)
|
||||
elem_dict = {tag: {}}
|
||||
|
||||
# If the element has attributes, add them to the dictionary
|
||||
elem_dict[tag].update({'@' + remove_namespace(k): v for k, v in elem.attrib.items()})
|
||||
|
||||
# Handle the element's text content, if present and not just whitespace
|
||||
text = elem.text.strip() if elem.text and elem.text.strip() else None
|
||||
if text:
|
||||
elem_dict[tag]['#text'] = text
|
||||
|
||||
# Process child elements
|
||||
for child in elem:
|
||||
child_dict = elem_to_dict(child)
|
||||
child_tag = remove_namespace(child.tag)
|
||||
if child_tag not in elem_dict[tag]:
|
||||
elem_dict[tag][child_tag] = []
|
||||
elem_dict[tag][child_tag].append(child_dict[child_tag])
|
||||
|
||||
# Simplify structure if there's only text or no children and no attributes
|
||||
if len(elem_dict[tag]) == 1 and '#text' in elem_dict[tag]:
|
||||
return {tag: elem_dict[tag]['#text']}
|
||||
elif not elem_dict[tag]:
|
||||
return {tag: ''}
|
||||
|
||||
return elem_dict
|
||||
|
||||
# Convert the root element to dictionary
|
||||
return elem_to_dict(root)
|
||||
|
||||
|
||||
def sha256_hex(input_string: str) -> str:
|
||||
return hashlib.sha256(input_string.encode()).hexdigest()
|
||||
|
||||
|
||||
class ResponseError(RuntimeError):
|
||||
pass
|
||||
|
||||
|
||||
class AuthError(ResponseError):
|
||||
pass
|
||||
|
||||
|
||||
class HikvisionISAPIClient:
|
||||
def __init__(self, host):
|
||||
self.host = host
|
||||
self.cookies = {}
|
||||
|
||||
def auth(self, username: str, password: str):
|
||||
r = requests.get(self.isapi_uri('Security/sessionLogin/capabilities'),
|
||||
{'username': username},
|
||||
headers={
|
||||
'X-Requested-With': 'XMLHttpRequest',
|
||||
})
|
||||
r.raise_for_status()
|
||||
caps = xml_to_dict(r.text)['SessionLoginCap']
|
||||
is_irreversible = caps['isIrreversible'][0].lower() == 'true'
|
||||
|
||||
# https://github.com/JakeVincet/nvt/blob/master/2018/hikvision/gb_hikvision_ip_camera_default_credentials.nasl
|
||||
# also look into webAuth.js and utils.js
|
||||
|
||||
if 'salt' in caps and is_irreversible:
|
||||
p = sha256_hex(username + caps['salt'][0] + password)
|
||||
p = sha256_hex(p + caps['challenge'][0])
|
||||
for i in range(int(caps['iterations'][0])-2):
|
||||
p = sha256_hex(p)
|
||||
else:
|
||||
p = sha256_hex(password) + caps['challenge'][0]
|
||||
for i in range(int(caps['iterations'][0])-1):
|
||||
p = sha256_hex(p)
|
||||
|
||||
data = '<SessionLogin>'
|
||||
data += f'<userName>{username}</userName>'
|
||||
data += f'<password>{p}</password>'
|
||||
data += f'<sessionID>{caps["sessionID"][0]}</sessionID>'
|
||||
data += '<isSessionIDValidLongTerm>false</isSessionIDValidLongTerm>'
|
||||
data += f'<sessionIDVersion>{caps["sessionIDVersion"][0]}</sessionIDVersion>'
|
||||
data += '</SessionLogin>'
|
||||
|
||||
r = requests.post(self.isapi_uri(f'Security/sessionLogin?timeStamp={int(time())}'), data=data, headers={
|
||||
'Accept-Encoding': 'gzip, deflate',
|
||||
'If-Modified-Since': '0',
|
||||
'X-Requested-With': 'XMLHttpRequest',
|
||||
'Content-Type': 'application/x-www-form-urlencoded; charset=UTF-8',
|
||||
})
|
||||
r.raise_for_status()
|
||||
resp = xml_to_dict(r.text)['SessionLogin']
|
||||
status_value = int(resp['statusValue'][0])
|
||||
status_string = resp['statusString'][0]
|
||||
if status_value != 200:
|
||||
raise AuthError(f'{status_value}: {status_string}')
|
||||
|
||||
self.cookies = r.cookies.get_dict()
|
||||
|
||||
def get_ntp_server(self) -> str:
|
||||
r = requests.get(self.isapi_uri('System/time/ntpServers/capabilities'), cookies=self.cookies)
|
||||
r.raise_for_status()
|
||||
ntp_server = xml_to_dict(r.text)['NTPServerList']['NTPServer'][0]
|
||||
|
||||
if ntp_server['addressingFormatType'][0]['#text'] == 'hostname':
|
||||
ntp_host = ntp_server['hostName'][0]
|
||||
else:
|
||||
ntp_host = ntp_server['ipAddress'][0]
|
||||
|
||||
return ntp_host
|
||||
|
||||
def set_timezone(self):
|
||||
data = '<?xml version="1.0" encoding="UTF-8"?>'
|
||||
data += '<Time><timeMode>NTP</timeMode><timeZone>CST-3:00:00</timeZone></Time>'
|
||||
|
||||
r = requests.put(self.isapi_uri('System/time'), cookies=self.cookies, data=data, headers={
|
||||
'Content-Type': 'application/x-www-form-urlencoded; charset=UTF-8'
|
||||
})
|
||||
self.isapi_check_put_response(r)
|
||||
|
||||
def set_ntp_server(self, ntp_host: str, ntp_port: int = 123):
|
||||
format = 'ipaddress' if validate_ipv4(ntp_host) else 'hostname'
|
||||
|
||||
data = '<?xml version="1.0" encoding="UTF-8"?>'
|
||||
data += f'<NTPServer><id>1</id><addressingFormatType>{format}</addressingFormatType><ipAddress>{ntp_host}</ipAddress><portNo>{ntp_port}</portNo><synchronizeInterval>1440</synchronizeInterval></NTPServer>'
|
||||
|
||||
r = requests.put(self.isapi_uri('System/time/ntpServers/1'),
|
||||
data=data,
|
||||
cookies=self.cookies,
|
||||
headers={
|
||||
'Content-Type': 'application/x-www-form-urlencoded; charset=UTF-8'
|
||||
})
|
||||
self.isapi_check_put_response(r)
|
||||
|
||||
def isapi_uri(self, path: str) -> str:
|
||||
return f'http://{self.host}/ISAPI/{path}'
|
||||
|
||||
def isapi_check_put_response(self, r):
|
||||
r.raise_for_status()
|
||||
resp = xml_to_dict(r.text)['ResponseStatus']
|
||||
|
||||
status_code = int(resp['statusCode'][0])
|
||||
status_string = resp['statusString'][0]
|
||||
|
||||
if status_code != 1 or status_string.lower() != 'ok':
|
||||
raise ResponseError('response status looks bad')
|
||||
|
||||
|
||||
def main():
|
||||
parser = ArgumentParser()
|
||||
parser.add_argument('--host', type=str, required=True)
|
||||
parser.add_argument('--get-ntp-server', action='store_true')
|
||||
parser.add_argument('--set-ntp-server', type=str)
|
||||
parser.add_argument('--username', type=str)
|
||||
parser.add_argument('--password', type=str)
|
||||
args = parser.parse_args()
|
||||
|
||||
if not args.get_ntp_server and not args.set_ntp_server:
|
||||
raise ArgumentError(None, 'either --get-ntp-server or --set-ntp-server is required')
|
||||
|
||||
ipcam_config = IpcamConfig()
|
||||
login = args.username if args.username else ipcam_config['web_creds']['login']
|
||||
password = args.password if args.password else ipcam_config['web_creds']['password']
|
||||
|
||||
client = HikvisionISAPIClient(args.host)
|
||||
client.auth(args.username, args.password)
|
||||
|
||||
if args.get_ntp_server:
|
||||
print(client.get_ntp_server())
|
||||
return
|
||||
|
||||
if not args.set_ntp_server:
|
||||
raise ArgumentError(None, '--set-ntp-server is required')
|
||||
|
||||
if not validate_ipv4_or_hostname(args.set_ntp_server):
|
||||
raise ArgumentError(None, 'input ntp server is neither ip address nor a valid hostname')
|
||||
|
||||
client.set_ntp_server(args.set_ntp_server)
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
main()
|
@ -73,6 +73,15 @@ class IpcamConfig(ConfigUnit):
|
||||
'login': {'type': 'string', 'required': True},
|
||||
'password': {'type': 'string', 'required': True},
|
||||
}
|
||||
},
|
||||
|
||||
'web_creds': {
|
||||
'required': True,
|
||||
'type': 'dict',
|
||||
'schema': {
|
||||
'login': {'type': 'string', 'required': True},
|
||||
'password': {'type': 'string', 'required': True},
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -26,6 +26,7 @@ class LinuxBoardsConfig(ConfigUnit):
|
||||
'schema': {
|
||||
'mdns': {'type': 'string', 'required': True},
|
||||
'board': {'type': 'string', 'required': True},
|
||||
'location': {'type': 'string', 'required': True},
|
||||
'network': {
|
||||
'type': 'list',
|
||||
'required': True,
|
||||
|
@ -10,6 +10,7 @@ import string
|
||||
import random
|
||||
import re
|
||||
import os
|
||||
import ipaddress
|
||||
|
||||
from enum import Enum
|
||||
from datetime import datetime
|
||||
@ -37,6 +38,14 @@ def validate_ipv4_or_hostname(address: str, raise_exception: bool = False) -> bo
|
||||
return False
|
||||
|
||||
|
||||
def validate_ipv4(address: str) -> bool:
|
||||
try:
|
||||
ipaddress.IPv6Address(address)
|
||||
return True
|
||||
except ipaddress.AddressValueError:
|
||||
return False
|
||||
|
||||
|
||||
def validate_mac_address(mac_address: str) -> bool:
|
||||
mac_pattern = r'^([0-9A-Fa-f]{2}[:-]){5}([0-9A-Fa-f]{2})$'
|
||||
if re.match(mac_pattern, mac_address):
|
||||
|
Loading…
x
Reference in New Issue
Block a user