#!/usr/local/bin python3
import pika, requests
import base64, json, multiprocessing, tempfile, time, os, shutil, subprocess, sys
+import sm_extract
SM_ZIP='sm_zip'
SM_ZIP_RESP='sm_zip_resp'
HOST='https://germinate.za3k.com'
-OUTPUT_DIR='/jbod0/text-scimag'
+SM_OUTPUT_DIR='/jbod0/text-scimag'
USER="sm"
PASS="McFnE9I4OdwTB"
IS_GERMINATE = socket.gethostname() == "germinate"
pool = None
-def pool():
+def make_pool():
global pool
if pool is None:
pool = multiprocessing.Pool()
return pool
class timer():
- def __init__(self, name):
+ def __init__(self, name, print=False):
self.name = name
+ self.print = print
def __enter__(self):
self._time = time.time()
def __exit__(self, *exec_info):
elapsed = time.time() - self._time
- print("timer[{}] = {}ms".format(self.name, int(elapsed*1000)))
+ if self.print:
+ print(" timer[{}] = {}ms".format(self.name, int(elapsed*1000)))
-def main(args):
- credentials = pika.PlainCredentials(USER, PASS)
- connection = pika.BlockingConnection(pika.ConnectionParameters('localhost', 5672, '/', credentials))
- channel = connection.channel()
+class QueueConnection():
+ def __init__(self):
+ pass
+ def __enter__(self):
+ credentials = pika.PlainCredentials(USER, PASS)
+ self.connection = pika.BlockingConnection(pika.ConnectionParameters('localhost', 5672, '/', credentials))
+ self.channel = self.connection.channel()
- channel.queue_declare(queue=SM_ZIP, durable=True)
- channel.queue_declare(queue=SM_ZIP_RESP, durable=True)
- if len(args) <= 1 or args[1] == "worker":
- worker_sm(channel)
- if args[1] == "listener":
- listener_sm(channel)
- elif args[1] == "enqueue":
- enqueuer_sm(channel)
- elif args[1] == "debug":
- txt_extract_sm(args[2], args[3], debug=True)
- else:
- print("Usage: rabbitmq.py worker|enqueue")
- sys.exit(1)
- connection.close()
-def worker_sm(channel):
- channel.basic_consume(
- queue=SM_ZIP,
- consumer_callback=do_work_sm,
- )
- channel.basic_qos(prefetch_count=1)
- print(' [*] Waiting for messages. To exit press CTRL+C')
- channel.start_consuming()
-def listener_sm(channel):
- channel.basic_consume(
- queue=SM_ZIP_RESP,
- consumer_callback=do_listen_sm,
- )
- channel.basic_qos(prefetch_count=1)
- print(' [*] Waiting for messages. To exit press CTRL+C')
- channel.start_consuming()
+ self.channel.queue_declare(queue=SM_ZIP, durable=True)
+ self.channel.queue_declare(queue=SM_ZIP_RESP, durable=True)
+ return self.channel
+ def __exit__(self, *exec_info):
+ self.connection.close()
+
+def threaded_consumer(queue, consumer_callback):
+ while True:
+ try:
+ with QueueConnection() as channel:
+ channel.basic_qos(prefetch_count=1)
+ channel.basic_consume(
+ queue=queue,
+ consumer_callback=consumer_callback,
+ )
+ print('[*] Waiting for messages. To exit press CTRL+C')
+ channel.start_consuming()
+ except pika.exceptions.ConnectionClosed:
+ pass
+
+def worker_sm():
+ threaded_consumer(SM_ZIP, do_work_sm)
+def listener_sm():
+ threaded_consumer(SM_ZIP_RESP, do_listen_sm)
# Library functions
def remove_file(path):
pass
# "Meat" -- actually specific to what's going on.
-def enqueuer_sm(channel):
+def enqueuer_sm():
"""Load up all the text extraction work to be done"""
- for drive_num in range(0,9):
- drive="/libgen12-{drive_num:02}".format(drive_num=drive_num)
- for dirname, subdirList, fileList in os.walk(drive):
- for fname in fileList:
- if fname.endswith(".zip"):
- fpath = os.path.join(dirname, fname)
- url=HOST + fpath
- print("enqueue", url)
- channel.basic_publish(
- exchange='',
- routing_key=SM_ZIP,
- body=url,
- )
- sys.exit(0)
+ with QueueConnection() as channel:
+ urls = []
+ for drive_num in range(1,9):
+ drive="/libgen12-{drive_num:02}".format(drive_num=drive_num)
+ for dirname, subdirList, fileList in os.walk(drive):
+ for fname in fileList:
+ if fname.endswith(".zip"):
+ fpath = os.path.join(dirname, fname)
+ url=HOST + fpath
+ urls.append(url)
+ urls.sort()
+ for url in urls:
+ channel.basic_publish(
+ exchange='',
+ routing_key=SM_ZIP,
+ body=url,
+ properties=pika.spec.BasicProperties(
+ delivery_mode=2,
+ )
+ )
def do_listen_sm(channel, method, properties, content):
"""Receive extracted text and write it to disk"""
response = json.loads(content)
- output_file = os.path.join(OUTPUT_DIR, response["filename"])
+ output_path = os.path.join(SM_OUTPUT_DIR, response["filename"])
print("saving", output_file)
content = base64.b64decode(response["content"].encode('ascii'))
- if os.path.exists(output_file):
- print("skipped", output_file)
+ if os.path.exists(output_path):
+ print("skipped", output_path)
else:
with open(output_file, "wb") as f:
f.write(content)
- print("saved", output_file)
+ print("saved", output_path)
channel.basic_ack(delivery_tag=method.delivery_tag)
-def txt_extract_sm(input_path, output_path, debug=False):
- """Extract text from a .zip file of ~1000 PDFs"""
- # Step 1: Extract
- with tempfile.TemporaryDirectory() as td_in, tempfile.TemporaryDirectory() as td_out:
- if debug:
- td_in = "tmp.in"
- td_out = "tmp.out"; os.mkdir(td_out)
- print("working in temp dirs: ", td_in, td_out)
- with timer("zip"):
- subprocess.call(["unzip", "-q", "-n", input_path, "-d", td_in])
- pdfs = []
- with timer("ls"):
- for dirname, subdirList, fileList in os.walk(td_in):
- assert dirname.startswith(td_in)
- short = dirname[len(td_in):].lstrip("/")
- for dname in subdirList:
- os.mkdir(os.path.join(dirname.replace(td_in, td_out), dname))
- for fname in fileList:
- if fname.endswith(".pdf"):
- pdf = os.path.join(short, fname)
- pdfs.append(pdf)
- else:
- print(fname)
- with timer("pdftotext"):
- input_pdf = os.path.join(td_in, pdf)
- output_txt = os.path.join(td_out, pdf + ".txt")
- subprocess.call(["pdftotext", input_pdf, output_txt])
- with timer("tar"):
- subprocess.call(["tar", "czf", output_path, td_out])
-
def do_work_sm(channel, method, properties, url):
url = url.decode('ascii')
input_filename = url.split("/")[-1]
- output_filename = input_filename + ".out"
+ output_filename = input_filename[:-len(".zip")] + "-text.tar.gz"
+ output_path = url.split("/")[-2] + "/" + output_filename
if IS_GERMINATE:
source_file = url.replace(HOST, "")
- final_save_path = os.path.join(OUTPUT_DIR, output_filename)
+ final_save_path = os.path.join(SM_OUTPUT_DIR, output_path)
try:
if IS_GERMINATE:
print("local copy", source_file)
- with timer("copy"):
+ with timer("copy-input", print=True):
shutil.copyfile(source_file, input_filename)
else:
- print("downloading", url)
+ print("downloading", url, print=True)
with timer("download"):
response = requests.get(url)
with requests.get(url, stream=True) as r:
with open(input_filename, 'wb') as f:
shutil.copyfileobj(r.raw, f)
- print("processing", input_filename)
- with timer("process"):
- txt_extract_sm(input_filename, output_filename)
+ print(" processing", input_filename)
+ with timer("process", print=True):
+ sm_extract.txt_extract_sm(input_filename, output_filename)
if IS_GERMINATE:
- print("local copy of output", output_filename)
- with timer("copy"):
+ print(" local copy of output", final_save_path)
+ with timer("copy-output", print=True):
shutil.copyfile(output_filename, final_save_path)
else:
- print("sending-response", output_filename)
- with timer("send-response"):
+ print(" sending-response", output_path)
+ with timer("send-response", print=True):
with open(output_filename, 'rb') as f:
file_contents = base64.b64encode(f.read()).decode('ascii')
channel.basic_publish(
exchange='',
routing_key=SM_ZIP_RESP,
- body=json.dumps({"url": url, "filename": output_filename, "content": file_contents}),
+ body=json.dumps({"url": url, "filename": output_path, "content": file_contents}),
+ delivery_mode=2,
)
finally:
remove_file(input_filename)
remove_file(output_filename)
channel.basic_ack(delivery_tag=method.delivery_tag)
- print("sent", output_filename)
-
+ print(" sent", output_filename)
if __name__ == '__main__':
- main(sys.argv)
+ args = sys.argv
+ if len(args) <= 1 or args[1] == "worker":
+ if True: # single-threaded
+ worker_sm()
+ else: # multi-threaded does not work
+ pool = make_pool()
+ pool.apply_async(worker_sm)
+ try:
+ time.sleep(10000000)
+ except KeyboardInterrupt:
+ print(' [*] Terminating...')
+ pool.terminate()
+ pool.join()
+ elif args[1] == "listener":
+ listener_sm()
+ elif args[1] == "enqueue":
+ enqueuer_sm()
+ elif args[1] == "debug":
+ sm_extract.txt_extract_sm(args[2], args[3], debug=True)
+ else:
+ print("Usage: rabbitmq.py worker|enqueue")
+ sys.exit(1)
--- /dev/null
+#!/usr/local/bin python3
+import pika, requests
+import base64, json, multiprocessing, tempfile, time, os, shutil, subprocess, sys
+
+def txt_extract_sm(input_path, output_path, debug=False):
+ """Extract text from a .zip file of ~1000 PDFs. Single-threaded."""
+ with tempfile.TemporaryDirectory() as td_in, tempfile.TemporaryDirectory() as td_out:
+ #if debug:
+ # td_in = "tmp.in"
+ # td_out = "tmp.out"; os.mkdir(td_out)
+ #print("working in temp dirs: ", td_in, td_out)
+
+ # Extract .zip file
+ with timer("zip", print=debug):
+ subprocess.call(["unzip", "-q", "-n", input_path, "-d", td_in])
+
+ # Make a list of pdf files extracted
+ pdfs = []
+ with timer("ls", print=debug):
+ with open(os.path.join(OUTPUT_DIR, "non-pdfs"), "a") as logfile:
+ for dirname, subdirList, fileList in os.walk(td_in):
+ assert dirname.startswith(td_in)
+ short = dirname[len(td_in):].lstrip("/")
+ for dname in subdirList:
+ os.mkdir(os.path.join(dirname.replace(td_in, td_out), dname))
+ for fname in fileList:
+ file_name = os.path.join(short, fname)
+ if fname.lower().endswith(".pdf"):
+ pdfs.append(file_name)
+ else:
+ print(input_path, fname, file=logfile)
+ print("other", input_path, fname)
+
+ # Extract the text from each (single-threaded)
+ # INPUT.pdf -> INPUT.pdf.txt
+ with timer("pdftotext", print=debug):
+ for pdf in pdfs:
+ input_pdf = os.path.join(td_in, pdf)
+ output_txt = os.path.join(td_out, pdf + ".txt")
+ subprocess.call(["pdftotext", input_pdf, output_txt])
+
+ # Put the results into a .tar.gz file
+ with timer("tar", print=debug):
+ subprocess.call(
+ ["tar", "czf", output_path, "."], # gzip = z, J = '.xz'
+ stderr=subprocess.DEVNULL,
+ cwd=td_out,
+ env={"XZ_OPT":"-9","GZIP":"-9",}
+ )
+
+if __name__ == '__main__':
+ args = sys.argv
+ if len(args) == 2:
+ txt_extract_sm(args[1], args[2])
+ else
+ print("Usage: sm-extract INPUT.zip OUTPUT.tar.gz")
+ sys.exit(1)