view piecrust/importing/wordpress.py @ 300:2daa05a21026

import: Add an XML-based Wordpress importer.
author Ludovic Chabant <ludovic@chabant.com>
date Wed, 11 Mar 2015 23:48:35 -0700
parents
children 103abb08755e
line wrap: on
line source

import os.path
import logging
import datetime
import yaml
from urllib.parse import urlparse
from piecrust import CONFIG_PATH
from piecrust.importing.base import Importer, create_page, download_asset
from piecrust.sources.base import MODE_CREATING


logger = logging.getLogger(__name__)


class WordpressImporter(Importer):
    name = 'wordpress'
    description = "Imports a Wordpress blog."

    def setupParser(self, parser, app):
        parser.add_argument(
                '--posts_fs',
                default="hierarchy",
                choices=['flat', 'shallow', 'hierarchy'],
                help="The blog file-system type to use.")
        parser.add_argument(
                '--prefix',
                default="wp_",
                help="The SQL table prefix. Defaults to `wp_`.")
        parser.add_argument(
                '--default-post-layout',
                help="The default layout to use for posts.")
        parser.add_argument(
                '--default-post-category',
                help="The default category to use for posts.")
        parser.add_argument(
                '--default-page-layout',
                help="The default layout to use for pages.")
        parser.add_argument(
                '--default-page-category',
                help="The default category to use for pages.")
        parser.add_argument(
                'xml_or_db_url',
                help=("The exported XML archive of the Wordpress site, or "
                      "the URL of the SQL database.\n"
                      "\n"
                      "If an SQL database URL, it should be of the "
                      "form:  type://user:password@server/database\n"
                      "\n"
                      "For example:\n"
                      "mysql://user:password@example.org/my_database"))

    def importWebsite(self, app, args):
        parsed_url = urlparse(args.xml_or_db_url)
        if not parsed_url.scheme:
            impl = _XmlImporter(app, args)
        else:
            impl = _SqlImporter(app, args)
        return impl.importWebsite()


class _XmlImporter(object):
    ns_wp = {'wp': 'http://wordpress.org/export/1.2/'}
    ns_dc = {'dc': "http://purl.org/dc/elements/1.1/"}
    ns_excerpt = {'excerpt': "http://wordpress.org/export/1.2/excerpt/"}
    ns_content = {'content': "http://purl.org/rss/1.0/modules/content/"}

    def __init__(self, app, args):
        self.app = app
        self.path = args.xml_or_db_url
        self.posts_fs = args.posts_fs
        self._cat_map = {}
        self._author_map = {}

        for cls in self.app.plugin_loader.getSources():
            if cls.SOURCE_NAME == ('posts/%s' % self.posts_fs):
                src_config = {
                        'type': 'posts/%s' % self.posts_fs,
                        'fs_endpoint': 'posts',
                        'data_type': 'blog'}
                self.posts_source = cls(app, 'posts', src_config)
                break
        else:
            raise Exception("No such posts file-system: " % self.posts_fs)

    def importWebsite(self):
        if not os.path.exists(self.path):
            raise Exception("No such file: %s" % self.path)

        try:
            import xml.etree.ElementTree as ET
        except ImportError:
            logger.error("You don't seem to have any support for ElementTree "
                         "XML parsing.")
            return 1

        with open(self.path, 'r', encoding='utf8') as fp:
            xml = fp.read()
        xml = xml.replace(chr(0x1e), '')
        xml = xml.replace(chr(0x10), '')
        tree = ET.fromstring(xml)
        channel = tree.find('channel')

        # Get basic site information
        title = find_text(channel, 'title')
        description = find_text(channel, 'description')
        site_config = {
                'site': {
                    'title': title,
                    'description': description,
                    'posts_fs': self.posts_fs}
                }
        logger.info("Importing '%s'" % title)

        # Get authors' names.
        authors = {}
        for a in channel.findall('wp:author', self.ns_wp):
            login = find_text(a, 'wp:author_login', self.ns_wp)
            authors[login] = {
                    'email': find_text(a, 'wp:author_email', self.ns_wp),
                    'display_name': find_text(a, 'wp:author_display_name',
                                              self.ns_wp),
                    'first_name': find_text(a, 'wp:author_first_name',
                                            self.ns_wp),
                    'last_name': find_text(a, 'wp:author_last_name',
                                           self.ns_wp),
                    'author_id': find_text(a, 'wp:author_id',
                                           self.ns_wp)}
        site_config['site']['authors'] = authors

        # Other stuff.
        site_config['site'].update({
                'post_url': '%year%/%month%/%slug%',
                'category_url': 'category/%category%'})

        logger.info("Generating site configuration...")
        site_config_path = os.path.join(self.app.root_dir, CONFIG_PATH)
        with open(site_config_path, 'w') as fp:
            yaml.safe_dump(site_config, fp, default_flow_style=False,
                           allow_unicode=True)

        # Content.
        for i in channel.findall('item'):
            post_type = find_text(i, 'wp:post_type', self.ns_wp)
            if post_type == 'attachment':
                self._createAsset(i)
            elif post_type == 'post':
                self._createPost(i)

        self._cat_map = None
        self._author_map = None

    def _createAsset(self, node):
        url = find_text(node, 'wp:attachment_url', self.ns_wp)
        download_asset(self.app, url)

    def _getPageMetadata(self, node):
        title = find_text(node, 'title')
        creator = find_text(node, 'dc:creator', self.ns_dc)
        status = find_text(node, 'wp:status', self.ns_wp)
        post_id = find_text(node, 'wp:post_id', self.ns_wp)
        guid = find_text(node, 'guid')
        description = find_text(node, 'description')
        # TODO: menu order, parent, password, sticky

        categories = []
        for c in node.findall('category'):
            nicename = str(c.attrib.get('nicename'))
            categories.append(nicename)

        metadata = {
                'title': title,
                'author': creator,
                'status': status,
                'post_id': post_id,
                'post_guid': guid,
                'description': description,
                'categories': categories}

        for m in node.findall('wp:postmeta', self.ns_wp):
            key = find_text(m, 'wp:meta_key', self.ns_wp)
            metadata[key] = find_text(m, 'wp:meta_value', self.ns_wp)

        return metadata

    def _getPageContents(self, node):
        content = find_text(node, 'content:encoded', self.ns_content)
        excerpt = find_text(node, 'excerpt:encoded', self.ns_excerpt)
        if not excerpt.strip():
            return content
        return "%s\n\n---excerpt---\n\n%s" % (content, excerpt)

    def _getPageInfo(self, node):
        url = find_text(node, 'link')
        post_date_str = find_text(node, 'wp:post_date', self.ns_wp)
        post_date = datetime.datetime.strptime(post_date_str,
                                               '%Y-%m-%d %H:%M:%S')
        post_name = find_text(node, 'wp:post_name', self.ns_wp)
        return {
                'url': url,
                'slug': post_name,
                'datetime': post_date}

    def _createPage(self, node):
        info = self._getPageInfo(node)
        rel_path = os.path.join('pages', info['slug'])
        metadata = self._getPageMetadata(node)
        contents = self._getPageContents(node)
        create_page(self.app, rel_path, metadata, contents)

    def _createPost(self, node):
        info = self._getPageInfo(node)
        post_dt = info['datetime']
        finder = {
                'year': post_dt.year,
                'month': post_dt.month,
                'day': post_dt.day,
                'slug': info['slug']}
        rel_path, fac_metadata = self.posts_source.findPagePath(
                finder, MODE_CREATING)
        rel_path = os.path.join('posts', rel_path)
        metadata = self._getPageMetadata(node)
        contents = self._getPageContents(node)
        create_page(self.app, rel_path, metadata, contents)


class _SqlImporter(object):
    def __init__(self, app, args):
        self.app = app
        self.db_url = args.xml_or_db_url
        self.prefix = args.prefix

    def importWebsite(self):
        raise NotImplementedError()


def find_text(parent, child_name, namespaces=None):
    return str(parent.find(child_name, namespaces).text)