--- /dev/null
+#!/usr/local/bin python3
+import pika, requests
+import base64, json, multiprocessing, tempfile, time, os, shutil, subprocess, sys
+
+SM_ZIP='sm_zip'
+SM_ZIP_RESP='sm_zip_resp'
+HOST='https://germinate.za3k.com'
+OUTPUT_DIR='/jbod0/text-scimag'
+
+USER="sm"
+PASS="McFnE9I4OdwTB"
+
+import socket
+IS_GERMINATE = socket.gethostname() == "germinate"
+
+pool = None
+def pool():
+ global pool
+ if pool is None:
+ pool = multiprocessing.Pool()
+ return pool
+
+class timer():
+ def __init__(self, name):
+ self.name = name
+ def __enter__(self):
+ self._time = time.time()
+ def __exit__(self, *exec_info):
+ elapsed = time.time() - self._time
+ print("timer[{}] = {}ms".format(self.name, int(elapsed*1000)))
+
+def main(args):
+ credentials = pika.PlainCredentials(USER, PASS)
+ connection = pika.BlockingConnection(pika.ConnectionParameters('localhost', 5672, '/', credentials))
+ channel = connection.channel()
+
+ channel.queue_declare(queue=SM_ZIP, durable=True)
+ channel.queue_declare(queue=SM_ZIP_RESP, durable=True)
+ if len(args) <= 1 or args[1] == "worker":
+ worker_sm(channel)
+ if args[1] == "listener":
+ listener_sm(channel)
+ elif args[1] == "enqueue":
+ enqueuer_sm(channel)
+ elif args[1] == "debug":
+ txt_extract_sm(args[2], args[3], debug=True)
+ else:
+ print("Usage: rabbitmq.py worker|enqueue")
+ sys.exit(1)
+ connection.close()
+def worker_sm(channel):
+ channel.basic_consume(
+ queue=SM_ZIP,
+ consumer_callback=do_work_sm,
+ )
+ channel.basic_qos(prefetch_count=1)
+ print(' [*] Waiting for messages. To exit press CTRL+C')
+ channel.start_consuming()
+def listener_sm(channel):
+ channel.basic_consume(
+ queue=SM_ZIP_RESP,
+ consumer_callback=do_listen_sm,
+ )
+ channel.basic_qos(prefetch_count=1)
+ print(' [*] Waiting for messages. To exit press CTRL+C')
+ channel.start_consuming()
+
+# Library functions
+def remove_file(path):
+ try:
+ os.remove(path)
+ except FileNotFoundError:
+ pass
+
+# "Meat" -- actually specific to what's going on.
+def enqueuer_sm(channel):
+ """Load up all the text extraction work to be done"""
+ for drive_num in range(0,9):
+ drive="/libgen12-{drive_num:02}".format(drive_num=drive_num)
+ for dirname, subdirList, fileList in os.walk(drive):
+ for fname in fileList:
+ if fname.endswith(".zip"):
+ fpath = os.path.join(dirname, fname)
+ url=HOST + fpath
+ print("enqueue", url)
+ channel.basic_publish(
+ exchange='',
+ routing_key=SM_ZIP,
+ body=url,
+ )
+ sys.exit(0)
+
+def do_listen_sm(channel, method, properties, content):
+ """Receive extracted text and write it to disk"""
+ response = json.loads(content)
+ output_file = os.path.join(OUTPUT_DIR, response["filename"])
+
+ print("saving", output_file)
+ content = base64.b64decode(response["content"].encode('ascii'))
+ if os.path.exists(output_file):
+ print("skipped", output_file)
+ else:
+ with open(output_file, "wb") as f:
+ f.write(content)
+ print("saved", output_file)
+ channel.basic_ack(delivery_tag=method.delivery_tag)
+
+def txt_extract_sm(input_path, output_path, debug=False):
+ """Extract text from a .zip file of ~1000 PDFs"""
+ # Step 1: Extract
+ with tempfile.TemporaryDirectory() as td_in, tempfile.TemporaryDirectory() as td_out:
+ if debug:
+ td_in = "tmp.in"
+ td_out = "tmp.out"; os.mkdir(td_out)
+ print("working in temp dirs: ", td_in, td_out)
+ with timer("zip"):
+ subprocess.call(["unzip", "-q", "-n", input_path, "-d", td_in])
+ pdfs = []
+ with timer("ls"):
+ for dirname, subdirList, fileList in os.walk(td_in):
+ assert dirname.startswith(td_in)
+ short = dirname[len(td_in):].lstrip("/")
+ for dname in subdirList:
+ os.mkdir(os.path.join(dirname.replace(td_in, td_out), dname))
+ for fname in fileList:
+ if fname.endswith(".pdf"):
+ pdf = os.path.join(short, fname)
+ pdfs.append(pdf)
+ else:
+ print(fname)
+ with timer("pdftotext"):
+ input_pdf = os.path.join(td_in, pdf)
+ output_txt = os.path.join(td_out, pdf + ".txt")
+ subprocess.call(["pdftotext", input_pdf, output_txt])
+ with timer("tar"):
+ subprocess.call(["tar", "czf", output_path, td_out])
+
+def do_work_sm(channel, method, properties, url):
+ url = url.decode('ascii')
+ input_filename = url.split("/")[-1]
+ output_filename = input_filename + ".out"
+
+ if IS_GERMINATE:
+ source_file = url.replace(HOST, "")
+ final_save_path = os.path.join(OUTPUT_DIR, output_filename)
+
+ try:
+ if IS_GERMINATE:
+ print("local copy", source_file)
+ with timer("copy"):
+ shutil.copyfile(source_file, input_filename)
+ else:
+ print("downloading", url)
+ with timer("download"):
+ response = requests.get(url)
+ with requests.get(url, stream=True) as r:
+ with open(input_filename, 'wb') as f:
+ shutil.copyfileobj(r.raw, f)
+
+ print("processing", input_filename)
+ with timer("process"):
+ txt_extract_sm(input_filename, output_filename)
+
+ if IS_GERMINATE:
+ print("local copy of output", output_filename)
+ with timer("copy"):
+ shutil.copyfile(output_filename, final_save_path)
+ else:
+ print("sending-response", output_filename)
+ with timer("send-response"):
+ with open(output_filename, 'rb') as f:
+ file_contents = base64.b64encode(f.read()).decode('ascii')
+ channel.basic_publish(
+ exchange='',
+ routing_key=SM_ZIP_RESP,
+ body=json.dumps({"url": url, "filename": output_filename, "content": file_contents}),
+ )
+ finally:
+ remove_file(input_filename)
+ remove_file(output_filename)
+
+ channel.basic_ack(delivery_tag=method.delivery_tag)
+ print("sent", output_filename)
+
+
+if __name__ == '__main__':
+ main(sys.argv)