75 lines
1.9 KiB
Python
Executable File
75 lines
1.9 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
import include_homekit
|
|
import re
|
|
|
|
from html import escape
|
|
from typing import Optional
|
|
from homekit.config import AppConfigUnit, config
|
|
from homekit.modem import ModemsConfig, E3372
|
|
from homekit.database import MySQLHomeDatabase
|
|
from homekit.telegram import send_message
|
|
|
|
db: Optional[MySQLHomeDatabase] = None
|
|
|
|
|
|
class VkSmsCheckerConfig(AppConfigUnit):
|
|
NAME = 'vk_sms_checker'
|
|
|
|
@classmethod
|
|
def schema(cls) -> Optional[dict]:
|
|
return {
|
|
'modem': {'type': 'string', 'required': True}
|
|
}
|
|
|
|
@staticmethod
|
|
def custom_validator(data):
|
|
if data['modem'] not in ModemsConfig():
|
|
raise ValueError('invalid modem')
|
|
|
|
|
|
def get_last_time() -> int:
|
|
cur = db.cursor()
|
|
cur.execute("SELECT last_message_time FROM vk_sms LIMIT 1")
|
|
return int(cur.fetchone()[0])
|
|
|
|
|
|
def set_last_time(timestamp: int) -> None:
|
|
cur = db.cursor()
|
|
cur.execute("UPDATE vk_sms SET last_message_time=%s", (timestamp,))
|
|
db.commit()
|
|
|
|
|
|
def check_sms():
|
|
modem = ModemsConfig()[config.app_config['modem']]
|
|
cl = E3372(modem['ip'], legacy_token_auth=modem['legacy_auth'])
|
|
messages = cl.sms_list()
|
|
messages.reverse()
|
|
|
|
last_time = get_last_time()
|
|
new_last_time = None
|
|
results = []
|
|
|
|
if not messages:
|
|
return
|
|
|
|
for m in messages:
|
|
if m['UnixTime'] <= last_time:
|
|
continue
|
|
new_last_time = m['UnixTime']
|
|
if re.match(r'^vk', m['Phone'], flags=re.IGNORECASE) or re.match(r'vk', m['Content'], flags=re.IGNORECASE):
|
|
results.append(m)
|
|
|
|
if results:
|
|
for m in results:
|
|
text = '<b>'+escape(m['Phone'])+'</b> ('+m['Date']+')'
|
|
text += "\n"+escape(m['Content'])
|
|
send_message(text=text, chat='vk_sms_checker')
|
|
|
|
if new_last_time:
|
|
set_last_time(new_last_time)
|
|
|
|
|
|
if __name__ == '__main__':
|
|
db = MySQLHomeDatabase()
|
|
config.load_app(VkSmsCheckerConfig)
|
|
check_sms() |