611 lines
19 KiB
Python
Executable File
611 lines
19 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
import include_homekit
|
|
import asyncio
|
|
import logging
|
|
import jinja2
|
|
import aiohttp_jinja2
|
|
import json
|
|
import re
|
|
import inverterd
|
|
import phonenumbers
|
|
import time
|
|
import os.path
|
|
|
|
from io import StringIO
|
|
from aiohttp import web
|
|
from typing import Optional, Union
|
|
from urllib.parse import quote_plus
|
|
from contextvars import ContextVar
|
|
from homekit.config import config, AppConfigUnit, is_development_mode, Translation, Language
|
|
from homekit.camera import IpcamConfig
|
|
from homekit.util import homekit_path, filesize_fmt, seconds_to_human_readable_string, json_serial, validate_ipv4
|
|
from homekit.modem import E3372, ModemsConfig, MacroNetWorkType
|
|
from homekit.inverter.config import InverterdConfig
|
|
from homekit.relay.sunxi_h3_client import RelayClient
|
|
from homekit import openwrt, http
|
|
|
|
|
|
class WebKbnConfig(AppConfigUnit):
|
|
NAME = 'web_kbn'
|
|
|
|
@classmethod
|
|
def schema(cls) -> Optional[dict]:
|
|
return {
|
|
'listen_addr': cls._addr_schema(required=True),
|
|
'assets_public_path': {'type': 'string'},
|
|
'pump_addr': cls._addr_schema(required=True),
|
|
'hls_local_host': cls._addr_schema(required=True, only_ip=True),
|
|
'inverter_grafana_url': {'type': 'string'},
|
|
'sensors_grafana_url': {'type': 'string'},
|
|
}
|
|
|
|
|
|
# files marked with + at the beginning are included by default
|
|
common_static_files = {
|
|
'+bootstrap.min.css': 1,
|
|
'+bootstrap.bundle.min.js': 1,
|
|
'+polyfills.js': 1,
|
|
'+app.js': 10,
|
|
'+app.css': 6,
|
|
'hls.js': 1
|
|
}
|
|
routes = web.RouteTableDef()
|
|
logger = logging.getLogger(__name__)
|
|
lang_context_var = ContextVar('lang', default=Translation.DEFAULT_LANGUAGE)
|
|
|
|
|
|
def get_js_link(file, version) -> str:
|
|
if is_development_mode():
|
|
version = int(time.time())
|
|
file += f'?version={version}'
|
|
return f'<script src="{config.app_config["assets_public_path"]}/{file}" type="text/javascript"></script>'
|
|
|
|
|
|
def get_css_link(file, version) -> str:
|
|
if is_development_mode():
|
|
version = int(time.time())
|
|
file += f'?version={version}'
|
|
return f'<link rel="stylesheet" type="text/css" href="{config.app_config["assets_public_path"]}/{file}">'
|
|
|
|
|
|
def get_head_static(additional_files=None) -> str:
|
|
buf = StringIO()
|
|
if additional_files is None:
|
|
additional_files = []
|
|
|
|
for file, version in common_static_files.items():
|
|
enabled_by_default = file.startswith('+')
|
|
if not enabled_by_default and file not in additional_files:
|
|
continue
|
|
|
|
if enabled_by_default:
|
|
file = file[1:]
|
|
|
|
if file.endswith('.js'):
|
|
buf.write(get_js_link(file, version))
|
|
else:
|
|
buf.write(get_css_link(file, version))
|
|
|
|
return buf.getvalue()
|
|
|
|
|
|
def get_modem_client(modem_cfg: dict) -> E3372:
|
|
return E3372(modem_cfg['ip'], legacy_token_auth=modem_cfg['legacy_auth'])
|
|
|
|
|
|
def get_modem_data(modem_cfg: dict, get_raw=False) -> Union[dict, tuple]:
|
|
cl = get_modem_client(modem_cfg)
|
|
|
|
signal = cl.device_signal
|
|
status = cl.monitoring_status
|
|
traffic = cl.traffic_stats
|
|
|
|
if get_raw:
|
|
device_info = cl.device_information
|
|
dialup_conn = cl.dialup_connection
|
|
return signal, status, traffic, device_info, dialup_conn
|
|
else:
|
|
network_type_label = re.sub('^MACRO_NET_WORK_TYPE(_EX)?_', '', MacroNetWorkType(int(status['CurrentNetworkType'])).name)
|
|
return {
|
|
'type': network_type_label,
|
|
'level': int(status['SignalIcon']) if 'SignalIcon' in status else 0,
|
|
'rssi': signal['rssi'],
|
|
'sinr': signal['sinr'],
|
|
'connected_time': seconds_to_human_readable_string(int(traffic['CurrentConnectTime'])),
|
|
'downloaded': filesize_fmt(int(traffic['CurrentDownload'])),
|
|
'uploaded': filesize_fmt(int(traffic['CurrentUpload']))
|
|
}
|
|
|
|
|
|
def get_pump_client() -> RelayClient:
|
|
addr = config.app_config['pump_addr']
|
|
cl = RelayClient(host=addr.host, port=addr.port)
|
|
cl.connect()
|
|
return cl
|
|
|
|
|
|
def get_inverter_client() -> inverterd.Client:
|
|
cl = inverterd.Client(host=InverterdConfig()['remote_addr'].host)
|
|
cl.connect()
|
|
cl.format(inverterd.Format.JSON)
|
|
return cl
|
|
|
|
|
|
def get_inverter_data() -> tuple:
|
|
cl = get_inverter_client()
|
|
|
|
status = json.loads(cl.exec('get-status'))['data']
|
|
rated = json.loads(cl.exec('get-rated'))['data']
|
|
|
|
power_direction = status['battery_power_direction'].lower()
|
|
power_direction = re.sub('ge$', 'ging', power_direction)
|
|
|
|
charging_rate = ''
|
|
if power_direction == 'charging':
|
|
charging_rate = ' @ %s %s' % (
|
|
status['battery_charge_current']['value'],
|
|
status['battery_charge_current']['unit'])
|
|
elif power_direction == 'discharging':
|
|
charging_rate = ' @ %s %s' % (
|
|
status['battery_discharge_current']['value'],
|
|
status['battery_discharge_current']['unit'])
|
|
|
|
html = '<b>Battery:</b> %s %s' % (
|
|
status['battery_voltage']['value'],
|
|
status['battery_voltage']['unit'])
|
|
html += ' (%s%s, ' % (
|
|
status['battery_capacity']['value'],
|
|
status['battery_capacity']['unit'])
|
|
html += '%s%s)' % (power_direction, charging_rate)
|
|
|
|
html += "\n"
|
|
html += '<b>Load:</b> %s %s' % (
|
|
status['ac_output_active_power']['value'],
|
|
status['ac_output_active_power']['unit'])
|
|
html += ' (%s%%)' % (status['output_load_percent']['value'],)
|
|
|
|
if status['pv1_input_power']['value'] > 0:
|
|
html += "\n"
|
|
html += '<b>Input power:</b> %s %s' % (
|
|
status['pv1_input_power']['value'],
|
|
status['pv1_input_power']['unit'])
|
|
|
|
if status['grid_voltage']['value'] > 0 or status['grid_freq']['value'] > 0:
|
|
html += "\n"
|
|
html += '<b>AC input:</b> %s %s' % (
|
|
status['grid_voltage']['value'],
|
|
status['grid_voltage']['unit'])
|
|
html += ', %s %s' % (
|
|
status['grid_freq']['value'],
|
|
status['grid_freq']['unit'])
|
|
|
|
html += "\n"
|
|
html += '<b>Priority:</b> %s' % (rated['output_source_priority'],)
|
|
|
|
html = html.replace("\n", '<br>')
|
|
|
|
return status, rated, html
|
|
|
|
|
|
def get_current_upstream() -> str:
|
|
r = openwrt.get_default_route()
|
|
logger.info(f'default route: {r}')
|
|
mc = ModemsConfig()
|
|
for k, v in mc.items():
|
|
if 'gateway_ip' in v and v['gateway_ip'] == r:
|
|
r = v['ip']
|
|
break
|
|
upstream = None
|
|
for k, v in mc.items():
|
|
if r == v['ip']:
|
|
upstream = k
|
|
if not upstream:
|
|
raise RuntimeError('failed to determine current upstream!')
|
|
return upstream
|
|
|
|
|
|
def get_preferred_lang(req: web.Request) -> Language:
|
|
lang_cookie = req.cookies.get('lang', None)
|
|
if lang_cookie is None:
|
|
return Translation.DEFAULT_LANGUAGE
|
|
try:
|
|
return Language(lang_cookie)
|
|
except ValueError:
|
|
logger.debug(f"unsupported lang_cookie value: {lang_cookie}")
|
|
return Translation.DEFAULT_LANGUAGE
|
|
|
|
|
|
@web.middleware
|
|
async def language_middleware(request, handler):
|
|
lang_context_var.set(get_preferred_lang(request))
|
|
return await handler(request)
|
|
|
|
|
|
def lang(key, unit='web_kbn'):
|
|
strings = Translation(unit)
|
|
if isinstance(key, str) and '.' in key:
|
|
return strings.get(lang_context_var.get()).get(key)
|
|
else:
|
|
return strings.get(lang_context_var.get())[key]
|
|
|
|
|
|
async def render(req: web.Request,
|
|
template_name: str,
|
|
title: Optional[str] = None,
|
|
context: Optional[dict] = None,
|
|
assets: Optional[list] = None):
|
|
if context is None:
|
|
context = {}
|
|
context = {
|
|
**context,
|
|
'head_static': get_head_static(assets),
|
|
'user_lang': lang_context_var.get().value
|
|
}
|
|
if title is not None:
|
|
context['title'] = title
|
|
response = aiohttp_jinja2.render_template(template_name+'.j2', req, context=context)
|
|
return response
|
|
|
|
|
|
@routes.get('/')
|
|
async def index0(req: web.Request):
|
|
raise web.HTTPFound('main.cgi')
|
|
|
|
|
|
@routes.get('/main.cgi')
|
|
async def index(req: web.Request):
|
|
tabs = ['zones', 'list']
|
|
tab = req.query.get('tab', None)
|
|
if tab and (tab not in tabs or tab == tabs[0]):
|
|
raise web.HTTPFound('main.cgi')
|
|
if tab is None:
|
|
tab = tabs[0]
|
|
|
|
ctx = {}
|
|
for k in 'inverter', 'sensors':
|
|
ctx[f'{k}_grafana_url'] = config.app_config[f'{k}_grafana_url']
|
|
|
|
cc = IpcamConfig()
|
|
ctx['camzones'] = cc['zones'].keys()
|
|
ctx['allcams'] = cc.get_all_cam_names()
|
|
ctx['lang_enum'] = Language
|
|
ctx['lang_selected'] = lang_context_var.get()
|
|
ctx['tab_selected'] = tab
|
|
ctx['tabs'] = tabs
|
|
|
|
return await render(req, 'index',
|
|
title=lang('sitename'),
|
|
context=ctx)
|
|
|
|
|
|
@routes.get('/modems.cgi')
|
|
async def modems(req: web.Request):
|
|
return await render(req, 'modems',
|
|
title=lang('modem_statuses'),
|
|
context=dict(modems=ModemsConfig(),
|
|
modems_js_list=[key for key, value in ModemsConfig().items() if value['type'] == 'e3372']))
|
|
|
|
|
|
@routes.get('/modems_info.ajx')
|
|
async def modems_ajx(req: web.Request):
|
|
mc = ModemsConfig()
|
|
modem = req.query.get('id', None)
|
|
if modem not in mc.keys():
|
|
raise ValueError('invalid modem id')
|
|
|
|
modem_cfg = mc.get(modem)
|
|
if modem_cfg['type'] != 'e3372':
|
|
raise ValueError('invalid modem type')
|
|
|
|
loop = asyncio.get_event_loop()
|
|
modem_data = await loop.run_in_executor(None, lambda: get_modem_data(modem_cfg))
|
|
|
|
html = aiohttp_jinja2.render_string('modem_data.j2', req, context=dict(
|
|
modem_data=modem_data,
|
|
modem=modem
|
|
))
|
|
|
|
return http.ajax_ok({'html': html})
|
|
|
|
|
|
@routes.get('/modems_verbose.cgi')
|
|
async def modems_verbose(req: web.Request):
|
|
modem = req.query.get('id', None)
|
|
if modem not in ModemsConfig().keys():
|
|
raise ValueError('invalid modem id')
|
|
|
|
modem_cfg = ModemsConfig().get(modem)
|
|
if modem_cfg['type'] != 'e3372':
|
|
raise ValueError('invalid modem type')
|
|
|
|
loop = asyncio.get_event_loop()
|
|
signal, status, traffic, device, dialup_conn = await loop.run_in_executor(None, lambda: get_modem_data(modem_cfg, True))
|
|
data = [
|
|
['Signal', signal],
|
|
['Connection', status],
|
|
['Traffic', traffic],
|
|
['Device info', device],
|
|
['Dialup connection', dialup_conn]
|
|
]
|
|
|
|
modem_name = Translation('modems').get(lang_context_var.get())[modem]['full']
|
|
return await render(req, 'modem_verbose',
|
|
title=lang('modem_verbose_info_about_modem') % (modem_name,),
|
|
context=dict(data=data, modem_name=modem_name))
|
|
|
|
|
|
@routes.get('/sms.cgi')
|
|
async def sms(req: web.Request):
|
|
modem = req.query.get('id', list(ModemsConfig().keys())[0])
|
|
is_outbox = int(req.query.get('outbox', 0)) == 1
|
|
error = req.query.get('error', None)
|
|
sent = int(req.query.get('sent', 0)) == 1
|
|
|
|
input_modem = ModemsConfig()[modem]
|
|
if input_modem['type'] != 'e3372':
|
|
raise ValueError('invalid modem')
|
|
|
|
cl = get_modem_client(input_modem)
|
|
messages = cl.sms_list(1, 20, is_outbox)
|
|
return await render(req, 'sms',
|
|
title=lang('sms_page_title') % (lang('sms_outbox') if is_outbox else lang('sms_inbox'), modem),
|
|
context=dict(
|
|
modems=ModemsConfig(),
|
|
selected_modem=modem,
|
|
is_outbox=is_outbox,
|
|
error=error,
|
|
is_sent=sent,
|
|
messages=messages
|
|
))
|
|
|
|
|
|
@routes.post('/sms.cgi')
|
|
async def sms_post(req: web.Request):
|
|
modem = req.query.get('id', list(ModemsConfig().keys())[0])
|
|
is_outbox = int(req.query.get('outbox', 0)) == 1
|
|
|
|
fd = await req.post()
|
|
phone = fd.get('phone', None)
|
|
text = fd.get('text', None)
|
|
|
|
return_url = f'sms.cgi?id={modem}&outbox={int(is_outbox)}'
|
|
phone = re.sub(r'\s+', '', phone)
|
|
|
|
if len(phone) > 4:
|
|
country = None
|
|
if not phone.startswith('+'):
|
|
country = 'RU'
|
|
number = phonenumbers.parse(phone, country)
|
|
if not phonenumbers.is_valid_number(number):
|
|
raise web.HTTPFound(f'{return_url}&error=Неверный+номер')
|
|
phone = phonenumbers.format_number(number, phonenumbers.PhoneNumberFormat.E164)
|
|
|
|
cl = get_modem_client(ModemsConfig()[modem])
|
|
cl.sms_send(phone, text)
|
|
raise web.HTTPFound(return_url)
|
|
|
|
|
|
@routes.get('/inverter.cgi')
|
|
async def inverter(req: web.Request):
|
|
action = req.query.get('do', None)
|
|
if action == 'set-osp':
|
|
val = req.query.get('value')
|
|
if val not in ('sub', 'sbu'):
|
|
raise ValueError('invalid osp value')
|
|
cl = get_inverter_client()
|
|
cl.exec('set-output-source-priority',
|
|
arguments=(val.upper(),))
|
|
raise web.HTTPFound('inverter.cgi')
|
|
|
|
status, rated, html = await asyncio.get_event_loop().run_in_executor(None, get_inverter_data)
|
|
return await render(req, 'inverter',
|
|
title=lang('inverter'),
|
|
context=dict(status=status, rated=rated, html=html))
|
|
|
|
|
|
@routes.get('/inverter.ajx')
|
|
async def inverter_ajx(req: web.Request):
|
|
status, rated, html = await asyncio.get_event_loop().run_in_executor(None, get_inverter_data)
|
|
return http.ajax_ok({'html': html})
|
|
|
|
|
|
@routes.get('/pump.cgi')
|
|
async def pump(req: web.Request):
|
|
# TODO
|
|
# these are blocking calls
|
|
# should be rewritten using aio
|
|
|
|
cl = get_pump_client()
|
|
|
|
action = req.query.get('set', None)
|
|
if action in ('on', 'off'):
|
|
getattr(cl, action)()
|
|
raise web.HTTPFound('pump.cgi')
|
|
|
|
status = cl.status()
|
|
return await render(req, 'pump',
|
|
title=lang('pump'),
|
|
context=dict(status=status))
|
|
|
|
|
|
@routes.get('/cams.cgi')
|
|
async def cams(req: web.Request):
|
|
cc = IpcamConfig()
|
|
|
|
cam = req.query.get('id', None)
|
|
zone = req.query.get('zone', None)
|
|
debug_hls = bool(req.query.get('debug_hls', False))
|
|
debug_video_events = bool(req.query.get('debug_video_events', False))
|
|
|
|
if cam is not None:
|
|
if not cc.has_camera(int(cam)):
|
|
raise ValueError('invalid camera id')
|
|
cams = [int(cam)]
|
|
mode = {'type': 'single', 'cam': int(cam)}
|
|
|
|
elif zone is not None:
|
|
if not cc.has_zone(zone):
|
|
raise ValueError('invalid zone')
|
|
cams = cc['zones'][zone]
|
|
mode = {'type': 'zone', 'zone': zone}
|
|
|
|
else:
|
|
cams = cc.get_all_cam_names()
|
|
mode = {'type': 'all'}
|
|
|
|
if req.headers.get('Host').endswith('.manor.id'):
|
|
hls_pfx = 'https://'+req.headers.get('Host')
|
|
hls_pfx += re.sub(r'/home/?$', '/ipcam/', os.path.dirname(req.headers.get('X-Real-URI')))
|
|
else:
|
|
hls_pfx = 'http://'+str(config.app_config['hls_local_host'])+'/ipcam/'
|
|
|
|
js_config = {
|
|
'pfx': hls_pfx,
|
|
# 'host': config.app_config['hls_local_host'],
|
|
# 'proto': 'http',
|
|
'cams': cams,
|
|
'hlsConfig': {
|
|
'opts': {
|
|
'startPosition': -1,
|
|
# https://github.com/video-dev/hls.js/issues/3884#issuecomment-842380784
|
|
'liveSyncDuration': 2,
|
|
'liveMaxLatencyDuration': 3,
|
|
'maxLiveSyncPlaybackRate': 2,
|
|
'liveDurationInfinity': True
|
|
},
|
|
'debugVideoEvents': debug_video_events,
|
|
'debug': debug_hls
|
|
}
|
|
}
|
|
|
|
return await render(req, 'cams',
|
|
title=lang('cams'),
|
|
assets=['hls.js'],
|
|
context=dict(
|
|
mode=mode,
|
|
js_config=js_config,
|
|
))
|
|
|
|
|
|
@routes.get('/routing_main.cgi')
|
|
async def routing_main(req: web.Request):
|
|
upstream = get_current_upstream()
|
|
|
|
set_upstream_to = req.query.get('set-upstream-to', None)
|
|
mc = ModemsConfig()
|
|
|
|
if set_upstream_to and set_upstream_to in mc and set_upstream_to != upstream:
|
|
modem = mc[set_upstream_to]
|
|
new_upstream = str(modem['gateway_ip'] if 'gateway_ip' in modem else modem['ip'])
|
|
openwrt.set_upstream(new_upstream)
|
|
raise web.HTTPFound('routing_main.cgi')
|
|
|
|
context = dict(
|
|
upstream=upstream,
|
|
selected_tab='main',
|
|
modems=mc.keys()
|
|
)
|
|
return await render(req, 'routing_main', title=lang('routing'), context=context)
|
|
|
|
|
|
@routes.get('/routing_rules.cgi')
|
|
async def routing_rules(req: web.Request):
|
|
mc = ModemsConfig()
|
|
|
|
action = req.query.get('action', None)
|
|
error = req.query.get('error', None)
|
|
set_name = req.query.get('set', None)
|
|
ip = req.query.get('ip', None)
|
|
|
|
def validate_input():
|
|
# validate set
|
|
if not set_name or set_name not in mc:
|
|
raise ValueError(f'invalid set \'{set_name}\'')
|
|
|
|
# validate ip
|
|
if not isinstance(ip, str):
|
|
raise ValueError('invalid ip')
|
|
|
|
slash_pos = None
|
|
try:
|
|
slash_pos = ip.index('/')
|
|
except ValueError:
|
|
pass
|
|
|
|
if slash_pos is not None:
|
|
ip_without_mask = ip[0:slash_pos]
|
|
else:
|
|
ip_without_mask = ip
|
|
if not validate_ipv4(ip_without_mask):
|
|
raise ValueError(f'invalid ip \'{ip}\'')
|
|
|
|
base_url = 'routing_rules.cgi'
|
|
if action in ('add', 'del'):
|
|
try:
|
|
validate_input()
|
|
except ValueError as e:
|
|
raise web.HTTPFound(f'{base_url}?error='+quote_plus(str(e)))
|
|
f = getattr(openwrt, f'ipset_{action}')
|
|
output = f(set_name, ip)
|
|
url = base_url
|
|
if output != '':
|
|
url += '?error='+quote_plus(output)
|
|
raise web.HTTPFound(url)
|
|
|
|
ipsets = openwrt.ipset_list_all()
|
|
context = dict(
|
|
sets=ipsets,
|
|
selected_tab='rules',
|
|
error=error
|
|
)
|
|
return await render(req, 'routing_rules',
|
|
title=lang('routing') + ' // ' + lang('routing_rules'),
|
|
context=context)
|
|
|
|
|
|
@routes.get('/routing_dhcp.cgi')
|
|
async def routing_dhcp(req: web.Request):
|
|
leases = openwrt.get_dhcp_leases()
|
|
return await render(req, 'routing_dhcp',
|
|
title=lang('routing') + ' // DHCP',
|
|
context=dict(leases=leases, selected_tab='dhcp'))
|
|
|
|
|
|
@routes.get('/debug.cgi')
|
|
async def debug(req: web.Request):
|
|
info = dict(
|
|
headers=dict(req.headers),
|
|
host=req.headers.get('Host'),
|
|
url=str(req.url),
|
|
method=req.method,
|
|
)
|
|
return http.ajax_ok(info)
|
|
|
|
|
|
def init_web_app(app: web.Application):
|
|
app.middlewares.append(language_middleware)
|
|
aiohttp_jinja2.setup(
|
|
app,
|
|
loader=jinja2.FileSystemLoader(homekit_path('web', 'kbn_templates')),
|
|
autoescape=jinja2.select_autoescape(['html', 'xml']),
|
|
)
|
|
env = aiohttp_jinja2.get_env(app)
|
|
|
|
# @pass_context is used only to prevent jinja2 from caching the result of lang filter results of constant values.
|
|
# as of now i don't know a better way of doing it
|
|
@jinja2.pass_context
|
|
def filter_lang(ctx, key, unit='web_kbn'):
|
|
return lang(key, unit)
|
|
|
|
env.filters['tojson'] = lambda obj: json.dumps(obj, separators=(',', ':'), default=json_serial)
|
|
env.filters['lang'] = filter_lang
|
|
|
|
app.router.add_static('/assets/', path=homekit_path('web', 'kbn_assets'))
|
|
|
|
|
|
if __name__ == '__main__':
|
|
config.load_app(WebKbnConfig)
|
|
http.serve(addr=config.app_config['listen_addr'],
|
|
routes=routes,
|
|
before_start=init_web_app)
|