refactor a bit
This commit is contained in:
parent
d554e1c1c9
commit
3bfca2f2fb
5
mosgorsud/__init__.py
Normal file
5
mosgorsud/__init__.py
Normal file
@ -0,0 +1,5 @@
|
|||||||
|
from .parser import get_cases
|
||||||
|
|
||||||
|
__all__ = [
|
||||||
|
'get_cases'
|
||||||
|
]
|
@ -5,9 +5,15 @@ import os
|
|||||||
import tempfile
|
import tempfile
|
||||||
import random
|
import random
|
||||||
import string
|
import string
|
||||||
|
import logging
|
||||||
|
|
||||||
from bs4 import BeautifulSoup
|
from bs4 import BeautifulSoup
|
||||||
from typing import List, Dict
|
from typing import List, Dict
|
||||||
|
|
||||||
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
BASE_URL = "https://mos-gorsud.ru/mgs/defend"
|
||||||
|
|
||||||
headers = {
|
headers = {
|
||||||
'Referer': 'https://mos-gorsud.ru/',
|
'Referer': 'https://mos-gorsud.ru/',
|
||||||
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; rv:78.0) Gecko/20100101 Firefox/78.0'
|
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; rv:78.0) Gecko/20100101 Firefox/78.0'
|
||||||
@ -24,35 +30,55 @@ def get_links(s: str) -> List[str]:
|
|||||||
return list(set(re.findall(regex, s)))
|
return list(set(re.findall(regex, s)))
|
||||||
|
|
||||||
|
|
||||||
class MGSPiracy:
|
def get_full_url(url: str) -> str:
|
||||||
BASE_URL = "https://mos-gorsud.ru/mgs/defend"
|
if not url.startswith('http:') and not url.startswith('https:'):
|
||||||
|
if not url.startswith('/'):
|
||||||
|
url = '/' + url
|
||||||
|
url = 'https://mos-gorsud.ru' + url
|
||||||
|
return url
|
||||||
|
|
||||||
def __init__(self, from_page: int, to_page: int):
|
|
||||||
self.from_page = from_page
|
|
||||||
self.to_page = to_page
|
|
||||||
|
|
||||||
def get_cases(self) -> List[Dict]:
|
def get_document_text(url: str) -> str:
|
||||||
cases = []
|
print(f'downloading {url}')
|
||||||
|
|
||||||
for page in range(self.from_page, self.to_page+1):
|
r = requests.get(url, allow_redirects=True, headers=headers)
|
||||||
print(f'page {page}')
|
content_disposition = r.headers['Content-Disposition']
|
||||||
|
filename, file_extension = os.path.splitext(re.search('attachment; filename="(.*?)"', content_disposition).group(1))
|
||||||
|
|
||||||
url = self.BASE_URL + '?page=' + str(page)
|
tempname = '%s/%s%s' % (tempfile.gettempdir(), strgen(10), file_extension)
|
||||||
r = requests.get(url, headers=headers)
|
|
||||||
|
|
||||||
soup = BeautifulSoup(r.text, "html.parser")
|
with open(tempname, 'wb') as f:
|
||||||
rows = soup.select('.searchResultContainer table.custom_table tbody tr')
|
f.write(r.content)
|
||||||
|
|
||||||
for row in rows:
|
text = textract.process(tempname).decode('utf-8')
|
||||||
cols = row.find_all('td')
|
os.unlink(tempname)
|
||||||
|
|
||||||
|
return text
|
||||||
|
|
||||||
|
|
||||||
|
def get_cases(from_page: int, to_page: int) -> List[Dict]:
|
||||||
|
cases = []
|
||||||
|
|
||||||
|
for page in range(from_page, to_page+1):
|
||||||
|
url = f'{BASE_URL}?page={page}'
|
||||||
|
print(f'page {page} ({url})')
|
||||||
|
|
||||||
|
r = requests.get(url, headers=headers)
|
||||||
|
|
||||||
|
soup = BeautifulSoup(r.text, "html.parser")
|
||||||
|
rows = soup.select('.searchResultContainer table.custom_table tbody tr')
|
||||||
|
|
||||||
|
for row in rows:
|
||||||
|
cols = row.find_all('td')
|
||||||
|
|
||||||
|
try:
|
||||||
date = cols[0].get_text().strip()
|
date = cols[0].get_text().strip()
|
||||||
statement_number = cols[1].get_text().strip()
|
statement_number = cols[1].get_text().strip()
|
||||||
applicant = cols[3].get_text().strip()
|
applicant = cols[3].get_text().strip()
|
||||||
object = cols[4].get_text().strip()
|
object = cols[4].get_text().strip()
|
||||||
link = self.mgs_url(cols[5].find('a')['href'])
|
link = get_full_url(cols[5].find('a')['href'])
|
||||||
|
|
||||||
decision_text = self.get_document_text(link)
|
decision_text = get_document_text(link)
|
||||||
violation_links = '\n'.join(get_links(decision_text))
|
violation_links = '\n'.join(get_links(decision_text))
|
||||||
|
|
||||||
cases.append(dict(
|
cases.append(dict(
|
||||||
@ -65,28 +91,7 @@ class MGSPiracy:
|
|||||||
decision_text=decision_text
|
decision_text=decision_text
|
||||||
))
|
))
|
||||||
|
|
||||||
return cases
|
except (TypeError, KeyError) as e:
|
||||||
|
logger.exception(e)
|
||||||
|
|
||||||
def mgs_url(self, url: str) -> str:
|
return cases
|
||||||
if not url.startswith('http:') and not url.startswith('https:'):
|
|
||||||
if not url.startswith('/'):
|
|
||||||
url = '/' + url
|
|
||||||
url = 'https://mos-gorsud.ru' + url
|
|
||||||
return url
|
|
||||||
|
|
||||||
def get_document_text(self, url: str) -> str:
|
|
||||||
print(f'downloading {url}')
|
|
||||||
|
|
||||||
r = requests.get(url, allow_redirects=True, headers=headers)
|
|
||||||
content_disposition = r.headers['Content-Disposition']
|
|
||||||
filename, file_extension = os.path.splitext(re.search('attachment; filename="(.*?)"', content_disposition).group(1))
|
|
||||||
|
|
||||||
tempname = '%s/%s%s' % (tempfile.gettempdir(), strgen(10), file_extension)
|
|
||||||
|
|
||||||
with open(tempname, 'wb') as f:
|
|
||||||
f.write(r.content)
|
|
||||||
|
|
||||||
text = textract.process(tempname).decode('utf-8')
|
|
||||||
os.unlink(tempname)
|
|
||||||
|
|
||||||
return text
|
|
@ -1,6 +1,9 @@
|
|||||||
#!/usr/bin/env python3
|
#!/usr/bin/env python3
|
||||||
import traceback
|
import traceback
|
||||||
from mgs import MGSPiracy
|
import mosgorsud
|
||||||
|
import requests
|
||||||
|
import urllib3.exceptions
|
||||||
|
|
||||||
from argparse import ArgumentParser
|
from argparse import ArgumentParser
|
||||||
from ch1p import State, telegram_notify
|
from ch1p import State, telegram_notify
|
||||||
from html import escape
|
from html import escape
|
||||||
@ -10,21 +13,24 @@ if __name__ == '__main__':
|
|||||||
# parse arguments
|
# parse arguments
|
||||||
parser = ArgumentParser()
|
parser = ArgumentParser()
|
||||||
parser.add_argument('--state-file', required=True)
|
parser.add_argument('--state-file', required=True)
|
||||||
parser.add_argument('--token', help='Telegram bot token', required=True)
|
parser.add_argument('--token', required=True,
|
||||||
parser.add_argument('--chat-id', type=int, help='Telegram chat id (with bot)', required=True)
|
help='Telegram bot token',)
|
||||||
parser.add_argument('--from', type=int, default=1, help='First page', dest='_from')
|
parser.add_argument('--chat-id', type=int, required=True,
|
||||||
parser.add_argument('--to', type=int, default=5, help='Last page')
|
help='Telegram chat id (with bot)')
|
||||||
|
parser.add_argument('--from', type=int, default=1, dest='_from',
|
||||||
|
help='First page')
|
||||||
|
parser.add_argument('--to', type=int, default=5,
|
||||||
|
help='Last page')
|
||||||
parser.add_argument('--domains', nargs='+', required=True)
|
parser.add_argument('--domains', nargs='+', required=True)
|
||||||
|
|
||||||
args = parser.parse_args()
|
arg = parser.parse_args()
|
||||||
|
|
||||||
try:
|
try:
|
||||||
# get recent cases
|
# get recent cases
|
||||||
mgs = MGSPiracy(from_page=args._from, to_page=args.to)
|
cases = mosgorsud.get_cases(from_page=arg._from, to_page=arg.to)
|
||||||
cases = mgs.get_cases()
|
|
||||||
|
|
||||||
# read state
|
# read state
|
||||||
state = State(file=args.state_file,
|
state = State(file=arg.state_file,
|
||||||
default=dict(cases=[]))
|
default=dict(cases=[]))
|
||||||
|
|
||||||
# loop through cases
|
# loop through cases
|
||||||
@ -34,10 +40,10 @@ if __name__ == '__main__':
|
|||||||
continue
|
continue
|
||||||
|
|
||||||
matched = False
|
matched = False
|
||||||
for mydomain in args.domains:
|
for mydomain in arg.domains:
|
||||||
if mydomain in case['decision_text']:
|
if mydomain in case['decision_text']:
|
||||||
matched = True
|
matched = True
|
||||||
results.append('%s found in %s' % (mydomain, case['statement_number']))
|
results.append('%s found in %s (%s)' % (mydomain, case['statement_number'], case['doc_link']))
|
||||||
state['cases'].append(case['statement_number'])
|
state['cases'].append(case['statement_number'])
|
||||||
|
|
||||||
if matched:
|
if matched:
|
||||||
@ -50,14 +56,19 @@ if __name__ == '__main__':
|
|||||||
|
|
||||||
telegram_notify(text=escape(text),
|
telegram_notify(text=escape(text),
|
||||||
parse_mode='HTML',
|
parse_mode='HTML',
|
||||||
token=args.token,
|
token=arg.token,
|
||||||
chat_id=args.chat_id)
|
chat_id=arg.chat_id)
|
||||||
|
|
||||||
except KeyboardInterrupt:
|
except KeyboardInterrupt:
|
||||||
pass
|
pass
|
||||||
|
|
||||||
|
except (TimeoutError, requests.exceptions.ConnectionError, urllib3.exceptions.MaxRetryError):
|
||||||
|
telegram_notify(text='mosgorsud error: network timeout',
|
||||||
|
token=arg.token,
|
||||||
|
chat_id=arg.chat_id)
|
||||||
|
|
||||||
except:
|
except:
|
||||||
telegram_notify(text='error: '+escape(traceback.format_exc()),
|
telegram_notify(text='mosgorsud error: '+escape(traceback.format_exc()),
|
||||||
parse_mode='HTML',
|
parse_mode='HTML',
|
||||||
token=args.token,
|
token=arg.token,
|
||||||
chat_id=args.chat_id)
|
chat_id=arg.chat_id)
|
||||||
|
20
to_csv.py
20
to_csv.py
@ -1,22 +1,26 @@
|
|||||||
#!/usr/bin/env python3
|
#!/usr/bin/env python3
|
||||||
import csv
|
import csv
|
||||||
from mgs import MGSPiracy
|
import mosgorsud
|
||||||
|
|
||||||
from argparse import ArgumentParser
|
from argparse import ArgumentParser
|
||||||
|
|
||||||
|
|
||||||
if __name__ == '__main__':
|
if __name__ == '__main__':
|
||||||
# parse arguments
|
# parse arguments
|
||||||
argp = ArgumentParser()
|
argp = ArgumentParser()
|
||||||
argp.add_argument('--output', type=str, default='output.csv', help='CSV output file name')
|
argp.add_argument('--output', type=str, default='output.csv',
|
||||||
argp.add_argument('--from', type=int, default=0, help='First page', dest='_from')
|
help='CSV output file name')
|
||||||
argp.add_argument('--to', type=int, default=10, help='Last page')
|
argp.add_argument('--from', type=int, default=1, dest='_from',
|
||||||
args = argp.parse_args()
|
help='First page')
|
||||||
|
argp.add_argument('--to', type=int, default=10,
|
||||||
|
help='Last page')
|
||||||
|
arg = argp.parse_args()
|
||||||
|
|
||||||
# get cases
|
# get cases
|
||||||
mgs = MGSPiracy(from_page=args._from, to_page=args.to)
|
cases = mosgorsud.get_cases(from_page=arg._from, to_page=arg.to)
|
||||||
cases = mgs.get_cases()
|
|
||||||
|
|
||||||
# write to csv
|
# write to csv
|
||||||
f = open(args.output, 'w', newline='')
|
f = open(arg.output, 'w', newline='')
|
||||||
csv_writer = csv.writer(f)
|
csv_writer = csv.writer(f)
|
||||||
|
|
||||||
for case in cases:
|
for case in cases:
|
||||||
|
Loading…
x
Reference in New Issue
Block a user