From: Zachary Vance Date: Thu, 12 Aug 2021 20:21:30 +0000 (-0700) Subject: working v1 X-Git-Url: https://git.za3k.com/?a=commitdiff_plain;h=31d2d00caab2ea2772f3bba44e8c6f50d2638a52;p=mqlg.git working v1 --- diff --git a/__pycache__/lib.cpython-37.pyc b/__pycache__/lib.cpython-37.pyc new file mode 100644 index 0000000..8138fba Binary files /dev/null and b/__pycache__/lib.cpython-37.pyc differ diff --git a/__pycache__/sm.cpython-37.pyc b/__pycache__/sm.cpython-37.pyc new file mode 100644 index 0000000..9e0a2d6 Binary files /dev/null and b/__pycache__/sm.cpython-37.pyc differ diff --git a/lib.py b/lib.py new file mode 100644 index 0000000..8e47e24 --- /dev/null +++ b/lib.py @@ -0,0 +1,14 @@ +#!/usr/local/bin python3 +import time + +class timer(): + def __init__(self, name, print=False): + self.name = name + self.print = print + def __enter__(self): + self._time = time.time() + def __exit__(self, *exec_info): + elapsed = time.time() - self._time + if self.print: + print(" timer[{}] = {}ms".format(self.name, int(elapsed*1000))) + diff --git a/main.py b/main.py index 241ea92..ffcf85c 100644 --- a/main.py +++ b/main.py @@ -1,185 +1,156 @@ #!/usr/local/bin python3 import pika, requests -import base64, json, multiprocessing, tempfile, time, os, shutil, subprocess, sys -import sm_extract +import base64, json, tempfile, time, os, shutil, sys -SM_ZIP='sm_zip' -SM_ZIP_RESP='sm_zip_resp' -HOST='https://germinate.za3k.com' -SM_OUTPUT_DIR='/jbod0/text-scimag' +import sm +from lib import timer +# Queues +FF_TAR="ff_tar" +FF_TAR_RESP="ff_tar_resp" +LG_TAR="lg_tar" +LG_TAR_RESP="lg_tar_resp" + +# Login credentials +MQ_HOST = "localhost" USER="sm" PASS="McFnE9I4OdwTB" import socket IS_GERMINATE = socket.gethostname() == "germinate" -pool = None -def make_pool(): - global pool - if pool is None: - pool = multiprocessing.Pool() - return pool - -class timer(): - def __init__(self, name, print=False): - self.name = name - self.print = print - def __enter__(self): - self._time = time.time() - def __exit__(self, *exec_info): - elapsed = time.time() - self._time - if self.print: - print(" timer[{}] = {}ms".format(self.name, int(elapsed*1000))) - +# Library functions class QueueConnection(): - def __init__(self): - pass + def __init__(self, queues): + self.queues = queues def __enter__(self): credentials = pika.PlainCredentials(USER, PASS) - self.connection = pika.BlockingConnection(pika.ConnectionParameters('localhost', 5672, '/', credentials)) + self.connection = pika.BlockingConnection(pika.ConnectionParameters(MQ_HOST, 5672, '/', credentials)) self.channel = self.connection.channel() - self.channel.queue_declare(queue=SM_ZIP, durable=True) - self.channel.queue_declare(queue=SM_ZIP_RESP, durable=True) + for queue in self.queues: + self.channel.queue_declare(queue=queue, durable=True) return self.channel def __exit__(self, *exec_info): self.connection.close() +#def threaded_consumer_multiple(queue, consumer_callback): + #while True: + #try: + #except pika.exceptions.ConnectionClosed: + # print("connection close") + # pass + def threaded_consumer(queue, consumer_callback): - while True: + _threaded_consumer(queue, consumer_callback) + +def _threaded_consumer(queue, consumer_callback): + with QueueConnection([queue]) as channel: + channel.basic_qos(prefetch_count=1) + channel.basic_consume( + queue=queue, + consumer_callback=consumer_callback, + ) try: - with QueueConnection() as channel: - channel.basic_qos(prefetch_count=1) - channel.basic_consume( - queue=queue, - consumer_callback=consumer_callback, - ) - print('[*] Waiting for messages. To exit press CTRL+C') - channel.start_consuming() - except pika.exceptions.ConnectionClosed: - pass - -def worker_sm(): - threaded_consumer(SM_ZIP, do_work_sm) -def listener_sm(): - threaded_consumer(SM_ZIP_RESP, do_listen_sm) + print('[*] Waiting for messages. To exit press CTRL+C') + channel.start_consuming() + except KeyboardInterrupt: + channel.stop_consuming() -# Library functions def remove_file(path): try: os.remove(path) except FileNotFoundError: pass -# "Meat" -- actually specific to what's going on. -def enqueuer_sm(): +def enqueue_all(queue, objects): """Load up all the text extraction work to be done""" - with QueueConnection() as channel: - urls = [] - for drive_num in range(1,9): - drive="/libgen12-{drive_num:02}".format(drive_num=drive_num) - for dirname, subdirList, fileList in os.walk(drive): - for fname in fileList: - if fname.endswith(".zip"): - fpath = os.path.join(dirname, fname) - url=HOST + fpath - urls.append(url) - urls.sort() - for url in urls: + with QueueConnection([queue]) as channel: + for o in objects: channel.basic_publish( exchange='', - routing_key=SM_ZIP, - body=url, + routing_key=queue, + body=json.dumps(o), properties=pika.spec.BasicProperties( delivery_mode=2, ) ) -def do_listen_sm(channel, method, properties, content): +def do_work(response_queue, file_processor): + def _inner_do_work(channel, method, properties, message): + message = json.loads(message.decode('ascii')) + task_id = message["task_id"] + remote_url = message["remote_url"] + remote_output_path = message["remote_output_path"] + local_input_path = message.get("local_input_path") + local_output_path = message.get("local_output_path") + + with tempfile.TemporaryDirectory() as d: + print("received", task_id) + input_path = os.path.join(d, "INPUT.zip") + output_path = os.path.join(d, "OUTPUT.tar.gz") + + if IS_GERMINATE and local_input_path is not None: + print(" local copy", local_input_path) + with timer("copy-input", print=True): + shutil.copyfile(local_input_path, input_path) + else: + print(" downloading", remote_url) + with timer("download", print=True): + with requests.get(remote_url, stream=True) as r: + with open(input_path, 'wb') as f: + shutil.copyfileobj(r.raw, f) + + print(" processing", task_id) + with timer("process", print=True): + file_processor(input_path, output_path) + + if IS_GERMINATE and local_output_path is not None: + print(" local copy of output", local_output_path) + with timer("copy-output", print=True): + shutil.copyfile(output_path, local_output_path) + else: + print(" sending-response", output_path) + with timer("send-response", print=True): + with open(output_path, 'rb') as f: + file_contents = base64.b64encode(f.read()).decode('ascii') + channel.basic_publish( + exchange='', + routing_key=response_queue, + body=json.dumps({"response_id": task_id, "local_output_path": remote_output_path, "content": file_contents}), + properties=pika.BasicProperties(delivery_mode=2), + ) + + channel.basic_ack(delivery_tag=method.delivery_tag) + print(" sent", output_path) + return _inner_do_work + +def do_listen(channel, method, properties, message): """Receive extracted text and write it to disk""" - response = json.loads(content) - output_path = os.path.join(SM_OUTPUT_DIR, response["filename"]) + response = json.loads(message) + response_id = response["response_id"] + output_path = response["local_output_path"] - print("saving", output_file) - content = base64.b64decode(response["content"].encode('ascii')) + print("received", response_id) if os.path.exists(output_path): print("skipped", output_path) else: + content = base64.b64decode(response["content"].encode('ascii')) with open(output_file, "wb") as f: f.write(content) print("saved", output_path) channel.basic_ack(delivery_tag=method.delivery_tag) -def do_work_sm(channel, method, properties, url): - url = url.decode('ascii') - input_filename = url.split("/")[-1] - output_filename = input_filename[:-len(".zip")] + "-text.tar.gz" - output_path = url.split("/")[-2] + "/" + output_filename - - if IS_GERMINATE: - source_file = url.replace(HOST, "") - final_save_path = os.path.join(SM_OUTPUT_DIR, output_path) - - try: - if IS_GERMINATE: - print("local copy", source_file) - with timer("copy-input", print=True): - shutil.copyfile(source_file, input_filename) - else: - print("downloading", url, print=True) - with timer("download"): - response = requests.get(url) - with requests.get(url, stream=True) as r: - with open(input_filename, 'wb') as f: - shutil.copyfileobj(r.raw, f) - - print(" processing", input_filename) - with timer("process", print=True): - sm_extract.txt_extract_sm(input_filename, output_filename) - - if IS_GERMINATE: - print(" local copy of output", final_save_path) - with timer("copy-output", print=True): - shutil.copyfile(output_filename, final_save_path) - else: - print(" sending-response", output_path) - with timer("send-response", print=True): - with open(output_filename, 'rb') as f: - file_contents = base64.b64encode(f.read()).decode('ascii') - channel.basic_publish( - exchange='', - routing_key=SM_ZIP_RESP, - body=json.dumps({"url": url, "filename": output_path, "content": file_contents}), - delivery_mode=2, - ) - finally: - remove_file(input_filename) - remove_file(output_filename) - - channel.basic_ack(delivery_tag=method.delivery_tag) - print(" sent", output_filename) - if __name__ == '__main__': args = sys.argv - if len(args) <= 1 or args[1] == "worker": - if True: # single-threaded - worker_sm() - else: # multi-threaded does not work - pool = make_pool() - pool.apply_async(worker_sm) - try: - time.sleep(10000000) - except KeyboardInterrupt: - print(' [*] Terminating...') - pool.terminate() - pool.join() - elif args[1] == "listener": - listener_sm() - elif args[1] == "enqueue": - enqueuer_sm() - elif args[1] == "debug": - sm_extract.txt_extract_sm(args[2], args[3], debug=True) + if len(args) == 3 and sys.argv[1] == "sm" and sys.argv[2] == "worker": + threaded_consumer(sm.QUEUE, do_work(sm.QUEUE_RESP, sm.extract_text)) + elif len(args) == 3 and sys.argv[1] == "sm" and sys.argv[2] == "listener": + threaded_consumer(sm.QUEUE_RESP, do_listen) + elif len(args) == 3 and sys.argv[1] == "sm" and sys.argv[2] == "enqueue": + enqueue_all(sm.QUEUE, sm.generate_tasks()) + elif len(args) == 5 and sys.argv[1] == "sm" and sys.argv[2] == "debug": + sm.extract_text(args[3], args[4], debug=True) else: - print("Usage: rabbitmq.py worker|enqueue") + print("Usage: {} sm worker|listener|enqueue".format(sys.argv[0].split("/")[-1])) sys.exit(1) diff --git a/sm-extract.py b/sm-extract.py deleted file mode 100644 index 5a05053..0000000 --- a/sm-extract.py +++ /dev/null @@ -1,57 +0,0 @@ -#!/usr/local/bin python3 -import pika, requests -import base64, json, multiprocessing, tempfile, time, os, shutil, subprocess, sys - -def txt_extract_sm(input_path, output_path, debug=False): - """Extract text from a .zip file of ~1000 PDFs. Single-threaded.""" - with tempfile.TemporaryDirectory() as td_in, tempfile.TemporaryDirectory() as td_out: - #if debug: - # td_in = "tmp.in" - # td_out = "tmp.out"; os.mkdir(td_out) - #print("working in temp dirs: ", td_in, td_out) - - # Extract .zip file - with timer("zip", print=debug): - subprocess.call(["unzip", "-q", "-n", input_path, "-d", td_in]) - - # Make a list of pdf files extracted - pdfs = [] - with timer("ls", print=debug): - with open(os.path.join(OUTPUT_DIR, "non-pdfs"), "a") as logfile: - for dirname, subdirList, fileList in os.walk(td_in): - assert dirname.startswith(td_in) - short = dirname[len(td_in):].lstrip("/") - for dname in subdirList: - os.mkdir(os.path.join(dirname.replace(td_in, td_out), dname)) - for fname in fileList: - file_name = os.path.join(short, fname) - if fname.lower().endswith(".pdf"): - pdfs.append(file_name) - else: - print(input_path, fname, file=logfile) - print("other", input_path, fname) - - # Extract the text from each (single-threaded) - # INPUT.pdf -> INPUT.pdf.txt - with timer("pdftotext", print=debug): - for pdf in pdfs: - input_pdf = os.path.join(td_in, pdf) - output_txt = os.path.join(td_out, pdf + ".txt") - subprocess.call(["pdftotext", input_pdf, output_txt]) - - # Put the results into a .tar.gz file - with timer("tar", print=debug): - subprocess.call( - ["tar", "czf", output_path, "."], # gzip = z, J = '.xz' - stderr=subprocess.DEVNULL, - cwd=td_out, - env={"XZ_OPT":"-9","GZIP":"-9",} - ) - -if __name__ == '__main__': - args = sys.argv - if len(args) == 2: - txt_extract_sm(args[1], args[2]) - else - print("Usage: sm-extract INPUT.zip OUTPUT.tar.gz") - sys.exit(1) diff --git a/sm.py b/sm.py new file mode 100644 index 0000000..008a878 --- /dev/null +++ b/sm.py @@ -0,0 +1,91 @@ +#!/usr/local/bin python3 +import tempfile, os, subprocess, sys +from lib import timer + +SM_URL='https://germinate.za3k.com{path}' +SM_OUTPUT_PATH='/jbod0/text-scimag/{path}' + +QUEUE='sm_zip' +QUEUE_RESP='sm_zip_resp' + +ERROR_FILE="/var/tmp/sm.nonpdfs" + +def extract_text(input_path, output_path, debug=False): + """Extract text from a .zip file of ~1000 PDFs. Single-threaded.""" + with tempfile.TemporaryDirectory() as td_in, tempfile.TemporaryDirectory() as td_out: + if debug: + td_in = "tmp.in" + td_out = "tmp.out"; os.mkdir(td_out) + #print("working in temp dirs: ", td_in, td_out) + + # Extract .zip file + with timer("zip", print=debug): + subprocess.call(["unzip", "-q", "-n", input_path, "-d", td_in]) + + # Make a list of pdf files extracted + pdfs = [] + other = [] + with timer("ls", print=debug): + for dirname, subdirList, fileList in os.walk(td_in): + assert dirname.startswith(td_in) + short = dirname[len(td_in):].lstrip("/") + for dname in subdirList: + os.mkdir(os.path.join(dirname.replace(td_in, td_out), dname)) + for fname in fileList: + file_name = os.path.join(short, fname) + if fname.lower().endswith(".pdf"): + pdfs.append(file_name) + else: + other.append(fname) + print(" other", input_path, fname) + + if len(other) > 0: + with open(ERROR_FILE, "a") as logfile: + print(input_path, fname, file=logfile) + + # Extract the text from each (single-threaded) + # INPUT.pdf -> INPUT.pdf.txt + with timer("pdftotext", print=debug): + for pdf in pdfs: + input_pdf = os.path.join(td_in, pdf) + output_txt = os.path.join(td_out, pdf + ".txt") + subprocess.call(["pdftotext", input_pdf, output_txt]) + + # Put the results into a .tar.gz file + with timer("tar", print=debug): + output_path = os.path.abspath(output_path) + subprocess.call( + ["tar", "czf", output_path, "."], # gzip = z, J = '.xz' + stderr=subprocess.DEVNULL, + cwd=td_out, + env={"XZ_OPT":"-9","GZIP":"-9",} + ) + +def generate_tasks(): + tasks = [] + for drive_num in range(1,9): + drive="/libgen12-{drive_num:02}".format(drive_num=drive_num) + for dirname, subdirList, fileList in os.walk(drive): + for filename in fileList: + if filename.endswith(".zip"): + fullpath = os.path.join(dirname, filename) + short_in = '/'.join(fullpath.split("/")[-2:]) + short_out = short_in[:-len(".zip")] + ".text.tar.gz" + tasks.append({ + "task_id": filename, + "remote_url": SM_URL.format(path=fullpath), + "local_input_path": fullpath, + "local_output_path": SM_OUTPUT_PATH.format(path=short_out), + "remote_output_path": SM_OUTPUT_PATH.format(path=short_out), + "dataset": "sm", + }) + tasks.sort(key=lambda x: x["remote_url"]) + return tasks + +if __name__ == '__main__': + if len(sys.argv) == 3: + extract_text(sys.argv[1], sys.argv[2]) + else: + print(sys.argv) + print("Usage: sm-extract INPUT.zip OUTPUT.tar.gz") + sys.exit(1)