]> git.za3k.com Git - mqlg.git/commitdiff
working v1
authorZachary Vance <za3k@za3k.com>
Thu, 12 Aug 2021 20:21:30 +0000 (13:21 -0700)
committerZachary Vance <za3k@za3k.com>
Thu, 12 Aug 2021 20:21:30 +0000 (13:21 -0700)
__pycache__/lib.cpython-37.pyc [new file with mode: 0644]
__pycache__/sm.cpython-37.pyc [new file with mode: 0644]
lib.py [new file with mode: 0644]
main.py
sm-extract.py [deleted file]
sm.py [new file with mode: 0644]

diff --git a/__pycache__/lib.cpython-37.pyc b/__pycache__/lib.cpython-37.pyc
new file mode 100644 (file)
index 0000000..8138fba
Binary files /dev/null and b/__pycache__/lib.cpython-37.pyc differ
diff --git a/__pycache__/sm.cpython-37.pyc b/__pycache__/sm.cpython-37.pyc
new file mode 100644 (file)
index 0000000..9e0a2d6
Binary files /dev/null and b/__pycache__/sm.cpython-37.pyc differ
diff --git a/lib.py b/lib.py
new file mode 100644 (file)
index 0000000..8e47e24
--- /dev/null
+++ b/lib.py
@@ -0,0 +1,14 @@
+#!/usr/local/bin python3
+import time
+
+class timer():
+    def __init__(self, name, print=False):
+        self.name = name
+        self.print = print
+    def __enter__(self):
+        self._time = time.time()
+    def __exit__(self, *exec_info):
+        elapsed = time.time() - self._time
+        if self.print:
+            print(" timer[{}] = {}ms".format(self.name, int(elapsed*1000)))
+
diff --git a/main.py b/main.py
index 241ea92dbd14020a5ca73821563e32d6b67dc5a1..ffcf85cb04ffa89d826f05b0aaa66f28db39d7db 100644 (file)
--- a/main.py
+++ b/main.py
 #!/usr/local/bin python3
 import pika, requests
-import base64, json, multiprocessing, tempfile, time, os, shutil, subprocess, sys
-import sm_extract 
+import base64, json, tempfile, time, os, shutil, sys
 
-SM_ZIP='sm_zip'
-SM_ZIP_RESP='sm_zip_resp'
-HOST='https://germinate.za3k.com'
-SM_OUTPUT_DIR='/jbod0/text-scimag'
+import sm
+from lib import timer
 
+# Queues
+FF_TAR="ff_tar"
+FF_TAR_RESP="ff_tar_resp"
+LG_TAR="lg_tar"
+LG_TAR_RESP="lg_tar_resp"
+
+# Login credentials
+MQ_HOST = "localhost"
 USER="sm"
 PASS="McFnE9I4OdwTB"
 
 import socket
 IS_GERMINATE = socket.gethostname() == "germinate"
 
-pool = None
-def make_pool():
-    global pool
-    if pool is None:
-        pool = multiprocessing.Pool()
-    return pool
-
-class timer():
-    def __init__(self, name, print=False):
-        self.name = name
-        self.print = print
-    def __enter__(self):
-        self._time = time.time()
-    def __exit__(self, *exec_info):
-        elapsed = time.time() - self._time
-        if self.print:
-            print(" timer[{}] = {}ms".format(self.name, int(elapsed*1000)))
-
+# Library functions
 class QueueConnection():
-    def __init__(self):
-        pass
+    def __init__(self, queues):
+        self.queues = queues
     def __enter__(self):
         credentials = pika.PlainCredentials(USER, PASS)
-        self.connection = pika.BlockingConnection(pika.ConnectionParameters('localhost', 5672, '/', credentials))
+        self.connection = pika.BlockingConnection(pika.ConnectionParameters(MQ_HOST, 5672, '/', credentials))
         self.channel = self.connection.channel()
 
-        self.channel.queue_declare(queue=SM_ZIP, durable=True)
-        self.channel.queue_declare(queue=SM_ZIP_RESP, durable=True)
+        for queue in self.queues:
+            self.channel.queue_declare(queue=queue, durable=True)
         return self.channel
     def __exit__(self, *exec_info):
         self.connection.close()
 
+#def threaded_consumer_multiple(queue, consumer_callback):
+    #while True:
+        #try:
+        #except pika.exceptions.ConnectionClosed:
+        #    print("connection close")
+        #    pass
+
 def threaded_consumer(queue, consumer_callback):
-    while True:
+    _threaded_consumer(queue, consumer_callback)
+
+def _threaded_consumer(queue, consumer_callback):
+    with QueueConnection([queue]) as channel:
+        channel.basic_qos(prefetch_count=1)
+        channel.basic_consume(
+            queue=queue,
+            consumer_callback=consumer_callback,
+        )
         try:
-            with QueueConnection() as channel:
-                channel.basic_qos(prefetch_count=1)
-                channel.basic_consume(
-                    queue=queue,
-                    consumer_callback=consumer_callback,
-                )
-                print('[*] Waiting for messages. To exit press CTRL+C')
-                channel.start_consuming()
-        except pika.exceptions.ConnectionClosed:
-            pass
-
-def worker_sm():
-    threaded_consumer(SM_ZIP, do_work_sm)
-def listener_sm():
-    threaded_consumer(SM_ZIP_RESP, do_listen_sm)
+            print('[*] Waiting for messages. To exit press CTRL+C')
+            channel.start_consuming()
+        except KeyboardInterrupt:
+            channel.stop_consuming()
 
-# Library functions
 def remove_file(path):
     try:
         os.remove(path)
     except FileNotFoundError:
         pass
 
-# "Meat" -- actually specific to what's going on.
-def enqueuer_sm():
+def enqueue_all(queue, objects):
     """Load up all the text extraction work to be done"""
-    with QueueConnection() as channel:
-        urls = []
-        for drive_num in range(1,9):
-            drive="/libgen12-{drive_num:02}".format(drive_num=drive_num)
-            for dirname, subdirList, fileList in os.walk(drive):
-                for fname in fileList:
-                    if fname.endswith(".zip"):
-                        fpath = os.path.join(dirname, fname)
-                        url=HOST + fpath
-                        urls.append(url)
-        urls.sort()
-        for url in urls:
+    with QueueConnection([queue]) as channel:
+        for o in objects:
             channel.basic_publish(
                 exchange='',
-                routing_key=SM_ZIP,
-                body=url,
+                routing_key=queue,
+                body=json.dumps(o),
                 properties=pika.spec.BasicProperties(
                     delivery_mode=2,
                 )
             )
 
-def do_listen_sm(channel, method, properties, content):
+def do_work(response_queue, file_processor):
+    def _inner_do_work(channel, method, properties, message):
+        message = json.loads(message.decode('ascii'))
+        task_id = message["task_id"]
+        remote_url = message["remote_url"]
+        remote_output_path = message["remote_output_path"]
+        local_input_path = message.get("local_input_path")
+        local_output_path = message.get("local_output_path")
+
+        with tempfile.TemporaryDirectory() as d:
+            print("received", task_id)
+            input_path = os.path.join(d, "INPUT.zip")
+            output_path = os.path.join(d, "OUTPUT.tar.gz")
+
+            if IS_GERMINATE and local_input_path is not None:
+                print(" local copy", local_input_path)
+                with timer("copy-input", print=True):
+                    shutil.copyfile(local_input_path, input_path)
+            else:
+                print(" downloading", remote_url)
+                with timer("download", print=True):
+                    with requests.get(remote_url, stream=True) as r:
+                        with open(input_path, 'wb') as f:
+                            shutil.copyfileobj(r.raw, f)
+
+            print(" processing", task_id)
+            with timer("process", print=True):
+                file_processor(input_path, output_path)
+
+            if IS_GERMINATE and local_output_path is not None:
+                print(" local copy of output", local_output_path)
+                with timer("copy-output", print=True):
+                    shutil.copyfile(output_path, local_output_path)
+            else:
+                print(" sending-response", output_path)
+                with timer("send-response", print=True):
+                    with open(output_path, 'rb') as f:
+                        file_contents = base64.b64encode(f.read()).decode('ascii')
+                    channel.basic_publish(
+                        exchange='',
+                        routing_key=response_queue,
+                        body=json.dumps({"response_id": task_id, "local_output_path": remote_output_path, "content": file_contents}),
+                        properties=pika.BasicProperties(delivery_mode=2),
+                    )
+
+        channel.basic_ack(delivery_tag=method.delivery_tag)
+        print(" sent", output_path)
+    return _inner_do_work
+
+def do_listen(channel, method, properties, message):
     """Receive extracted text and write it to disk"""
-    response = json.loads(content)
-    output_path = os.path.join(SM_OUTPUT_DIR, response["filename"])
+    response = json.loads(message)
+    response_id = response["response_id"]
+    output_path = response["local_output_path"]
 
-    print("saving", output_file)
-    content = base64.b64decode(response["content"].encode('ascii'))
+    print("received", response_id)
     if os.path.exists(output_path):
         print("skipped", output_path)
     else:
+        content = base64.b64decode(response["content"].encode('ascii'))
         with open(output_file, "wb") as f:
             f.write(content)
         print("saved", output_path)
     channel.basic_ack(delivery_tag=method.delivery_tag)
 
-def do_work_sm(channel, method, properties, url):
-    url = url.decode('ascii')
-    input_filename = url.split("/")[-1]
-    output_filename = input_filename[:-len(".zip")] + "-text.tar.gz"
-    output_path = url.split("/")[-2] + "/" + output_filename
-
-    if IS_GERMINATE:
-        source_file = url.replace(HOST, "")
-        final_save_path = os.path.join(SM_OUTPUT_DIR, output_path)
-
-    try:
-        if IS_GERMINATE:
-            print("local copy", source_file)
-            with timer("copy-input", print=True):
-                shutil.copyfile(source_file, input_filename)
-        else:
-            print("downloading", url, print=True)
-            with timer("download"):
-                response = requests.get(url)
-                with requests.get(url, stream=True) as r:
-                    with open(input_filename, 'wb') as f:
-                        shutil.copyfileobj(r.raw, f)
-
-        print(" processing", input_filename)
-        with timer("process", print=True):
-            sm_extract.txt_extract_sm(input_filename, output_filename)
-
-        if IS_GERMINATE:
-            print(" local copy of output", final_save_path)
-            with timer("copy-output", print=True):
-                shutil.copyfile(output_filename, final_save_path)
-        else:
-            print(" sending-response", output_path)
-            with timer("send-response", print=True):
-                with open(output_filename, 'rb') as f:
-                    file_contents = base64.b64encode(f.read()).decode('ascii')
-                channel.basic_publish(
-                    exchange='',
-                    routing_key=SM_ZIP_RESP,
-                    body=json.dumps({"url": url, "filename": output_path, "content": file_contents}),
-                    delivery_mode=2,
-                )
-    finally:
-        remove_file(input_filename)
-        remove_file(output_filename)
-
-    channel.basic_ack(delivery_tag=method.delivery_tag)
-    print(" sent", output_filename)
-
 if __name__ == '__main__':
     args = sys.argv
-    if len(args) <= 1 or args[1] == "worker":
-        if True: # single-threaded
-            worker_sm()
-        else: # multi-threaded does not work
-            pool = make_pool()
-            pool.apply_async(worker_sm)
-            try:
-                time.sleep(10000000)
-            except KeyboardInterrupt:
-                print(' [*] Terminating...')
-                pool.terminate()
-                pool.join()
-    elif args[1] == "listener":
-        listener_sm()
-    elif args[1] == "enqueue":
-        enqueuer_sm()
-    elif args[1] == "debug":
-        sm_extract.txt_extract_sm(args[2], args[3], debug=True)
+    if   len(args) == 3 and sys.argv[1] == "sm" and sys.argv[2] == "worker":
+        threaded_consumer(sm.QUEUE, do_work(sm.QUEUE_RESP, sm.extract_text))
+    elif len(args) == 3 and sys.argv[1] == "sm" and sys.argv[2] == "listener":
+        threaded_consumer(sm.QUEUE_RESP, do_listen)
+    elif len(args) == 3 and sys.argv[1] == "sm" and sys.argv[2] == "enqueue":
+        enqueue_all(sm.QUEUE, sm.generate_tasks())
+    elif len(args) == 5 and sys.argv[1] == "sm" and sys.argv[2] == "debug":
+        sm.extract_text(args[3], args[4], debug=True)
     else:
-        print("Usage: rabbitmq.py worker|enqueue")
+        print("Usage: {} sm worker|listener|enqueue".format(sys.argv[0].split("/")[-1]))
         sys.exit(1)
diff --git a/sm-extract.py b/sm-extract.py
deleted file mode 100644 (file)
index 5a05053..0000000
+++ /dev/null
@@ -1,57 +0,0 @@
-#!/usr/local/bin python3
-import pika, requests
-import base64, json, multiprocessing, tempfile, time, os, shutil, subprocess, sys
-
-def txt_extract_sm(input_path, output_path, debug=False):
-    """Extract text from a .zip file of ~1000 PDFs. Single-threaded."""
-    with tempfile.TemporaryDirectory() as td_in, tempfile.TemporaryDirectory() as td_out:
-        #if debug:
-        #    td_in = "tmp.in"
-        #    td_out = "tmp.out"; os.mkdir(td_out)
-        #print("working in temp dirs: ", td_in, td_out)
-
-        # Extract .zip file
-        with timer("zip", print=debug):
-            subprocess.call(["unzip", "-q", "-n", input_path, "-d", td_in])
-
-        # Make a list of pdf files extracted
-        pdfs = []
-        with timer("ls", print=debug):
-            with open(os.path.join(OUTPUT_DIR, "non-pdfs"), "a") as logfile:
-                for dirname, subdirList, fileList in os.walk(td_in):
-                    assert dirname.startswith(td_in)
-                    short = dirname[len(td_in):].lstrip("/")
-                    for dname in subdirList:
-                        os.mkdir(os.path.join(dirname.replace(td_in, td_out), dname))
-                    for fname in fileList:
-                        file_name = os.path.join(short, fname)
-                        if fname.lower().endswith(".pdf"):
-                            pdfs.append(file_name)
-                        else:
-                            print(input_path, fname, file=logfile)
-                            print("other", input_path, fname)
-
-        # Extract the text from each (single-threaded)
-        # INPUT.pdf -> INPUT.pdf.txt
-        with timer("pdftotext", print=debug):
-            for pdf in pdfs:
-                input_pdf = os.path.join(td_in, pdf)
-                output_txt = os.path.join(td_out, pdf + ".txt")
-                subprocess.call(["pdftotext", input_pdf, output_txt])
-
-        # Put the results into a .tar.gz file
-        with timer("tar", print=debug):
-            subprocess.call(
-                ["tar", "czf", output_path, "."], # gzip = z, J = '.xz'
-                stderr=subprocess.DEVNULL,
-                cwd=td_out,
-                env={"XZ_OPT":"-9","GZIP":"-9",}
-            )
-
-if __name__ == '__main__':
-    args = sys.argv
-    if len(args) == 2:
-        txt_extract_sm(args[1], args[2])
-    else
-        print("Usage: sm-extract INPUT.zip OUTPUT.tar.gz")
-        sys.exit(1)
diff --git a/sm.py b/sm.py
new file mode 100644 (file)
index 0000000..008a878
--- /dev/null
+++ b/sm.py
@@ -0,0 +1,91 @@
+#!/usr/local/bin python3
+import tempfile, os, subprocess, sys
+from lib import timer
+
+SM_URL='https://germinate.za3k.com{path}'
+SM_OUTPUT_PATH='/jbod0/text-scimag/{path}'
+
+QUEUE='sm_zip'
+QUEUE_RESP='sm_zip_resp'
+
+ERROR_FILE="/var/tmp/sm.nonpdfs"
+
+def extract_text(input_path, output_path, debug=False):
+    """Extract text from a .zip file of ~1000 PDFs. Single-threaded."""
+    with tempfile.TemporaryDirectory() as td_in, tempfile.TemporaryDirectory() as td_out:
+        if debug:
+            td_in = "tmp.in"
+            td_out = "tmp.out"; os.mkdir(td_out)
+        #print("working in temp dirs: ", td_in, td_out)
+
+        # Extract .zip file
+        with timer("zip", print=debug):
+            subprocess.call(["unzip", "-q", "-n", input_path, "-d", td_in])
+
+        # Make a list of pdf files extracted
+        pdfs = []
+        other = []
+        with timer("ls", print=debug):
+            for dirname, subdirList, fileList in os.walk(td_in):
+                assert dirname.startswith(td_in)
+                short = dirname[len(td_in):].lstrip("/")
+                for dname in subdirList:
+                    os.mkdir(os.path.join(dirname.replace(td_in, td_out), dname))
+                for fname in fileList:
+                    file_name = os.path.join(short, fname)
+                    if fname.lower().endswith(".pdf"):
+                        pdfs.append(file_name)
+                    else:
+                        other.append(fname)
+                        print(" other", input_path, fname)
+
+        if len(other) > 0:
+            with open(ERROR_FILE, "a") as logfile:
+                print(input_path, fname, file=logfile)
+
+        # Extract the text from each (single-threaded)
+        # INPUT.pdf -> INPUT.pdf.txt
+        with timer("pdftotext", print=debug):
+            for pdf in pdfs:
+                input_pdf = os.path.join(td_in, pdf)
+                output_txt = os.path.join(td_out, pdf + ".txt")
+                subprocess.call(["pdftotext", input_pdf, output_txt])
+
+        # Put the results into a .tar.gz file
+        with timer("tar", print=debug):
+            output_path = os.path.abspath(output_path)
+            subprocess.call(
+                ["tar", "czf", output_path, "."], # gzip = z, J = '.xz'
+                stderr=subprocess.DEVNULL,
+                cwd=td_out,
+                env={"XZ_OPT":"-9","GZIP":"-9",}
+            )
+
+def generate_tasks():
+    tasks = []
+    for drive_num in range(1,9):
+        drive="/libgen12-{drive_num:02}".format(drive_num=drive_num)
+        for dirname, subdirList, fileList in os.walk(drive):
+            for filename in fileList:
+                if filename.endswith(".zip"):
+                    fullpath = os.path.join(dirname, filename)
+                    short_in = '/'.join(fullpath.split("/")[-2:])
+                    short_out = short_in[:-len(".zip")] + ".text.tar.gz"
+                    tasks.append({
+                        "task_id": filename,
+                        "remote_url": SM_URL.format(path=fullpath),
+                        "local_input_path": fullpath,
+                        "local_output_path": SM_OUTPUT_PATH.format(path=short_out),
+                        "remote_output_path": SM_OUTPUT_PATH.format(path=short_out),
+                        "dataset": "sm",
+                    })
+    tasks.sort(key=lambda x: x["remote_url"])
+    return tasks
+
+if __name__ == '__main__':
+    if len(sys.argv) == 3:
+        extract_text(sys.argv[1], sys.argv[2])
+    else:
+        print(sys.argv)
+        print("Usage: sm-extract INPUT.zip OUTPUT.tar.gz")
+        sys.exit(1)