0
|
1 import os.path
|
|
2 import logging
|
|
3 import datetime
|
|
4
|
|
5
|
|
6 logger = logging.getLogger(__name__)
|
|
7
|
|
8
|
|
9 def parse_url(url_or_path):
|
|
10 mf_obj = parse_mf2(url_or_path)
|
|
11 matcher = EntryMatcher(mf_obj)
|
|
12
|
|
13 feed = Feed(url_or_path, matcher.mf_dict)
|
|
14
|
|
15 entries = []
|
|
16 for pair in matcher.entries:
|
|
17 mf_entry, bs_el = pair
|
|
18 try:
|
|
19 entry = Entry(feed, mf_entry, bs_el)
|
|
20 entry.interpret()
|
|
21 except InvalidEntryException:
|
|
22 logger.debug("Found invalid entry... skipping.")
|
|
23 continue
|
|
24
|
|
25 entries.append(entry)
|
|
26
|
|
27 sorted_entries = sorted(
|
|
28 entries,
|
|
29 key=lambda e: e.get(
|
|
30 'published', datetime.datetime.fromtimestamp(
|
|
31 0,
|
|
32 tz=datetime.timezone(datetime.timedelta(0)))),
|
|
33 reverse=False)
|
|
34
|
|
35 feed.entries = sorted_entries
|
|
36 return feed
|
|
37
|
|
38
|
|
39 def parse_mf2(url_or_path):
|
|
40 import mf2py
|
|
41 logger.debug("Fetching %s..." % url_or_path)
|
|
42 if os.path.exists(url_or_path):
|
|
43 obj = open(url_or_path, 'r', encoding='utf8')
|
|
44 params = {'doc': obj}
|
|
45 else:
|
|
46 params = {'url': url_or_path}
|
|
47 return mf2py.Parser(html_parser='html5lib', **params)
|
|
48
|
|
49
|
|
50 class InvalidEntryException(Exception):
|
|
51 pass
|
|
52
|
|
53
|
|
54 class Feed:
|
|
55 def __init__(self, url, mf_dict):
|
|
56 self.url = url
|
|
57 self._mf_dict = mf_dict
|
|
58 self.entries = []
|
|
59
|
|
60
|
|
61 class Entry:
|
|
62 def __init__(self, owner_feed, mf_entry, bs_obj):
|
|
63 self._owner_feed = owner_feed
|
|
64 self._mf_entry = mf_entry
|
|
65 self._bs_obj = bs_obj
|
|
66
|
|
67 self._type = None
|
|
68 self._props = None
|
|
69
|
|
70 @property
|
|
71 def entry_type(self):
|
|
72 return self._type
|
|
73
|
|
74 @property
|
|
75 def html_element(self):
|
|
76 return self._bs_obj
|
|
77
|
|
78 @property
|
|
79 def best_name(self):
|
|
80 self.interpret()
|
|
81
|
|
82 for pn in ['title', 'name', 'content-plain', 'content']:
|
|
83 pv = self._props.get(pn)
|
|
84 if pv:
|
|
85 return pv
|
|
86 return None
|
|
87
|
|
88 def __getattr__(self, name):
|
|
89 try:
|
|
90 return self._doGet(name)
|
|
91 except KeyError:
|
|
92 raise AttributeError("Entry does not have property '%s'." % name)
|
|
93
|
|
94 def get(self, name, default=None, *, force_list=False):
|
|
95 try:
|
|
96 return self._doGet(name, force_list=force_list)
|
|
97 except KeyError:
|
|
98 return default
|
|
99
|
|
100 def _doGet(self, name, force_list=False):
|
|
101 self.interpret()
|
|
102
|
|
103 values = self._props[name]
|
|
104 if not force_list and isinstance(values, list) and len(values) == 1:
|
|
105 return values[0]
|
|
106 return values
|
|
107
|
|
108 def htmlFind(self, *args, **kwargs):
|
|
109 if self._bs_obj is None:
|
|
110 raise Exception("No HTML object is available for this entry.")
|
|
111
|
|
112 return self._bs_obj.find(*args, **kwargs)
|
|
113
|
|
114 def interpret(self):
|
|
115 if self._type is not None or self._props is not None:
|
|
116 return
|
|
117
|
|
118 import mf2util
|
|
119
|
|
120 self._type = mf2util.post_type_discovery(self._mf_entry)
|
|
121 self._props = mf2util.interpret_entry(
|
|
122 self._owner_feed._mf_dict, self._owner_feed.url,
|
|
123 hentry=self._mf_entry)
|
|
124
|
|
125 # Adds a `is_micropost` property.
|
|
126 self._detect_micropost()
|
|
127
|
|
128 # mf2util only detects the first photo for a "photo"-type post,
|
|
129 # but there might be several so we need to fix that.
|
|
130 #
|
|
131 # mf2util also apparently doesn't always bring "category" info.
|
|
132 self._fix_interpreted_props('photo', 'category')
|
|
133
|
|
134 def _detect_micropost(self):
|
|
135 is_micro = False
|
|
136 name = self.get('name')
|
|
137 content = self.get('content-plain')
|
|
138 if content and not name:
|
|
139 is_micro = True
|
|
140 elif name and not content:
|
|
141 is_micro = True
|
|
142 elif name and content:
|
|
143 shortest = min(len(name), len(content))
|
|
144 is_micro = (name[:shortest] == content[:shortest])
|
|
145 self._props['is_micropost'] = is_micro
|
|
146
|
|
147 def _fix_interpreted_props(self, *names):
|
|
148 for name in names:
|
|
149 values = self._mf_entry['properties'].get(name, [])
|
|
150 if isinstance(values, str):
|
|
151 values = [values]
|
|
152 self._props[name] = values
|
|
153
|
|
154
|
|
155 class EntryMatcher:
|
|
156 """ A class that matches `mf2util` results along with the original
|
|
157 BeautifulSoup document, so we have HTML objects on hand if needed.
|
|
158 """
|
|
159 def __init__(self, mf_obj):
|
|
160 self.mf_dict = mf_obj.to_dict()
|
|
161 self.entries = []
|
|
162
|
|
163 els_by_type = {}
|
|
164 next_el = {}
|
|
165 bf = mf_obj.__doc__
|
|
166 for e in self.mf_dict.get('items', []):
|
|
167 types = e.get('type')
|
|
168 if not types:
|
|
169 continue
|
|
170
|
|
171 entry_type = types[0]
|
|
172 if entry_type not in els_by_type:
|
|
173 ebt = list(bf.find_all(class_=entry_type))
|
|
174 els_by_type[entry_type] = ebt
|
|
175 next_el[entry_type] = 0
|
|
176
|
|
177 els = els_by_type[entry_type]
|
|
178 e_and_el = (e, els[next_el[entry_type]])
|
|
179 self.entries.append(e_and_el)
|
|
180 next_el[entry_type] += 1
|