#!/bin/python3
"""
Generates blog.za3k.com from source files.
-Source files consist of a YAML "front material", and markdown or HTML body.
+Source files consist of a YAML "front material", and markdown body.
Mostly they have been exported from wordpress by wordpress2frontmaterial.py.
This is one big honking violation of:
from pathlib import Path
import frontmatter
-import monitor
-NO_RESTART_EXIT_CODE = 27
-RESTART_EXIT_CODE = 28
RFC822="%a, %d %b %Y %H:%M:%S %Z"
FRONTMATTER_DT="%Y-%m-%d %H:%M:%S%:z"
i2 = frac2 * (range2.stop - range2.start) + range2.start
return i2
-
-RELOAD_HTML = b"""
-<script>
-document.addEventListener("DOMContentLoaded", function (event) {
- setTimeout(function() {
- var scrollpos = sessionStorage.getItem('scrollpos');
- if (scrollpos) {
- window.scrollTo(0, scrollpos);
- sessionStorage.removeItem('scrollpos');
- }
- }, 10)
-});
-setTimeout(function() {
- sessionStorage.setItem('scrollpos', window.scrollY);
- document.location.reload(true);
-}, 10000);
-</script>
-"""
-
class Templatable(PseudoMap):
use_layout = True
def __init__(self, blog):
def output(self):
output = self.content()
- if self.blog.reload:
- output += RELOAD_HTML
self.output_path.parent.mkdir(parents=True, exist_ok=True)
with open(self.output_path, "wb") as f:
f.write(output)
pass # TODO
class Blog(PseudoMap):
- def __init__(self, config="config.yaml", reload=False):
+ def __init__(self, config="config.yaml"):
self.tags = {} # Tag -> str
self.categories = {}
self.authors = {}
self._posts = []
- self.reload = reload
self.now = datetime.datetime.now(datetime.timezone.utc)
self.now_rfc822 = self.now.strftime(RFC822)
for page in blog.pages:
page.output()
- def updates_happened(self, paths):
- for path in paths:
- self._update_happened(path)
-
- @staticmethod
- def reboot():
- #os.execl(sys.argv[0], *sys.argv)
- sys.exit(RESTART_EXIT_CODE)
-
-
@property
def tagcloud(self, font_sizes=range(8, 22), limit=45):
top_tags = self.tags.values()
return Templatable.render_template(Templatable, blog, "tagcloud", self)
- def _update_happened(self, path):
- path = Path(path)
- reload_update = [
- os.path.abspath(__file__),
- self.config,
- ]
- templates = [
- self[x] for x in self.__dict__.keys() if x.endswith("_template")
- ]
- global_update = templates + [
- Path(self.static_dir) / "wp-includes",
- Path(self.static_dir) / "wp-content/themes",
- self.page_dir,
- ]
- local_update = [
- self.post_dir,
- self.static_dir, # aside from those two subdirs
- self.image_dir,
- ]
- if str(path) in reload_update:
- print(path, "updated, restarting 'blog'...")
- self.reboot()
- if any(path.is_relative_to(top) for top in global_update):
- print(path, "updated, generating all", file=sys.stderr)
- self.generate_all()
- elif any(path.is_relative_to(top) for top in local_update):
- print(path, "updated, generating all (needlessly)", file=sys.stderr)
- self.generate_all() # TODO: Don't generate everything
- else:
- #print(path, "updated, ignoring...", file=sys.stderr)
- pass
-
def clean(self):
assert self.destination
os.system("rm -rf \"{}\"/*".format(self.destination))
def publish(self):
os.system("rsync -r --delete {destination}/ germinate:/var/www/blog".format(destination=self.destination))
-
-def supervisor():
- while True:
- result = subprocess.run([sys.argv[0], "--supervised"] + sys.argv[1:])
- if result.returncode == NO_RESTART_EXIT_CODE:
- break
- sys.exit(0)
-
if __name__ == "__main__":
- if "--supervised" not in sys.argv:
- supervisor()
-
parser = argparse.ArgumentParser(
prog="blog",
- description="Generate za3k's blog from HTML/markdown files with YAML frontmatter and some templates",
+ description="Generate za3k's blog from markdown files with YAML frontmatter and some templates",
)
- parser.add_argument("-a", "--all", action='store_true')
- parser.add_argument('changed_files', metavar="FILE", type=argparse.FileType('r'), nargs="*", help='blog posts to re-generate (source paths)')
- parser.add_argument("-f", "--follow", action='store_true', help="continue running and monitoring for file changes")
parser.add_argument("-l", "--local", action='store_true', help="use relative paths for links")
- parser.add_argument("-r", "--reload", action='store_true', help="reload the page automatically")
- parser.add_argument("--supervised", action='store_true')
+ parser.add_argument("-p", "--publish", action='store_true', help="publish to blog2.za3k.com")
args = parser.parse_args()
- assert args.supervised
- if len(args.changed_files) == 0:
- args.all = True
-
- blog = Blog(reload=args.reload)
+ blog = Blog()
blog.local = args.local
if args.local:
blog.web_root = "file://" + blog.destination
- if args.all:
- blog.clean()
- blog.generate_all()
- if not args.local:
- blog.publish()
- else:
- blog.updates_happened(args.changed_files)
-
- if args.follow:
- print("monitoring for changes...", file=sys.stderr)
- # Discard updates within 5s of one another
- try:
- for changed_file in monitor.Monitor(blog.source, discard_rapid=5):
- blog.updates_happened([changed_file])
- except KeyboardInterrupt:
- pass
- sys.exit(NO_RESTART_EXIT_CODE)
+
+ blog.clean()
+ blog.generate_all()
+ if not args.local and args.publish:
+ blog.publish()
+++ /dev/null
-"""
-Yield each changed source file, as a pathlib.Path
-
-Runs indefinitely. Exit with Ctrl-C
-"""
-
-import collections
-import queue
-import watchdog.observers
-import watchdog.events
-import time
-
-class FunctionEventHandler(watchdog.events.FileSystemEventHandler):
- def __init__(self, f):
- self.f = f
- def on_any_event(self, event):
- self.f(event)
-
-class Monitor():
- def __init__(self, path, discard_rapid=None):
- self.observer = watchdog.observers.Observer()
- self.updates = queue.Queue()
- self.observer.schedule(
- FunctionEventHandler(self.updates.put),
- path,
- recursive=True
- )
- self.last_update = collections.defaultdict(int)
- self.discard_rapid = discard_rapid
- self.observer.start()
-
- @staticmethod
- def time():
- return time.time()
-
- def is_rapid(self, path):
- now = self.time()
- last = self.last_update[path]
- elapsed = now - last
- return (self.discard_rapid is not None and
- elapsed < self.discard_rapid)
-
- ignore_events = [
- watchdog.events.FileOpenedEvent,
- watchdog.events.FileClosedEvent,
- watchdog.events.DirModifiedEvent,
- ]
- def _iter(self):
- try:
- while True:
- event = self.updates.get()
- if not any(isinstance(event, t) for t in self.ignore_events):
- yield event.src_path
- if hasattr(event, "dest_path"):
- yield event.dest_path
- self.updates.task_done()
- except KeyboardInterrupt:
- self.observer.stop()
- self.observer.join()
-
- def __iter__(self):
- for x in self._iter():
- if not self.is_rapid(x):
- yield x