Mercurial > silorider
view silorider/parse.py @ 18:a921cc2306bc
Do our own HTML parsing/stripping of micropost contents.
- This lets us properly handle various forms of linking.
- Add tests for processing posts with links.
- Fix configuration in tests.
- Basic error handling for processing posts.
author | Ludovic Chabant <ludovic@chabant.com> |
---|---|
date | Sun, 16 Sep 2018 21:16:20 -0700 |
parents | a1b7a459326a |
children | 69a6a8c9d33d |
line wrap: on
line source
import os.path import logging import datetime logger = logging.getLogger(__name__) def parse_url(url_or_path): mf_obj = parse_mf2(url_or_path) matcher = EntryMatcher(mf_obj) feed = Feed(url_or_path, matcher.mf_dict) entries = [] for pair in matcher.entries: mf_entry, bs_el = pair try: entry = Entry(feed, mf_entry, bs_el) entry.interpret() except InvalidEntryException: logger.debug("Found invalid entry... skipping.") continue entries.append(entry) sorted_entries = sorted( entries, key=lambda e: e.get( 'published', datetime.datetime.fromtimestamp( 0, tz=datetime.timezone(datetime.timedelta(0)))), reverse=False) feed.entries = sorted_entries return feed def parse_mf2(url_or_path): import mf2py logger.debug("Fetching %s..." % url_or_path) if os.path.exists(url_or_path): obj = open(url_or_path, 'r', encoding='utf8') params = {'doc': obj} else: params = {'url': url_or_path} return mf2py.Parser(html_parser='html5lib', **params) class InvalidEntryException(Exception): pass class Feed: def __init__(self, url, mf_dict): self.url = url self._mf_dict = mf_dict self.entries = [] class Entry: def __init__(self, owner_feed, mf_entry, bs_obj): self._owner_feed = owner_feed self._mf_entry = mf_entry self._bs_obj = bs_obj self._type = None self._props = None @property def entry_type(self): return self._type @property def html_element(self): return self._bs_obj def __getattr__(self, name): try: return self._doGet(name) except KeyError: raise AttributeError("Entry does not have property '%s'." % name) def get(self, name, default=None, *, force_list=False): try: return self._doGet(name, force_list=force_list) except KeyError: return default def _doGet(self, name, force_list=False): self.interpret() values = self._props[name] if not force_list and isinstance(values, list) and len(values) == 1: return values[0] return values def htmlFind(self, *args, **kwargs): if self._bs_obj is None: raise Exception("No HTML object is available for this entry.") return self._bs_obj.find(*args, **kwargs) def interpret(self): if self._type is not None or self._props is not None: return import mf2util self._type = mf2util.post_type_discovery(self._mf_entry) self._props = mf2util.interpret_entry( self._owner_feed._mf_dict, self._owner_feed.url, hentry=self._mf_entry) # Adds a `is_micropost` property. self._detect_micropost() # mf2util only detects the first photo for a "photo"-type post, # but there might be several so we need to fix that. # # mf2util also apparently doesn't always bring "category" info. self._fix_interpreted_props('photo', 'category') def _detect_micropost(self): is_micro = False name = self.get('name') content = self.get('content-plain') if content and not name: is_micro = True elif name and not content: is_micro = True elif name and content: shortest = min(len(name), len(content)) is_micro = (name[:shortest] == content[:shortest]) self._props['is_micropost'] = is_micro def _fix_interpreted_props(self, *names): for name in names: values = self._mf_entry['properties'].get(name, []) if isinstance(values, str): values = [values] self._props[name] = values class EntryMatcher: """ A class that matches `mf2util` results along with the original BeautifulSoup document, so we have HTML objects on hand if needed. """ def __init__(self, mf_obj): self.mf_dict = mf_obj.to_dict() self.entries = [] els_by_type = {} next_el = {} bf = mf_obj.__doc__ for e in self.mf_dict.get('items', []): types = e.get('type') if not types: continue entry_type = types[0] if entry_type not in els_by_type: ebt = list(bf.find_all(class_=entry_type)) els_by_type[entry_type] = ebt next_el[entry_type] = 0 els = els_by_type[entry_type] e_and_el = (e, els[next_el[entry_type]]) self.entries.append(e_and_el) next_el[entry_type] += 1