2021-12-28 01:13:47 +03:00

40 lines
1.1 KiB
Python

import logging
from threading import Thread
from lib.scanner import TCPScanner
logger = logging.getLogger(__name__)
class Worker(Thread):
def __init__(self, name, host, opened=None, ignore=None, concurrency=None, timeout=None):
Thread.__init__(self)
assert concurrency is not None
self.done = False
self.name = name
self.concurrency = concurrency
self.opened = opened
self.ignore = ignore
scanner_kw = {}
if timeout is not None:
scanner_kw['timeout'] = timeout
self.scanner = TCPScanner(host, range(0, 65535), **scanner_kw)
def run(self):
logger.info(f'starting {self.name} ({self.concurrency} threads)')
self.scanner.scan(num_threads=self.concurrency)
self.done = not self.scanner.failed
logger.info(f'finished {self.name}')
def get_results(self):
return self.scanner.results
def is_expected(self, port):
return ((self.opened is not None) and (port in self.opened)) or ((self.ignore is not None) and (port in self.ignore))
def get_host(self):
return self.scanner.host