355 lines
12 KiB
Python
355 lines
12 KiB
Python
#!/usr/bin/env python3
|
||
import asyncio
|
||
import jinja2
|
||
import aiohttp_jinja2
|
||
import json
|
||
import re
|
||
import inverterd
|
||
import phonenumbers
|
||
import __py_include
|
||
|
||
from io import StringIO
|
||
from aiohttp.web import HTTPFound
|
||
from typing import Optional, Union
|
||
from homekit.config import config, AppConfigUnit
|
||
from homekit.util import homekit_path, filesize_fmt, seconds_to_human_readable_string
|
||
from homekit.modem import E3372, ModemsConfig, MacroNetWorkType
|
||
from homekit.inverter.config import InverterdConfig
|
||
from homekit.relay.sunxi_h3_client import RelayClient
|
||
from homekit import http
|
||
|
||
|
||
class WebKbnConfig(AppConfigUnit):
|
||
NAME = 'web_kbn'
|
||
|
||
@classmethod
|
||
def schema(cls) -> Optional[dict]:
|
||
return {
|
||
'listen_addr': cls._addr_schema(required=True),
|
||
'assets_public_path': {'type': 'string'},
|
||
'pump_addr': cls._addr_schema(required=True),
|
||
'inverter_grafana_url': {'type': 'string'},
|
||
'sensors_grafana_url': {'type': 'string'},
|
||
}
|
||
|
||
|
||
STATIC_FILES = [
|
||
'bootstrap.min.css',
|
||
'bootstrap.min.js',
|
||
'polyfills.js',
|
||
'app.js',
|
||
'app.css'
|
||
]
|
||
|
||
|
||
def get_js_link(file, version) -> str:
|
||
if version:
|
||
file += f'?version={version}'
|
||
return f'<script src="{config.app_config["assets_public_path"]}/{file}" type="text/javascript"></script>'
|
||
|
||
|
||
def get_css_link(file, version) -> str:
|
||
if version:
|
||
file += f'?version={version}'
|
||
return f'<link rel="stylesheet" type="text/css" href="{config.app_config["assets_public_path"]}/{file}">'
|
||
|
||
|
||
def get_head_static() -> str:
|
||
buf = StringIO()
|
||
for file in STATIC_FILES:
|
||
v = 2
|
||
try:
|
||
q_ind = file.index('?')
|
||
v = file[q_ind+1:]
|
||
file = file[:file.index('?')]
|
||
except ValueError:
|
||
pass
|
||
|
||
if file.endswith('.js'):
|
||
buf.write(get_js_link(file, v))
|
||
else:
|
||
buf.write(get_css_link(file, v))
|
||
return buf.getvalue()
|
||
|
||
|
||
def get_modem_client(modem_cfg: dict) -> E3372:
|
||
return E3372(modem_cfg['ip'], legacy_token_auth=modem_cfg['legacy_auth'])
|
||
|
||
|
||
def get_modem_data(modem_cfg: dict, get_raw=False) -> Union[dict, tuple]:
|
||
cl = get_modem_client(modem_cfg)
|
||
|
||
signal = cl.device_signal
|
||
status = cl.monitoring_status
|
||
traffic = cl.traffic_stats
|
||
|
||
if get_raw:
|
||
device_info = cl.device_information
|
||
dialup_conn = cl.dialup_connection
|
||
return signal, status, traffic, device_info, dialup_conn
|
||
else:
|
||
network_type_label = re.sub('^MACRO_NET_WORK_TYPE(_EX)?_', '', MacroNetWorkType(int(status['CurrentNetworkType'])).name)
|
||
return {
|
||
'type': network_type_label,
|
||
'level': int(status['SignalIcon']) if 'SignalIcon' in status else 0,
|
||
'rssi': signal['rssi'],
|
||
'sinr': signal['sinr'],
|
||
'connected_time': seconds_to_human_readable_string(int(traffic['CurrentConnectTime'])),
|
||
'downloaded': filesize_fmt(int(traffic['CurrentDownload'])),
|
||
'uploaded': filesize_fmt(int(traffic['CurrentUpload']))
|
||
}
|
||
|
||
|
||
def get_pump_client() -> RelayClient:
|
||
addr = config.app_config['pump_addr']
|
||
cl = RelayClient(host=addr.host, port=addr.port)
|
||
cl.connect()
|
||
return cl
|
||
|
||
|
||
def get_inverter_client() -> inverterd.Client:
|
||
cl = inverterd.Client(host=InverterdConfig()['remote_addr'].host)
|
||
cl.connect()
|
||
cl.format(inverterd.Format.JSON)
|
||
return cl
|
||
|
||
|
||
def get_inverter_data() -> tuple:
|
||
cl = get_inverter_client()
|
||
|
||
status = json.loads(cl.exec('get-status'))['data']
|
||
rated = json.loads(cl.exec('get-rated'))['data']
|
||
|
||
power_direction = status['battery_power_direction'].lower()
|
||
power_direction = re.sub('ge$', 'ging', power_direction)
|
||
|
||
charging_rate = ''
|
||
if power_direction == 'charging':
|
||
charging_rate = ' @ %s %s' % (
|
||
status['battery_charge_current']['value'],
|
||
status['battery_charge_current']['unit'])
|
||
elif power_direction == 'discharging':
|
||
charging_rate = ' @ %s %s' % (
|
||
status['battery_discharge_current']['value'],
|
||
status['battery_discharge_current']['unit'])
|
||
|
||
html = '<b>Battery:</b> %s %s' % (
|
||
status['battery_voltage']['value'],
|
||
status['battery_voltage']['unit'])
|
||
html += ' (%s%s, ' % (
|
||
status['battery_capacity']['value'],
|
||
status['battery_capacity']['unit'])
|
||
html += '%s%s)' % (power_direction, charging_rate)
|
||
|
||
html += "\n"
|
||
html += '<b>Load:</b> %s %s' % (
|
||
status['ac_output_active_power']['value'],
|
||
status['ac_output_active_power']['unit'])
|
||
html += ' (%s%%)' % (status['output_load_percent']['value'],)
|
||
|
||
if status['pv1_input_power']['value'] > 0:
|
||
html += "\n"
|
||
html += '<b>Input power:</b> %s %s' % (
|
||
status['pv1_input_power']['value'],
|
||
status['pv1_input_power']['unit'])
|
||
|
||
if status['grid_voltage']['value'] > 0 or status['grid_freq']['value'] > 0:
|
||
html += "\n"
|
||
html += '<b>AC input:</b> %s %s' % (
|
||
status['grid_voltage']['value'],
|
||
status['grid_voltage']['unit'])
|
||
html += ', %s %s' % (
|
||
status['grid_freq']['value'],
|
||
status['grid_freq']['unit'])
|
||
|
||
html += "\n"
|
||
html += '<b>Priority:</b> %s' % (rated['output_source_priority'],)
|
||
|
||
html = html.replace("\n", '<br>')
|
||
|
||
return status, rated, html
|
||
|
||
|
||
class WebSite(http.HTTPServer):
|
||
_modems_config: ModemsConfig
|
||
|
||
def __init__(self, *args, **kwargs):
|
||
super().__init__(*args, **kwargs)
|
||
|
||
self._modems_config = ModemsConfig()
|
||
|
||
aiohttp_jinja2.setup(
|
||
self.app,
|
||
loader=jinja2.FileSystemLoader(homekit_path('web', 'kbn_templates')),
|
||
autoescape=jinja2.select_autoescape(['html', 'xml']),
|
||
)
|
||
env = aiohttp_jinja2.get_env(self.app)
|
||
env.filters['tojson'] = lambda obj: json.dumps(obj, separators=(',', ':'))
|
||
|
||
self.app.router.add_static('/assets/', path=homekit_path('web', 'kbn_assets'))
|
||
|
||
self.get('/main.cgi', self.index)
|
||
|
||
self.get('/modems.cgi', self.modems)
|
||
self.get('/modems/info.ajx', self.modems_ajx)
|
||
self.get('/modems/verbose.cgi', self.modems_verbose)
|
||
|
||
self.get('/inverter.cgi', self.inverter)
|
||
self.get('/inverter.ajx', self.inverter_ajx)
|
||
self.get('/pump.cgi', self.pump)
|
||
self.get('/sms.cgi', self.sms)
|
||
self.post('/sms.cgi', self.sms_post)
|
||
|
||
async def render_page(self,
|
||
req: http.Request,
|
||
template_name: str,
|
||
title: Optional[str] = None,
|
||
context: Optional[dict] = None):
|
||
if context is None:
|
||
context = {}
|
||
context = {
|
||
**context,
|
||
'head_static': get_head_static()
|
||
}
|
||
if title is not None:
|
||
context['title'] = title
|
||
response = aiohttp_jinja2.render_template(template_name+'.j2', req, context=context)
|
||
return response
|
||
|
||
async def index(self, req: http.Request):
|
||
ctx = {}
|
||
for k in 'inverter', 'sensors':
|
||
ctx[f'{k}_grafana_url'] = config.app_config[f'{k}_grafana_url']
|
||
return await self.render_page(req, 'index',
|
||
title="Home web site",
|
||
context=ctx)
|
||
|
||
async def modems(self, req: http.Request):
|
||
return await self.render_page(req, 'modems',
|
||
title='Состояние модемов',
|
||
context=dict(modems=self._modems_config))
|
||
|
||
async def modems_ajx(self, req: http.Request):
|
||
modem = req.query.get('id', None)
|
||
if modem not in self._modems_config.keys():
|
||
raise ValueError('invalid modem id')
|
||
|
||
modem_cfg = self._modems_config.get(modem)
|
||
loop = asyncio.get_event_loop()
|
||
modem_data = await loop.run_in_executor(None, lambda: get_modem_data(modem_cfg))
|
||
|
||
html = aiohttp_jinja2.render_string('modem_data.j2', req, context=dict(
|
||
modem_data=modem_data,
|
||
modem=modem
|
||
))
|
||
|
||
return self.ok({'html': html})
|
||
|
||
async def modems_verbose(self, req: http.Request):
|
||
modem = req.query.get('id', None)
|
||
if modem not in self._modems_config.keys():
|
||
raise ValueError('invalid modem id')
|
||
|
||
modem_cfg = self._modems_config.get(modem)
|
||
loop = asyncio.get_event_loop()
|
||
signal, status, traffic, device, dialup_conn = await loop.run_in_executor(None, lambda: get_modem_data(modem_cfg, True))
|
||
data = [
|
||
['Signal', signal],
|
||
['Connection', status],
|
||
['Traffic', traffic],
|
||
['Device info', device],
|
||
['Dialup connection', dialup_conn]
|
||
]
|
||
|
||
modem_name = self._modems_config.getfullname(modem)
|
||
return await self.render_page(req, 'modem_verbose',
|
||
title=f'Подробная информация о модеме "{modem_name}"',
|
||
context=dict(data=data, modem_name=modem_name))
|
||
|
||
async def sms(self, req: http.Request):
|
||
modem = req.query.get('id', list(self._modems_config.keys())[0])
|
||
is_outbox = int(req.query.get('outbox', 0)) == 1
|
||
error = req.query.get('error', None)
|
||
sent = int(req.query.get('sent', 0)) == 1
|
||
|
||
cl = get_modem_client(self._modems_config[modem])
|
||
messages = cl.sms_list(1, 20, is_outbox)
|
||
return await self.render_page(req, 'sms',
|
||
title=f"SMS-сообщения ({'исходящие' if is_outbox else 'входящие'}, {modem})",
|
||
context=dict(
|
||
modems=self._modems_config,
|
||
selected_modem=modem,
|
||
is_outbox=is_outbox,
|
||
error=error,
|
||
is_sent=sent,
|
||
messages=messages
|
||
))
|
||
|
||
async def sms_post(self, req: http.Request):
|
||
modem = req.query.get('id', list(self._modems_config.keys())[0])
|
||
is_outbox = int(req.query.get('outbox', 0)) == 1
|
||
|
||
fd = await req.post()
|
||
phone = fd.get('phone', None)
|
||
text = fd.get('text', None)
|
||
|
||
return_url = f'/sms.cgi?id={modem}&outbox={int(is_outbox)}'
|
||
phone = re.sub('\s+', '', phone)
|
||
|
||
if len(phone) > 4:
|
||
country = None
|
||
if not phone.startswith('+'):
|
||
country = 'RU'
|
||
number = phonenumbers.parse(phone, country)
|
||
if not phonenumbers.is_valid_number(number):
|
||
raise HTTPFound(f'{return_url}&error=Неверный+номер')
|
||
phone = phonenumbers.format_number(number, phonenumbers.PhoneNumberFormat.E164)
|
||
|
||
cl = get_modem_client(self._modems_config[modem])
|
||
cl.sms_send(phone, text)
|
||
raise HTTPFound(return_url)
|
||
|
||
async def inverter(self, req: http.Request):
|
||
action = req.query.get('do', None)
|
||
if action == 'set-osp':
|
||
val = req.query.get('value')
|
||
if val not in ('sub', 'sbu'):
|
||
raise ValueError('invalid osp value')
|
||
cl = get_inverter_client()
|
||
cl.exec('set-output-source-priority',
|
||
arguments=(val.upper(),))
|
||
raise HTTPFound('/inverter.cgi')
|
||
|
||
status, rated, html = await asyncio.get_event_loop().run_in_executor(None, get_inverter_data)
|
||
return await self.render_page(req, 'inverter',
|
||
title='Инвертор',
|
||
context=dict(status=status, rated=rated, html=html))
|
||
|
||
async def inverter_ajx(self, req: http.Request):
|
||
status, rated, html = await asyncio.get_event_loop().run_in_executor(None, get_inverter_data)
|
||
return self.ok({'html': html})
|
||
|
||
async def pump(self, req: http.Request):
|
||
# TODO
|
||
# these are blocking calls
|
||
# should be rewritten using aio
|
||
|
||
cl = get_pump_client()
|
||
|
||
action = req.query.get('set', None)
|
||
if action in ('on', 'off'):
|
||
getattr(cl, action)()
|
||
raise HTTPFound('/pump.cgi')
|
||
|
||
status = cl.status()
|
||
return await self.render_page(req, 'pump',
|
||
title='Насос',
|
||
context=dict(status=status))
|
||
|
||
|
||
if __name__ == '__main__':
|
||
config.load_app(WebKbnConfig)
|
||
|
||
server = WebSite(config.app_config['listen_addr'])
|
||
server.run()
|