homekit/bin/web_kbn.py
2024-02-19 01:44:11 +03:00

540 lines
17 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
import __py_include
import asyncio
import logging
import jinja2
import aiohttp_jinja2
import json
import re
import inverterd
import phonenumbers
import time
from io import StringIO
from aiohttp import web
from typing import Optional, Union
from urllib.parse import quote_plus
from homekit.config import config, AppConfigUnit, is_development_mode, Translation
from homekit.camera import IpcamConfig
from homekit.util import homekit_path, filesize_fmt, seconds_to_human_readable_string, json_serial, validate_ipv4
from homekit.modem import E3372, ModemsConfig, MacroNetWorkType
from homekit.inverter.config import InverterdConfig
from homekit.relay.sunxi_h3_client import RelayClient
from homekit import openwrt, http
class WebKbnConfig(AppConfigUnit):
NAME = 'web_kbn'
@classmethod
def schema(cls) -> Optional[dict]:
return {
'listen_addr': cls._addr_schema(required=True),
'assets_public_path': {'type': 'string'},
'pump_addr': cls._addr_schema(required=True),
'cam_hls_host': cls._addr_schema(required=True, only_ip=True),
'inverter_grafana_url': {'type': 'string'},
'sensors_grafana_url': {'type': 'string'},
}
common_static_files = [
'bootstrap.min.css',
'bootstrap.min.js',
'polyfills.js',
'app.js',
'app.css'
]
routes = web.RouteTableDef()
webkbn_strings = Translation('web_kbn')
logger = logging.getLogger(__name__)
def get_js_link(file, version) -> str:
if is_development_mode():
version = int(time.time())
if version:
file += f'?version={version}'
return f'<script src="{config.app_config["assets_public_path"]}/{file}" type="text/javascript"></script>'
def get_css_link(file, version) -> str:
if is_development_mode():
version = int(time.time())
if version:
file += f'?version={version}'
return f'<link rel="stylesheet" type="text/css" href="{config.app_config["assets_public_path"]}/{file}">'
def get_head_static(files=None) -> str:
buf = StringIO()
if files is None:
files = []
for file in common_static_files + files:
v = 2
try:
q_ind = file.index('?')
v = file[q_ind+1:]
file = file[:file.index('?')]
except ValueError:
pass
if file.endswith('.js'):
buf.write(get_js_link(file, v))
else:
buf.write(get_css_link(file, v))
return buf.getvalue()
def get_modem_client(modem_cfg: dict) -> E3372:
return E3372(modem_cfg['ip'], legacy_token_auth=modem_cfg['legacy_auth'])
def get_modem_data(modem_cfg: dict, get_raw=False) -> Union[dict, tuple]:
cl = get_modem_client(modem_cfg)
signal = cl.device_signal
status = cl.monitoring_status
traffic = cl.traffic_stats
if get_raw:
device_info = cl.device_information
dialup_conn = cl.dialup_connection
return signal, status, traffic, device_info, dialup_conn
else:
network_type_label = re.sub('^MACRO_NET_WORK_TYPE(_EX)?_', '', MacroNetWorkType(int(status['CurrentNetworkType'])).name)
return {
'type': network_type_label,
'level': int(status['SignalIcon']) if 'SignalIcon' in status else 0,
'rssi': signal['rssi'],
'sinr': signal['sinr'],
'connected_time': seconds_to_human_readable_string(int(traffic['CurrentConnectTime'])),
'downloaded': filesize_fmt(int(traffic['CurrentDownload'])),
'uploaded': filesize_fmt(int(traffic['CurrentUpload']))
}
def get_pump_client() -> RelayClient:
addr = config.app_config['pump_addr']
cl = RelayClient(host=addr.host, port=addr.port)
cl.connect()
return cl
def get_inverter_client() -> inverterd.Client:
cl = inverterd.Client(host=InverterdConfig()['remote_addr'].host)
cl.connect()
cl.format(inverterd.Format.JSON)
return cl
def get_inverter_data() -> tuple:
cl = get_inverter_client()
status = json.loads(cl.exec('get-status'))['data']
rated = json.loads(cl.exec('get-rated'))['data']
power_direction = status['battery_power_direction'].lower()
power_direction = re.sub('ge$', 'ging', power_direction)
charging_rate = ''
if power_direction == 'charging':
charging_rate = ' @ %s %s' % (
status['battery_charge_current']['value'],
status['battery_charge_current']['unit'])
elif power_direction == 'discharging':
charging_rate = ' @ %s %s' % (
status['battery_discharge_current']['value'],
status['battery_discharge_current']['unit'])
html = '<b>Battery:</b> %s %s' % (
status['battery_voltage']['value'],
status['battery_voltage']['unit'])
html += ' (%s%s, ' % (
status['battery_capacity']['value'],
status['battery_capacity']['unit'])
html += '%s%s)' % (power_direction, charging_rate)
html += "\n"
html += '<b>Load:</b> %s %s' % (
status['ac_output_active_power']['value'],
status['ac_output_active_power']['unit'])
html += ' (%s%%)' % (status['output_load_percent']['value'],)
if status['pv1_input_power']['value'] > 0:
html += "\n"
html += '<b>Input power:</b> %s %s' % (
status['pv1_input_power']['value'],
status['pv1_input_power']['unit'])
if status['grid_voltage']['value'] > 0 or status['grid_freq']['value'] > 0:
html += "\n"
html += '<b>AC input:</b> %s %s' % (
status['grid_voltage']['value'],
status['grid_voltage']['unit'])
html += ', %s %s' % (
status['grid_freq']['value'],
status['grid_freq']['unit'])
html += "\n"
html += '<b>Priority:</b> %s' % (rated['output_source_priority'],)
html = html.replace("\n", '<br>')
return status, rated, html
def get_current_upstream() -> str:
r = openwrt.get_default_route()
logger.info(f'default route: {r}')
mc = ModemsConfig()
for k, v in mc.items():
if 'gateway_ip' in v and v['gateway_ip'] == r:
r = v['ip']
break
upstream = None
for k, v in mc.items():
if r == v['ip']:
upstream = k
if not upstream:
raise RuntimeError('failed to determine current upstream!')
return upstream
def lang(key: str):
return webkbn_strings.get()[key]
async def render(req: web.Request,
template_name: str,
title: Optional[str] = None,
context: Optional[dict] = None,
assets: Optional[list] = None):
if context is None:
context = {}
context = {
**context,
'head_static': get_head_static(assets)
}
if title is not None:
context['title'] = title
response = aiohttp_jinja2.render_template(template_name+'.j2', req, context=context)
return response
@routes.get('/main.cgi')
async def index(req: web.Request):
ctx = {}
for k in 'inverter', 'sensors':
ctx[f'{k}_grafana_url'] = config.app_config[f'{k}_grafana_url']
cc = IpcamConfig()
ctx['camzones'] = cc['zones'].keys()
ctx['allcams'] = cc.get_all_cam_names()
return await render(req, 'index',
title=lang('sitename'),
context=ctx)
@routes.get('/modems.cgi')
async def modems(req: web.Request):
return await render(req, 'modems',
title='Состояние модемов',
context=dict(modems=ModemsConfig()))
@routes.get('/modems/info.ajx')
async def modems_ajx(req: web.Request):
mc = ModemsConfig()
modem = req.query.get('id', None)
if modem not in mc.keys():
raise ValueError('invalid modem id')
modem_cfg = mc.get(modem)
loop = asyncio.get_event_loop()
modem_data = await loop.run_in_executor(None, lambda: get_modem_data(modem_cfg))
html = aiohttp_jinja2.render_string('modem_data.j2', req, context=dict(
modem_data=modem_data,
modem=modem
))
return http.ajax_ok({'html': html})
@routes.get('/modems/verbose.cgi')
async def modems_verbose(req: web.Request):
modem = req.query.get('id', None)
if modem not in ModemsConfig().keys():
raise ValueError('invalid modem id')
modem_cfg = ModemsConfig().get(modem)
loop = asyncio.get_event_loop()
signal, status, traffic, device, dialup_conn = await loop.run_in_executor(None, lambda: get_modem_data(modem_cfg, True))
data = [
['Signal', signal],
['Connection', status],
['Traffic', traffic],
['Device info', device],
['Dialup connection', dialup_conn]
]
modem_name = ModemsConfig().getfullname(modem)
return await render(req, 'modem_verbose',
title=f'Подробная информация о модеме "{modem_name}"',
context=dict(data=data, modem_name=modem_name))
@routes.get('/sms.cgi')
async def sms(req: web.Request):
modem = req.query.get('id', list(ModemsConfig().keys())[0])
is_outbox = int(req.query.get('outbox', 0)) == 1
error = req.query.get('error', None)
sent = int(req.query.get('sent', 0)) == 1
cl = get_modem_client(ModemsConfig()[modem])
messages = cl.sms_list(1, 20, is_outbox)
return await render(req, 'sms',
title=f"SMS-сообщения ({'исходящие' if is_outbox else 'входящие'}, {modem})",
context=dict(
modems=ModemsConfig(),
selected_modem=modem,
is_outbox=is_outbox,
error=error,
is_sent=sent,
messages=messages
))
@routes.post('/sms.cgi')
async def sms_post(req: web.Request):
modem = req.query.get('id', list(ModemsConfig().keys())[0])
is_outbox = int(req.query.get('outbox', 0)) == 1
fd = await req.post()
phone = fd.get('phone', None)
text = fd.get('text', None)
return_url = f'/sms.cgi?id={modem}&outbox={int(is_outbox)}'
phone = re.sub('\s+', '', phone)
if len(phone) > 4:
country = None
if not phone.startswith('+'):
country = 'RU'
number = phonenumbers.parse(phone, country)
if not phonenumbers.is_valid_number(number):
raise web.HTTPFound(f'{return_url}&error=Неверный+номер')
phone = phonenumbers.format_number(number, phonenumbers.PhoneNumberFormat.E164)
cl = get_modem_client(ModemsConfig()[modem])
cl.sms_send(phone, text)
raise web.HTTPFound(return_url)
@routes.get('/inverter.cgi')
async def inverter(req: web.Request):
action = req.query.get('do', None)
if action == 'set-osp':
val = req.query.get('value')
if val not in ('sub', 'sbu'):
raise ValueError('invalid osp value')
cl = get_inverter_client()
cl.exec('set-output-source-priority',
arguments=(val.upper(),))
raise web.HTTPFound('/inverter.cgi')
status, rated, html = await asyncio.get_event_loop().run_in_executor(None, get_inverter_data)
return await render(req, 'inverter',
title=lang('inverter'),
context=dict(status=status, rated=rated, html=html))
@routes.get('/inverter.ajx')
async def inverter_ajx(req: web.Request):
status, rated, html = await asyncio.get_event_loop().run_in_executor(None, get_inverter_data)
return http.ajax_ok({'html': html})
@routes.get('/pump.cgi')
async def pump(req: web.Request):
# TODO
# these are blocking calls
# should be rewritten using aio
cl = get_pump_client()
action = req.query.get('set', None)
if action in ('on', 'off'):
getattr(cl, action)()
raise web.HTTPFound('/pump.cgi')
status = cl.status()
return await render(req, 'pump',
title=lang('pump'),
context=dict(status=status))
@routes.get('/cams.cgi')
async def cams(req: web.Request):
cc = IpcamConfig()
cam = req.query.get('id', None)
zone = req.query.get('zone', None)
debug_hls = bool(req.query.get('debug_hls', False))
debug_video_events = bool(req.query.get('debug_video_events', False))
if cam is not None:
if not cc.has_camera(int(cam)):
raise ValueError('invalid camera id')
cams = [int(cam)]
mode = {'type': 'single', 'cam': cam}
elif zone is not None:
if not cc.has_zone(zone):
raise ValueError('invalid zone')
cams = cc['zones'][zone]
mode = {'type': 'zone', 'zone': zone}
else:
raise web.HTTPBadRequest(text='no camera id or zone found')
js_config = {
'host': config.app_config['cam_hls_host'],
'proto': 'http',
'cams': cams,
'hlsConfig': {
'opts': {
'startPosition': -1,
# https://github.com/video-dev/hls.js/issues/3884#issuecomment-842380784
'liveSyncDuration': 2,
'liveMaxLatencyDuration': 3,
'maxLiveSyncPlaybackRate': 2,
'liveDurationInfinity': True
},
'debugVideoEvents': debug_video_events,
'debug': debug_hls
}
}
return await render(req, 'cams',
title=lang('cams'),
assets=['hls.js'],
context=dict(
mode=mode,
js_config=js_config,
))
@routes.get('/routing/main.cgi')
async def routing_main(req: web.Request):
upstream = get_current_upstream()
set_upstream_to = req.query.get('set-upstream-to', None)
mc = ModemsConfig()
if set_upstream_to and set_upstream_to in mc and set_upstream_to != upstream:
modem = mc[set_upstream_to]
new_upstream = str(modem['gateway_ip'] if 'gateway_ip' in modem else modem['ip'])
openwrt.set_upstream(new_upstream)
raise web.HTTPFound('/routing/main.cgi')
context = dict(
upstream=upstream,
selected_tab='main',
modems=mc.keys()
)
return await render(req, 'routing_main', title=lang('routing'), context=context)
@routes.get('/routing/rules.cgi')
async def routing_rules(req: web.Request):
mc = ModemsConfig()
action = req.query.get('action', None)
error = req.query.get('error', None)
set_name = req.query.get('set', None)
ip = req.query.get('ip', None)
def validate_input():
# validate set
if not set_name or set_name not in mc:
raise ValueError(f'invalid set \'{set_name}\'')
# validate ip
if not isinstance(ip, str):
raise ValueError('invalid ip')
slash_pos = None
try:
slash_pos = ip.index('/')
except ValueError:
pass
if slash_pos is not None:
ip_without_mask = ip[0:slash_pos]
else:
ip_without_mask = ip
if not validate_ipv4(ip_without_mask):
raise ValueError(f'invalid ip \'{ip}\'')
base_url = '/routing/rules.cgi'
if action in ('add', 'del'):
try:
validate_input()
except ValueError as e:
raise web.HTTPFound(f'{base_url}?error='+quote_plus(str(e)))
f = getattr(openwrt, f'ipset_{action}')
output = f(set_name, ip)
url = base_url
if output != '':
url += '?error='+quote_plus(output)
raise web.HTTPFound(url)
ipsets = openwrt.ipset_list_all()
context = dict(
sets=ipsets,
selected_tab='rules',
error=error
)
return await render(req, 'routing_rules',
title=lang('routing') + ' // ' + lang('routing_rules'),
context=context)
@routes.get('/routing/dhcp.cgi')
async def routing_dhcp(req: web.Request):
leases = openwrt.get_dhcp_leases()
return await render(req, 'routing_dhcp',
title=lang('routing') + ' // DHCP',
context=dict(leases=leases, selected_tab='dhcp'))
def init_web_app(app: web.Application):
aiohttp_jinja2.setup(
app,
loader=jinja2.FileSystemLoader(homekit_path('web', 'kbn_templates')),
autoescape=jinja2.select_autoescape(['html', 'xml']),
)
env = aiohttp_jinja2.get_env(app)
def filter_lang(key, unit='web_kbn'):
strings = Translation(unit)
if isinstance(key, str) and '.' in key:
return strings.get().get(key)
else:
return strings.get()[key]
env.filters['tojson'] = lambda obj: json.dumps(obj, separators=(',', ':'), default=json_serial)
env.filters['lang'] = filter_lang
app.router.add_static('/assets/', path=homekit_path('web', 'kbn_assets'))
if __name__ == '__main__':
config.load_app(WebKbnConfig)
http.serve(addr=config.app_config['listen_addr'],
routes=routes,
before_start=init_web_app)