view piecrust/importing/wordpress.py @ 1193:032920568ba7 draft

cm: Update requirements.
author Ludovic Chabant <ludovic@chabant.com>
date Fri, 30 Dec 2022 16:49:04 -0800
parents 4850f8c21b6e
children
line wrap: on
line source

import os.path
import logging
import datetime
import yaml
from collections import OrderedDict
from piecrust import CONFIG_PATH
from piecrust.configuration import (
    ConfigurationLoader, ConfigurationDumper, merge_dicts)
from piecrust.importing.base import Importer, create_page, download_asset


logger = logging.getLogger(__name__)


class WordpressImporterBase(Importer):
    def setupParser(self, parser, app):
        parser.add_argument(
            '--pages-source',
            default="pages",
            help="The source to store pages in.")
        parser.add_argument(
            '--posts-source',
            default="posts",
            help="The source to store posts in.")
        parser.add_argument(
            '--default-post-layout',
            help="The default layout to use for posts.")
        parser.add_argument(
            '--default-post-category',
            help="The default category to use for posts.")
        parser.add_argument(
            '--default-page-layout',
            help="The default layout to use for pages.")
        parser.add_argument(
            '--default-page-category',
            help="The default category to use for pages.")

    def importWebsite(self, app, args):
        impl = self._getImplementation(app, args)
        return impl.importWebsite()

    def _getImplementation(self, app, args):
        raise NotImplementedError()


class _ImporterBase(object):
    def __init__(self, app, args):
        self.app = app
        self._cat_map = {}
        self._author_map = {}
        self._pages_source = app.getSource(args.pages_source)
        self._posts_source = app.getSource(args.posts_source)

    def importWebsite(self):
        ctx = self._open()

        # Site configuration.
        logger.info("Generating site configuration...")
        site_config = self._getSiteConfig(ctx)
        site_config.setdefault('site', {})
        site_config['site'].update({
            'post_url': '%year%/%month%/%slug%',
            'category_url': 'category/%category%'})

        site_config_path = os.path.join(self.app.root_dir, CONFIG_PATH)
        with open(site_config_path, 'r') as fp:
            cfg_data = yaml.load(fp, Loader=ConfigurationLoader)

        cfg_data = cfg_data or {}
        merge_dicts(cfg_data, site_config)

        with open(site_config_path, 'w') as fp:
            yaml.dump(cfg_data, fp, default_flow_style=False,
                      allow_unicode=True,
                      Dumper=ConfigurationDumper)

        # Content
        for p in self._getPosts(ctx):
            if p['type'] == 'attachment':
                self._createAsset(p)
            else:
                self._createPost(p)

        self._close(ctx)

    def _open(self):
        raise NotImplementedError()

    def _close(self, ctx):
        pass

    def _getSiteConfig(self, ctx):
        raise NotImplementedError()

    def _getPosts(self, ctx):
        raise NotImplementedError()

    def _createAsset(self, asset_info):
        download_asset(self.app, asset_info['url'])

    def _createPost(self, post_info):
        post_dt = post_info['datetime']
        finder = {
            'year': post_dt.year,
            'month': post_dt.month,
            'day': post_dt.day,
            'slug': post_info['slug']}
        if post_info['type'] == 'post':
            source = self._posts_source
        elif post_info['type'] == 'page':
            source = self._pages_source
        else:
            raise Exception("Unknown post type: %s" % post_info['type'])
        factory = source.findPageFactory(finder, MODE_CREATING)

        metadata = post_info['metadata'].copy()
        for name in ['title', 'author', 'status', 'post_id', 'post_guid',
                     'description', 'categories']:
            val = post_info.get(name)
            if val is not None:
                metadata[name] = val

        content = post_info['content']
        excerpt = post_info['excerpt']
        text = content
        if excerpt is not None and excerpt.strip() != '':
            text = "%s\n\n---excerpt---\n\n%s" % (content, excerpt)

        status = metadata.get('status')
        if status == 'publish':
            path = factory.path
            create_page(self.app, path, metadata, text)
        elif status == 'draft':
            filename = '-'.join(metadata['title'].split(' ')) + '.html'
            path = os.path.join(self.app.root_dir, 'drafts', filename)
            create_page(self.app, path, metadata, text)
        else:
            logger.warning("Ignoring post with status: %s" % status)


class _XmlImporter(_ImporterBase):
    ns_wp = {'wp': 'http://wordpress.org/export/1.2/'}
    ns_dc = {'dc': "http://purl.org/dc/elements/1.1/"}
    ns_excerpt = {'excerpt': "http://wordpress.org/export/1.2/excerpt/"}
    ns_content = {'content': "http://purl.org/rss/1.0/modules/content/"}

    def __init__(self, app, args):
        super(_XmlImporter, self).__init__(app, args)
        self.path = args.xml_path

    def _open(self):
        if not os.path.exists(self.path):
            raise Exception("No such file: %s" % self.path)

        try:
            import xml.etree.ElementTree as ET
        except ImportError:
            logger.error("You don't seem to have any support for ElementTree "
                         "XML parsing.")
            return 1

        with open(self.path, 'r', encoding='utf8') as fp:
            xml = fp.read()
        xml = xml.replace(chr(0x1e), '')
        xml = xml.replace(chr(0x10), '')
        tree = ET.fromstring(xml)
        channel = tree.find('channel')

        return channel

    def _getSiteConfig(self, channel):
        # Get basic site information
        title = find_text(channel, 'title')
        description = find_text(channel, 'description')
        site_config = OrderedDict({
            'site': {
                'title': title,
                'description': description}
        })

        # Get authors' names.
        authors = {}
        for a in channel.findall('wp:author', self.ns_wp):
            login = find_text(a, 'wp:author_login', self.ns_wp)
            authors[login] = {
                'email': find_text(a, 'wp:author_email', self.ns_wp),
                'display_name': find_text(a, 'wp:author_display_name',
                                          self.ns_wp),
                'first_name': find_text(a, 'wp:author_first_name',
                                        self.ns_wp),
                'last_name': find_text(a, 'wp:author_last_name',
                                       self.ns_wp),
                'author_id': find_text(a, 'wp:author_id',
                                       self.ns_wp)}
        site_config['site']['authors'] = authors

        return site_config

    def _getPosts(self, channel):
        for i in channel.findall('item'):
            post_type = find_text(i, 'wp:post_type', self.ns_wp)
            if post_type == 'attachment':
                yield self._getAssetInfo(i)
            else:
                yield self._getPostInfo(i)

    def _getAssetInfo(self, node):
        url = find_text(node, 'wp:attachment_url', self.ns_wp)
        return {'type': 'attachment', 'url': url}

    def _getPostInfo(self, node):
        post_date_str = find_text(node, 'wp:post_date', self.ns_wp)
        post_date = datetime.datetime.strptime(post_date_str,
                                               '%Y-%m-%d %H:%M:%S')
        post_name = find_text(node, 'wp:post_name', self.ns_wp)
        post_type = find_text(node, 'wp:post_type', self.ns_wp)
        post_info = {
            'type': post_type,
            'slug': post_name,
            'datetime': post_date}

        title = find_text(node, 'title')
        creator = find_text(node, 'dc:creator', self.ns_dc)
        status = find_text(node, 'wp:status', self.ns_wp)
        post_id = find_text(node, 'wp:post_id', self.ns_wp)
        guid = find_text(node, 'guid')
        description = find_text(node, 'description')
        # TODO: menu order, parent, password, sticky
        post_info.update({
            'title': title,
            'author': creator,
            'status': status,
            'post_id': post_id,
            'post_guid': guid,
            'description': description})

        categories = []
        for c in node.findall('category'):
            nicename = str(c.attrib.get('nicename'))
            categories.append(nicename)
        post_info['categories'] = categories

        metadata = {}
        for m in node.findall('wp:postmeta', self.ns_wp):
            key = find_text(m, 'wp:meta_key', self.ns_wp)
            metadata[key] = find_text(m, 'wp:meta_value', self.ns_wp)
        post_info['metadata'] = metadata

        content = find_text(node, 'content:encoded', self.ns_content)
        excerpt = find_text(node, 'excerpt:encoded', self.ns_excerpt)
        post_info.update({
            'content': content,
            'excerpt': excerpt})

        return post_info


class WordpressXmlImporter(WordpressImporterBase):
    name = 'wordpress-xml'
    description = "Imports a Wordpress blog from an exported XML archive."

    def setupParser(self, parser, app):
        super(WordpressXmlImporter, self).setupParser(parser, app)
        parser.add_argument(
                'xml_path',
                help="The path to the exported XML archive file.")

    def _getImplementation(self, app, args):
        return _XmlImporter(app, args)


def find_text(parent, child_name, namespaces=None):
    return str(parent.find(child_name, namespaces).text)