#!/usr/local/bin python3
import pika, requests
-import base64, json, multiprocessing, tempfile, time, os, shutil, subprocess, sys
-import sm_extract
+import base64, json, tempfile, time, os, shutil, sys
-SM_ZIP='sm_zip'
-SM_ZIP_RESP='sm_zip_resp'
-HOST='https://germinate.za3k.com'
-SM_OUTPUT_DIR='/jbod0/text-scimag'
+import sm
+from lib import timer
+# Queues
+FF_TAR="ff_tar"
+FF_TAR_RESP="ff_tar_resp"
+LG_TAR="lg_tar"
+LG_TAR_RESP="lg_tar_resp"
+
+# Login credentials
+MQ_HOST = "localhost"
USER="sm"
PASS="McFnE9I4OdwTB"
import socket
IS_GERMINATE = socket.gethostname() == "germinate"
-pool = None
-def make_pool():
- global pool
- if pool is None:
- pool = multiprocessing.Pool()
- return pool
-
-class timer():
- def __init__(self, name, print=False):
- self.name = name
- self.print = print
- def __enter__(self):
- self._time = time.time()
- def __exit__(self, *exec_info):
- elapsed = time.time() - self._time
- if self.print:
- print(" timer[{}] = {}ms".format(self.name, int(elapsed*1000)))
-
+# Library functions
class QueueConnection():
- def __init__(self):
- pass
+ def __init__(self, queues):
+ self.queues = queues
def __enter__(self):
credentials = pika.PlainCredentials(USER, PASS)
- self.connection = pika.BlockingConnection(pika.ConnectionParameters('localhost', 5672, '/', credentials))
+ self.connection = pika.BlockingConnection(pika.ConnectionParameters(MQ_HOST, 5672, '/', credentials))
self.channel = self.connection.channel()
- self.channel.queue_declare(queue=SM_ZIP, durable=True)
- self.channel.queue_declare(queue=SM_ZIP_RESP, durable=True)
+ for queue in self.queues:
+ self.channel.queue_declare(queue=queue, durable=True)
return self.channel
def __exit__(self, *exec_info):
self.connection.close()
+#def threaded_consumer_multiple(queue, consumer_callback):
+ #while True:
+ #try:
+ #except pika.exceptions.ConnectionClosed:
+ # print("connection close")
+ # pass
+
def threaded_consumer(queue, consumer_callback):
- while True:
+ _threaded_consumer(queue, consumer_callback)
+
+def _threaded_consumer(queue, consumer_callback):
+ with QueueConnection([queue]) as channel:
+ channel.basic_qos(prefetch_count=1)
+ channel.basic_consume(
+ queue=queue,
+ consumer_callback=consumer_callback,
+ )
try:
- with QueueConnection() as channel:
- channel.basic_qos(prefetch_count=1)
- channel.basic_consume(
- queue=queue,
- consumer_callback=consumer_callback,
- )
- print('[*] Waiting for messages. To exit press CTRL+C')
- channel.start_consuming()
- except pika.exceptions.ConnectionClosed:
- pass
-
-def worker_sm():
- threaded_consumer(SM_ZIP, do_work_sm)
-def listener_sm():
- threaded_consumer(SM_ZIP_RESP, do_listen_sm)
+ print('[*] Waiting for messages. To exit press CTRL+C')
+ channel.start_consuming()
+ except KeyboardInterrupt:
+ channel.stop_consuming()
-# Library functions
def remove_file(path):
try:
os.remove(path)
except FileNotFoundError:
pass
-# "Meat" -- actually specific to what's going on.
-def enqueuer_sm():
+def enqueue_all(queue, objects):
"""Load up all the text extraction work to be done"""
- with QueueConnection() as channel:
- urls = []
- for drive_num in range(1,9):
- drive="/libgen12-{drive_num:02}".format(drive_num=drive_num)
- for dirname, subdirList, fileList in os.walk(drive):
- for fname in fileList:
- if fname.endswith(".zip"):
- fpath = os.path.join(dirname, fname)
- url=HOST + fpath
- urls.append(url)
- urls.sort()
- for url in urls:
+ with QueueConnection([queue]) as channel:
+ for o in objects:
channel.basic_publish(
exchange='',
- routing_key=SM_ZIP,
- body=url,
+ routing_key=queue,
+ body=json.dumps(o),
properties=pika.spec.BasicProperties(
delivery_mode=2,
)
)
-def do_listen_sm(channel, method, properties, content):
+def do_work(response_queue, file_processor):
+ def _inner_do_work(channel, method, properties, message):
+ message = json.loads(message.decode('ascii'))
+ task_id = message["task_id"]
+ remote_url = message["remote_url"]
+ remote_output_path = message["remote_output_path"]
+ local_input_path = message.get("local_input_path")
+ local_output_path = message.get("local_output_path")
+
+ with tempfile.TemporaryDirectory() as d:
+ print("received", task_id)
+ input_path = os.path.join(d, "INPUT.zip")
+ output_path = os.path.join(d, "OUTPUT.tar.gz")
+
+ if IS_GERMINATE and local_input_path is not None:
+ print(" local copy", local_input_path)
+ with timer("copy-input", print=True):
+ shutil.copyfile(local_input_path, input_path)
+ else:
+ print(" downloading", remote_url)
+ with timer("download", print=True):
+ with requests.get(remote_url, stream=True) as r:
+ with open(input_path, 'wb') as f:
+ shutil.copyfileobj(r.raw, f)
+
+ print(" processing", task_id)
+ with timer("process", print=True):
+ file_processor(input_path, output_path)
+
+ if IS_GERMINATE and local_output_path is not None:
+ print(" local copy of output", local_output_path)
+ with timer("copy-output", print=True):
+ shutil.copyfile(output_path, local_output_path)
+ else:
+ print(" sending-response", output_path)
+ with timer("send-response", print=True):
+ with open(output_path, 'rb') as f:
+ file_contents = base64.b64encode(f.read()).decode('ascii')
+ channel.basic_publish(
+ exchange='',
+ routing_key=response_queue,
+ body=json.dumps({"response_id": task_id, "local_output_path": remote_output_path, "content": file_contents}),
+ properties=pika.BasicProperties(delivery_mode=2),
+ )
+
+ channel.basic_ack(delivery_tag=method.delivery_tag)
+ print(" sent", output_path)
+ return _inner_do_work
+
+def do_listen(channel, method, properties, message):
"""Receive extracted text and write it to disk"""
- response = json.loads(content)
- output_path = os.path.join(SM_OUTPUT_DIR, response["filename"])
+ response = json.loads(message)
+ response_id = response["response_id"]
+ output_path = response["local_output_path"]
- print("saving", output_file)
- content = base64.b64decode(response["content"].encode('ascii'))
+ print("received", response_id)
if os.path.exists(output_path):
print("skipped", output_path)
else:
+ content = base64.b64decode(response["content"].encode('ascii'))
with open(output_file, "wb") as f:
f.write(content)
print("saved", output_path)
channel.basic_ack(delivery_tag=method.delivery_tag)
-def do_work_sm(channel, method, properties, url):
- url = url.decode('ascii')
- input_filename = url.split("/")[-1]
- output_filename = input_filename[:-len(".zip")] + "-text.tar.gz"
- output_path = url.split("/")[-2] + "/" + output_filename
-
- if IS_GERMINATE:
- source_file = url.replace(HOST, "")
- final_save_path = os.path.join(SM_OUTPUT_DIR, output_path)
-
- try:
- if IS_GERMINATE:
- print("local copy", source_file)
- with timer("copy-input", print=True):
- shutil.copyfile(source_file, input_filename)
- else:
- print("downloading", url, print=True)
- with timer("download"):
- response = requests.get(url)
- with requests.get(url, stream=True) as r:
- with open(input_filename, 'wb') as f:
- shutil.copyfileobj(r.raw, f)
-
- print(" processing", input_filename)
- with timer("process", print=True):
- sm_extract.txt_extract_sm(input_filename, output_filename)
-
- if IS_GERMINATE:
- print(" local copy of output", final_save_path)
- with timer("copy-output", print=True):
- shutil.copyfile(output_filename, final_save_path)
- else:
- print(" sending-response", output_path)
- with timer("send-response", print=True):
- with open(output_filename, 'rb') as f:
- file_contents = base64.b64encode(f.read()).decode('ascii')
- channel.basic_publish(
- exchange='',
- routing_key=SM_ZIP_RESP,
- body=json.dumps({"url": url, "filename": output_path, "content": file_contents}),
- delivery_mode=2,
- )
- finally:
- remove_file(input_filename)
- remove_file(output_filename)
-
- channel.basic_ack(delivery_tag=method.delivery_tag)
- print(" sent", output_filename)
-
if __name__ == '__main__':
args = sys.argv
- if len(args) <= 1 or args[1] == "worker":
- if True: # single-threaded
- worker_sm()
- else: # multi-threaded does not work
- pool = make_pool()
- pool.apply_async(worker_sm)
- try:
- time.sleep(10000000)
- except KeyboardInterrupt:
- print(' [*] Terminating...')
- pool.terminate()
- pool.join()
- elif args[1] == "listener":
- listener_sm()
- elif args[1] == "enqueue":
- enqueuer_sm()
- elif args[1] == "debug":
- sm_extract.txt_extract_sm(args[2], args[3], debug=True)
+ if len(args) == 3 and sys.argv[1] == "sm" and sys.argv[2] == "worker":
+ threaded_consumer(sm.QUEUE, do_work(sm.QUEUE_RESP, sm.extract_text))
+ elif len(args) == 3 and sys.argv[1] == "sm" and sys.argv[2] == "listener":
+ threaded_consumer(sm.QUEUE_RESP, do_listen)
+ elif len(args) == 3 and sys.argv[1] == "sm" and sys.argv[2] == "enqueue":
+ enqueue_all(sm.QUEUE, sm.generate_tasks())
+ elif len(args) == 5 and sys.argv[1] == "sm" and sys.argv[2] == "debug":
+ sm.extract_text(args[3], args[4], debug=True)
else:
- print("Usage: rabbitmq.py worker|enqueue")
+ print("Usage: {} sm worker|listener|enqueue".format(sys.argv[0].split("/")[-1]))
sys.exit(1)
+++ /dev/null
-#!/usr/local/bin python3
-import pika, requests
-import base64, json, multiprocessing, tempfile, time, os, shutil, subprocess, sys
-
-def txt_extract_sm(input_path, output_path, debug=False):
- """Extract text from a .zip file of ~1000 PDFs. Single-threaded."""
- with tempfile.TemporaryDirectory() as td_in, tempfile.TemporaryDirectory() as td_out:
- #if debug:
- # td_in = "tmp.in"
- # td_out = "tmp.out"; os.mkdir(td_out)
- #print("working in temp dirs: ", td_in, td_out)
-
- # Extract .zip file
- with timer("zip", print=debug):
- subprocess.call(["unzip", "-q", "-n", input_path, "-d", td_in])
-
- # Make a list of pdf files extracted
- pdfs = []
- with timer("ls", print=debug):
- with open(os.path.join(OUTPUT_DIR, "non-pdfs"), "a") as logfile:
- for dirname, subdirList, fileList in os.walk(td_in):
- assert dirname.startswith(td_in)
- short = dirname[len(td_in):].lstrip("/")
- for dname in subdirList:
- os.mkdir(os.path.join(dirname.replace(td_in, td_out), dname))
- for fname in fileList:
- file_name = os.path.join(short, fname)
- if fname.lower().endswith(".pdf"):
- pdfs.append(file_name)
- else:
- print(input_path, fname, file=logfile)
- print("other", input_path, fname)
-
- # Extract the text from each (single-threaded)
- # INPUT.pdf -> INPUT.pdf.txt
- with timer("pdftotext", print=debug):
- for pdf in pdfs:
- input_pdf = os.path.join(td_in, pdf)
- output_txt = os.path.join(td_out, pdf + ".txt")
- subprocess.call(["pdftotext", input_pdf, output_txt])
-
- # Put the results into a .tar.gz file
- with timer("tar", print=debug):
- subprocess.call(
- ["tar", "czf", output_path, "."], # gzip = z, J = '.xz'
- stderr=subprocess.DEVNULL,
- cwd=td_out,
- env={"XZ_OPT":"-9","GZIP":"-9",}
- )
-
-if __name__ == '__main__':
- args = sys.argv
- if len(args) == 2:
- txt_extract_sm(args[1], args[2])
- else
- print("Usage: sm-extract INPUT.zip OUTPUT.tar.gz")
- sys.exit(1)
--- /dev/null
+#!/usr/local/bin python3
+import tempfile, os, subprocess, sys
+from lib import timer
+
+SM_URL='https://germinate.za3k.com{path}'
+SM_OUTPUT_PATH='/jbod0/text-scimag/{path}'
+
+QUEUE='sm_zip'
+QUEUE_RESP='sm_zip_resp'
+
+ERROR_FILE="/var/tmp/sm.nonpdfs"
+
+def extract_text(input_path, output_path, debug=False):
+ """Extract text from a .zip file of ~1000 PDFs. Single-threaded."""
+ with tempfile.TemporaryDirectory() as td_in, tempfile.TemporaryDirectory() as td_out:
+ if debug:
+ td_in = "tmp.in"
+ td_out = "tmp.out"; os.mkdir(td_out)
+ #print("working in temp dirs: ", td_in, td_out)
+
+ # Extract .zip file
+ with timer("zip", print=debug):
+ subprocess.call(["unzip", "-q", "-n", input_path, "-d", td_in])
+
+ # Make a list of pdf files extracted
+ pdfs = []
+ other = []
+ with timer("ls", print=debug):
+ for dirname, subdirList, fileList in os.walk(td_in):
+ assert dirname.startswith(td_in)
+ short = dirname[len(td_in):].lstrip("/")
+ for dname in subdirList:
+ os.mkdir(os.path.join(dirname.replace(td_in, td_out), dname))
+ for fname in fileList:
+ file_name = os.path.join(short, fname)
+ if fname.lower().endswith(".pdf"):
+ pdfs.append(file_name)
+ else:
+ other.append(fname)
+ print(" other", input_path, fname)
+
+ if len(other) > 0:
+ with open(ERROR_FILE, "a") as logfile:
+ print(input_path, fname, file=logfile)
+
+ # Extract the text from each (single-threaded)
+ # INPUT.pdf -> INPUT.pdf.txt
+ with timer("pdftotext", print=debug):
+ for pdf in pdfs:
+ input_pdf = os.path.join(td_in, pdf)
+ output_txt = os.path.join(td_out, pdf + ".txt")
+ subprocess.call(["pdftotext", input_pdf, output_txt])
+
+ # Put the results into a .tar.gz file
+ with timer("tar", print=debug):
+ output_path = os.path.abspath(output_path)
+ subprocess.call(
+ ["tar", "czf", output_path, "."], # gzip = z, J = '.xz'
+ stderr=subprocess.DEVNULL,
+ cwd=td_out,
+ env={"XZ_OPT":"-9","GZIP":"-9",}
+ )
+
+def generate_tasks():
+ tasks = []
+ for drive_num in range(1,9):
+ drive="/libgen12-{drive_num:02}".format(drive_num=drive_num)
+ for dirname, subdirList, fileList in os.walk(drive):
+ for filename in fileList:
+ if filename.endswith(".zip"):
+ fullpath = os.path.join(dirname, filename)
+ short_in = '/'.join(fullpath.split("/")[-2:])
+ short_out = short_in[:-len(".zip")] + ".text.tar.gz"
+ tasks.append({
+ "task_id": filename,
+ "remote_url": SM_URL.format(path=fullpath),
+ "local_input_path": fullpath,
+ "local_output_path": SM_OUTPUT_PATH.format(path=short_out),
+ "remote_output_path": SM_OUTPUT_PATH.format(path=short_out),
+ "dataset": "sm",
+ })
+ tasks.sort(key=lambda x: x["remote_url"])
+ return tasks
+
+if __name__ == '__main__':
+ if len(sys.argv) == 3:
+ extract_text(sys.argv[1], sys.argv[2])
+ else:
+ print(sys.argv)
+ print("Usage: sm-extract INPUT.zip OUTPUT.tar.gz")
+ sys.exit(1)