]> git.za3k.com Git - mqlg.git/commitdiff
initial commit
authorZachary Vance <za3k@za3k.com>
Tue, 10 Aug 2021 03:38:50 +0000 (20:38 -0700)
committerZachary Vance <za3k@za3k.com>
Tue, 10 Aug 2021 03:38:50 +0000 (20:38 -0700)
main.py [new file with mode: 0644]

diff --git a/main.py b/main.py
new file mode 100644 (file)
index 0000000..00f39d4
--- /dev/null
+++ b/main.py
@@ -0,0 +1,187 @@
+#!/usr/local/bin python3
+import pika, requests
+import base64, json, multiprocessing, tempfile, time, os, shutil, subprocess, sys
+
+SM_ZIP='sm_zip'
+SM_ZIP_RESP='sm_zip_resp'
+HOST='https://germinate.za3k.com'
+OUTPUT_DIR='/jbod0/text-scimag'
+
+USER="sm"
+PASS="McFnE9I4OdwTB"
+
+import socket
+IS_GERMINATE = socket.gethostname() == "germinate"
+
+pool = None
+def pool():
+    global pool
+    if pool is None:
+        pool = multiprocessing.Pool()
+    return pool
+
+class timer():
+    def __init__(self, name):
+        self.name = name
+    def __enter__(self):
+        self._time = time.time()
+    def __exit__(self, *exec_info):
+        elapsed = time.time() - self._time
+        print("timer[{}] = {}ms".format(self.name, int(elapsed*1000)))
+
+def main(args):
+    credentials = pika.PlainCredentials(USER, PASS)
+    connection = pika.BlockingConnection(pika.ConnectionParameters('localhost', 5672, '/', credentials))
+    channel = connection.channel()
+
+    channel.queue_declare(queue=SM_ZIP, durable=True)
+    channel.queue_declare(queue=SM_ZIP_RESP, durable=True)
+    if len(args) <= 1 or args[1] == "worker":
+        worker_sm(channel)
+    if args[1] == "listener":
+        listener_sm(channel)
+    elif args[1] == "enqueue":
+        enqueuer_sm(channel)
+    elif args[1] == "debug":
+        txt_extract_sm(args[2], args[3], debug=True)
+    else:
+        print("Usage: rabbitmq.py worker|enqueue")
+        sys.exit(1)
+    connection.close()
+def worker_sm(channel):
+    channel.basic_consume(
+        queue=SM_ZIP,
+        consumer_callback=do_work_sm,
+    )
+    channel.basic_qos(prefetch_count=1)
+    print(' [*] Waiting for messages. To exit press CTRL+C')
+    channel.start_consuming()
+def listener_sm(channel):
+    channel.basic_consume(
+        queue=SM_ZIP_RESP,
+        consumer_callback=do_listen_sm,
+    )
+    channel.basic_qos(prefetch_count=1)
+    print(' [*] Waiting for messages. To exit press CTRL+C')
+    channel.start_consuming()
+
+# Library functions
+def remove_file(path):
+    try:
+        os.remove(path)
+    except FileNotFoundError:
+        pass
+
+# "Meat" -- actually specific to what's going on.
+def enqueuer_sm(channel):
+    """Load up all the text extraction work to be done"""
+    for drive_num in range(0,9):
+        drive="/libgen12-{drive_num:02}".format(drive_num=drive_num)
+        for dirname, subdirList, fileList in os.walk(drive):
+            for fname in fileList:
+                if fname.endswith(".zip"):
+                    fpath = os.path.join(dirname, fname)
+                    url=HOST + fpath
+                    print("enqueue", url)
+                    channel.basic_publish(
+                        exchange='',
+                        routing_key=SM_ZIP,
+                        body=url,
+                    )
+                    sys.exit(0)
+
+def do_listen_sm(channel, method, properties, content):
+    """Receive extracted text and write it to disk"""
+    response = json.loads(content)
+    output_file = os.path.join(OUTPUT_DIR, response["filename"])
+
+    print("saving", output_file)
+    content = base64.b64decode(response["content"].encode('ascii'))
+    if os.path.exists(output_file):
+        print("skipped", output_file)
+    else:
+        with open(output_file, "wb") as f:
+            f.write(content)
+        print("saved", output_file)
+    channel.basic_ack(delivery_tag=method.delivery_tag)
+
+def txt_extract_sm(input_path, output_path, debug=False):
+    """Extract text from a .zip file of ~1000 PDFs"""
+    # Step 1: Extract
+    with tempfile.TemporaryDirectory() as td_in, tempfile.TemporaryDirectory() as td_out:
+        if debug:
+            td_in = "tmp.in"
+            td_out = "tmp.out"; os.mkdir(td_out)
+        print("working in temp dirs: ", td_in, td_out)
+        with timer("zip"):
+            subprocess.call(["unzip", "-q", "-n", input_path, "-d", td_in])
+        pdfs = []
+        with timer("ls"):
+            for dirname, subdirList, fileList in os.walk(td_in):
+                assert dirname.startswith(td_in)
+                short = dirname[len(td_in):].lstrip("/")
+                for dname in subdirList:
+                    os.mkdir(os.path.join(dirname.replace(td_in, td_out), dname))
+                for fname in fileList:
+                    if fname.endswith(".pdf"):
+                        pdf = os.path.join(short, fname)
+                        pdfs.append(pdf)
+                    else:
+                        print(fname)
+        with timer("pdftotext"):
+            input_pdf = os.path.join(td_in, pdf)
+            output_txt = os.path.join(td_out, pdf + ".txt")
+            subprocess.call(["pdftotext", input_pdf, output_txt])
+        with timer("tar"):
+            subprocess.call(["tar", "czf", output_path, td_out])
+
+def do_work_sm(channel, method, properties, url):
+    url = url.decode('ascii')
+    input_filename = url.split("/")[-1]
+    output_filename = input_filename + ".out"
+
+    if IS_GERMINATE:
+        source_file = url.replace(HOST, "")
+        final_save_path = os.path.join(OUTPUT_DIR, output_filename)
+
+    try:
+        if IS_GERMINATE:
+            print("local copy", source_file)
+            with timer("copy"):
+                shutil.copyfile(source_file, input_filename)
+        else:
+            print("downloading", url)
+            with timer("download"):
+                response = requests.get(url)
+                with requests.get(url, stream=True) as r:
+                    with open(input_filename, 'wb') as f:
+                        shutil.copyfileobj(r.raw, f)
+
+        print("processing", input_filename)
+        with timer("process"):
+            txt_extract_sm(input_filename, output_filename)
+
+        if IS_GERMINATE:
+            print("local copy of output", output_filename)
+            with timer("copy"):
+                shutil.copyfile(output_filename, final_save_path)
+        else:
+            print("sending-response", output_filename)
+            with timer("send-response"):
+                with open(output_filename, 'rb') as f:
+                    file_contents = base64.b64encode(f.read()).decode('ascii')
+                channel.basic_publish(
+                    exchange='',
+                    routing_key=SM_ZIP_RESP,
+                    body=json.dumps({"url": url, "filename": output_filename, "content": file_contents}),
+                )
+    finally:
+        remove_file(input_filename)
+        remove_file(output_filename)
+
+    channel.basic_ack(delivery_tag=method.delivery_tag)
+    print("sent", output_filename)
+
+
+if __name__ == '__main__':
+    main(sys.argv)