2023-06-10 23:20:37 +03:00

557 lines
21 KiB
Python

import asyncio
import logging
from inverterd import Format
from typing import Union
from enum import Enum
from ..util import Addr, stringify
class InverterEnum(Enum):
def as_text(self) -> str:
raise RuntimeError('abstract method')
class BatteryType(InverterEnum):
AGM = 0
Flooded = 1
User = 2
def as_text(self) -> str:
return ('AGM', 'Flooded', 'User')[self.value]
class InputVoltageRange(InverterEnum):
Appliance = 0
USP = 1
def as_text(self) -> str:
return ('Appliance', 'USP')[self.value]
class OutputSourcePriority(InverterEnum):
SolarUtilityBattery = 0
SolarBatteryUtility = 1
def as_text(self) -> str:
return ('Solar-Utility-Battery', 'Solar-Battery-Utility')[self.value]
class ChargeSourcePriority(InverterEnum):
SolarFirst = 0
SolarAndUtility = 1
SolarOnly = 2
def as_text(self) -> str:
return ('Solar-First', 'Solar-and-Utility', 'Solar-only')[self.value]
class MachineType(InverterEnum):
OffGridTie = 0
GridTie = 1
def as_text(self) -> str:
return ('Off-Grid-Tie', 'Grid-Tie')[self.value]
class Topology(InverterEnum):
TransformerLess = 0
Transformer = 1
def as_text(self) -> str:
return ('Transformer-less', 'Transformer')[self.value]
class OutputMode(InverterEnum):
SingleOutput = 0
ParallelOutput = 1
Phase_1_of_3 = 2
Phase_2_of_3 = 3
Phase_3_of_3 = 4
def as_text(self) -> str:
return (
'Single output',
'Parallel output',
'Phase 1 of 3-phase output',
'Phase 2 of 3-phase output',
'Phase 3 of 3-phase'
)[self.value]
class SolarPowerPriority(InverterEnum):
BatteryLoadUtility = 0
LoadBatteryUtility = 1
def as_text(self) -> str:
return ('Battery-Load-Utility', 'Load-Battery-Utility')[self.value]
class MPPTChargerStatus(InverterEnum):
Abnormal = 0
NotCharging = 1
Charging = 2
def as_text(self) -> str:
return ('Abnormal', 'Not charging', 'Charging')[self.value]
class BatteryPowerDirection(InverterEnum):
DoNothing = 0
Charge = 1
Discharge = 2
def as_text(self) -> str:
return ('Do nothing', 'Charge', 'Discharge')[self.value]
class DC_AC_PowerDirection(InverterEnum):
DoNothing = 0
AC_DC = 1
DC_AC = 2
def as_text(self) -> str:
return ('Do nothing', 'AC/DC', 'DC/AC')[self.value]
class LinePowerDirection(InverterEnum):
DoNothing = 0
Input = 1
Output = 2
def as_text(self) -> str:
return ('Do nothing', 'Input', 'Output')[self.value]
class WorkingMode(InverterEnum):
PowerOnMode = 0
StandbyMode = 1
BypassMode = 2
BatteryMode = 3
FaultMode = 4
HybridMode = 5
def as_text(self) -> str:
return (
'Power on mode',
'Standby mode',
'Bypass mode',
'Battery mode',
'Fault mode',
'Hybrid mode'
)[self.value]
class ParallelConnectionStatus(InverterEnum):
NotExistent = 0
Existent = 1
def as_text(self) -> str:
return ('Non-existent', 'Existent')[self.value]
class LoadConnectionStatus(InverterEnum):
Disconnected = 0
Connected = 1
def as_text(self) -> str:
return ('Disconnected', 'Connected')[self.value]
class ConfigurationStatus(InverterEnum):
Default = 0
Changed = 1
def as_text(self) -> str:
return ('Default', 'Changed')[self.value]
_g_human_readable = {"grid_voltage": "Grid voltage",
"grid_freq": "Grid frequency",
"ac_output_voltage": "AC output voltage",
"ac_output_freq": "AC output frequency",
"ac_output_apparent_power": "AC output apparent power",
"ac_output_active_power": "AC output active power",
"output_load_percent": "Output load percent",
"battery_voltage": "Battery voltage",
"battery_voltage_scc": "Battery voltage from SCC",
"battery_voltage_scc2": "Battery voltage from SCC2",
"battery_discharge_current": "Battery discharge current",
"battery_charge_current": "Battery charge current",
"battery_capacity": "Battery capacity",
"inverter_heat_sink_temp": "Inverter heat sink temperature",
"mppt1_charger_temp": "MPPT1 charger temperature",
"mppt2_charger_temp": "MPPT2 charger temperature",
"pv1_input_power": "PV1 input power",
"pv2_input_power": "PV2 input power",
"pv1_input_voltage": "PV1 input voltage",
"pv2_input_voltage": "PV2 input voltage",
"configuration_status": "Configuration state",
"mppt1_charger_status": "MPPT1 charger status",
"mppt2_charger_status": "MPPT2 charger status",
"load_connected": "Load connection",
"battery_power_direction": "Battery power direction",
"dc_ac_power_direction": "DC/AC power direction",
"line_power_direction": "Line power direction",
"local_parallel_id": "Local parallel ID",
"ac_input_rating_voltage": "AC input rating voltage",
"ac_input_rating_current": "AC input rating current",
"ac_output_rating_voltage": "AC output rating voltage",
"ac_output_rating_freq": "AC output rating frequency",
"ac_output_rating_current": "AC output rating current",
"ac_output_rating_apparent_power": "AC output rating apparent power",
"ac_output_rating_active_power": "AC output rating active power",
"battery_rating_voltage": "Battery rating voltage",
"battery_recharge_voltage": "Battery re-charge voltage",
"battery_redischarge_voltage": "Battery re-discharge voltage",
"battery_under_voltage": "Battery under voltage",
"battery_bulk_voltage": "Battery bulk voltage",
"battery_float_voltage": "Battery float voltage",
"battery_type": "Battery type",
"max_charge_current": "Max charge current",
"max_ac_charge_current": "Max AC charge current",
"input_voltage_range": "Input voltage range",
"output_source_priority": "Output source priority",
"charge_source_priority": "Charge source priority",
"parallel_max_num": "Parallel max num",
"machine_type": "Machine type",
"topology": "Topology",
"output_mode": "Output mode",
"solar_power_priority": "Solar power priority",
"mppt": "MPPT string",
"fault_code": "Fault code",
"line_fail": "Line fail",
"output_circuit_short": "Output circuit short",
"inverter_over_temperature": "Inverter over temperature",
"fan_lock": "Fan lock",
"battery_voltage_high": "Battery voltage high",
"battery_low": "Battery low",
"battery_under": "Battery under",
"over_load": "Over load",
"eeprom_fail": "EEPROM fail",
"power_limit": "Power limit",
"pv1_voltage_high": "PV1 voltage high",
"pv2_voltage_high": "PV2 voltage high",
"mppt1_overload_warning": "MPPT1 overload warning",
"mppt2_overload_warning": "MPPT2 overload warning",
"battery_too_low_to_charge_for_scc1": "Battery too low to charge for SCC1",
"battery_too_low_to_charge_for_scc2": "Battery too low to charge for SCC2",
"buzzer": "Buzzer",
"overload_bypass": "Overload bypass function",
"escape_to_default_screen_after_1min_timeout": "Escape to default screen after 1min timeout",
"overload_restart": "Overload restart",
"over_temp_restart": "Over temperature restart",
"backlight_on": "Backlight on",
"alarm_on_on_primary_source_interrupt": "Alarm on on primary source interrupt",
"fault_code_record": "Fault code record",
"wh": "Wh"}
class InverterEmulator:
def __init__(self, addr: Addr, wait=True):
self.status = {"grid_voltage": {"unit": "V", "value": 236.3},
"grid_freq": {"unit": "Hz", "value": 50.0},
"ac_output_voltage": {"unit": "V", "value": 229.9},
"ac_output_freq": {"unit": "Hz", "value": 50.0},
"ac_output_apparent_power": {"unit": "VA", "value": 207},
"ac_output_active_power": {"unit": "Wh", "value": 146},
"output_load_percent": {"unit": "%", "value": 4},
"battery_voltage": {"unit": "V", "value": 49.1},
"battery_voltage_scc": {"unit": "V", "value": 0.0},
"battery_voltage_scc2": {"unit": "V", "value": 0.0},
"battery_discharge_current": {"unit": "A", "value": 3},
"battery_charge_current": {"unit": "A", "value": 0},
"battery_capacity": {"unit": "%", "value": 69},
"inverter_heat_sink_temp": {"unit": "°C", "value": 17},
"mppt1_charger_temp": {"unit": "°C", "value": 0},
"mppt2_charger_temp": {"unit": "°C", "value": 0},
"pv1_input_power": {"unit": "Wh", "value": 0},
"pv2_input_power": {"unit": "Wh", "value": 0},
"pv1_input_voltage": {"unit": "V", "value": 0.0},
"pv2_input_voltage": {"unit": "V", "value": 0.0},
"configuration_status": ConfigurationStatus.Default,
"mppt1_charger_status": MPPTChargerStatus.Abnormal,
"mppt2_charger_status": MPPTChargerStatus.Abnormal,
"load_connected": LoadConnectionStatus.Connected,
"battery_power_direction": BatteryPowerDirection.Discharge,
"dc_ac_power_direction": DC_AC_PowerDirection.DC_AC,
"line_power_direction": LinePowerDirection.DoNothing,
"local_parallel_id": 0}
self.rated = {"ac_input_rating_voltage": {"unit": "V", "value": 230.0},
"ac_input_rating_current": {"unit": "A", "value": 21.7},
"ac_output_rating_voltage": {"unit": "V", "value": 230.0},
"ac_output_rating_freq": {"unit": "Hz", "value": 50.0},
"ac_output_rating_current": {"unit": "A", "value": 21.7},
"ac_output_rating_apparent_power": {"unit": "VA", "value": 5000},
"ac_output_rating_active_power": {"unit": "Wh", "value": 5000},
"battery_rating_voltage": {"unit": "V", "value": 48.0},
"battery_recharge_voltage": {"unit": "V", "value": 48.0},
"battery_redischarge_voltage": {"unit": "V", "value": 55.0},
"battery_under_voltage": {"unit": "V", "value": 42.0},
"battery_bulk_voltage": {"unit": "V", "value": 57.6},
"battery_float_voltage": {"unit": "V", "value": 54.0},
"battery_type": BatteryType.User,
"max_charge_current": {"unit": "A", "value": 60},
"max_ac_charge_current": {"unit": "A", "value": 30},
"input_voltage_range": InputVoltageRange.Appliance,
"output_source_priority": OutputSourcePriority.SolarBatteryUtility,
"charge_source_priority": ChargeSourcePriority.SolarAndUtility,
"parallel_max_num": 6,
"machine_type": MachineType.OffGridTie,
"topology": Topology.TransformerLess,
"output_mode": OutputMode.SingleOutput,
"solar_power_priority": SolarPowerPriority.LoadBatteryUtility,
"mppt": "2"}
self.errors = {"fault_code": 0,
"line_fail": False,
"output_circuit_short": False,
"inverter_over_temperature": False,
"fan_lock": False,
"battery_voltage_high": False,
"battery_low": False,
"battery_under": False,
"over_load": False,
"eeprom_fail": False,
"power_limit": False,
"pv1_voltage_high": False,
"pv2_voltage_high": False,
"mppt1_overload_warning": False,
"mppt2_overload_warning": False,
"battery_too_low_to_charge_for_scc1": False,
"battery_too_low_to_charge_for_scc2": False}
self.flags = {"buzzer": False,
"overload_bypass": True,
"escape_to_default_screen_after_1min_timeout": False,
"overload_restart": True,
"over_temp_restart": True,
"backlight_on": False,
"alarm_on_on_primary_source_interrupt": True,
"fault_code_record": False}
self.day_generated = 1000
self.logger = logging.getLogger(self.__class__.__name__)
host, port = addr
asyncio.run(self.run_server(host, port, wait))
# self.max_ac_charge_current = 30
# self.max_charge_current = 60
# self.charge_thresholds = [48, 54]
async def run_server(self, host, port, wait: bool):
server = await asyncio.start_server(self.client_handler, host, port)
async with server:
self.logger.info(f'listening on {host}:{port}')
if wait:
await server.serve_forever()
else:
asyncio.ensure_future(server.serve_forever())
async def client_handler(self, reader, writer):
client_fmt = Format.JSON
def w(s: str):
writer.write(s.encode('utf-8'))
def return_error(message=None):
w('err\r\n')
if message:
if client_fmt in (Format.JSON, Format.SIMPLE_JSON):
w(stringify({
'result': 'error',
'message': message
}))
elif client_fmt in (Format.TABLE, Format.SIMPLE_TABLE):
w(f'error: {message}')
w('\r\n')
w('\r\n')
def return_ok(data=None):
w('ok\r\n')
if client_fmt in (Format.JSON, Format.SIMPLE_JSON):
jdata = {
'result': 'ok'
}
if data:
jdata['data'] = data
w(stringify(jdata))
w('\r\n')
elif data:
w(data)
w('\r\n')
w('\r\n')
request = None
while request != 'quit':
try:
request = await reader.read(255)
if request == b'\x04':
break
request = request.decode('utf-8').strip()
except Exception:
break
if request.startswith('format '):
requested_format = request[7:]
try:
client_fmt = Format(requested_format)
except ValueError:
return_error('invalid format')
return_ok()
elif request.startswith('exec '):
buf = request[5:].split(' ')
command = buf[0]
args = buf[1:]
try:
return_ok(self.process_command(client_fmt, command, *args))
except ValueError as e:
return_error(str(e))
else:
return_error(f'invalid token: {request}')
try:
await writer.drain()
except ConnectionResetError as e:
# self.logger.exception(e)
pass
writer.close()
def process_command(self, fmt: Format, c: str, *args) -> Union[dict, str, list[int], None]:
ac_charge_currents = [2, 10, 20, 30, 40, 50, 60]
if c == 'get-status':
return self.format_dict(self.status, fmt)
elif c == 'get-rated':
return self.format_dict(self.rated, fmt)
elif c == 'get-errors':
return self.format_dict(self.errors, fmt)
elif c == 'get-flags':
return self.format_dict(self.flags, fmt)
elif c == 'get-day-generated':
return self.format_dict({'wh': 1000}, fmt)
elif c == 'get-allowed-ac-charge-currents':
return self.format_list(ac_charge_currents, fmt)
elif c == 'set-max-ac-charge-current':
if int(args[0]) != 0:
raise ValueError(f'invalid machine id: {args[0]}')
amps = int(args[1])
if amps not in ac_charge_currents:
raise ValueError(f'invalid value: {amps}')
self.rated['max_ac_charge_current']['value'] = amps
elif c == 'set-charge-thresholds':
self.rated['battery_recharge_voltage']['value'] = float(args[0])
self.rated['battery_redischarge_voltage']['value'] = float(args[1])
elif c == 'set-output-source-priority':
self.rated['output_source_priority'] = OutputSourcePriority.SolarBatteryUtility if args[0] == 'SBU' else OutputSourcePriority.SolarUtilityBattery
elif c == 'set-battery-cutoff-voltage':
self.rated['battery_under_voltage']['value'] = float(args[0])
elif c == 'set-flag':
flag = args[0]
val = bool(int(args[1]))
if flag == 'BUZZ':
k = 'buzzer'
elif flag == 'OLBP':
k = 'overload_bypass'
elif flag == 'LCDE':
k = 'escape_to_default_screen_after_1min_timeout'
elif flag == 'OLRS':
k = 'overload_restart'
elif flag == 'OTRS':
k = 'over_temp_restart'
elif flag == 'BLON':
k = 'backlight_on'
elif flag == 'ALRM':
k = 'alarm_on_on_primary_source_interrupt'
elif flag == 'FTCR':
k = 'fault_code_record'
else:
raise ValueError('invalid flag')
self.flags[k] = val
else:
raise ValueError(f'{c}: unsupported command')
@staticmethod
def format_list(values: list, fmt: Format) -> Union[str, list]:
if fmt in (Format.JSON, Format.SIMPLE_JSON):
return values
return '\n'.join(map(lambda v: str(v), values))
@staticmethod
def format_dict(data: dict, fmt: Format) -> Union[str, dict]:
new_data = {}
for k, v in data.items():
new_val = None
if fmt in (Format.JSON, Format.TABLE, Format.SIMPLE_TABLE):
if isinstance(v, dict):
new_val = v
elif isinstance(v, InverterEnum):
new_val = v.as_text()
else:
new_val = v
elif fmt == Format.SIMPLE_JSON:
if isinstance(v, dict):
new_val = v['value']
elif isinstance(v, InverterEnum):
new_val = v.value
else:
new_val = str(v)
new_data[k] = new_val
if fmt in (Format.JSON, Format.SIMPLE_JSON):
return new_data
lines = []
if fmt == Format.SIMPLE_TABLE:
for k, v in new_data.items():
buf = k
if isinstance(v, dict):
buf += ' ' + str(v['value']) + ' ' + v['unit']
elif isinstance(v, InverterEnum):
buf += ' ' + v.as_text()
else:
buf += ' ' + str(v)
lines.append(buf)
elif fmt == Format.TABLE:
max_k_len = 0
for k in new_data.keys():
if len(_g_human_readable[k]) > max_k_len:
max_k_len = len(_g_human_readable[k])
for k, v in new_data.items():
buf = _g_human_readable[k] + ':'
buf += ' ' * (max_k_len - len(_g_human_readable[k]) + 1)
if isinstance(v, dict):
buf += str(v['value']) + ' ' + v['unit']
elif isinstance(v, InverterEnum):
buf += v.as_text()
elif isinstance(v, bool):
buf += str(int(v))
else:
buf += str(v)
lines.append(buf)
return '\n'.join(lines)