#!/usr/bin/python # # Copyright (C) 2004, Scott Hadfield # Distributed under the terms of the GNU General Public License v2 # # $Id: shrubxery.py,v 1.0 2004/10/03 19:57:12 hadfield Exp $ # """A python implementation of the blosxom blogger. This script is a python translation of the blosxom blogger. It attempts to capture the "zen" of blosxom by keeping the script as simple as possible, yet very powerful via plugins. """ __author__ = "Scott Hadfield " __module__ = "shrubxery" __version__ = "0.8" __revision__ = "0" import os import re import sys import cgi import time def config(): """All of the user configurable variables are set here.""" env = {} # What's this blog's title? env["blog_title"] = "Shrubxery" # What's this blog's description (for outgoing RSS feed)? env["blog_description"] = "The Shrubxery Weblog." # What's this blog's primary language (for outgoing RSS feed)? env["blog_language"] = "en" # Where are this blog's entries kept? env["datadir"] = "./data" # What's my preferred base URL for this blog (leave blank for automatic)? env["url"] = "http://www.buffmuthers.com/projects/shrubxery/shrubxery.cgi" # Should I stick only to the datadir for items or travel down the # directory hierarchy looking for items? If so, to what depth? # 0 = infinite depth (aka grab everything), 1 = datadir only, # n = n levels down env["depth"] = 0 # How many entries should I show on the home page? env["num_entries"] = 40 # What file extension signifies a shrubxery entry? env["file_extension"] = "entry" # What is the default flavour? env["default_flavour"] = "news" # Should I show entries from the future (i.e. dated after now)? env["show_future_entries"] = False # --- Plugins (Optional) ----- # Where are my plugins kept? env["plugin_dir"] = "./plugins" # Where should my modules keep their state information? env["plugin_state_dir"] = env["plugin_dir"] + "/state" # --- Static Rendering ----- # Where are this blog's static files to be created? env["static_dir"] = "/Library/WebServer/Documents/blog" # What's my administrative password (you must set this for # static rendering)? env["static_password"] = "" # What flavours should I generate statically? env["static_flavours"] = ("html", "rss") # Should I statically generate individual entries? # False = no, True = yes env["static_entries"] = False # -------------------------------- return env class Shrubxery: """The controller class that gives life to the script.""" def __init__(self, env): self.env = env self.env["version"] = __version__ self.env["template_data"] = {} self.plugins = PluginManager(self.env) def execute(self): """Sets up all of the variables and prints the output.""" form = cgi.FieldStorage() # Guess the url if it's not set already. if self.env["url"] in ["", None]: self.env["url"] = self._geturl() # Drop any ending / from dir settings self.env["datadir"] = self.env["datadir"].rstrip("/") self.env["plugin_dir"] = self.env["plugin_dir"].rstrip("/") self.env["static_dir"] = self.env["static_dir"].rstrip("/") # Global variable to be used in head/foot.{flavour} templates self.env["path_info"] = "" self.env["path_info_list"] = [] # Set the path_info var for optional blog name, archive yr/mo/day if os.environ.has_key("PATH_INFO"): self.env["path_info_list"] = os.environ["PATH_INFO"].split("/") else: self.env["path_info_list"] = form.getvalue("path", "").split("/") self.env["path_info_list"].pop(0) # Add any non date path segments to path_info and remove them from # path_info_list. while (self.env["path_info_list"] and re.match("^[a-zA-Z].*$", self.env["path_info_list"][0]) and not re.search("(.*)\.(.*)", self.env["path_info_list"][0])): self.env["path_info"] += "/" + self.env["path_info_list"].pop(0) # Flavour specified by ?flav={flav} or index.{flav} self.env["flavour"] = "" using = "" if self.env["path_info_list"]: match = re.search("(.+)\.(.+)$", self.env["path_info_list"][-1]) using = "path_info_list" elif self.env["path_info"]: match = re.search("(.+)\.(.+)$", self.env["path_info"]) using = "path_info" else: match = None if match: self.env["flavour"] = match.group(2) if match.group(1) != "index": self.env["path_info"] += "/%s.%s" % (match.group(1), match.group(2)) if using == "path_info_list": self.env["path_info_list"].pop() else: print self.env["path_info"], "
" self.env["path_info"] = "" elif form.getvalue("flav"): self.env["flavour"] = form.getvalue("flav") else: self.env["flavour"] = self.env["default_flavour"] # Strip any starting or ending slashes self.env["path_info"] = self.env["path_info"].strip("/") # Get the date from the path info try: self.env["path_info_yr"] = self.env["path_info_list"][0] self.env["path_info_mo"] = self.env["path_info_list"][1] self.env["path_info_da"] = self.env["path_info_list"][2] except IndexError: self.env["path_info_yr"] = "" self.env["path_info_mo"] = "" self.env["path_info_da"] = "" # Set the month number if the month was a name. self.env["path_info_mo_num"] = "" if self.env["path_info_mo"]: if re.match("\d{2}", self.env["path_info_mo"]): self.env["path_info_mo_num"] = self.env["path_info_mo"] else: try: time_obj = time.strptime(self.env["path_info_mo"], "%b") self.env["path_info_mo_num"] = ( time.strftime("%m", time_obj)) except ValueError: self.env["path_info_mo_num"] = "" # Load the default templates into self.env["template_data"] self.env["template_data"] = template_data # Plugins: Start self.plugins.start() self.env["plugins"] = self.plugins.list_plugins() # Grab our list of entries and non-entry files. (files, others) = self.plugins.entries() self.env["files"] = files self.env["others"] = others # Plugins: Filter self.plugins.filter(files, others) content_type = self.plugins.template( self.env["path_info"], "content_type", self.env["flavour"]) print self.generate(files, others, content_type) # Plugins: End self.plugins.end() def generate(self, files, others, content_type): """Generates and returns the html content of the page.""" currentdir = self.env["path_info"] # Plugins: Skip # Allow plugins to decide if we can cut short story generation if self.plugins.skip(): return "" date = "%s/%s/%s" % (self.env["path_info_yr"], self.env["path_info_mo_num"], self.env["path_info_da"]) self.env["output"] = "" # Head head = self.plugins.template(self.env["path_info"], "head", self.env["flavour"]) # Plugins: Head head = self.plugins.head(currentdir, head) self.env["output"] += self.plugins.interpolate(head) # Stories curdate = "" ne = self.env["num_entries"] match = re.search("(.*?)([^\/]+)\.(.+)$", self.env["path_info"]) if match and match.group(2) != "index": currentdir = "%s%s.%s" % (match.group(1), match.group(2), self.env["file_extension"]) elif match: # Remove the '/index.ext' from the path currentdir = currentdir[:-len("/%s.%s" % (match.group(2), match.group(3)))] for path_file_info in self.plugins.sort(files, others): # Have we exceeded our max num_entries? if ne <= 0: break path_file = path_file_info.keys()[0] mtime = float(files[path_file]) match_str = "%s/(?:(.*)/)?(.*)\.%s" % (self.env["datadir"], self.env["file_extension"]) match = re.search(match_str, path_file) path = match.group(1) if path is None: path = "" filename = match.group(2) # Only read stories in the correct hierarchy if (not re.search("^%s" % currentdir, path) and path_file != "%s/%s" % (self.env["datadir"], currentdir)): continue if path != "": path = "/" + path self.env["path"] = path # Make sure the file isn't a broken link or something. if (not os.path.exists(path_file) or not os.access(path_file, os.R_OK)): continue # Set all of the date variables (dw, da, mo, mo_num, ti, hr, minute, hr12, ampm, yr) = \ nice_date(mtime) (path_info_yr, path_info_mo_num, path_info_da) = \ date.split("/") # Only print stories of the right date if ((path_info_yr and yr != path_info_yr) or (path_info_mo_num and mo_num != path_info_mo_num) or (path_info_da and da != path_info_da)): continue if ((path_info_yr and yr < path_info_yr) or (path_info_da and da < path_info_da)): break date_vars = {"dw": dw, "mo": mo, "mo_num": mo_num, "da": da, "ti": ti, "yr": yr, "hr": hr, "min": minute, "hr12": hr12, "ampm": ampm, "fn": filename} self.env.update(date_vars) date_txt = self.plugins.template( self.env["path_info"], "date", self.env["flavour"]) # Plugins: Date date_txt = self.plugins.date( currentdir, date_txt, files[path_file], (dw, mo, mo_num, da, ti, yr)) date_txt = self.plugins.interpolate(date_txt, date_vars) # If we found a new date then we can print out the date again. if curdate != date_txt: curdate = date_txt self.env["output"] += date_txt title = "" body = "" if os.path.isfile(path_file): try: file_handle = open(path_file) title = file_handle.readline().strip() body = file_handle.read() except IOError: # Ignore any IOErrors and continue with processing pass story = self.plugins.template(self.env["path_info"], "story", self.env["flavour"]) # Plugins: Story (story, title, body) = self.plugins.story(path, story, title, body) story_vars = {"title": title, "body": body, "path": path} self.env["output"] += self.plugins.interpolate(story, story_vars) ne -= 1 # Foot foot = self.plugins.template(self.env["path_info"], "foot", self.env["flavour"]) # Plugins: Foot foot = self.plugins.foot(currentdir, foot) self.env["output"] += self.plugins.interpolate(foot) # Plugins: Last self.plugins.last() self.env["output"] = self._header(content_type) + self.env["output"] return self.env["output"] def _header(self, content_type): """Builds and returns the header.""" return "Content-Type: %s; charset=ISO-8859-1\n\n" % content_type def _geturl(self): """Guesses and returns the current URL. This probably doesn't work for ssl hosts. """ url = "" if (os.environ.has_key("HTTP_HOST") and os.environ.has_key("REQUEST_URI")): url = "http://%s%s" % (os.environ["HTTP_HOST"], os.environ["REQUEST_URI"]) if os.environ.has_key("PATH_INFO"): url = url[:-len(os.environ["PATH_INFO"])] return url class PluginManager: """Defines all of the hooks and controls the hook overrides.""" def __init__(self, env): self.env = env self.plugins = [] def start(self): """Loads all plugins and executes their start() function. The start function should return 1 if the plugin is active. This is the only required function. Note that even plugins that are turned off (i.e. suffixed with _) will have their start() function executed. """ def cmpfunc(file1, file2): if file1 < file2: return -1 if file1 == file2: return 0 return 1 if (not self.env["plugin_dir"] or not os.path.isdir(self.env["plugin_dir"])): return sys.path.insert(0, self.env["plugin_dir"]) filelist = os.listdir(self.env["plugin_dir"]) filelist.sort(cmpfunc) for plugin_name in filelist: plugin_path = self.env["plugin_dir"] + "/" + plugin_name if (os.path.isfile(plugin_path) and re.match("^\w+\.py$", plugin_name)): match = re.match("^(\d*(\w+?)(_?)).py$", plugin_name) plugin_mod = match.group(1) plugin = match.group(2) if match.group(3) == "_": on_off = -1 else: on_off = 1 try: exec "import %s" % plugin_mod plugin_obj = eval("%s.%s(self.env)" % (plugin_mod, plugin)) if plugin_obj.start(): self.plugins.append({plugin: [plugin_obj, on_off]}) except Exception, errmsg: print "
Could not load plugin '%s'" % plugin_name print "
Reason: %s
" % errmsg def template(self, path, chunk, flavour): """Loads and executes the template function. Allows for the first encountered plugin.template function to override the default built-in template (default_template) function. The template function loads and returns the template content. """ def default_template(path, chunk, flavour): """The default template function.""" try: content = open("%s/%s/%s.%s" % (self.env["datadir"], path, chunk, flavour)).read() except IOError: if (self.env["template_data"].has_key(flavour) and self.env["template_data"][flavour].has_key(chunk)): return self.env["template_data"][flavour][chunk] return self.env["template_data"]["error"][chunk] return content.strip() return self._get_function("template", default_template, path, chunk, flavour) def entries(self): """Loads and executes the entries function. Allows for the first encountered plugin.entries function to override the default built-in entries (default_entries) function. The entries function finds and returns the list of weblog entries. """ def default_entries(): """The default entries function.""" return self._find(self.env["datadir"], 1) return self._get_function("entries", default_entries) def filter(self, files, others): """Executes a plugin.filter functions. Filter alters the list of entries found. """ for function in self._list_functions("filter"): function(files, others) return def skip(self): """Executes all plugin.skip's and stops at the first to return True. If True is returned by any plugin the we won't output any entries.""" for function in self._list_functions("skip"): if function(): return True def interpolate(self, template_content, var_dict = None): """Loads and executes the interpolate function. Allows for the first encountered plugin.interpolate function to override the default built-in interpolate (default_interpolate) function. The interpolate function replaces the variables in the templates with their corresponding values in self.env. """ def default_interpolate(template_content, var_dict = None): """The default interpolate function.""" if var_dict == None: var_dict = {} all_vars = self.env.copy() all_vars.update(var_dict) variables = re.findall("(\$\w+(?:::)?\w*)", template_content) for variable in variables: # if the variable contains :: then extrapolate the plugin name # and desired variable from that. if variable.find("::") != -1: plugin = variable[1:variable.find("::")] plugin_var = variable[variable.find("::") + len("::"):] plugin_obj = self._get_plugin(plugin) if (plugin_obj is not None and "output_env" in dir(plugin_obj) and plugin_obj.output_env.has_key(plugin_var)): template_content = ( re.sub("\$%s::%s([^\w])" % (plugin, plugin_var), plugin_obj.output_env[plugin_var] + "\\1", template_content)) else: # Chop off the starting $ sign variable = variable[1:] if not all_vars.has_key(variable): continue template_content = re.sub("\$%s([^\w])" % variable, all_vars[variable] + "\\1", template_content) return template_content return self._get_function("interpolate", default_interpolate, template_content, var_dict) def _get_plugin(self, plugin_name): """Returns the plugin class object for the desired plugin.""" for plugin in self.plugins: if (plugin_name == plugin.keys()[0] and plugin.values()[0][1] != -1): return plugin.values()[0][0] def head(self, currentdir, head): """Executes all plugin.head hooks.""" for function in self._list_functions("head"): head = function(currentdir, head) return head def sort(self, files, others): """Loads and executes the sort function. Allows for the first encountered plugin.sort function to override the default built-in sort (default_sort) function. Determines the order that the weblogs are displayed.""" def default_sort(files, others): """The default sort function.""" tmp_list = [] for key, value in files.iteritems(): tmp_list.append({key: value}) def cmpfunc(file1, file2): if file2.values()[0] < file1.values()[0]: return -1 if file2.values()[0] == file1.values()[0]: return 0 return 1 tmp_list.sort(cmpfunc) return tmp_list return self._get_function("sort", default_sort, files, others) def date(self, currentdir, date_txt, mtime, date_vars): """Executes all plugin.date hooks. date_vars is a tuple (dw, mo, mo_num, da, ti, yr) """ for function in self._list_functions("date"): date_txt = function(currentdir, date_txt, mtime, date_vars) return date_txt def story(self, path, story, title, body): """Executes all plugin.story hooks.""" for function in self._list_functions("story"): (story, title, body) = function(path, story, title, body) return (story, title, body) def foot(self, currentdir, foot): """Executes all plugin.foot hooks.""" for function in self._list_functions("foot"): foot = function(currentdir, foot) return foot def last(self): """Executes all plugin.last hooks. The last hook executed before we return everything from generate.""" for function in self._list_functions("last"): function() return def end(self): """Executes all plugin.end hooks. Executed after everything has been printing.""" for function in self._list_functions("end"): function() return def list_plugins(self): """Returns a list of (plugin_name, on_off) pairs.""" plugin_list = [] for plugin in self.plugins: plugin_list.append((plugin.keys()[0], plugin.values()[0][1])) return plugin_list def _list_functions(self, function_name): """Returns a list of function references to all plugin.function_name. This is used by all of the hooks so that they can execute each plugin that contains the function 'function_name' however they like. """ functions = [] for plugin in self.plugins: plugin_obj = plugin.values()[0][0] # Skip this plugin if the on_off variable = -1 if plugin.values()[0][1] == -1: continue try: if not eval("callable(plugin_obj.%s)" % function_name): continue functions.append(eval("plugin_obj.%s" % function_name)) except AttributeError: continue return functions def _get_function(self, function_name, default_func, *args): """Returns the return value of plugin.function_name. The function is called with *args and falls back on default_func if no suitable plugin.function_name is found. """ retval = None for plugin in self.plugins: plugin_obj = plugin.values()[0][0] # Skip this plugin if the on_off variable = -1 if plugin.values()[0][1] == -1: continue try: if not eval("callable(plugin_obj.%s)" % function_name): continue retval = eval("plugin_obj.%s(*args)" % function_name) return retval except AttributeError: continue except "DoNotUse": continue return default_func(*args) def _find(self, curr_path, curr_depth): """A simple find function that returns valid entries. Returns two dicts, the first containing (filename, timestamp) pairs for the entries, the second containing (filename, timestamp) pairs for all non-matching files. Used only by the default_entries() function. """ files = {} others = {} if self.env["depth"] and curr_depth > int(self.env["depth"]): return (files, others) for file_name in os.listdir(curr_path): file_path = curr_path + "/" + file_name if not os.path.exists(file_path): continue if os.path.isdir(file_path): files_tmp, others_tmp = self._find(file_path, curr_depth + 1) files.update(files_tmp) others.update(others_tmp) continue match = re.match("^(?:(.*)/)?(.+)\.%s$" % self.env["file_extension"], file_name) mtime = os.path.getmtime(file_path) if (match and match.group(2) != "index"): if (self.env["show_future_entries"] or mtime < time.time()): files[file_path] = mtime elif os.path.isfile(file_path): others[file_path] = mtime return files, others def nice_date(timestamp): """Returns a set of date variables from 'timestamp', used by the script.""" time_obj = time.localtime(timestamp) time_str = time.strftime("%a %d %b %m %H:%M %H %M %I %p %Y", time_obj) (day_of_week, day, month, month_num, fulltime, hour, minute, hour12, ampm, year) = time_str.split() return (day_of_week, day, month, month_num, fulltime, hour, minute, hour12, ampm, year) # Default HTML and RSS template bits html = {} html["content_type"] = "text/html" html["head"] = """$blog_title $path_info_da $path_info_mo $path_info_yr
$blog_title
$path_info_da $path_info_mo $path_info_yr

""" html["story"] = """

$title
$body

posted at: $ti | path: $path | permanent link to this entry

\n""" html["date"] = """

$dw, $da $mo $yr

\n""" html["foot"] = """

Powered by Shrubxery""" rss = {} rss["content_type"] = """text/xml""" rss["head"] = """\n\n\n\n\n \n $blog_title $path_info_da $path_info_mo $path_info_yr\n $url\n $blog_description\n $blog_language\n""" rss["story"] = """ \n $title\n $url/$yr/$mo_num/$da#$fn\n $body\n \n""" rss["date"] = "\n" rss["foot"] = "\n" error = {} error["content_type"] = "text/html" error["head"] = """

Error: I'm afraid this is the first I've heard of a "$flavour" flavoured Shrubxery. Try dropping the "/+$flavour" bit from the end of the URL.\n\n""" error["story"] = """

$title
$body #

\n""" error["date"] = "

$dw, $da $mo $yr

\n" error["foot"] = "" template_data = {"html": html, "rss": rss, "error": error} if __name__ == "__main__": #print "Content-type:text/html\n\n", shrubxery = Shrubxery(config()) shrubxery.execute()