]> git.za3k.com Git - mqlg.git/commitdiff
v1.2, multithreading and keepalive working
authorZachary Vance <za3k@za3k.com>
Fri, 13 Aug 2021 00:52:00 +0000 (17:52 -0700)
committerZachary Vance <za3k@za3k.com>
Fri, 13 Aug 2021 00:53:55 +0000 (17:53 -0700)
.gitignore [new file with mode: 0644]
__pycache__/lib.cpython-37.pyc [deleted file]
__pycache__/sm.cpython-37.pyc [deleted file]
lib.py
main.py
sm.py

diff --git a/.gitignore b/.gitignore
new file mode 100644 (file)
index 0000000..5da64ee
--- /dev/null
@@ -0,0 +1,3 @@
+*.swp
+__pycache__/
+*.py[cod]
diff --git a/__pycache__/lib.cpython-37.pyc b/__pycache__/lib.cpython-37.pyc
deleted file mode 100644 (file)
index 8138fba..0000000
Binary files a/__pycache__/lib.cpython-37.pyc and /dev/null differ
diff --git a/__pycache__/sm.cpython-37.pyc b/__pycache__/sm.cpython-37.pyc
deleted file mode 100644 (file)
index 9e0a2d6..0000000
Binary files a/__pycache__/sm.cpython-37.pyc and /dev/null differ
diff --git a/lib.py b/lib.py
index 8e47e24813751d88c3d505478aeffb3fc241fe5b..27ccca6972799aa5b9d9ccc33ec1b772e8942173 100644 (file)
--- a/lib.py
+++ b/lib.py
@@ -1,14 +1,21 @@
 #!/usr/local/bin python3
 import time
 
+try:
+    from tqdm import tqdm
+    tqdm_avail = True
+except ImportError:
+    tqdm = lambda x, **kwargs: x
+    tqdm_avail = False
+
 class timer():
-    def __init__(self, name, print=False):
+    def __init__(self, name, logger=None):
         self.name = name
-        self.print = print
+        self.logger = logger
     def __enter__(self):
         self._time = time.time()
     def __exit__(self, *exec_info):
         elapsed = time.time() - self._time
-        if self.print:
-            print(" timer[{}] = {}ms".format(self.name, int(elapsed*1000)))
+        if self.logger is not None:
+            self.logger(" timer[{}] = {}ms".format(self.name, int(elapsed*1000)))
 
diff --git a/main.py b/main.py
index ffcf85cb04ffa89d826f05b0aaa66f28db39d7db..261ae5454fb80c341d280976ddc5ec5402e3d751 100644 (file)
--- a/main.py
+++ b/main.py
@@ -1,9 +1,16 @@
 #!/usr/local/bin python3
 import pika, requests
-import base64, json, tempfile, time, os, shutil, sys
-
-import sm
-from lib import timer
+import base64, concurrent.futures, functools, logging, json, tempfile, time, multiprocessing, os, shutil, sys
+import lib, sm
+
+logging.basicConfig(filename='/var/log/mqlg.log',
+                                       filemode='a',
+                                       format='[mq-lg] %(threadName)s %(asctime)s,%(msecs)d %(name)s %(levelname)s %(message)s',
+                                       datefmt='%H:%M:%S',
+                                       level=logging.INFO)
+printed = logging.StreamHandler()
+printed.setLevel(logging.INFO)#logging.WARNING)
+#logging.getLogger().addHandler(printed)
 
 # Queues
 FF_TAR="ff_tar"
@@ -17,7 +24,7 @@ USER="sm"
 PASS="McFnE9I4OdwTB"
 
 import socket
-IS_GERMINATE = socket.gethostname() == "germinate"
+IS_GERMINATE = False #socket.gethostname() == "germinate"
 
 # Library functions
 class QueueConnection():
@@ -34,27 +41,47 @@ class QueueConnection():
     def __exit__(self, *exec_info):
         self.connection.close()
 
-#def threaded_consumer_multiple(queue, consumer_callback):
-    #while True:
-        #try:
-        #except pika.exceptions.ConnectionClosed:
-        #    print("connection close")
-        #    pass
-
-def threaded_consumer(queue, consumer_callback):
-    _threaded_consumer(queue, consumer_callback)
+def threaded_consumer(queue, response_queue, consumer_callback, threads=None):
+    if threads is None:
+        threads = multiprocessing.cpu_count()
+    executor = concurrent.futures.ThreadPoolExecutor(max_workers=threads)
+    def _inner_cb(channel, method, properties, message):
+        # Queue async work on another thread
+        future = executor.submit(consumer_callback, message)
+        mq_conn = channel.connection
+        # Once the thread is complete, finish the remaining work back on this thread
+        #future.add_done_callback(lambda future: mq_conn.add_callback_threadsafe(functools.partial(_done_cb, channel, method, future)))
+        future.add_done_callback(lambda future: mq_conn.add_timeout(0, functools.partial(_done_cb, channel, method, future)))
+
+    def _done_cb(channel, method, future):
+        # Always executed on main thread
+        response = future.result()
+        if response is not None:
+            with lib.timer("send-response 1/2", logging.warning):
+                channel.basic_publish(
+                    exchange='',
+                    routing_key=response_queue,
+                    body=response,
+                    properties=pika.BasicProperties(delivery_mode=2),
+                )
+                logging.warning(" sent")
+        #logging.error("skipping ack during debug test")
+        #sys.exit(0)
+        channel.basic_ack(delivery_tag=method.delivery_tag)
+        logging.info(" done")
 
-def _threaded_consumer(queue, consumer_callback):
     with QueueConnection([queue]) as channel:
-        channel.basic_qos(prefetch_count=1)
+        mq_conn = channel.connection
+        channel.basic_qos(prefetch_count=threads)
         channel.basic_consume(
             queue=queue,
-            consumer_callback=consumer_callback,
+            consumer_callback=_inner_cb,
         )
         try:
-            print('[*] Waiting for messages. To exit press CTRL+C')
+            logging.error('[*] Waiting for messages. To exit press CTRL+C')
             channel.start_consuming()
         except KeyboardInterrupt:
+            executor.shutdown()
             channel.stop_consuming()
 
 def remove_file(path):
@@ -76,80 +103,88 @@ def enqueue_all(queue, objects):
                 )
             )
 
-def do_work(response_queue, file_processor):
-    def _inner_do_work(channel, method, properties, message):
-        message = json.loads(message.decode('ascii'))
-        task_id = message["task_id"]
-        remote_url = message["remote_url"]
-        remote_output_path = message["remote_output_path"]
-        local_input_path = message.get("local_input_path")
-        local_output_path = message.get("local_output_path")
-
-        with tempfile.TemporaryDirectory() as d:
-            print("received", task_id)
-            input_path = os.path.join(d, "INPUT.zip")
-            output_path = os.path.join(d, "OUTPUT.tar.gz")
-
-            if IS_GERMINATE and local_input_path is not None:
-                print(" local copy", local_input_path)
-                with timer("copy-input", print=True):
-                    shutil.copyfile(local_input_path, input_path)
-            else:
-                print(" downloading", remote_url)
-                with timer("download", print=True):
-                    with requests.get(remote_url, stream=True) as r:
+def do_work(file_processor, message):
+    """
+    File processing RPC.
+    - Takes in JSON describing the file to work on and where to put it.
+    - Can directly write the file, returning None, or returns a JSON response
+    
+    Any return value will be sent in the return queue.
+    """
+    message = json.loads(message.decode('ascii'))
+    task_id = message["task_id"]
+    remote_url = message["remote_url"]
+    remote_output_path = message["remote_output_path"]
+    local_input_path = message.get("local_input_path")
+    local_output_path = message.get("local_output_path")
+
+    with tempfile.TemporaryDirectory(prefix="mqlg.") as d:
+        logging.error("received {}".format(task_id))
+        input_path = os.path.join(d, "INPUT.zip")
+        output_path = os.path.join(d, "OUTPUT.tar.gz")
+
+        if IS_GERMINATE and local_input_path is not None:
+            logging.warning(" local copy {}".format(local_input_path))
+            with lib.timer("copy-input", logging.info):
+                shutil.copyfile(local_input_path, input_path)
+        else:
+            logging.warning(" downloading {}".format(remote_url))
+            with lib.timer("download", logging.warning):
+                with requests.get(remote_url, stream=True) as r:
+                    if lib.tqdm_avail:
+                        # A nice progress bar but read/write instead of copyfileobj slows it down a little.
+                        total_size = int(r.headers.get('content-length', 0))
+                        progress_bar = lib.tqdm(desc="download", total=total_size, unit='iB', unit_scale=True)
+                        with open(input_path, 'wb') as f:
+                            for data in r.iter_content(100000):
+                                f.write(data)
+                                progress_bar.update(len(data))
+                            progress_bar.close()
+                    else:
                         with open(input_path, 'wb') as f:
                             shutil.copyfileobj(r.raw, f)
 
-            print(" processing", task_id)
-            with timer("process", print=True):
-                file_processor(input_path, output_path)
-
-            if IS_GERMINATE and local_output_path is not None:
-                print(" local copy of output", local_output_path)
-                with timer("copy-output", print=True):
-                    shutil.copyfile(output_path, local_output_path)
-            else:
-                print(" sending-response", output_path)
-                with timer("send-response", print=True):
-                    with open(output_path, 'rb') as f:
-                        file_contents = base64.b64encode(f.read()).decode('ascii')
-                    channel.basic_publish(
-                        exchange='',
-                        routing_key=response_queue,
-                        body=json.dumps({"response_id": task_id, "local_output_path": remote_output_path, "content": file_contents}),
-                        properties=pika.BasicProperties(delivery_mode=2),
-                    )
-
-        channel.basic_ack(delivery_tag=method.delivery_tag)
-        print(" sent", output_path)
-    return _inner_do_work
-
-def do_listen(channel, method, properties, message):
+        logging.warning(" processing {}".format(task_id))
+        with lib.timer("process", logging.warning):
+            file_processor(input_path, output_path)
+
+        if IS_GERMINATE and local_output_path is not None:
+            logging.warning(" local copy of output {}".format(local_output_path))
+            with lib.timer("copy-output", logging.warning):
+                shutil.copyfile(output_path, local_output_path)
+            return None
+        else:
+            logging.warning(" sending-response {}".format(output_path))
+            with lib.timer("send-response 1/2", logging.warning):
+                with open(output_path, 'rb') as f:
+                    file_contents = base64.b64encode(f.read()).decode('ascii')
+                    return json.dumps({"response_id": task_id, "local_output_path": remote_output_path, "content": file_contents})
+
+def do_listen(message):
     """Receive extracted text and write it to disk"""
     response = json.loads(message)
     response_id = response["response_id"]
     output_path = response["local_output_path"]
 
-    print("received", response_id)
+    logging.error("received {}".format(response_id))
     if os.path.exists(output_path):
-        print("skipped", output_path)
+        logging.error("skipped {}".format(output_path))
     else:
         content = base64.b64decode(response["content"].encode('ascii'))
         with open(output_file, "wb") as f:
             f.write(content)
-        print("saved", output_path)
-    channel.basic_ack(delivery_tag=method.delivery_tag)
+        logging.warning("saved {}".format(output_path))
 
 if __name__ == '__main__':
     args = sys.argv
     if   len(args) == 3 and sys.argv[1] == "sm" and sys.argv[2] == "worker":
-        threaded_consumer(sm.QUEUE, do_work(sm.QUEUE_RESP, sm.extract_text))
+        threaded_consumer(sm.QUEUE, sm.QUEUE_RESP, functools.partial(do_work, sm.extract_text))
     elif len(args) == 3 and sys.argv[1] == "sm" and sys.argv[2] == "listener":
         threaded_consumer(sm.QUEUE_RESP, do_listen)
     elif len(args) == 3 and sys.argv[1] == "sm" and sys.argv[2] == "enqueue":
         enqueue_all(sm.QUEUE, sm.generate_tasks())
     elif len(args) == 5 and sys.argv[1] == "sm" and sys.argv[2] == "debug":
+        printed.setLevel(logging.DEBUG)
         sm.extract_text(args[3], args[4], debug=True)
     else:
         print("Usage: {} sm worker|listener|enqueue".format(sys.argv[0].split("/")[-1]))
diff --git a/sm.py b/sm.py
index 008a878a590ffa7f31868850e5f7353f6464c4dd..b221c42606905beca09be8e518b2581d8f326290 100644 (file)
--- a/sm.py
+++ b/sm.py
@@ -1,9 +1,10 @@
 #!/usr/local/bin python3
-import tempfile, os, subprocess, sys
-from lib import timer
+import logging, tempfile, os, subprocess, sys
+import lib
 
 SM_URL='https://germinate.za3k.com{path}'
 SM_OUTPUT_PATH='/jbod0/text-scimag/{path}'
+PDF_CONVERSION_TIMEOUT = 30
 
 QUEUE='sm_zip'
 QUEUE_RESP='sm_zip_resp'
@@ -12,20 +13,20 @@ ERROR_FILE="/var/tmp/sm.nonpdfs"
 
 def extract_text(input_path, output_path, debug=False):
     """Extract text from a .zip file of ~1000 PDFs. Single-threaded."""
-    with tempfile.TemporaryDirectory() as td_in, tempfile.TemporaryDirectory() as td_out:
+    with tempfile.TemporaryDirectory(prefix="mqlg-in.") as td_in, tempfile.TemporaryDirectory(prefix="mqlg-out.") as td_out:
         if debug:
             td_in = "tmp.in"
             td_out = "tmp.out"; os.mkdir(td_out)
-        #print("working in temp dirs: ", td_in, td_out)
+        logging.info("working in temp dirs. in:{} out:{}".format(td_in, td_out))
 
         # Extract .zip file
-        with timer("zip", print=debug):
+        with lib.timer("zip", logging.info):
             subprocess.call(["unzip", "-q", "-n", input_path, "-d", td_in])
 
         # Make a list of pdf files extracted
         pdfs = []
         other = []
-        with timer("ls", print=debug):
+        with lib.timer("ls", logging.info):
             for dirname, subdirList, fileList in os.walk(td_in):
                 assert dirname.startswith(td_in)
                 short = dirname[len(td_in):].lstrip("/")
@@ -37,22 +38,32 @@ def extract_text(input_path, output_path, debug=False):
                         pdfs.append(file_name)
                     else:
                         other.append(fname)
-                        print(" other", input_path, fname)
+                        logging.warning(" other {} {}".format(input_path, fname))
 
         if len(other) > 0:
             with open(ERROR_FILE, "a") as logfile:
-                print(input_path, fname, file=logfile)
+                print("unrecognized-file", input_path, fname, file=logfile)
 
         # Extract the text from each (single-threaded)
         # INPUT.pdf -> INPUT.pdf.txt
-        with timer("pdftotext", print=debug):
-            for pdf in pdfs:
+        with lib.timer("pdftotext", logging.warning):
+            for pdf in lib.tqdm(pdfs, desc="pdftotext"):
                 input_pdf = os.path.join(td_in, pdf)
                 output_txt = os.path.join(td_out, pdf + ".txt")
-                subprocess.call(["pdftotext", input_pdf, output_txt])
+                #logging.info("converting {} -> {}".format(input_pdf, output_txt))
+                try:
+                    subprocess.call(
+                        ["pdftotext", input_pdf, output_txt],
+                        timeout=PDF_CONVERSION_TIMEOUT,
+                        stderr=subprocess.DEVNULL,
+                    )
+                except subprocess.TimeoutExpired:
+                    with open(ERROR_FILE, "a") as logfile:
+                        print("timeout pdftotext", PDF_CONVERSION_TIMEOUT, input_path, input_pdf, output_txt, file=logfile)
+                        logging.warning(" timeout-pdftotext {}s {}".format(PDF_CONVERSION_TIMEOUT, input_pdf))
 
         # Put the results into a .tar.gz file
-        with timer("tar", print=debug):
+        with lib.timer("tar", logging.warning):
             output_path = os.path.abspath(output_path)
             subprocess.call(
                 ["tar", "czf", output_path, "."], # gzip = z, J = '.xz'
@@ -86,6 +97,6 @@ if __name__ == '__main__':
     if len(sys.argv) == 3:
         extract_text(sys.argv[1], sys.argv[2])
     else:
-        print(sys.argv)
+        print(sys.argv, file=sys.stderr)
         print("Usage: sm-extract INPUT.zip OUTPUT.tar.gz")
         sys.exit(1)