From b31eb81ad610c67eb4a4bb03965ee15c669d3428 Mon Sep 17 00:00:00 2001 From: Zachary Vance Date: Sat, 14 Aug 2021 01:48:57 -0700 Subject: [PATCH] Remove local support. Switch to url processing instead of file processing because python-driven 'requests' was occupying 600% CPU. Remove progress bars as another possible culprit --- main.py | 54 ++++++++++++------------------------------------------ sm.py | 14 ++++++++++---- 2 files changed, 22 insertions(+), 46 deletions(-) diff --git a/main.py b/main.py index c31d896..2908af2 100755 --- a/main.py +++ b/main.py @@ -8,6 +8,7 @@ logging.basicConfig(filename=os.path.expanduser('~/sm.log'), format='[mq-lg] %(threadName)s %(asctime)s,%(msecs)d %(name)s %(levelname)s %(message)s', datefmt='%H:%M:%S', level=logging.INFO) +printed = logging.StreamHandler() # Queues FF_TAR="ff_tar" @@ -15,11 +16,8 @@ FF_TAR_RESP="ff_tar_resp" LG_TAR="lg_tar" LG_TAR_RESP="lg_tar_resp" -# Treat germinate specially -IS_GERMINATE = socket.gethostname() == "germinate" - # Login credentials -MQ_HOST = os.getenv("MQ_HOST", "localhost" if IS_GERMINATE else "germinate.za3k.com") +MQ_HOST = os.getenv("MQ_HOST", "germinate.za3k.com") USER="sm" PASS="McFnE9I4OdwTB" @@ -112,50 +110,20 @@ def do_work(file_processor, message): task_id = message["task_id"] remote_url = message["remote_url"] remote_output_path = message["remote_output_path"] - local_input_path = message.get("local_input_path") - local_output_path = message.get("local_output_path") with tempfile.TemporaryDirectory(prefix="mqlg.") as d: logging.error("received {}".format(task_id)) - input_path = os.path.join(d, "INPUT.zip") output_path = os.path.join(d, "OUTPUT.tar.gz") - if IS_GERMINATE and local_input_path is not None: - logging.warning(" local copy {}".format(local_input_path)) - with lib.timer("copy-input", logging.info): - shutil.copyfile(local_input_path, input_path) - else: - logging.warning(" downloading {}".format(remote_url)) - with lib.timer("download", logging.warning): - with requests.get(remote_url, stream=True) as r: - if lib.tqdm_avail: - # A nice progress bar but read/write instead of copyfileobj slows it down a little. - total_size = int(r.headers.get('content-length', 0)) - progress_bar = lib.tqdm(desc="download", total=total_size, unit='iB', unit_scale=True, leave=False) - with open(input_path, 'wb') as f: - for data in r.iter_content(100000): - f.write(data) - progress_bar.update(len(data)) - progress_bar.close() - else: - with open(input_path, 'wb') as f: - shutil.copyfileobj(r.raw, f) - logging.warning(" processing {}".format(task_id)) with lib.timer("process", logging.warning): - file_processor(input_path, output_path) + file_processor(remote_url, output_path) - if IS_GERMINATE and local_output_path is not None: - logging.warning(" local copy of output {}".format(local_output_path)) - with lib.timer("copy-output", logging.warning): - shutil.copyfile(output_path, local_output_path) - return None - else: - logging.warning(" sending-response {}".format(output_path)) - with lib.timer("send-response 1/2", logging.warning): - with open(output_path, 'rb') as f: - file_contents = base64.b64encode(f.read()).decode('ascii') - return json.dumps({"response_id": task_id, "local_output_path": remote_output_path, "content": file_contents}) + logging.warning(" sending-response {}".format(output_path)) + with lib.timer("send-response 1/2", logging.warning): + with open(output_path, 'rb') as f: + file_contents = base64.b64encode(f.read()).decode('ascii') + return json.dumps({"response_id": task_id, "local_output_path": remote_output_path, "content": file_contents}) def do_listen(message): """Receive extracted text and write it to disk""" @@ -186,9 +154,7 @@ if __name__ == '__main__': while len(args) > 0: arg, *args = args if arg == "--debug": - printed = logging.StreamHandler() printed.setLevel(logging.INFO) - logging.getLogger().addHandler(printed) elif arg == "--threads": if len(args) < 1: raise InvalidArguments() @@ -211,19 +177,23 @@ if __name__ == '__main__': if command == "worker": if len(args) != 0: raise InvalidArguments() + logging.getLogger().addHandler(printed) threaded_consumer(module.QUEUE, module.QUEUE_RESP, functools.partial(do_work, module.extract_text), threads=threads) elif command == "listener": if len(args) != 0: raise InvalidArguments() + logging.getLogger().addHandler(printed) threaded_consumer(module.QUEUE_RESP, do_listen, threads=threads) elif command == "enqueue": if len(args) != 0: raise InvalidArguments() + logging.getLogger().addHandler(printed) enqueue_all(queue, module.generate_tasks()) elif command == "debug": if len(args) != 2: raise InvalidArguments() printed.setLevel(logging.DEBUG) + logging.getLogger().addHandler(printed) module.extract_text(*args, debug=True) except InvalidArguments: program_name = sys.argv[0].split("/")[-1] diff --git a/sm.py b/sm.py index 417225d..e168b54 100644 --- a/sm.py +++ b/sm.py @@ -11,13 +11,18 @@ QUEUE_RESP='sm_zip_resp' ERROR_FILE=os.path.expanduser("~/sm.errors") -def extract_text(input_path, output_path, debug=False): +def extract_text(input_url, output_path, debug=False): """Extract text from a .zip file of ~1000 PDFs. Single-threaded.""" with tempfile.TemporaryDirectory(prefix="mqlg-in.") as td_in, tempfile.TemporaryDirectory(prefix="mqlg-out.") as td_out: if debug: td_in = "tmp.in" td_out = "tmp.out"; os.mkdir(td_out) logging.info("working in temp dirs. in:{} out:{}".format(td_in, td_out)) + input_path = os.path.join(td_in, "INPUT.zip") + + # Do the download + with lib.timer("wget-download", logging.info): + subprocess.call(["wget", "-nv", "-O", input_path, input_url]) # Extract .zip file with lib.timer("zip", logging.info): @@ -47,7 +52,8 @@ def extract_text(input_path, output_path, debug=False): # Extract the text from each (single-threaded) # INPUT.pdf -> INPUT.pdf.txt with lib.timer("pdftotext", logging.warning): - for pdf in lib.tqdm(pdfs, desc="pdftotext", leave=False): + #for pdf in lib.tqdm(pdfs, desc="pdftotext", leave=False): + for pdf in pdfs: input_pdf = os.path.join(td_in, pdf) output_txt = os.path.join(td_out, pdf + ".txt") #logging.info("converting {} -> {}".format(input_pdf, output_txt)) @@ -85,8 +91,8 @@ def generate_tasks(): tasks.append({ "task_id": filename, "remote_url": SM_URL.format(path=fullpath), - "local_input_path": fullpath, - "local_output_path": SM_OUTPUT_PATH.format(path=short_out), + #"local_input_path": fullpath, + #"local_output_path": SM_OUTPUT_PATH.format(path=short_out), "remote_output_path": SM_OUTPUT_PATH.format(path=short_out), "dataset": "sm", }) -- 2.47.3