homekit/include/py/xmeye/nwipcam.py

327 lines
11 KiB
Python
Executable File

#!/usr/bin/python3
# numenworld-ipcam - Reverse engineering of the NuMenWorld NCV-I536A IP camera
# Copyright (C) 2019-2019 Johannes Bauer
#
# Changes and improvements:
# Copyright (C) 2024 Evgeny Zinoviev
#
# This file is part of numenworld-ipcam (https://github.com/johndoe31415/numenworld-ipcam).
#
# numenworld-ipcam is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; this program is ONLY licensed under
# version 3 of the License, later versions are explicitly excluded.
#
# numenworld-ipcam is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with numenworld-ipcam; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
#
# Johannes Bauer <JohannesBauer@gmx.de>
import collections
import struct
import socket
import enum
import json
import subprocess
from .HorrificallyBrokenPasswordFunction import HorrificallyBrokenPasswordFunction
class XMEyeMsgCode(enum.IntEnum):
LoginCmd = 1000
LoginReply = LoginCmd + 1
KeepAliveCmd = 1006
KeepAliveReply = KeepAliveCmd + 1
SetConfigCmd = 1040
SetConfigReply = SetConfigCmd + 1
GetConfigCmd = 1042
GetConfidReply = GetConfigCmd + 1
GetSystemInfoCmd = 1020
GetSystemInfoReply = GetSystemInfoCmd + 1
ChannelTitleCmd = 1048
ChannelTitleReply = ChannelTitleCmd + 1
SystemFunctionCmd = 1360
SystemFunctionReply = SystemFunctionCmd + 1
OPMonitorStartStopCmd = 1410
OPMonitorStartStopReply = OPMonitorStartStopCmd + 1
OPMonitorClaimCmd = 1413
OPMonitorClaimReply = OPMonitorClaimCmd + 1
OPTimeQueryCmd = 1452
OPTimeQueryReply = OPTimeQueryCmd + 1
VideoStreamData = 1412
class AudioVideoDataType(enum.IntEnum):
VideoIncomplete = 0xfc
VideoComplete = 0xfd
AudioComplete = 0xfa
class AudioVideoPayload():
_HeaderFields = collections.namedtuple("HeaderFields", ["unknown1", "channel", "datatype", "unknown2", "length"])
_HeaderStruct = struct.Struct("< H B B H H")
def __init__(self, payload, hint=""):
self._header = self._HeaderFields(*self._HeaderStruct.unpack(payload[:self._HeaderStruct.size]))
print(
"%20s [%5d]: %s %s" % (hint, len(payload), " ".join("%02x" % (c) for c in payload[:8]), str(self._header)))
# if len(payload) != (self._HeaderStruct.size + self._header.length):
# raise Exception("Unexpected AV payload, expected %d bytes but got %d." % (self._HeaderStruct.size + self._header.length, len(payload)))
# print(self._header)
self._data = payload[self._HeaderStruct.size:]
@property
def data(self):
return self._data
class XMEyeMessage():
_HeaderFields = collections.namedtuple("HeaderFields", ["station", "session", "unknown", "msgcode", "length"])
_HeaderStruct = struct.Struct("< L L 6s H L")
def __init__(self, station, session, msgcode, message):
if isinstance(message, (bytes, bytearray)):
self._message = bytes(message)
else:
self._message = json.dumps(message).encode("ascii")
self._header = self._HeaderFields(station=station,
session=session,
unknown=bytes(6),
msgcode=msgcode,
length=len(self._message))
@property
def header(self):
return self._header
@property
def message(self):
return self._message
@property
def payload(self):
msg = self.message.rstrip(bytes(1))
try:
data = json.loads(msg)
except (json.JSONDecodeError, UnicodeError):
return self.message
return data
@property
def length(self):
return len(self.message)
def __bytes__(self):
header = self._HeaderStruct.pack(*self._header)
return header + self._message
@classmethod
def deserialize(cls, msg):
header_data = msg[:20]
header = cls._HeaderFields(*cls._HeaderStruct.unpack(header_data))
payload = msg[20: 20 + header.length]
if len(payload) < header.length:
payload += bytes(header.length - len(payload))
try:
msgcode = XMEyeMsgCode(header.msgcode)
except ValueError:
msgcode = header.msgcode
return cls(station=header.station, session=header.session, msgcode=msgcode, message=payload)
@classmethod
def deserialize_all(cls, msg):
msg = bytearray(msg)
while len(msg) >= 20:
next_msg = cls.deserialize(msg)
yield next_msg
msg = msg[20 + next_msg.length:]
def dump(self):
print("%s (%d bytes payload):" % (self._header.msgcode, self.length))
if isinstance(self.payload, bytes):
print(self.payload)
else:
print(json.dumps(self.payload, indent=4, sort_keys=True))
print()
def __repr__(self):
return "Msg(%s): %s" % (self.header, str(self.payload))
class XMEyeCamera():
def __init__(self, hostname, username="admin", password="", port=34567):
self._hostname = hostname
self._conn = socket.create_connection((hostname, port))
self._session = None
self._username = username
self._password = password
@property
def derived_password(self):
return HorrificallyBrokenPasswordFunction.derive(self._password)
@property
def rtsp_uri(self):
rtsp_port = 554
return "rtsp://%s:%d/user=%s&password=%s&channel=" % (
self._hostname, rtsp_port, self._username, self.derived_password)
def _rx_bytes(self, length):
result = bytearray()
while len(result) < length:
remaining_bytes = length - len(result)
rx_data = self._conn.recv(remaining_bytes)
result += rx_data
return result
def _rx(self):
response_header = self._rx_bytes(20)
header = XMEyeMessage.deserialize(response_header)
payload_data = self._rx_bytes(header.length)
response_data = response_header + payload_data
response = XMEyeMessage.deserialize(response_data)
# print("<-", response)
return response
def _tx(self, command):
# print("->", command)
data = bytes(command)
self._conn.send(data)
def _tx_rx(self, command):
self._tx(command)
return self._rx()
def login(self):
data = {
"EncryptType": "MD5",
"LoginType": "DVRIP-Web",
"UserName": self._username,
"PassWord": self.derived_password,
}
command = XMEyeMessage(station=0xff, session=0, msgcode=XMEyeMsgCode.LoginCmd, message=data)
response = self._tx_rx(command)
if int(response.payload["Ret"]) == 100:
# Login successful
self._session = int(response.payload["SessionID"], 16)
else:
raise Exception("Login failed:", response)
def _generic_cmd(self, name, msgcode):
data = {
"Name": name,
"SessionID": "0x%x" % (self._session,),
}
command = XMEyeMessage(station=0xff, session=self._session, msgcode=msgcode, message=data)
return self._tx_rx(command)
def get_systeminfo(self):
return self._generic_cmd("SystemInfo", XMEyeMsgCode.GetSystemInfoCmd)
def get_channel_title(self):
return self._generic_cmd("ChannelTitle", XMEyeMsgCode.ChannelTitleCmd)
def get_system_function(self):
return self._generic_cmd("SystemFunction", XMEyeMsgCode.SystemFunctionCmd)
def get_talk_audio_format(self):
return self._generic_cmd("TalkAudioFormat", XMEyeMsgCode.SystemFunctionCmd)
def get_ntp_server(self):
ntp_config = self._generic_cmd("NetWork.NetNTP", XMEyeMsgCode.GetConfigCmd)
return ntp_config.payload['NetWork.NetNTP']['Server']['Name']
def set_ntp_server(self,
ntp_host: str,
ntp_port: int = 123) -> None:
data = {
'Name': 'NetWork.NetNTP',
'NetWork.NetNTP': {
'Enable': True,
'Server': {
'Address': '0x00000000',
'Anonymity': False,
'Name': ntp_host,
'Password': '',
'Port': ntp_port,
'UserName': ''
},
'TimeZone': 13, # Moscow time
'UpdatePeriod': 60
},
"SessionID": "0x%x" % (self._session,),
}
command = XMEyeMessage(station=0xff, session=self._session, msgcode=XMEyeMsgCode.SetConfigCmd, message=data)
self._tx_rx(command)
def _opmonitor_cmd(self, action, msgcode, rx_msg=True):
data = {
"Name": "OPMonitor",
"OPMonitor": {
"Action": action,
"Parameter": {
"Channel": 0,
"CombinMode": "CONNECT_ALL",
"StreamType": "Main",
"TransMode": "TCP",
},
},
"SessionID": "0x%x" % (self._session,),
}
command = XMEyeMessage(station=0xff, session=self._session, msgcode=msgcode, message=data)
if rx_msg:
return self._tx_rx(command)
else:
self._tx(command)
def get_stream(self, packet_callback):
self._opmonitor_cmd("Claim", XMEyeMsgCode.OPMonitorClaimCmd)
self._opmonitor_cmd("Start", XMEyeMsgCode.OPMonitorStartStopCmd, rx_msg=False)
while True:
rx_pkt = self._rx()
packet_callback(rx_pkt)
# def playback_stream(self):
# mplayer_process = subprocess.Popen(["mplayer", "-demuxer", "h264es", "-"], stdin=subprocess.PIPE)
# with open("audio.raw", "wb") as f, open("video.raw", "wb") as video_f:
# def pkt_callback(pkt):
# if (pkt.header.station == 511) and (pkt.header.msgcode == XMEyeMsgCode.VideoStreamData):
# avpayload = AudioVideoPayload(pkt.payload, hint="video")
# mplayer_process.stdin.raw.write(pkt.payload)
# video_f.write(pkt.payload)
# elif pkt.header.msgcode == XMEyeMsgCode.VideoStreamData:
# # Audio data?
# avpayload = AudioVideoPayload(pkt.payload, hint="audio")
# f.write(avpayload.data)
# elif pkt.header.msgcode != XMEyeMsgCode.VideoStreamData:
# print(pkt)
#
# self.get_stream(pkt_callback)
if __name__ == '__main__':
cam = XMEyeCamera("192.168.0.47", password="DerPr03ess")
cam.login()
print(cam.get_systeminfo())
print(cam.get_channel_title())
# print(cam.get_talk_audio_format())
# print(cam.get_system_function())