From 09ba8de385765cfba3e283490f821a1c81ae74a7 Mon Sep 17 00:00:00 2001 From: Zachary Vance Date: Thu, 12 Aug 2021 17:52:00 -0700 Subject: [PATCH] v1.2, multithreading and keepalive working --- .gitignore | 3 + __pycache__/lib.cpython-37.pyc | Bin 813 -> 0 bytes __pycache__/sm.cpython-37.pyc | Bin 2664 -> 0 bytes lib.py | 15 ++- main.py | 175 ++++++++++++++++++++------------- sm.py | 37 ++++--- 6 files changed, 143 insertions(+), 87 deletions(-) create mode 100644 .gitignore delete mode 100644 __pycache__/lib.cpython-37.pyc delete mode 100644 __pycache__/sm.cpython-37.pyc diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..5da64ee --- /dev/null +++ b/.gitignore @@ -0,0 +1,3 @@ +*.swp +__pycache__/ +*.py[cod] diff --git a/__pycache__/lib.cpython-37.pyc b/__pycache__/lib.cpython-37.pyc deleted file mode 100644 index 8138fbab496a1270c6f0d7e776549d3a251096ea..0000000000000000000000000000000000000000 GIT binary patch literal 0 HcmV?d00001 literal 813 zcmZ8fy>8nu5I#~rmfgllfeb|_PX$yUHqf~ZiWW#VFWFQSgeuaA0{u7?4LFWH6wZSr zV_u>U!nI5D3Yl_8RYsZv?mJS)`;PohU-$L~fEN8eOn)(eKeXx0!^Ic$<{g3r$qGo| z6eO444M=ax6DHXi#Qp>3VVV(kwyS-QUlia;NVx2~T9$aym%$Arz6@m#V<02s8Q8$& zrP3!ERg+QV=-<$r1B5kOgMc+hL`JM()0kVoN=s#fxy~ySd)9AMG1FA5eS~|^A%@2f zlk>V%lU16Xr}}DQ)Y2TG>ohZYT^(I6)Iv>){O5RnWg{W-DmOv|SlJK@u#l~W?tbGg znW19NDDLh##EpSlxPuqBq+%Ng3Nh2f>@Lm{zepX2x{sWieh??54QI73Q{$wyo)g!6tsRV#N{F_+KeWR^ecjW0^)oIB#b5nX?m_< f#0Fl(-M5&xZ=?LP8b{$K5q)Z#9@7aoh=@M{QwOI^ diff --git a/__pycache__/sm.cpython-37.pyc b/__pycache__/sm.cpython-37.pyc deleted file mode 100644 index 9e0a2d62a390c4fc54f336471ab81d5a560b3b25..0000000000000000000000000000000000000000 GIT binary patch literal 0 HcmV?d00001 literal 2664 zcmZ8jOK%&=5$>KBhr_2RTb2SR;UP$t$cofH5Fm!*coWH41B$HGBU%g+3`Xo0HPSq& z?xs!6&MbnJb5!h8P7Xj`0_3uXAjka^x#ZB;+n#h=AUW+JPW6zHwHczjtNKw>RbAgV z{6($iAb9@#k6Zq~YY2U%l>a>j2oIt033LPz93po%Viy;>=4!>LyE=?oXhf!KVnlS; zB8FQcrfU<+Et8V#kQONu=Q(yOq(Z9Ckz1uTQajbyD_W=eN#g>OI%vPA6{V5J3M$6t z3cAoRitp0qY!^nfNfs_JjQV7OG^QXCyo)-ESKuYu>R@S#Afha79}6+$_jLz-cz6T zL^SO2s2eBokR0(xeO7F^NS=86tM34=lCk0Fm-XXa{|a1}Hb7S#ZjkpZWr$ z@!!E-DbuHT9sTYvzt!_nZs+B!Gyw}!%>N|HpW&<|oJ@ywm;a!xpb1>Z6z9(Pt;~iq z?YyEq652_1QiF5#OnIkem8quQfA+TnG~Ue0uyZ}H&UZOkb*kmHtU75-(WFVLQ~VM$ zCo?AtS*L9wkp2%5lf=B-|j-mwU2_3wv(gwr|+z-t+lt;AMtK`H;4xzT@}ZS z`h=2hYC#otgE%dzF&PP|!%ro04GQ~d~`TuNuP2q zjlLg-(s<^Fr!Wt17yMZuj%ED=&M6Z?5^pe;z}rF^ODp7p1w&~lKuhB!31V4dbQt=5 zD$Ou?Mwzr}OlBr5e>en%DPogG0&TG9#P z;YfJJbOrK!t(b%!js~tNh!@1J1p}B%;04?;k^=XM_fK zs0s`cwt6_4wxzfP-Nbs=3S`0gO>V73e^QsYs_k|p8uQ!gOWVKf48dLtIfW|#m{ zp!lv!a$z=evr1d@12^X2v+RRCD*DQ0^{7$7a2Fc4p@UBjUQuyrTkL(%iy8y(@wkv? z-P^Dr?fao09TNY+4ru>^hSD~KP8ob{J>@m;-Y>l34?vi01w;BNJ@3?{$^3XgrFE3B z$QP^u$JwIl?y62DfUQ6$O+E|*!Isp-ch!Wo`)hB1XH(W;e6qi{y}##ef3WwVG&mrt zzK%Qo*@AlOSvVA8`HK(LDrP z{0Gn>sD8i)AjTM*P+1qcE0z7Uv(Nnjz1QZ^ z>Kvl&$6F9u@llvXd0ickIxVKKJImPuNK)tL97r)ZEGU)zT&+G-3+6}r8~Yp5DF$z6 zV|SZY1fF(h75%#Ch-^HD-ZA$_2H=jy`RKo3A0v!*%k O*|%*Vh-IJ*8~+!>)7Clw diff --git a/lib.py b/lib.py index 8e47e24..27ccca6 100644 --- a/lib.py +++ b/lib.py @@ -1,14 +1,21 @@ #!/usr/local/bin python3 import time +try: + from tqdm import tqdm + tqdm_avail = True +except ImportError: + tqdm = lambda x, **kwargs: x + tqdm_avail = False + class timer(): - def __init__(self, name, print=False): + def __init__(self, name, logger=None): self.name = name - self.print = print + self.logger = logger def __enter__(self): self._time = time.time() def __exit__(self, *exec_info): elapsed = time.time() - self._time - if self.print: - print(" timer[{}] = {}ms".format(self.name, int(elapsed*1000))) + if self.logger is not None: + self.logger(" timer[{}] = {}ms".format(self.name, int(elapsed*1000))) diff --git a/main.py b/main.py index ffcf85c..261ae54 100644 --- a/main.py +++ b/main.py @@ -1,9 +1,16 @@ #!/usr/local/bin python3 import pika, requests -import base64, json, tempfile, time, os, shutil, sys - -import sm -from lib import timer +import base64, concurrent.futures, functools, logging, json, tempfile, time, multiprocessing, os, shutil, sys +import lib, sm + +logging.basicConfig(filename='/var/log/mqlg.log', + filemode='a', + format='[mq-lg] %(threadName)s %(asctime)s,%(msecs)d %(name)s %(levelname)s %(message)s', + datefmt='%H:%M:%S', + level=logging.INFO) +printed = logging.StreamHandler() +printed.setLevel(logging.INFO)#logging.WARNING) +#logging.getLogger().addHandler(printed) # Queues FF_TAR="ff_tar" @@ -17,7 +24,7 @@ USER="sm" PASS="McFnE9I4OdwTB" import socket -IS_GERMINATE = socket.gethostname() == "germinate" +IS_GERMINATE = False #socket.gethostname() == "germinate" # Library functions class QueueConnection(): @@ -34,27 +41,47 @@ class QueueConnection(): def __exit__(self, *exec_info): self.connection.close() -#def threaded_consumer_multiple(queue, consumer_callback): - #while True: - #try: - #except pika.exceptions.ConnectionClosed: - # print("connection close") - # pass - -def threaded_consumer(queue, consumer_callback): - _threaded_consumer(queue, consumer_callback) +def threaded_consumer(queue, response_queue, consumer_callback, threads=None): + if threads is None: + threads = multiprocessing.cpu_count() + executor = concurrent.futures.ThreadPoolExecutor(max_workers=threads) + def _inner_cb(channel, method, properties, message): + # Queue async work on another thread + future = executor.submit(consumer_callback, message) + mq_conn = channel.connection + # Once the thread is complete, finish the remaining work back on this thread + #future.add_done_callback(lambda future: mq_conn.add_callback_threadsafe(functools.partial(_done_cb, channel, method, future))) + future.add_done_callback(lambda future: mq_conn.add_timeout(0, functools.partial(_done_cb, channel, method, future))) + + def _done_cb(channel, method, future): + # Always executed on main thread + response = future.result() + if response is not None: + with lib.timer("send-response 1/2", logging.warning): + channel.basic_publish( + exchange='', + routing_key=response_queue, + body=response, + properties=pika.BasicProperties(delivery_mode=2), + ) + logging.warning(" sent") + #logging.error("skipping ack during debug test") + #sys.exit(0) + channel.basic_ack(delivery_tag=method.delivery_tag) + logging.info(" done") -def _threaded_consumer(queue, consumer_callback): with QueueConnection([queue]) as channel: - channel.basic_qos(prefetch_count=1) + mq_conn = channel.connection + channel.basic_qos(prefetch_count=threads) channel.basic_consume( queue=queue, - consumer_callback=consumer_callback, + consumer_callback=_inner_cb, ) try: - print('[*] Waiting for messages. To exit press CTRL+C') + logging.error('[*] Waiting for messages. To exit press CTRL+C') channel.start_consuming() except KeyboardInterrupt: + executor.shutdown() channel.stop_consuming() def remove_file(path): @@ -76,80 +103,88 @@ def enqueue_all(queue, objects): ) ) -def do_work(response_queue, file_processor): - def _inner_do_work(channel, method, properties, message): - message = json.loads(message.decode('ascii')) - task_id = message["task_id"] - remote_url = message["remote_url"] - remote_output_path = message["remote_output_path"] - local_input_path = message.get("local_input_path") - local_output_path = message.get("local_output_path") - - with tempfile.TemporaryDirectory() as d: - print("received", task_id) - input_path = os.path.join(d, "INPUT.zip") - output_path = os.path.join(d, "OUTPUT.tar.gz") - - if IS_GERMINATE and local_input_path is not None: - print(" local copy", local_input_path) - with timer("copy-input", print=True): - shutil.copyfile(local_input_path, input_path) - else: - print(" downloading", remote_url) - with timer("download", print=True): - with requests.get(remote_url, stream=True) as r: +def do_work(file_processor, message): + """ + File processing RPC. + - Takes in JSON describing the file to work on and where to put it. + - Can directly write the file, returning None, or returns a JSON response + + Any return value will be sent in the return queue. + """ + message = json.loads(message.decode('ascii')) + task_id = message["task_id"] + remote_url = message["remote_url"] + remote_output_path = message["remote_output_path"] + local_input_path = message.get("local_input_path") + local_output_path = message.get("local_output_path") + + with tempfile.TemporaryDirectory(prefix="mqlg.") as d: + logging.error("received {}".format(task_id)) + input_path = os.path.join(d, "INPUT.zip") + output_path = os.path.join(d, "OUTPUT.tar.gz") + + if IS_GERMINATE and local_input_path is not None: + logging.warning(" local copy {}".format(local_input_path)) + with lib.timer("copy-input", logging.info): + shutil.copyfile(local_input_path, input_path) + else: + logging.warning(" downloading {}".format(remote_url)) + with lib.timer("download", logging.warning): + with requests.get(remote_url, stream=True) as r: + if lib.tqdm_avail: + # A nice progress bar but read/write instead of copyfileobj slows it down a little. + total_size = int(r.headers.get('content-length', 0)) + progress_bar = lib.tqdm(desc="download", total=total_size, unit='iB', unit_scale=True) + with open(input_path, 'wb') as f: + for data in r.iter_content(100000): + f.write(data) + progress_bar.update(len(data)) + progress_bar.close() + else: with open(input_path, 'wb') as f: shutil.copyfileobj(r.raw, f) - print(" processing", task_id) - with timer("process", print=True): - file_processor(input_path, output_path) - - if IS_GERMINATE and local_output_path is not None: - print(" local copy of output", local_output_path) - with timer("copy-output", print=True): - shutil.copyfile(output_path, local_output_path) - else: - print(" sending-response", output_path) - with timer("send-response", print=True): - with open(output_path, 'rb') as f: - file_contents = base64.b64encode(f.read()).decode('ascii') - channel.basic_publish( - exchange='', - routing_key=response_queue, - body=json.dumps({"response_id": task_id, "local_output_path": remote_output_path, "content": file_contents}), - properties=pika.BasicProperties(delivery_mode=2), - ) - - channel.basic_ack(delivery_tag=method.delivery_tag) - print(" sent", output_path) - return _inner_do_work - -def do_listen(channel, method, properties, message): + logging.warning(" processing {}".format(task_id)) + with lib.timer("process", logging.warning): + file_processor(input_path, output_path) + + if IS_GERMINATE and local_output_path is not None: + logging.warning(" local copy of output {}".format(local_output_path)) + with lib.timer("copy-output", logging.warning): + shutil.copyfile(output_path, local_output_path) + return None + else: + logging.warning(" sending-response {}".format(output_path)) + with lib.timer("send-response 1/2", logging.warning): + with open(output_path, 'rb') as f: + file_contents = base64.b64encode(f.read()).decode('ascii') + return json.dumps({"response_id": task_id, "local_output_path": remote_output_path, "content": file_contents}) + +def do_listen(message): """Receive extracted text and write it to disk""" response = json.loads(message) response_id = response["response_id"] output_path = response["local_output_path"] - print("received", response_id) + logging.error("received {}".format(response_id)) if os.path.exists(output_path): - print("skipped", output_path) + logging.error("skipped {}".format(output_path)) else: content = base64.b64decode(response["content"].encode('ascii')) with open(output_file, "wb") as f: f.write(content) - print("saved", output_path) - channel.basic_ack(delivery_tag=method.delivery_tag) + logging.warning("saved {}".format(output_path)) if __name__ == '__main__': args = sys.argv if len(args) == 3 and sys.argv[1] == "sm" and sys.argv[2] == "worker": - threaded_consumer(sm.QUEUE, do_work(sm.QUEUE_RESP, sm.extract_text)) + threaded_consumer(sm.QUEUE, sm.QUEUE_RESP, functools.partial(do_work, sm.extract_text)) elif len(args) == 3 and sys.argv[1] == "sm" and sys.argv[2] == "listener": threaded_consumer(sm.QUEUE_RESP, do_listen) elif len(args) == 3 and sys.argv[1] == "sm" and sys.argv[2] == "enqueue": enqueue_all(sm.QUEUE, sm.generate_tasks()) elif len(args) == 5 and sys.argv[1] == "sm" and sys.argv[2] == "debug": + printed.setLevel(logging.DEBUG) sm.extract_text(args[3], args[4], debug=True) else: print("Usage: {} sm worker|listener|enqueue".format(sys.argv[0].split("/")[-1])) diff --git a/sm.py b/sm.py index 008a878..b221c42 100644 --- a/sm.py +++ b/sm.py @@ -1,9 +1,10 @@ #!/usr/local/bin python3 -import tempfile, os, subprocess, sys -from lib import timer +import logging, tempfile, os, subprocess, sys +import lib SM_URL='https://germinate.za3k.com{path}' SM_OUTPUT_PATH='/jbod0/text-scimag/{path}' +PDF_CONVERSION_TIMEOUT = 30 QUEUE='sm_zip' QUEUE_RESP='sm_zip_resp' @@ -12,20 +13,20 @@ ERROR_FILE="/var/tmp/sm.nonpdfs" def extract_text(input_path, output_path, debug=False): """Extract text from a .zip file of ~1000 PDFs. Single-threaded.""" - with tempfile.TemporaryDirectory() as td_in, tempfile.TemporaryDirectory() as td_out: + with tempfile.TemporaryDirectory(prefix="mqlg-in.") as td_in, tempfile.TemporaryDirectory(prefix="mqlg-out.") as td_out: if debug: td_in = "tmp.in" td_out = "tmp.out"; os.mkdir(td_out) - #print("working in temp dirs: ", td_in, td_out) + logging.info("working in temp dirs. in:{} out:{}".format(td_in, td_out)) # Extract .zip file - with timer("zip", print=debug): + with lib.timer("zip", logging.info): subprocess.call(["unzip", "-q", "-n", input_path, "-d", td_in]) # Make a list of pdf files extracted pdfs = [] other = [] - with timer("ls", print=debug): + with lib.timer("ls", logging.info): for dirname, subdirList, fileList in os.walk(td_in): assert dirname.startswith(td_in) short = dirname[len(td_in):].lstrip("/") @@ -37,22 +38,32 @@ def extract_text(input_path, output_path, debug=False): pdfs.append(file_name) else: other.append(fname) - print(" other", input_path, fname) + logging.warning(" other {} {}".format(input_path, fname)) if len(other) > 0: with open(ERROR_FILE, "a") as logfile: - print(input_path, fname, file=logfile) + print("unrecognized-file", input_path, fname, file=logfile) # Extract the text from each (single-threaded) # INPUT.pdf -> INPUT.pdf.txt - with timer("pdftotext", print=debug): - for pdf in pdfs: + with lib.timer("pdftotext", logging.warning): + for pdf in lib.tqdm(pdfs, desc="pdftotext"): input_pdf = os.path.join(td_in, pdf) output_txt = os.path.join(td_out, pdf + ".txt") - subprocess.call(["pdftotext", input_pdf, output_txt]) + #logging.info("converting {} -> {}".format(input_pdf, output_txt)) + try: + subprocess.call( + ["pdftotext", input_pdf, output_txt], + timeout=PDF_CONVERSION_TIMEOUT, + stderr=subprocess.DEVNULL, + ) + except subprocess.TimeoutExpired: + with open(ERROR_FILE, "a") as logfile: + print("timeout pdftotext", PDF_CONVERSION_TIMEOUT, input_path, input_pdf, output_txt, file=logfile) + logging.warning(" timeout-pdftotext {}s {}".format(PDF_CONVERSION_TIMEOUT, input_pdf)) # Put the results into a .tar.gz file - with timer("tar", print=debug): + with lib.timer("tar", logging.warning): output_path = os.path.abspath(output_path) subprocess.call( ["tar", "czf", output_path, "."], # gzip = z, J = '.xz' @@ -86,6 +97,6 @@ if __name__ == '__main__': if len(sys.argv) == 3: extract_text(sys.argv[1], sys.argv[2]) else: - print(sys.argv) + print(sys.argv, file=sys.stderr) print("Usage: sm-extract INPUT.zip OUTPUT.tar.gz") sys.exit(1) -- 2.47.3