]> git.za3k.com Git - mqlg.git/commitdiff
Remove local support. Switch to url processing instead of file processing because...
authorZachary Vance <za3k@za3k.com>
Sat, 14 Aug 2021 08:48:57 +0000 (01:48 -0700)
committerZachary Vance <za3k@za3k.com>
Sat, 14 Aug 2021 08:48:57 +0000 (01:48 -0700)
main.py
sm.py

diff --git a/main.py b/main.py
index c31d8967af67d95bd9e01a1ead4fb90f64ec6018..2908af2432c6af641b6034f433e76051bd28c707 100755 (executable)
--- a/main.py
+++ b/main.py
@@ -8,6 +8,7 @@ logging.basicConfig(filename=os.path.expanduser('~/sm.log'),
                                        format='[mq-lg] %(threadName)s %(asctime)s,%(msecs)d %(name)s %(levelname)s %(message)s',
                                        datefmt='%H:%M:%S',
                                        level=logging.INFO)
+printed = logging.StreamHandler()
 
 # Queues
 FF_TAR="ff_tar"
@@ -15,11 +16,8 @@ FF_TAR_RESP="ff_tar_resp"
 LG_TAR="lg_tar"
 LG_TAR_RESP="lg_tar_resp"
 
-# Treat germinate specially
-IS_GERMINATE = socket.gethostname() == "germinate"
-
 # Login credentials
-MQ_HOST = os.getenv("MQ_HOST", "localhost" if IS_GERMINATE else "germinate.za3k.com")
+MQ_HOST = os.getenv("MQ_HOST", "germinate.za3k.com")
 USER="sm"
 PASS="McFnE9I4OdwTB"
 
@@ -112,50 +110,20 @@ def do_work(file_processor, message):
     task_id = message["task_id"]
     remote_url = message["remote_url"]
     remote_output_path = message["remote_output_path"]
-    local_input_path = message.get("local_input_path")
-    local_output_path = message.get("local_output_path")
 
     with tempfile.TemporaryDirectory(prefix="mqlg.") as d:
         logging.error("received {}".format(task_id))
-        input_path = os.path.join(d, "INPUT.zip")
         output_path = os.path.join(d, "OUTPUT.tar.gz")
 
-        if IS_GERMINATE and local_input_path is not None:
-            logging.warning(" local copy {}".format(local_input_path))
-            with lib.timer("copy-input", logging.info):
-                shutil.copyfile(local_input_path, input_path)
-        else:
-            logging.warning(" downloading {}".format(remote_url))
-            with lib.timer("download", logging.warning):
-                with requests.get(remote_url, stream=True) as r:
-                    if lib.tqdm_avail:
-                        # A nice progress bar but read/write instead of copyfileobj slows it down a little.
-                        total_size = int(r.headers.get('content-length', 0))
-                        progress_bar = lib.tqdm(desc="download", total=total_size, unit='iB', unit_scale=True, leave=False)
-                        with open(input_path, 'wb') as f:
-                            for data in r.iter_content(100000):
-                                f.write(data)
-                                progress_bar.update(len(data))
-                            progress_bar.close()
-                    else:
-                        with open(input_path, 'wb') as f:
-                            shutil.copyfileobj(r.raw, f)
-
         logging.warning(" processing {}".format(task_id))
         with lib.timer("process", logging.warning):
-            file_processor(input_path, output_path)
+            file_processor(remote_url, output_path)
 
-        if IS_GERMINATE and local_output_path is not None:
-            logging.warning(" local copy of output {}".format(local_output_path))
-            with lib.timer("copy-output", logging.warning):
-                shutil.copyfile(output_path, local_output_path)
-            return None
-        else:
-            logging.warning(" sending-response {}".format(output_path))
-            with lib.timer("send-response 1/2", logging.warning):
-                with open(output_path, 'rb') as f:
-                    file_contents = base64.b64encode(f.read()).decode('ascii')
-                    return json.dumps({"response_id": task_id, "local_output_path": remote_output_path, "content": file_contents})
+        logging.warning(" sending-response {}".format(output_path))
+        with lib.timer("send-response 1/2", logging.warning):
+            with open(output_path, 'rb') as f:
+                file_contents = base64.b64encode(f.read()).decode('ascii')
+                return json.dumps({"response_id": task_id, "local_output_path": remote_output_path, "content": file_contents})
 
 def do_listen(message):
     """Receive extracted text and write it to disk"""
@@ -186,9 +154,7 @@ if __name__ == '__main__':
         while len(args) > 0:
             arg, *args = args
             if arg == "--debug":
-                printed = logging.StreamHandler()
                 printed.setLevel(logging.INFO)
-                logging.getLogger().addHandler(printed)
             elif arg == "--threads":
                 if len(args) < 1:
                     raise InvalidArguments()
@@ -211,19 +177,23 @@ if __name__ == '__main__':
         if command == "worker":
             if len(args) != 0:
                 raise InvalidArguments()
+            logging.getLogger().addHandler(printed)
             threaded_consumer(module.QUEUE, module.QUEUE_RESP, functools.partial(do_work, module.extract_text), threads=threads)
         elif command == "listener":
             if len(args) != 0:
                 raise InvalidArguments()
+            logging.getLogger().addHandler(printed)
             threaded_consumer(module.QUEUE_RESP, do_listen, threads=threads)
         elif command == "enqueue":
             if len(args) != 0:
                 raise InvalidArguments()
+            logging.getLogger().addHandler(printed)
             enqueue_all(queue, module.generate_tasks())
         elif command == "debug":
             if len(args) != 2:
                 raise InvalidArguments()
             printed.setLevel(logging.DEBUG)
+            logging.getLogger().addHandler(printed)
             module.extract_text(*args, debug=True)
     except InvalidArguments:
         program_name = sys.argv[0].split("/")[-1]
diff --git a/sm.py b/sm.py
index 417225dbfc70bf131b0cc8d49a59d52925ba75fd..e168b54ee2bacd2bfa2ff09d40ff4424b204c742 100644 (file)
--- a/sm.py
+++ b/sm.py
@@ -11,13 +11,18 @@ QUEUE_RESP='sm_zip_resp'
 
 ERROR_FILE=os.path.expanduser("~/sm.errors")
 
-def extract_text(input_path, output_path, debug=False):
+def extract_text(input_url, output_path, debug=False):
     """Extract text from a .zip file of ~1000 PDFs. Single-threaded."""
     with tempfile.TemporaryDirectory(prefix="mqlg-in.") as td_in, tempfile.TemporaryDirectory(prefix="mqlg-out.") as td_out:
         if debug:
             td_in = "tmp.in"
             td_out = "tmp.out"; os.mkdir(td_out)
         logging.info("working in temp dirs. in:{} out:{}".format(td_in, td_out))
+        input_path = os.path.join(td_in, "INPUT.zip")
+
+        # Do the download
+        with lib.timer("wget-download", logging.info):
+            subprocess.call(["wget", "-nv", "-O", input_path, input_url])
 
         # Extract .zip file
         with lib.timer("zip", logging.info):
@@ -47,7 +52,8 @@ def extract_text(input_path, output_path, debug=False):
         # Extract the text from each (single-threaded)
         # INPUT.pdf -> INPUT.pdf.txt
         with lib.timer("pdftotext", logging.warning):
-            for pdf in lib.tqdm(pdfs, desc="pdftotext", leave=False):
+            #for pdf in lib.tqdm(pdfs, desc="pdftotext", leave=False):
+            for pdf in pdfs:
                 input_pdf = os.path.join(td_in, pdf)
                 output_txt = os.path.join(td_out, pdf + ".txt")
                 #logging.info("converting {} -> {}".format(input_pdf, output_txt))
@@ -85,8 +91,8 @@ def generate_tasks():
                     tasks.append({
                         "task_id": filename,
                         "remote_url": SM_URL.format(path=fullpath),
-                        "local_input_path": fullpath,
-                        "local_output_path": SM_OUTPUT_PATH.format(path=short_out),
+                        #"local_input_path": fullpath,
+                        #"local_output_path": SM_OUTPUT_PATH.format(path=short_out),
                         "remote_output_path": SM_OUTPUT_PATH.format(path=short_out),
                         "dataset": "sm",
                     })