format='[mq-lg] %(threadName)s %(asctime)s,%(msecs)d %(name)s %(levelname)s %(message)s',
datefmt='%H:%M:%S',
level=logging.INFO)
+printed = logging.StreamHandler()
# Queues
FF_TAR="ff_tar"
LG_TAR="lg_tar"
LG_TAR_RESP="lg_tar_resp"
-# Treat germinate specially
-IS_GERMINATE = socket.gethostname() == "germinate"
-
# Login credentials
-MQ_HOST = os.getenv("MQ_HOST", "localhost" if IS_GERMINATE else "germinate.za3k.com")
+MQ_HOST = os.getenv("MQ_HOST", "germinate.za3k.com")
USER="sm"
PASS="McFnE9I4OdwTB"
task_id = message["task_id"]
remote_url = message["remote_url"]
remote_output_path = message["remote_output_path"]
- local_input_path = message.get("local_input_path")
- local_output_path = message.get("local_output_path")
with tempfile.TemporaryDirectory(prefix="mqlg.") as d:
logging.error("received {}".format(task_id))
- input_path = os.path.join(d, "INPUT.zip")
output_path = os.path.join(d, "OUTPUT.tar.gz")
- if IS_GERMINATE and local_input_path is not None:
- logging.warning(" local copy {}".format(local_input_path))
- with lib.timer("copy-input", logging.info):
- shutil.copyfile(local_input_path, input_path)
- else:
- logging.warning(" downloading {}".format(remote_url))
- with lib.timer("download", logging.warning):
- with requests.get(remote_url, stream=True) as r:
- if lib.tqdm_avail:
- # A nice progress bar but read/write instead of copyfileobj slows it down a little.
- total_size = int(r.headers.get('content-length', 0))
- progress_bar = lib.tqdm(desc="download", total=total_size, unit='iB', unit_scale=True, leave=False)
- with open(input_path, 'wb') as f:
- for data in r.iter_content(100000):
- f.write(data)
- progress_bar.update(len(data))
- progress_bar.close()
- else:
- with open(input_path, 'wb') as f:
- shutil.copyfileobj(r.raw, f)
-
logging.warning(" processing {}".format(task_id))
with lib.timer("process", logging.warning):
- file_processor(input_path, output_path)
+ file_processor(remote_url, output_path)
- if IS_GERMINATE and local_output_path is not None:
- logging.warning(" local copy of output {}".format(local_output_path))
- with lib.timer("copy-output", logging.warning):
- shutil.copyfile(output_path, local_output_path)
- return None
- else:
- logging.warning(" sending-response {}".format(output_path))
- with lib.timer("send-response 1/2", logging.warning):
- with open(output_path, 'rb') as f:
- file_contents = base64.b64encode(f.read()).decode('ascii')
- return json.dumps({"response_id": task_id, "local_output_path": remote_output_path, "content": file_contents})
+ logging.warning(" sending-response {}".format(output_path))
+ with lib.timer("send-response 1/2", logging.warning):
+ with open(output_path, 'rb') as f:
+ file_contents = base64.b64encode(f.read()).decode('ascii')
+ return json.dumps({"response_id": task_id, "local_output_path": remote_output_path, "content": file_contents})
def do_listen(message):
"""Receive extracted text and write it to disk"""
while len(args) > 0:
arg, *args = args
if arg == "--debug":
- printed = logging.StreamHandler()
printed.setLevel(logging.INFO)
- logging.getLogger().addHandler(printed)
elif arg == "--threads":
if len(args) < 1:
raise InvalidArguments()
if command == "worker":
if len(args) != 0:
raise InvalidArguments()
+ logging.getLogger().addHandler(printed)
threaded_consumer(module.QUEUE, module.QUEUE_RESP, functools.partial(do_work, module.extract_text), threads=threads)
elif command == "listener":
if len(args) != 0:
raise InvalidArguments()
+ logging.getLogger().addHandler(printed)
threaded_consumer(module.QUEUE_RESP, do_listen, threads=threads)
elif command == "enqueue":
if len(args) != 0:
raise InvalidArguments()
+ logging.getLogger().addHandler(printed)
enqueue_all(queue, module.generate_tasks())
elif command == "debug":
if len(args) != 2:
raise InvalidArguments()
printed.setLevel(logging.DEBUG)
+ logging.getLogger().addHandler(printed)
module.extract_text(*args, debug=True)
except InvalidArguments:
program_name = sys.argv[0].split("/")[-1]
ERROR_FILE=os.path.expanduser("~/sm.errors")
-def extract_text(input_path, output_path, debug=False):
+def extract_text(input_url, output_path, debug=False):
"""Extract text from a .zip file of ~1000 PDFs. Single-threaded."""
with tempfile.TemporaryDirectory(prefix="mqlg-in.") as td_in, tempfile.TemporaryDirectory(prefix="mqlg-out.") as td_out:
if debug:
td_in = "tmp.in"
td_out = "tmp.out"; os.mkdir(td_out)
logging.info("working in temp dirs. in:{} out:{}".format(td_in, td_out))
+ input_path = os.path.join(td_in, "INPUT.zip")
+
+ # Do the download
+ with lib.timer("wget-download", logging.info):
+ subprocess.call(["wget", "-nv", "-O", input_path, input_url])
# Extract .zip file
with lib.timer("zip", logging.info):
# Extract the text from each (single-threaded)
# INPUT.pdf -> INPUT.pdf.txt
with lib.timer("pdftotext", logging.warning):
- for pdf in lib.tqdm(pdfs, desc="pdftotext", leave=False):
+ #for pdf in lib.tqdm(pdfs, desc="pdftotext", leave=False):
+ for pdf in pdfs:
input_pdf = os.path.join(td_in, pdf)
output_txt = os.path.join(td_out, pdf + ".txt")
#logging.info("converting {} -> {}".format(input_pdf, output_txt))
tasks.append({
"task_id": filename,
"remote_url": SM_URL.format(path=fullpath),
- "local_input_path": fullpath,
- "local_output_path": SM_OUTPUT_PATH.format(path=short_out),
+ #"local_input_path": fullpath,
+ #"local_output_path": SM_OUTPUT_PATH.format(path=short_out),
"remote_output_path": SM_OUTPUT_PATH.format(path=short_out),
"dataset": "sm",
})