From 84ec96e4dff9c0fce91c0ab101cb0ff80dbbfb74 Mon Sep 17 00:00:00 2001 From: Zachary Vance Date: Mon, 9 Aug 2021 20:38:50 -0700 Subject: [PATCH] initial commit --- main.py | 187 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 187 insertions(+) create mode 100644 main.py diff --git a/main.py b/main.py new file mode 100644 index 0000000..00f39d4 --- /dev/null +++ b/main.py @@ -0,0 +1,187 @@ +#!/usr/local/bin python3 +import pika, requests +import base64, json, multiprocessing, tempfile, time, os, shutil, subprocess, sys + +SM_ZIP='sm_zip' +SM_ZIP_RESP='sm_zip_resp' +HOST='https://germinate.za3k.com' +OUTPUT_DIR='/jbod0/text-scimag' + +USER="sm" +PASS="McFnE9I4OdwTB" + +import socket +IS_GERMINATE = socket.gethostname() == "germinate" + +pool = None +def pool(): + global pool + if pool is None: + pool = multiprocessing.Pool() + return pool + +class timer(): + def __init__(self, name): + self.name = name + def __enter__(self): + self._time = time.time() + def __exit__(self, *exec_info): + elapsed = time.time() - self._time + print("timer[{}] = {}ms".format(self.name, int(elapsed*1000))) + +def main(args): + credentials = pika.PlainCredentials(USER, PASS) + connection = pika.BlockingConnection(pika.ConnectionParameters('localhost', 5672, '/', credentials)) + channel = connection.channel() + + channel.queue_declare(queue=SM_ZIP, durable=True) + channel.queue_declare(queue=SM_ZIP_RESP, durable=True) + if len(args) <= 1 or args[1] == "worker": + worker_sm(channel) + if args[1] == "listener": + listener_sm(channel) + elif args[1] == "enqueue": + enqueuer_sm(channel) + elif args[1] == "debug": + txt_extract_sm(args[2], args[3], debug=True) + else: + print("Usage: rabbitmq.py worker|enqueue") + sys.exit(1) + connection.close() +def worker_sm(channel): + channel.basic_consume( + queue=SM_ZIP, + consumer_callback=do_work_sm, + ) + channel.basic_qos(prefetch_count=1) + print(' [*] Waiting for messages. To exit press CTRL+C') + channel.start_consuming() +def listener_sm(channel): + channel.basic_consume( + queue=SM_ZIP_RESP, + consumer_callback=do_listen_sm, + ) + channel.basic_qos(prefetch_count=1) + print(' [*] Waiting for messages. To exit press CTRL+C') + channel.start_consuming() + +# Library functions +def remove_file(path): + try: + os.remove(path) + except FileNotFoundError: + pass + +# "Meat" -- actually specific to what's going on. +def enqueuer_sm(channel): + """Load up all the text extraction work to be done""" + for drive_num in range(0,9): + drive="/libgen12-{drive_num:02}".format(drive_num=drive_num) + for dirname, subdirList, fileList in os.walk(drive): + for fname in fileList: + if fname.endswith(".zip"): + fpath = os.path.join(dirname, fname) + url=HOST + fpath + print("enqueue", url) + channel.basic_publish( + exchange='', + routing_key=SM_ZIP, + body=url, + ) + sys.exit(0) + +def do_listen_sm(channel, method, properties, content): + """Receive extracted text and write it to disk""" + response = json.loads(content) + output_file = os.path.join(OUTPUT_DIR, response["filename"]) + + print("saving", output_file) + content = base64.b64decode(response["content"].encode('ascii')) + if os.path.exists(output_file): + print("skipped", output_file) + else: + with open(output_file, "wb") as f: + f.write(content) + print("saved", output_file) + channel.basic_ack(delivery_tag=method.delivery_tag) + +def txt_extract_sm(input_path, output_path, debug=False): + """Extract text from a .zip file of ~1000 PDFs""" + # Step 1: Extract + with tempfile.TemporaryDirectory() as td_in, tempfile.TemporaryDirectory() as td_out: + if debug: + td_in = "tmp.in" + td_out = "tmp.out"; os.mkdir(td_out) + print("working in temp dirs: ", td_in, td_out) + with timer("zip"): + subprocess.call(["unzip", "-q", "-n", input_path, "-d", td_in]) + pdfs = [] + with timer("ls"): + for dirname, subdirList, fileList in os.walk(td_in): + assert dirname.startswith(td_in) + short = dirname[len(td_in):].lstrip("/") + for dname in subdirList: + os.mkdir(os.path.join(dirname.replace(td_in, td_out), dname)) + for fname in fileList: + if fname.endswith(".pdf"): + pdf = os.path.join(short, fname) + pdfs.append(pdf) + else: + print(fname) + with timer("pdftotext"): + input_pdf = os.path.join(td_in, pdf) + output_txt = os.path.join(td_out, pdf + ".txt") + subprocess.call(["pdftotext", input_pdf, output_txt]) + with timer("tar"): + subprocess.call(["tar", "czf", output_path, td_out]) + +def do_work_sm(channel, method, properties, url): + url = url.decode('ascii') + input_filename = url.split("/")[-1] + output_filename = input_filename + ".out" + + if IS_GERMINATE: + source_file = url.replace(HOST, "") + final_save_path = os.path.join(OUTPUT_DIR, output_filename) + + try: + if IS_GERMINATE: + print("local copy", source_file) + with timer("copy"): + shutil.copyfile(source_file, input_filename) + else: + print("downloading", url) + with timer("download"): + response = requests.get(url) + with requests.get(url, stream=True) as r: + with open(input_filename, 'wb') as f: + shutil.copyfileobj(r.raw, f) + + print("processing", input_filename) + with timer("process"): + txt_extract_sm(input_filename, output_filename) + + if IS_GERMINATE: + print("local copy of output", output_filename) + with timer("copy"): + shutil.copyfile(output_filename, final_save_path) + else: + print("sending-response", output_filename) + with timer("send-response"): + with open(output_filename, 'rb') as f: + file_contents = base64.b64encode(f.read()).decode('ascii') + channel.basic_publish( + exchange='', + routing_key=SM_ZIP_RESP, + body=json.dumps({"url": url, "filename": output_filename, "content": file_contents}), + ) + finally: + remove_file(input_filename) + remove_file(output_filename) + + channel.basic_ack(delivery_tag=method.delivery_tag) + print("sent", output_filename) + + +if __name__ == '__main__': + main(sys.argv) -- 2.47.3