bookscan_utils/pagenum-mass.py
2023-10-14 11:35:01 +03:00

140 lines
4.4 KiB
Python
Executable File

#!/usr/bin/env python3
import os
import logging
import shutil
import sys
from queue import Queue, Empty
from threading import Thread, Lock
from argparse import ArgumentParser, ArgumentError
from typing import Optional
from pagenum.system import ensure_dependencies
from pagenum.image import pdf2png, zonecrop, img2pagenum, Zone
_logger = logging.getLogger(__name__)
_queue = Queue()
_zones: Optional[list[Zone]]
_pretend = False
_max_page_num_length = None
_rename_lock = Lock()
_outdir: str
_indir: str
_psm: Optional[int] = None
def safe_copyfile(oldname, newname):
with _rename_lock:
if os.path.exists(os.path.join(_outdir, newname)):
filename, fileext = os.path.splitext(newname)
probe = 1
while True:
newname2 = f'{filename}-v{probe}{fileext}'
if os.path.exists(os.path.join(_outdir, newname2)):
probe += 1
else:
newname = newname2
break
shutil.copyfile(
os.path.join(_indir, oldname),
os.path.join(_outdir, newname)
)
class PagenumWorker(Thread):
def __init__(self, name):
Thread.__init__(self)
self.name = name
def run(self):
global _queue
while True:
try:
self.process(_queue.get(block=False))
except Empty:
break
def process(self, file):
file_path = os.path.join(_indir, file)
png_file = pdf2png(file_path)
num = None
for z in _zones:
cropped = zonecrop(png_file, z)
i2pn_kw = {}
if _psm:
i2pn_kw['psm'] = _psm
num = img2pagenum(cropped, args.max_page_num_length, **i2pn_kw)
os.unlink(cropped)
if num is not None:
break
os.unlink(png_file)
if _pretend:
_logger.info(f'{file}: {num}')
else:
newname = f'{num}.pdf' if num is not None else 'unknown.pdf'
safe_copyfile(file, newname)
_logger.debug(f'{file} done ({num})')
if __name__ == '__main__':
ensure_dependencies()
parser = ArgumentParser()
parser.add_argument('-i', '--input-directory', type=str, required=True)
parser.add_argument('-o', '--output-directory', type=str, required=True)
parser.add_argument('-z', '--zones', type=str, nargs='+', required=True,
help=("One or more zones to search for page number. Format: zone,w,h,mt,mr,mb,ml. "
"See pagenumb-probe.py for more info"))
parser.add_argument('-t', '--threads', type=int, default=4,
help="Number of threads")
parser.add_argument('-p', '--pretend', action='store_true',
help="Don't save files but print info to stdout")
parser.add_argument('--max-page-num-length', type=int, default=3)
parser.add_argument('--tesseract-psm', type=int)
parser.add_argument('-V', '--verbose', action='store_true')
args = parser.parse_args()
logging.basicConfig(level=logging.DEBUG if args.verbose else logging.INFO)
_zones = list(map(lambda zs: Zone.from_string(zs), args.zones))
_pretend = args.pretend
_max_page_num_length = args.max_page_num_length
_outdir = args.output_directory
_indir = args.input_directory
if args.tesseract_psm:
_psm = args.tesseract_psm
if not os.path.isdir(_indir):
raise OSError(f'{_indir}: no such directory')
if _indir == _outdir:
raise ArgumentError(None, '--input-directory must be different than --output-directory')
if not os.path.isdir(_outdir):
os.makedirs(_outdir)
else:
try:
input(f'Directory {_outdir} already exists. Press ENTER to erase it and continue or Ctrl+C to exit.')
except KeyboardInterrupt:
sys.exit(0)
shutil.rmtree(_outdir)
os.makedirs(_outdir)
files = os.listdir(_indir)
for file in files:
if not os.path.isfile(os.path.join(_indir, file)):
continue
filename, fileext = os.path.splitext(file)
if fileext.lower() != '.pdf':
continue
_queue.put(file)
threads = []
for i in range(args.threads):
thread = PagenumWorker(f'thread-{i}')
thread.start()
threads.append(thread)
for thread in threads:
thread.join()