Mercurial > piecrust2
view piecrust/importing/wordpress.py @ 415:0e9a94b7fdfa
bake: Improve bake record information.
* Store things in the bake record that require less interaction between the
master process and the workers. For instance, don't store the paginator
object in the render pass info -- instead, just store whether pagination
was used, and whether it had more items.
* Simplify information passing between workers and bake passes by saving the
rendering info to the JSON cache. This means the "render first sub" job
doesn't have to return anything except errors now.
* Add more performance counter info.
author | Ludovic Chabant <ludovic@chabant.com> |
---|---|
date | Sat, 20 Jun 2015 19:23:16 -0700 |
parents | dd25bd3ce1f9 |
children | 4850f8c21b6e |
line wrap: on
line source
import os.path import logging import datetime import yaml from collections import OrderedDict from piecrust import CONFIG_PATH from piecrust.configuration import ( ConfigurationLoader, ConfigurationDumper, merge_dicts) from piecrust.importing.base import Importer, create_page, download_asset from piecrust.sources.base import MODE_CREATING logger = logging.getLogger(__name__) class WordpressImporterBase(Importer): def setupParser(self, parser, app): parser.add_argument( '--pages-source', default="pages", help="The source to store pages in.") parser.add_argument( '--posts-source', default="posts", help="The source to store posts in.") parser.add_argument( '--default-post-layout', help="The default layout to use for posts.") parser.add_argument( '--default-post-category', help="The default category to use for posts.") parser.add_argument( '--default-page-layout', help="The default layout to use for pages.") parser.add_argument( '--default-page-category', help="The default category to use for pages.") def importWebsite(self, app, args): impl = self._getImplementation(app, args) return impl.importWebsite() def _getImplementation(self, app, args): raise NotImplementedError() class _ImporterBase(object): def __init__(self, app, args): self.app = app self._cat_map = {} self._author_map = {} self._pages_source = app.getSource(args.pages_source) self._posts_source = app.getSource(args.posts_source) def importWebsite(self): ctx = self._open() # Site configuration. logger.info("Generating site configuration...") site_config = self._getSiteConfig(ctx) site_config.setdefault('site', {}) site_config['site'].update({ 'post_url': '%year%/%month%/%slug%', 'category_url': 'category/%category%'}) site_config_path = os.path.join(self.app.root_dir, CONFIG_PATH) with open(site_config_path, 'r') as fp: cfg_data = yaml.load(fp, Loader=ConfigurationLoader) cfg_data = cfg_data or {} merge_dicts(cfg_data, site_config) with open(site_config_path, 'w') as fp: yaml.dump(cfg_data, fp, default_flow_style=False, allow_unicode=True, Dumper=ConfigurationDumper) # Content for p in self._getPosts(ctx): if p['type'] == 'attachment': self._createAsset(p) else: self._createPost(p) self._close(ctx) def _open(self): raise NotImplementedError() def _close(self, ctx): pass def _getSiteConfig(self, ctx): raise NotImplementedError() def _getPosts(self, ctx): raise NotImplementedError() def _createAsset(self, asset_info): download_asset(self.app, asset_info['url']) def _createPost(self, post_info): post_dt = post_info['datetime'] finder = { 'year': post_dt.year, 'month': post_dt.month, 'day': post_dt.day, 'slug': post_info['slug']} if post_info['type'] == 'post': source = self._posts_source elif post_info['type'] == 'page': source = self._pages_source else: raise Exception("Unknown post type: %s" % post_info['type']) factory = source.findPageFactory(finder, MODE_CREATING) metadata = post_info['metadata'].copy() for name in ['title', 'author', 'status', 'post_id', 'post_guid', 'description', 'categories']: val = post_info.get(name) if val is not None: metadata[name] = val content = post_info['content'] excerpt = post_info['excerpt'] text = content if excerpt is not None and excerpt.strip() != '': text = "%s\n\n---excerpt---\n\n%s" % (content, excerpt) status = metadata.get('status') if status == 'publish': path = factory.path create_page(self.app, path, metadata, text) elif status == 'draft': filename = '-'.join(metadata['title'].split(' ')) + '.html' path = os.path.join(self.app.root_dir, 'drafts', filename) create_page(self.app, path, metadata, text) else: logger.warning("Ignoring post with status: %s" % status) class _XmlImporter(_ImporterBase): ns_wp = {'wp': 'http://wordpress.org/export/1.2/'} ns_dc = {'dc': "http://purl.org/dc/elements/1.1/"} ns_excerpt = {'excerpt': "http://wordpress.org/export/1.2/excerpt/"} ns_content = {'content': "http://purl.org/rss/1.0/modules/content/"} def __init__(self, app, args): super(_XmlImporter, self).__init__(app, args) self.path = args.xml_path def _open(self): if not os.path.exists(self.path): raise Exception("No such file: %s" % self.path) try: import xml.etree.ElementTree as ET except ImportError: logger.error("You don't seem to have any support for ElementTree " "XML parsing.") return 1 with open(self.path, 'r', encoding='utf8') as fp: xml = fp.read() xml = xml.replace(chr(0x1e), '') xml = xml.replace(chr(0x10), '') tree = ET.fromstring(xml) channel = tree.find('channel') return channel def _getSiteConfig(self, channel): # Get basic site information title = find_text(channel, 'title') description = find_text(channel, 'description') site_config = OrderedDict({ 'site': { 'title': title, 'description': description} }) # Get authors' names. authors = {} for a in channel.findall('wp:author', self.ns_wp): login = find_text(a, 'wp:author_login', self.ns_wp) authors[login] = { 'email': find_text(a, 'wp:author_email', self.ns_wp), 'display_name': find_text(a, 'wp:author_display_name', self.ns_wp), 'first_name': find_text(a, 'wp:author_first_name', self.ns_wp), 'last_name': find_text(a, 'wp:author_last_name', self.ns_wp), 'author_id': find_text(a, 'wp:author_id', self.ns_wp)} site_config['site']['authors'] = authors return site_config def _getPosts(self, channel): for i in channel.findall('item'): post_type = find_text(i, 'wp:post_type', self.ns_wp) if post_type == 'attachment': yield self._getAssetInfo(i) else: yield self._getPostInfo(i) def _getAssetInfo(self, node): url = find_text(node, 'wp:attachment_url', self.ns_wp) return {'type': 'attachment', 'url': url} def _getPostInfo(self, node): post_date_str = find_text(node, 'wp:post_date', self.ns_wp) post_date = datetime.datetime.strptime(post_date_str, '%Y-%m-%d %H:%M:%S') post_name = find_text(node, 'wp:post_name', self.ns_wp) post_type = find_text(node, 'wp:post_type', self.ns_wp) post_info = { 'type': post_type, 'slug': post_name, 'datetime': post_date} title = find_text(node, 'title') creator = find_text(node, 'dc:creator', self.ns_dc) status = find_text(node, 'wp:status', self.ns_wp) post_id = find_text(node, 'wp:post_id', self.ns_wp) guid = find_text(node, 'guid') description = find_text(node, 'description') # TODO: menu order, parent, password, sticky post_info.update({ 'title': title, 'author': creator, 'status': status, 'post_id': post_id, 'post_guid': guid, 'description': description}) categories = [] for c in node.findall('category'): nicename = str(c.attrib.get('nicename')) categories.append(nicename) post_info['categories'] = categories metadata = {} for m in node.findall('wp:postmeta', self.ns_wp): key = find_text(m, 'wp:meta_key', self.ns_wp) metadata[key] = find_text(m, 'wp:meta_value', self.ns_wp) post_info['metadata'] = metadata content = find_text(node, 'content:encoded', self.ns_content) excerpt = find_text(node, 'excerpt:encoded', self.ns_excerpt) post_info.update({ 'content': content, 'excerpt': excerpt}) return post_info class WordpressXmlImporter(WordpressImporterBase): name = 'wordpress-xml' description = "Imports a Wordpress blog from an exported XML archive." def setupParser(self, parser, app): super(WordpressXmlImporter, self).setupParser(parser, app) parser.add_argument( 'xml_path', help="The path to the exported XML archive file.") def _getImplementation(self, app, args): return _XmlImporter(app, args) def find_text(parent, child_name, namespaces=None): return str(parent.find(child_name, namespaces).text)