From: Zachary Vance Date: Thu, 12 Aug 2021 00:15:32 +0000 (-0700) Subject: initial commit X-Git-Url: https://git.za3k.com/?a=commitdiff_plain;h=dfae21e212d4f2e82320ac6dfdd275421138e30b;p=mqlg.git initial commit --- diff --git a/main.py b/main.py index 00f39d4..241ea92 100644 --- a/main.py +++ b/main.py @@ -1,11 +1,12 @@ #!/usr/local/bin python3 import pika, requests import base64, json, multiprocessing, tempfile, time, os, shutil, subprocess, sys +import sm_extract SM_ZIP='sm_zip' SM_ZIP_RESP='sm_zip_resp' HOST='https://germinate.za3k.com' -OUTPUT_DIR='/jbod0/text-scimag' +SM_OUTPUT_DIR='/jbod0/text-scimag' USER="sm" PASS="McFnE9I4OdwTB" @@ -14,56 +15,55 @@ import socket IS_GERMINATE = socket.gethostname() == "germinate" pool = None -def pool(): +def make_pool(): global pool if pool is None: pool = multiprocessing.Pool() return pool class timer(): - def __init__(self, name): + def __init__(self, name, print=False): self.name = name + self.print = print def __enter__(self): self._time = time.time() def __exit__(self, *exec_info): elapsed = time.time() - self._time - print("timer[{}] = {}ms".format(self.name, int(elapsed*1000))) + if self.print: + print(" timer[{}] = {}ms".format(self.name, int(elapsed*1000))) -def main(args): - credentials = pika.PlainCredentials(USER, PASS) - connection = pika.BlockingConnection(pika.ConnectionParameters('localhost', 5672, '/', credentials)) - channel = connection.channel() +class QueueConnection(): + def __init__(self): + pass + def __enter__(self): + credentials = pika.PlainCredentials(USER, PASS) + self.connection = pika.BlockingConnection(pika.ConnectionParameters('localhost', 5672, '/', credentials)) + self.channel = self.connection.channel() - channel.queue_declare(queue=SM_ZIP, durable=True) - channel.queue_declare(queue=SM_ZIP_RESP, durable=True) - if len(args) <= 1 or args[1] == "worker": - worker_sm(channel) - if args[1] == "listener": - listener_sm(channel) - elif args[1] == "enqueue": - enqueuer_sm(channel) - elif args[1] == "debug": - txt_extract_sm(args[2], args[3], debug=True) - else: - print("Usage: rabbitmq.py worker|enqueue") - sys.exit(1) - connection.close() -def worker_sm(channel): - channel.basic_consume( - queue=SM_ZIP, - consumer_callback=do_work_sm, - ) - channel.basic_qos(prefetch_count=1) - print(' [*] Waiting for messages. To exit press CTRL+C') - channel.start_consuming() -def listener_sm(channel): - channel.basic_consume( - queue=SM_ZIP_RESP, - consumer_callback=do_listen_sm, - ) - channel.basic_qos(prefetch_count=1) - print(' [*] Waiting for messages. To exit press CTRL+C') - channel.start_consuming() + self.channel.queue_declare(queue=SM_ZIP, durable=True) + self.channel.queue_declare(queue=SM_ZIP_RESP, durable=True) + return self.channel + def __exit__(self, *exec_info): + self.connection.close() + +def threaded_consumer(queue, consumer_callback): + while True: + try: + with QueueConnection() as channel: + channel.basic_qos(prefetch_count=1) + channel.basic_consume( + queue=queue, + consumer_callback=consumer_callback, + ) + print('[*] Waiting for messages. To exit press CTRL+C') + channel.start_consuming() + except pika.exceptions.ConnectionClosed: + pass + +def worker_sm(): + threaded_consumer(SM_ZIP, do_work_sm) +def listener_sm(): + threaded_consumer(SM_ZIP_RESP, do_listen_sm) # Library functions def remove_file(path): @@ -73,115 +73,113 @@ def remove_file(path): pass # "Meat" -- actually specific to what's going on. -def enqueuer_sm(channel): +def enqueuer_sm(): """Load up all the text extraction work to be done""" - for drive_num in range(0,9): - drive="/libgen12-{drive_num:02}".format(drive_num=drive_num) - for dirname, subdirList, fileList in os.walk(drive): - for fname in fileList: - if fname.endswith(".zip"): - fpath = os.path.join(dirname, fname) - url=HOST + fpath - print("enqueue", url) - channel.basic_publish( - exchange='', - routing_key=SM_ZIP, - body=url, - ) - sys.exit(0) + with QueueConnection() as channel: + urls = [] + for drive_num in range(1,9): + drive="/libgen12-{drive_num:02}".format(drive_num=drive_num) + for dirname, subdirList, fileList in os.walk(drive): + for fname in fileList: + if fname.endswith(".zip"): + fpath = os.path.join(dirname, fname) + url=HOST + fpath + urls.append(url) + urls.sort() + for url in urls: + channel.basic_publish( + exchange='', + routing_key=SM_ZIP, + body=url, + properties=pika.spec.BasicProperties( + delivery_mode=2, + ) + ) def do_listen_sm(channel, method, properties, content): """Receive extracted text and write it to disk""" response = json.loads(content) - output_file = os.path.join(OUTPUT_DIR, response["filename"]) + output_path = os.path.join(SM_OUTPUT_DIR, response["filename"]) print("saving", output_file) content = base64.b64decode(response["content"].encode('ascii')) - if os.path.exists(output_file): - print("skipped", output_file) + if os.path.exists(output_path): + print("skipped", output_path) else: with open(output_file, "wb") as f: f.write(content) - print("saved", output_file) + print("saved", output_path) channel.basic_ack(delivery_tag=method.delivery_tag) -def txt_extract_sm(input_path, output_path, debug=False): - """Extract text from a .zip file of ~1000 PDFs""" - # Step 1: Extract - with tempfile.TemporaryDirectory() as td_in, tempfile.TemporaryDirectory() as td_out: - if debug: - td_in = "tmp.in" - td_out = "tmp.out"; os.mkdir(td_out) - print("working in temp dirs: ", td_in, td_out) - with timer("zip"): - subprocess.call(["unzip", "-q", "-n", input_path, "-d", td_in]) - pdfs = [] - with timer("ls"): - for dirname, subdirList, fileList in os.walk(td_in): - assert dirname.startswith(td_in) - short = dirname[len(td_in):].lstrip("/") - for dname in subdirList: - os.mkdir(os.path.join(dirname.replace(td_in, td_out), dname)) - for fname in fileList: - if fname.endswith(".pdf"): - pdf = os.path.join(short, fname) - pdfs.append(pdf) - else: - print(fname) - with timer("pdftotext"): - input_pdf = os.path.join(td_in, pdf) - output_txt = os.path.join(td_out, pdf + ".txt") - subprocess.call(["pdftotext", input_pdf, output_txt]) - with timer("tar"): - subprocess.call(["tar", "czf", output_path, td_out]) - def do_work_sm(channel, method, properties, url): url = url.decode('ascii') input_filename = url.split("/")[-1] - output_filename = input_filename + ".out" + output_filename = input_filename[:-len(".zip")] + "-text.tar.gz" + output_path = url.split("/")[-2] + "/" + output_filename if IS_GERMINATE: source_file = url.replace(HOST, "") - final_save_path = os.path.join(OUTPUT_DIR, output_filename) + final_save_path = os.path.join(SM_OUTPUT_DIR, output_path) try: if IS_GERMINATE: print("local copy", source_file) - with timer("copy"): + with timer("copy-input", print=True): shutil.copyfile(source_file, input_filename) else: - print("downloading", url) + print("downloading", url, print=True) with timer("download"): response = requests.get(url) with requests.get(url, stream=True) as r: with open(input_filename, 'wb') as f: shutil.copyfileobj(r.raw, f) - print("processing", input_filename) - with timer("process"): - txt_extract_sm(input_filename, output_filename) + print(" processing", input_filename) + with timer("process", print=True): + sm_extract.txt_extract_sm(input_filename, output_filename) if IS_GERMINATE: - print("local copy of output", output_filename) - with timer("copy"): + print(" local copy of output", final_save_path) + with timer("copy-output", print=True): shutil.copyfile(output_filename, final_save_path) else: - print("sending-response", output_filename) - with timer("send-response"): + print(" sending-response", output_path) + with timer("send-response", print=True): with open(output_filename, 'rb') as f: file_contents = base64.b64encode(f.read()).decode('ascii') channel.basic_publish( exchange='', routing_key=SM_ZIP_RESP, - body=json.dumps({"url": url, "filename": output_filename, "content": file_contents}), + body=json.dumps({"url": url, "filename": output_path, "content": file_contents}), + delivery_mode=2, ) finally: remove_file(input_filename) remove_file(output_filename) channel.basic_ack(delivery_tag=method.delivery_tag) - print("sent", output_filename) - + print(" sent", output_filename) if __name__ == '__main__': - main(sys.argv) + args = sys.argv + if len(args) <= 1 or args[1] == "worker": + if True: # single-threaded + worker_sm() + else: # multi-threaded does not work + pool = make_pool() + pool.apply_async(worker_sm) + try: + time.sleep(10000000) + except KeyboardInterrupt: + print(' [*] Terminating...') + pool.terminate() + pool.join() + elif args[1] == "listener": + listener_sm() + elif args[1] == "enqueue": + enqueuer_sm() + elif args[1] == "debug": + sm_extract.txt_extract_sm(args[2], args[3], debug=True) + else: + print("Usage: rabbitmq.py worker|enqueue") + sys.exit(1) diff --git a/sm-extract.py b/sm-extract.py new file mode 100644 index 0000000..5a05053 --- /dev/null +++ b/sm-extract.py @@ -0,0 +1,57 @@ +#!/usr/local/bin python3 +import pika, requests +import base64, json, multiprocessing, tempfile, time, os, shutil, subprocess, sys + +def txt_extract_sm(input_path, output_path, debug=False): + """Extract text from a .zip file of ~1000 PDFs. Single-threaded.""" + with tempfile.TemporaryDirectory() as td_in, tempfile.TemporaryDirectory() as td_out: + #if debug: + # td_in = "tmp.in" + # td_out = "tmp.out"; os.mkdir(td_out) + #print("working in temp dirs: ", td_in, td_out) + + # Extract .zip file + with timer("zip", print=debug): + subprocess.call(["unzip", "-q", "-n", input_path, "-d", td_in]) + + # Make a list of pdf files extracted + pdfs = [] + with timer("ls", print=debug): + with open(os.path.join(OUTPUT_DIR, "non-pdfs"), "a") as logfile: + for dirname, subdirList, fileList in os.walk(td_in): + assert dirname.startswith(td_in) + short = dirname[len(td_in):].lstrip("/") + for dname in subdirList: + os.mkdir(os.path.join(dirname.replace(td_in, td_out), dname)) + for fname in fileList: + file_name = os.path.join(short, fname) + if fname.lower().endswith(".pdf"): + pdfs.append(file_name) + else: + print(input_path, fname, file=logfile) + print("other", input_path, fname) + + # Extract the text from each (single-threaded) + # INPUT.pdf -> INPUT.pdf.txt + with timer("pdftotext", print=debug): + for pdf in pdfs: + input_pdf = os.path.join(td_in, pdf) + output_txt = os.path.join(td_out, pdf + ".txt") + subprocess.call(["pdftotext", input_pdf, output_txt]) + + # Put the results into a .tar.gz file + with timer("tar", print=debug): + subprocess.call( + ["tar", "czf", output_path, "."], # gzip = z, J = '.xz' + stderr=subprocess.DEVNULL, + cwd=td_out, + env={"XZ_OPT":"-9","GZIP":"-9",} + ) + +if __name__ == '__main__': + args = sys.argv + if len(args) == 2: + txt_extract_sm(args[1], args[2]) + else + print("Usage: sm-extract INPUT.zip OUTPUT.tar.gz") + sys.exit(1)