Mercurial > piecrust2
view piecrust/importing/wordpress.py @ 411:e7b865f8f335
bake: Enable multiprocess baking.
Baking is now done by running a worker per CPU, and sending jobs to them.
This changes several things across the codebase:
* Ability to not cache things related to pages other than the 'main' page
(i.e. the page at the bottom of the execution stack).
* Decouple the baking process from the bake records, so only the main process
keeps track (and modifies) the bake record.
* Remove the need for 'batch page getters' and loading a page directly from
the page factories.
There are various smaller changes too included here, including support for
scope performance timers that are saved with the bake record and can be
printed out to the console. Yes I got carried away.
For testing, the in-memory 'mock' file-system doesn't work anymore, since
we're spawning processes, so this is replaced by a 'tmpfs' file-system which
is saved in temporary files on disk and deleted after tests have run.
author | Ludovic Chabant <ludovic@chabant.com> |
---|---|
date | Fri, 12 Jun 2015 17:09:19 -0700 |
parents | dd25bd3ce1f9 |
children | 4850f8c21b6e |
line wrap: on
line source
import os.path import logging import datetime import yaml from collections import OrderedDict from piecrust import CONFIG_PATH from piecrust.configuration import ( ConfigurationLoader, ConfigurationDumper, merge_dicts) from piecrust.importing.base import Importer, create_page, download_asset from piecrust.sources.base import MODE_CREATING logger = logging.getLogger(__name__) class WordpressImporterBase(Importer): def setupParser(self, parser, app): parser.add_argument( '--pages-source', default="pages", help="The source to store pages in.") parser.add_argument( '--posts-source', default="posts", help="The source to store posts in.") parser.add_argument( '--default-post-layout', help="The default layout to use for posts.") parser.add_argument( '--default-post-category', help="The default category to use for posts.") parser.add_argument( '--default-page-layout', help="The default layout to use for pages.") parser.add_argument( '--default-page-category', help="The default category to use for pages.") def importWebsite(self, app, args): impl = self._getImplementation(app, args) return impl.importWebsite() def _getImplementation(self, app, args): raise NotImplementedError() class _ImporterBase(object): def __init__(self, app, args): self.app = app self._cat_map = {} self._author_map = {} self._pages_source = app.getSource(args.pages_source) self._posts_source = app.getSource(args.posts_source) def importWebsite(self): ctx = self._open() # Site configuration. logger.info("Generating site configuration...") site_config = self._getSiteConfig(ctx) site_config.setdefault('site', {}) site_config['site'].update({ 'post_url': '%year%/%month%/%slug%', 'category_url': 'category/%category%'}) site_config_path = os.path.join(self.app.root_dir, CONFIG_PATH) with open(site_config_path, 'r') as fp: cfg_data = yaml.load(fp, Loader=ConfigurationLoader) cfg_data = cfg_data or {} merge_dicts(cfg_data, site_config) with open(site_config_path, 'w') as fp: yaml.dump(cfg_data, fp, default_flow_style=False, allow_unicode=True, Dumper=ConfigurationDumper) # Content for p in self._getPosts(ctx): if p['type'] == 'attachment': self._createAsset(p) else: self._createPost(p) self._close(ctx) def _open(self): raise NotImplementedError() def _close(self, ctx): pass def _getSiteConfig(self, ctx): raise NotImplementedError() def _getPosts(self, ctx): raise NotImplementedError() def _createAsset(self, asset_info): download_asset(self.app, asset_info['url']) def _createPost(self, post_info): post_dt = post_info['datetime'] finder = { 'year': post_dt.year, 'month': post_dt.month, 'day': post_dt.day, 'slug': post_info['slug']} if post_info['type'] == 'post': source = self._posts_source elif post_info['type'] == 'page': source = self._pages_source else: raise Exception("Unknown post type: %s" % post_info['type']) factory = source.findPageFactory(finder, MODE_CREATING) metadata = post_info['metadata'].copy() for name in ['title', 'author', 'status', 'post_id', 'post_guid', 'description', 'categories']: val = post_info.get(name) if val is not None: metadata[name] = val content = post_info['content'] excerpt = post_info['excerpt'] text = content if excerpt is not None and excerpt.strip() != '': text = "%s\n\n---excerpt---\n\n%s" % (content, excerpt) status = metadata.get('status') if status == 'publish': path = factory.path create_page(self.app, path, metadata, text) elif status == 'draft': filename = '-'.join(metadata['title'].split(' ')) + '.html' path = os.path.join(self.app.root_dir, 'drafts', filename) create_page(self.app, path, metadata, text) else: logger.warning("Ignoring post with status: %s" % status) class _XmlImporter(_ImporterBase): ns_wp = {'wp': 'http://wordpress.org/export/1.2/'} ns_dc = {'dc': "http://purl.org/dc/elements/1.1/"} ns_excerpt = {'excerpt': "http://wordpress.org/export/1.2/excerpt/"} ns_content = {'content': "http://purl.org/rss/1.0/modules/content/"} def __init__(self, app, args): super(_XmlImporter, self).__init__(app, args) self.path = args.xml_path def _open(self): if not os.path.exists(self.path): raise Exception("No such file: %s" % self.path) try: import xml.etree.ElementTree as ET except ImportError: logger.error("You don't seem to have any support for ElementTree " "XML parsing.") return 1 with open(self.path, 'r', encoding='utf8') as fp: xml = fp.read() xml = xml.replace(chr(0x1e), '') xml = xml.replace(chr(0x10), '') tree = ET.fromstring(xml) channel = tree.find('channel') return channel def _getSiteConfig(self, channel): # Get basic site information title = find_text(channel, 'title') description = find_text(channel, 'description') site_config = OrderedDict({ 'site': { 'title': title, 'description': description} }) # Get authors' names. authors = {} for a in channel.findall('wp:author', self.ns_wp): login = find_text(a, 'wp:author_login', self.ns_wp) authors[login] = { 'email': find_text(a, 'wp:author_email', self.ns_wp), 'display_name': find_text(a, 'wp:author_display_name', self.ns_wp), 'first_name': find_text(a, 'wp:author_first_name', self.ns_wp), 'last_name': find_text(a, 'wp:author_last_name', self.ns_wp), 'author_id': find_text(a, 'wp:author_id', self.ns_wp)} site_config['site']['authors'] = authors return site_config def _getPosts(self, channel): for i in channel.findall('item'): post_type = find_text(i, 'wp:post_type', self.ns_wp) if post_type == 'attachment': yield self._getAssetInfo(i) else: yield self._getPostInfo(i) def _getAssetInfo(self, node): url = find_text(node, 'wp:attachment_url', self.ns_wp) return {'type': 'attachment', 'url': url} def _getPostInfo(self, node): post_date_str = find_text(node, 'wp:post_date', self.ns_wp) post_date = datetime.datetime.strptime(post_date_str, '%Y-%m-%d %H:%M:%S') post_name = find_text(node, 'wp:post_name', self.ns_wp) post_type = find_text(node, 'wp:post_type', self.ns_wp) post_info = { 'type': post_type, 'slug': post_name, 'datetime': post_date} title = find_text(node, 'title') creator = find_text(node, 'dc:creator', self.ns_dc) status = find_text(node, 'wp:status', self.ns_wp) post_id = find_text(node, 'wp:post_id', self.ns_wp) guid = find_text(node, 'guid') description = find_text(node, 'description') # TODO: menu order, parent, password, sticky post_info.update({ 'title': title, 'author': creator, 'status': status, 'post_id': post_id, 'post_guid': guid, 'description': description}) categories = [] for c in node.findall('category'): nicename = str(c.attrib.get('nicename')) categories.append(nicename) post_info['categories'] = categories metadata = {} for m in node.findall('wp:postmeta', self.ns_wp): key = find_text(m, 'wp:meta_key', self.ns_wp) metadata[key] = find_text(m, 'wp:meta_value', self.ns_wp) post_info['metadata'] = metadata content = find_text(node, 'content:encoded', self.ns_content) excerpt = find_text(node, 'excerpt:encoded', self.ns_excerpt) post_info.update({ 'content': content, 'excerpt': excerpt}) return post_info class WordpressXmlImporter(WordpressImporterBase): name = 'wordpress-xml' description = "Imports a Wordpress blog from an exported XML archive." def setupParser(self, parser, app): super(WordpressXmlImporter, self).setupParser(parser, app) parser.add_argument( 'xml_path', help="The path to the exported XML archive file.") def _getImplementation(self, app, args): return _XmlImporter(app, args) def find_text(parent, child_name, namespaces=None): return str(parent.find(child_name, namespaces).text)