]> git.za3k.com Git - mqlg.git/commitdiff
initial commit
authorZachary Vance <za3k@za3k.com>
Thu, 12 Aug 2021 00:15:32 +0000 (17:15 -0700)
committerZachary Vance <za3k@za3k.com>
Thu, 12 Aug 2021 00:15:32 +0000 (17:15 -0700)
main.py
sm-extract.py [new file with mode: 0644]

diff --git a/main.py b/main.py
index 00f39d4ae58561438a6ed9d975074c2f56608540..241ea92dbd14020a5ca73821563e32d6b67dc5a1 100644 (file)
--- a/main.py
+++ b/main.py
@@ -1,11 +1,12 @@
 #!/usr/local/bin python3
 import pika, requests
 import base64, json, multiprocessing, tempfile, time, os, shutil, subprocess, sys
+import sm_extract 
 
 SM_ZIP='sm_zip'
 SM_ZIP_RESP='sm_zip_resp'
 HOST='https://germinate.za3k.com'
-OUTPUT_DIR='/jbod0/text-scimag'
+SM_OUTPUT_DIR='/jbod0/text-scimag'
 
 USER="sm"
 PASS="McFnE9I4OdwTB"
@@ -14,56 +15,55 @@ import socket
 IS_GERMINATE = socket.gethostname() == "germinate"
 
 pool = None
-def pool():
+def make_pool():
     global pool
     if pool is None:
         pool = multiprocessing.Pool()
     return pool
 
 class timer():
-    def __init__(self, name):
+    def __init__(self, name, print=False):
         self.name = name
+        self.print = print
     def __enter__(self):
         self._time = time.time()
     def __exit__(self, *exec_info):
         elapsed = time.time() - self._time
-        print("timer[{}] = {}ms".format(self.name, int(elapsed*1000)))
+        if self.print:
+            print(" timer[{}] = {}ms".format(self.name, int(elapsed*1000)))
 
-def main(args):
-    credentials = pika.PlainCredentials(USER, PASS)
-    connection = pika.BlockingConnection(pika.ConnectionParameters('localhost', 5672, '/', credentials))
-    channel = connection.channel()
+class QueueConnection():
+    def __init__(self):
+        pass
+    def __enter__(self):
+        credentials = pika.PlainCredentials(USER, PASS)
+        self.connection = pika.BlockingConnection(pika.ConnectionParameters('localhost', 5672, '/', credentials))
+        self.channel = self.connection.channel()
 
-    channel.queue_declare(queue=SM_ZIP, durable=True)
-    channel.queue_declare(queue=SM_ZIP_RESP, durable=True)
-    if len(args) <= 1 or args[1] == "worker":
-        worker_sm(channel)
-    if args[1] == "listener":
-        listener_sm(channel)
-    elif args[1] == "enqueue":
-        enqueuer_sm(channel)
-    elif args[1] == "debug":
-        txt_extract_sm(args[2], args[3], debug=True)
-    else:
-        print("Usage: rabbitmq.py worker|enqueue")
-        sys.exit(1)
-    connection.close()
-def worker_sm(channel):
-    channel.basic_consume(
-        queue=SM_ZIP,
-        consumer_callback=do_work_sm,
-    )
-    channel.basic_qos(prefetch_count=1)
-    print(' [*] Waiting for messages. To exit press CTRL+C')
-    channel.start_consuming()
-def listener_sm(channel):
-    channel.basic_consume(
-        queue=SM_ZIP_RESP,
-        consumer_callback=do_listen_sm,
-    )
-    channel.basic_qos(prefetch_count=1)
-    print(' [*] Waiting for messages. To exit press CTRL+C')
-    channel.start_consuming()
+        self.channel.queue_declare(queue=SM_ZIP, durable=True)
+        self.channel.queue_declare(queue=SM_ZIP_RESP, durable=True)
+        return self.channel
+    def __exit__(self, *exec_info):
+        self.connection.close()
+
+def threaded_consumer(queue, consumer_callback):
+    while True:
+        try:
+            with QueueConnection() as channel:
+                channel.basic_qos(prefetch_count=1)
+                channel.basic_consume(
+                    queue=queue,
+                    consumer_callback=consumer_callback,
+                )
+                print('[*] Waiting for messages. To exit press CTRL+C')
+                channel.start_consuming()
+        except pika.exceptions.ConnectionClosed:
+            pass
+
+def worker_sm():
+    threaded_consumer(SM_ZIP, do_work_sm)
+def listener_sm():
+    threaded_consumer(SM_ZIP_RESP, do_listen_sm)
 
 # Library functions
 def remove_file(path):
@@ -73,115 +73,113 @@ def remove_file(path):
         pass
 
 # "Meat" -- actually specific to what's going on.
-def enqueuer_sm(channel):
+def enqueuer_sm():
     """Load up all the text extraction work to be done"""
-    for drive_num in range(0,9):
-        drive="/libgen12-{drive_num:02}".format(drive_num=drive_num)
-        for dirname, subdirList, fileList in os.walk(drive):
-            for fname in fileList:
-                if fname.endswith(".zip"):
-                    fpath = os.path.join(dirname, fname)
-                    url=HOST + fpath
-                    print("enqueue", url)
-                    channel.basic_publish(
-                        exchange='',
-                        routing_key=SM_ZIP,
-                        body=url,
-                    )
-                    sys.exit(0)
+    with QueueConnection() as channel:
+        urls = []
+        for drive_num in range(1,9):
+            drive="/libgen12-{drive_num:02}".format(drive_num=drive_num)
+            for dirname, subdirList, fileList in os.walk(drive):
+                for fname in fileList:
+                    if fname.endswith(".zip"):
+                        fpath = os.path.join(dirname, fname)
+                        url=HOST + fpath
+                        urls.append(url)
+        urls.sort()
+        for url in urls:
+            channel.basic_publish(
+                exchange='',
+                routing_key=SM_ZIP,
+                body=url,
+                properties=pika.spec.BasicProperties(
+                    delivery_mode=2,
+                )
+            )
 
 def do_listen_sm(channel, method, properties, content):
     """Receive extracted text and write it to disk"""
     response = json.loads(content)
-    output_file = os.path.join(OUTPUT_DIR, response["filename"])
+    output_path = os.path.join(SM_OUTPUT_DIR, response["filename"])
 
     print("saving", output_file)
     content = base64.b64decode(response["content"].encode('ascii'))
-    if os.path.exists(output_file):
-        print("skipped", output_file)
+    if os.path.exists(output_path):
+        print("skipped", output_path)
     else:
         with open(output_file, "wb") as f:
             f.write(content)
-        print("saved", output_file)
+        print("saved", output_path)
     channel.basic_ack(delivery_tag=method.delivery_tag)
 
-def txt_extract_sm(input_path, output_path, debug=False):
-    """Extract text from a .zip file of ~1000 PDFs"""
-    # Step 1: Extract
-    with tempfile.TemporaryDirectory() as td_in, tempfile.TemporaryDirectory() as td_out:
-        if debug:
-            td_in = "tmp.in"
-            td_out = "tmp.out"; os.mkdir(td_out)
-        print("working in temp dirs: ", td_in, td_out)
-        with timer("zip"):
-            subprocess.call(["unzip", "-q", "-n", input_path, "-d", td_in])
-        pdfs = []
-        with timer("ls"):
-            for dirname, subdirList, fileList in os.walk(td_in):
-                assert dirname.startswith(td_in)
-                short = dirname[len(td_in):].lstrip("/")
-                for dname in subdirList:
-                    os.mkdir(os.path.join(dirname.replace(td_in, td_out), dname))
-                for fname in fileList:
-                    if fname.endswith(".pdf"):
-                        pdf = os.path.join(short, fname)
-                        pdfs.append(pdf)
-                    else:
-                        print(fname)
-        with timer("pdftotext"):
-            input_pdf = os.path.join(td_in, pdf)
-            output_txt = os.path.join(td_out, pdf + ".txt")
-            subprocess.call(["pdftotext", input_pdf, output_txt])
-        with timer("tar"):
-            subprocess.call(["tar", "czf", output_path, td_out])
-
 def do_work_sm(channel, method, properties, url):
     url = url.decode('ascii')
     input_filename = url.split("/")[-1]
-    output_filename = input_filename + ".out"
+    output_filename = input_filename[:-len(".zip")] + "-text.tar.gz"
+    output_path = url.split("/")[-2] + "/" + output_filename
 
     if IS_GERMINATE:
         source_file = url.replace(HOST, "")
-        final_save_path = os.path.join(OUTPUT_DIR, output_filename)
+        final_save_path = os.path.join(SM_OUTPUT_DIR, output_path)
 
     try:
         if IS_GERMINATE:
             print("local copy", source_file)
-            with timer("copy"):
+            with timer("copy-input", print=True):
                 shutil.copyfile(source_file, input_filename)
         else:
-            print("downloading", url)
+            print("downloading", url, print=True)
             with timer("download"):
                 response = requests.get(url)
                 with requests.get(url, stream=True) as r:
                     with open(input_filename, 'wb') as f:
                         shutil.copyfileobj(r.raw, f)
 
-        print("processing", input_filename)
-        with timer("process"):
-            txt_extract_sm(input_filename, output_filename)
+        print(" processing", input_filename)
+        with timer("process", print=True):
+            sm_extract.txt_extract_sm(input_filename, output_filename)
 
         if IS_GERMINATE:
-            print("local copy of output", output_filename)
-            with timer("copy"):
+            print(" local copy of output", final_save_path)
+            with timer("copy-output", print=True):
                 shutil.copyfile(output_filename, final_save_path)
         else:
-            print("sending-response", output_filename)
-            with timer("send-response"):
+            print(" sending-response", output_path)
+            with timer("send-response", print=True):
                 with open(output_filename, 'rb') as f:
                     file_contents = base64.b64encode(f.read()).decode('ascii')
                 channel.basic_publish(
                     exchange='',
                     routing_key=SM_ZIP_RESP,
-                    body=json.dumps({"url": url, "filename": output_filename, "content": file_contents}),
+                    body=json.dumps({"url": url, "filename": output_path, "content": file_contents}),
+                    delivery_mode=2,
                 )
     finally:
         remove_file(input_filename)
         remove_file(output_filename)
 
     channel.basic_ack(delivery_tag=method.delivery_tag)
-    print("sent", output_filename)
-
+    print(" sent", output_filename)
 
 if __name__ == '__main__':
-    main(sys.argv)
+    args = sys.argv
+    if len(args) <= 1 or args[1] == "worker":
+        if True: # single-threaded
+            worker_sm()
+        else: # multi-threaded does not work
+            pool = make_pool()
+            pool.apply_async(worker_sm)
+            try:
+                time.sleep(10000000)
+            except KeyboardInterrupt:
+                print(' [*] Terminating...')
+                pool.terminate()
+                pool.join()
+    elif args[1] == "listener":
+        listener_sm()
+    elif args[1] == "enqueue":
+        enqueuer_sm()
+    elif args[1] == "debug":
+        sm_extract.txt_extract_sm(args[2], args[3], debug=True)
+    else:
+        print("Usage: rabbitmq.py worker|enqueue")
+        sys.exit(1)
diff --git a/sm-extract.py b/sm-extract.py
new file mode 100644 (file)
index 0000000..5a05053
--- /dev/null
@@ -0,0 +1,57 @@
+#!/usr/local/bin python3
+import pika, requests
+import base64, json, multiprocessing, tempfile, time, os, shutil, subprocess, sys
+
+def txt_extract_sm(input_path, output_path, debug=False):
+    """Extract text from a .zip file of ~1000 PDFs. Single-threaded."""
+    with tempfile.TemporaryDirectory() as td_in, tempfile.TemporaryDirectory() as td_out:
+        #if debug:
+        #    td_in = "tmp.in"
+        #    td_out = "tmp.out"; os.mkdir(td_out)
+        #print("working in temp dirs: ", td_in, td_out)
+
+        # Extract .zip file
+        with timer("zip", print=debug):
+            subprocess.call(["unzip", "-q", "-n", input_path, "-d", td_in])
+
+        # Make a list of pdf files extracted
+        pdfs = []
+        with timer("ls", print=debug):
+            with open(os.path.join(OUTPUT_DIR, "non-pdfs"), "a") as logfile:
+                for dirname, subdirList, fileList in os.walk(td_in):
+                    assert dirname.startswith(td_in)
+                    short = dirname[len(td_in):].lstrip("/")
+                    for dname in subdirList:
+                        os.mkdir(os.path.join(dirname.replace(td_in, td_out), dname))
+                    for fname in fileList:
+                        file_name = os.path.join(short, fname)
+                        if fname.lower().endswith(".pdf"):
+                            pdfs.append(file_name)
+                        else:
+                            print(input_path, fname, file=logfile)
+                            print("other", input_path, fname)
+
+        # Extract the text from each (single-threaded)
+        # INPUT.pdf -> INPUT.pdf.txt
+        with timer("pdftotext", print=debug):
+            for pdf in pdfs:
+                input_pdf = os.path.join(td_in, pdf)
+                output_txt = os.path.join(td_out, pdf + ".txt")
+                subprocess.call(["pdftotext", input_pdf, output_txt])
+
+        # Put the results into a .tar.gz file
+        with timer("tar", print=debug):
+            subprocess.call(
+                ["tar", "czf", output_path, "."], # gzip = z, J = '.xz'
+                stderr=subprocess.DEVNULL,
+                cwd=td_out,
+                env={"XZ_OPT":"-9","GZIP":"-9",}
+            )
+
+if __name__ == '__main__':
+    args = sys.argv
+    if len(args) == 2:
+        txt_extract_sm(args[1], args[2])
+    else
+        print("Usage: sm-extract INPUT.zip OUTPUT.tar.gz")
+        sys.exit(1)