#!/usr/local/bin python3
import pika, requests
-import base64, json, tempfile, time, os, shutil, sys
-
-import sm
-from lib import timer
+import base64, concurrent.futures, functools, logging, json, tempfile, time, multiprocessing, os, shutil, sys
+import lib, sm
+
+logging.basicConfig(filename='/var/log/mqlg.log',
+ filemode='a',
+ format='[mq-lg] %(threadName)s %(asctime)s,%(msecs)d %(name)s %(levelname)s %(message)s',
+ datefmt='%H:%M:%S',
+ level=logging.INFO)
+printed = logging.StreamHandler()
+printed.setLevel(logging.INFO)#logging.WARNING)
+#logging.getLogger().addHandler(printed)
# Queues
FF_TAR="ff_tar"
PASS="McFnE9I4OdwTB"
import socket
-IS_GERMINATE = socket.gethostname() == "germinate"
+IS_GERMINATE = False #socket.gethostname() == "germinate"
# Library functions
class QueueConnection():
def __exit__(self, *exec_info):
self.connection.close()
-#def threaded_consumer_multiple(queue, consumer_callback):
- #while True:
- #try:
- #except pika.exceptions.ConnectionClosed:
- # print("connection close")
- # pass
-
-def threaded_consumer(queue, consumer_callback):
- _threaded_consumer(queue, consumer_callback)
+def threaded_consumer(queue, response_queue, consumer_callback, threads=None):
+ if threads is None:
+ threads = multiprocessing.cpu_count()
+ executor = concurrent.futures.ThreadPoolExecutor(max_workers=threads)
+ def _inner_cb(channel, method, properties, message):
+ # Queue async work on another thread
+ future = executor.submit(consumer_callback, message)
+ mq_conn = channel.connection
+ # Once the thread is complete, finish the remaining work back on this thread
+ #future.add_done_callback(lambda future: mq_conn.add_callback_threadsafe(functools.partial(_done_cb, channel, method, future)))
+ future.add_done_callback(lambda future: mq_conn.add_timeout(0, functools.partial(_done_cb, channel, method, future)))
+
+ def _done_cb(channel, method, future):
+ # Always executed on main thread
+ response = future.result()
+ if response is not None:
+ with lib.timer("send-response 1/2", logging.warning):
+ channel.basic_publish(
+ exchange='',
+ routing_key=response_queue,
+ body=response,
+ properties=pika.BasicProperties(delivery_mode=2),
+ )
+ logging.warning(" sent")
+ #logging.error("skipping ack during debug test")
+ #sys.exit(0)
+ channel.basic_ack(delivery_tag=method.delivery_tag)
+ logging.info(" done")
-def _threaded_consumer(queue, consumer_callback):
with QueueConnection([queue]) as channel:
- channel.basic_qos(prefetch_count=1)
+ mq_conn = channel.connection
+ channel.basic_qos(prefetch_count=threads)
channel.basic_consume(
queue=queue,
- consumer_callback=consumer_callback,
+ consumer_callback=_inner_cb,
)
try:
- print('[*] Waiting for messages. To exit press CTRL+C')
+ logging.error('[*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()
except KeyboardInterrupt:
+ executor.shutdown()
channel.stop_consuming()
def remove_file(path):
)
)
-def do_work(response_queue, file_processor):
- def _inner_do_work(channel, method, properties, message):
- message = json.loads(message.decode('ascii'))
- task_id = message["task_id"]
- remote_url = message["remote_url"]
- remote_output_path = message["remote_output_path"]
- local_input_path = message.get("local_input_path")
- local_output_path = message.get("local_output_path")
-
- with tempfile.TemporaryDirectory() as d:
- print("received", task_id)
- input_path = os.path.join(d, "INPUT.zip")
- output_path = os.path.join(d, "OUTPUT.tar.gz")
-
- if IS_GERMINATE and local_input_path is not None:
- print(" local copy", local_input_path)
- with timer("copy-input", print=True):
- shutil.copyfile(local_input_path, input_path)
- else:
- print(" downloading", remote_url)
- with timer("download", print=True):
- with requests.get(remote_url, stream=True) as r:
+def do_work(file_processor, message):
+ """
+ File processing RPC.
+ - Takes in JSON describing the file to work on and where to put it.
+ - Can directly write the file, returning None, or returns a JSON response
+
+ Any return value will be sent in the return queue.
+ """
+ message = json.loads(message.decode('ascii'))
+ task_id = message["task_id"]
+ remote_url = message["remote_url"]
+ remote_output_path = message["remote_output_path"]
+ local_input_path = message.get("local_input_path")
+ local_output_path = message.get("local_output_path")
+
+ with tempfile.TemporaryDirectory(prefix="mqlg.") as d:
+ logging.error("received {}".format(task_id))
+ input_path = os.path.join(d, "INPUT.zip")
+ output_path = os.path.join(d, "OUTPUT.tar.gz")
+
+ if IS_GERMINATE and local_input_path is not None:
+ logging.warning(" local copy {}".format(local_input_path))
+ with lib.timer("copy-input", logging.info):
+ shutil.copyfile(local_input_path, input_path)
+ else:
+ logging.warning(" downloading {}".format(remote_url))
+ with lib.timer("download", logging.warning):
+ with requests.get(remote_url, stream=True) as r:
+ if lib.tqdm_avail:
+ # A nice progress bar but read/write instead of copyfileobj slows it down a little.
+ total_size = int(r.headers.get('content-length', 0))
+ progress_bar = lib.tqdm(desc="download", total=total_size, unit='iB', unit_scale=True)
+ with open(input_path, 'wb') as f:
+ for data in r.iter_content(100000):
+ f.write(data)
+ progress_bar.update(len(data))
+ progress_bar.close()
+ else:
with open(input_path, 'wb') as f:
shutil.copyfileobj(r.raw, f)
- print(" processing", task_id)
- with timer("process", print=True):
- file_processor(input_path, output_path)
-
- if IS_GERMINATE and local_output_path is not None:
- print(" local copy of output", local_output_path)
- with timer("copy-output", print=True):
- shutil.copyfile(output_path, local_output_path)
- else:
- print(" sending-response", output_path)
- with timer("send-response", print=True):
- with open(output_path, 'rb') as f:
- file_contents = base64.b64encode(f.read()).decode('ascii')
- channel.basic_publish(
- exchange='',
- routing_key=response_queue,
- body=json.dumps({"response_id": task_id, "local_output_path": remote_output_path, "content": file_contents}),
- properties=pika.BasicProperties(delivery_mode=2),
- )
-
- channel.basic_ack(delivery_tag=method.delivery_tag)
- print(" sent", output_path)
- return _inner_do_work
-
-def do_listen(channel, method, properties, message):
+ logging.warning(" processing {}".format(task_id))
+ with lib.timer("process", logging.warning):
+ file_processor(input_path, output_path)
+
+ if IS_GERMINATE and local_output_path is not None:
+ logging.warning(" local copy of output {}".format(local_output_path))
+ with lib.timer("copy-output", logging.warning):
+ shutil.copyfile(output_path, local_output_path)
+ return None
+ else:
+ logging.warning(" sending-response {}".format(output_path))
+ with lib.timer("send-response 1/2", logging.warning):
+ with open(output_path, 'rb') as f:
+ file_contents = base64.b64encode(f.read()).decode('ascii')
+ return json.dumps({"response_id": task_id, "local_output_path": remote_output_path, "content": file_contents})
+
+def do_listen(message):
"""Receive extracted text and write it to disk"""
response = json.loads(message)
response_id = response["response_id"]
output_path = response["local_output_path"]
- print("received", response_id)
+ logging.error("received {}".format(response_id))
if os.path.exists(output_path):
- print("skipped", output_path)
+ logging.error("skipped {}".format(output_path))
else:
content = base64.b64decode(response["content"].encode('ascii'))
with open(output_file, "wb") as f:
f.write(content)
- print("saved", output_path)
- channel.basic_ack(delivery_tag=method.delivery_tag)
+ logging.warning("saved {}".format(output_path))
if __name__ == '__main__':
args = sys.argv
if len(args) == 3 and sys.argv[1] == "sm" and sys.argv[2] == "worker":
- threaded_consumer(sm.QUEUE, do_work(sm.QUEUE_RESP, sm.extract_text))
+ threaded_consumer(sm.QUEUE, sm.QUEUE_RESP, functools.partial(do_work, sm.extract_text))
elif len(args) == 3 and sys.argv[1] == "sm" and sys.argv[2] == "listener":
threaded_consumer(sm.QUEUE_RESP, do_listen)
elif len(args) == 3 and sys.argv[1] == "sm" and sys.argv[2] == "enqueue":
enqueue_all(sm.QUEUE, sm.generate_tasks())
elif len(args) == 5 and sys.argv[1] == "sm" and sys.argv[2] == "debug":
+ printed.setLevel(logging.DEBUG)
sm.extract_text(args[3], args[4], debug=True)
else:
print("Usage: {} sm worker|listener|enqueue".format(sys.argv[0].split("/")[-1]))
#!/usr/local/bin python3
-import tempfile, os, subprocess, sys
-from lib import timer
+import logging, tempfile, os, subprocess, sys
+import lib
SM_URL='https://germinate.za3k.com{path}'
SM_OUTPUT_PATH='/jbod0/text-scimag/{path}'
+PDF_CONVERSION_TIMEOUT = 30
QUEUE='sm_zip'
QUEUE_RESP='sm_zip_resp'
def extract_text(input_path, output_path, debug=False):
"""Extract text from a .zip file of ~1000 PDFs. Single-threaded."""
- with tempfile.TemporaryDirectory() as td_in, tempfile.TemporaryDirectory() as td_out:
+ with tempfile.TemporaryDirectory(prefix="mqlg-in.") as td_in, tempfile.TemporaryDirectory(prefix="mqlg-out.") as td_out:
if debug:
td_in = "tmp.in"
td_out = "tmp.out"; os.mkdir(td_out)
- #print("working in temp dirs: ", td_in, td_out)
+ logging.info("working in temp dirs. in:{} out:{}".format(td_in, td_out))
# Extract .zip file
- with timer("zip", print=debug):
+ with lib.timer("zip", logging.info):
subprocess.call(["unzip", "-q", "-n", input_path, "-d", td_in])
# Make a list of pdf files extracted
pdfs = []
other = []
- with timer("ls", print=debug):
+ with lib.timer("ls", logging.info):
for dirname, subdirList, fileList in os.walk(td_in):
assert dirname.startswith(td_in)
short = dirname[len(td_in):].lstrip("/")
pdfs.append(file_name)
else:
other.append(fname)
- print(" other", input_path, fname)
+ logging.warning(" other {} {}".format(input_path, fname))
if len(other) > 0:
with open(ERROR_FILE, "a") as logfile:
- print(input_path, fname, file=logfile)
+ print("unrecognized-file", input_path, fname, file=logfile)
# Extract the text from each (single-threaded)
# INPUT.pdf -> INPUT.pdf.txt
- with timer("pdftotext", print=debug):
- for pdf in pdfs:
+ with lib.timer("pdftotext", logging.warning):
+ for pdf in lib.tqdm(pdfs, desc="pdftotext"):
input_pdf = os.path.join(td_in, pdf)
output_txt = os.path.join(td_out, pdf + ".txt")
- subprocess.call(["pdftotext", input_pdf, output_txt])
+ #logging.info("converting {} -> {}".format(input_pdf, output_txt))
+ try:
+ subprocess.call(
+ ["pdftotext", input_pdf, output_txt],
+ timeout=PDF_CONVERSION_TIMEOUT,
+ stderr=subprocess.DEVNULL,
+ )
+ except subprocess.TimeoutExpired:
+ with open(ERROR_FILE, "a") as logfile:
+ print("timeout pdftotext", PDF_CONVERSION_TIMEOUT, input_path, input_pdf, output_txt, file=logfile)
+ logging.warning(" timeout-pdftotext {}s {}".format(PDF_CONVERSION_TIMEOUT, input_pdf))
# Put the results into a .tar.gz file
- with timer("tar", print=debug):
+ with lib.timer("tar", logging.warning):
output_path = os.path.abspath(output_path)
subprocess.call(
["tar", "czf", output_path, "."], # gzip = z, J = '.xz'
if len(sys.argv) == 3:
extract_text(sys.argv[1], sys.argv[2])
else:
- print(sys.argv)
+ print(sys.argv, file=sys.stderr)
print("Usage: sm-extract INPUT.zip OUTPUT.tar.gz")
sys.exit(1)