view piecrust/importing/wordpress.py @ 1145:e94737572542

serve: Fix an issue where false positive matches were rendered as the requested page. Now we try to render the page, but also try to detect for the most common "empty" pages.
author Ludovic Chabant <ludovic@chabant.com>
date Tue, 05 Jun 2018 22:08:51 -0700
parents 4850f8c21b6e
children
line wrap: on
line source

import os.path
import logging
import datetime
import yaml
from collections import OrderedDict
from piecrust import CONFIG_PATH
from piecrust.configuration import (
    ConfigurationLoader, ConfigurationDumper, merge_dicts)
from piecrust.importing.base import Importer, create_page, download_asset


logger = logging.getLogger(__name__)


class WordpressImporterBase(Importer):
    def setupParser(self, parser, app):
        parser.add_argument(
            '--pages-source',
            default="pages",
            help="The source to store pages in.")
        parser.add_argument(
            '--posts-source',
            default="posts",
            help="The source to store posts in.")
        parser.add_argument(
            '--default-post-layout',
            help="The default layout to use for posts.")
        parser.add_argument(
            '--default-post-category',
            help="The default category to use for posts.")
        parser.add_argument(
            '--default-page-layout',
            help="The default layout to use for pages.")
        parser.add_argument(
            '--default-page-category',
            help="The default category to use for pages.")

    def importWebsite(self, app, args):
        impl = self._getImplementation(app, args)
        return impl.importWebsite()

    def _getImplementation(self, app, args):
        raise NotImplementedError()


class _ImporterBase(object):
    def __init__(self, app, args):
        self.app = app
        self._cat_map = {}
        self._author_map = {}
        self._pages_source = app.getSource(args.pages_source)
        self._posts_source = app.getSource(args.posts_source)

    def importWebsite(self):
        ctx = self._open()

        # Site configuration.
        logger.info("Generating site configuration...")
        site_config = self._getSiteConfig(ctx)
        site_config.setdefault('site', {})
        site_config['site'].update({
            'post_url': '%year%/%month%/%slug%',
            'category_url': 'category/%category%'})

        site_config_path = os.path.join(self.app.root_dir, CONFIG_PATH)
        with open(site_config_path, 'r') as fp:
            cfg_data = yaml.load(fp, Loader=ConfigurationLoader)

        cfg_data = cfg_data or {}
        merge_dicts(cfg_data, site_config)

        with open(site_config_path, 'w') as fp:
            yaml.dump(cfg_data, fp, default_flow_style=False,
                      allow_unicode=True,
                      Dumper=ConfigurationDumper)

        # Content
        for p in self._getPosts(ctx):
            if p['type'] == 'attachment':
                self._createAsset(p)
            else:
                self._createPost(p)

        self._close(ctx)

    def _open(self):
        raise NotImplementedError()

    def _close(self, ctx):
        pass

    def _getSiteConfig(self, ctx):
        raise NotImplementedError()

    def _getPosts(self, ctx):
        raise NotImplementedError()

    def _createAsset(self, asset_info):
        download_asset(self.app, asset_info['url'])

    def _createPost(self, post_info):
        post_dt = post_info['datetime']
        finder = {
            'year': post_dt.year,
            'month': post_dt.month,
            'day': post_dt.day,
            'slug': post_info['slug']}
        if post_info['type'] == 'post':
            source = self._posts_source
        elif post_info['type'] == 'page':
            source = self._pages_source
        else:
            raise Exception("Unknown post type: %s" % post_info['type'])
        factory = source.findPageFactory(finder, MODE_CREATING)

        metadata = post_info['metadata'].copy()
        for name in ['title', 'author', 'status', 'post_id', 'post_guid',
                     'description', 'categories']:
            val = post_info.get(name)
            if val is not None:
                metadata[name] = val

        content = post_info['content']
        excerpt = post_info['excerpt']
        text = content
        if excerpt is not None and excerpt.strip() != '':
            text = "%s\n\n---excerpt---\n\n%s" % (content, excerpt)

        status = metadata.get('status')
        if status == 'publish':
            path = factory.path
            create_page(self.app, path, metadata, text)
        elif status == 'draft':
            filename = '-'.join(metadata['title'].split(' ')) + '.html'
            path = os.path.join(self.app.root_dir, 'drafts', filename)
            create_page(self.app, path, metadata, text)
        else:
            logger.warning("Ignoring post with status: %s" % status)


class _XmlImporter(_ImporterBase):
    ns_wp = {'wp': 'http://wordpress.org/export/1.2/'}
    ns_dc = {'dc': "http://purl.org/dc/elements/1.1/"}
    ns_excerpt = {'excerpt': "http://wordpress.org/export/1.2/excerpt/"}
    ns_content = {'content': "http://purl.org/rss/1.0/modules/content/"}

    def __init__(self, app, args):
        super(_XmlImporter, self).__init__(app, args)
        self.path = args.xml_path

    def _open(self):
        if not os.path.exists(self.path):
            raise Exception("No such file: %s" % self.path)

        try:
            import xml.etree.ElementTree as ET
        except ImportError:
            logger.error("You don't seem to have any support for ElementTree "
                         "XML parsing.")
            return 1

        with open(self.path, 'r', encoding='utf8') as fp:
            xml = fp.read()
        xml = xml.replace(chr(0x1e), '')
        xml = xml.replace(chr(0x10), '')
        tree = ET.fromstring(xml)
        channel = tree.find('channel')

        return channel

    def _getSiteConfig(self, channel):
        # Get basic site information
        title = find_text(channel, 'title')
        description = find_text(channel, 'description')
        site_config = OrderedDict({
            'site': {
                'title': title,
                'description': description}
        })

        # Get authors' names.
        authors = {}
        for a in channel.findall('wp:author', self.ns_wp):
            login = find_text(a, 'wp:author_login', self.ns_wp)
            authors[login] = {
                'email': find_text(a, 'wp:author_email', self.ns_wp),
                'display_name': find_text(a, 'wp:author_display_name',
                                          self.ns_wp),
                'first_name': find_text(a, 'wp:author_first_name',
                                        self.ns_wp),
                'last_name': find_text(a, 'wp:author_last_name',
                                       self.ns_wp),
                'author_id': find_text(a, 'wp:author_id',
                                       self.ns_wp)}
        site_config['site']['authors'] = authors

        return site_config

    def _getPosts(self, channel):
        for i in channel.findall('item'):
            post_type = find_text(i, 'wp:post_type', self.ns_wp)
            if post_type == 'attachment':
                yield self._getAssetInfo(i)
            else:
                yield self._getPostInfo(i)

    def _getAssetInfo(self, node):
        url = find_text(node, 'wp:attachment_url', self.ns_wp)
        return {'type': 'attachment', 'url': url}

    def _getPostInfo(self, node):
        post_date_str = find_text(node, 'wp:post_date', self.ns_wp)
        post_date = datetime.datetime.strptime(post_date_str,
                                               '%Y-%m-%d %H:%M:%S')
        post_name = find_text(node, 'wp:post_name', self.ns_wp)
        post_type = find_text(node, 'wp:post_type', self.ns_wp)
        post_info = {
            'type': post_type,
            'slug': post_name,
            'datetime': post_date}

        title = find_text(node, 'title')
        creator = find_text(node, 'dc:creator', self.ns_dc)
        status = find_text(node, 'wp:status', self.ns_wp)
        post_id = find_text(node, 'wp:post_id', self.ns_wp)
        guid = find_text(node, 'guid')
        description = find_text(node, 'description')
        # TODO: menu order, parent, password, sticky
        post_info.update({
            'title': title,
            'author': creator,
            'status': status,
            'post_id': post_id,
            'post_guid': guid,
            'description': description})

        categories = []
        for c in node.findall('category'):
            nicename = str(c.attrib.get('nicename'))
            categories.append(nicename)
        post_info['categories'] = categories

        metadata = {}
        for m in node.findall('wp:postmeta', self.ns_wp):
            key = find_text(m, 'wp:meta_key', self.ns_wp)
            metadata[key] = find_text(m, 'wp:meta_value', self.ns_wp)
        post_info['metadata'] = metadata

        content = find_text(node, 'content:encoded', self.ns_content)
        excerpt = find_text(node, 'excerpt:encoded', self.ns_excerpt)
        post_info.update({
            'content': content,
            'excerpt': excerpt})

        return post_info


class WordpressXmlImporter(WordpressImporterBase):
    name = 'wordpress-xml'
    description = "Imports a Wordpress blog from an exported XML archive."

    def setupParser(self, parser, app):
        super(WordpressXmlImporter, self).setupParser(parser, app)
        parser.add_argument(
                'xml_path',
                help="The path to the exported XML archive file.")

    def _getImplementation(self, app, args):
        return _XmlImporter(app, args)


def find_text(parent, child_name, namespaces=None):
    return str(parent.find(child_name, namespaces).text)