看这篇文章前推荐阅读相关的如下文章:
知乎Live全文搜索之让elasticsearch_dsl支持asyncio
知乎Live全文搜索之使用Elasticsearch全文搜索
知乎Live全文搜索之模型设计和爬虫实现
知乎Live全文搜索之使用Elasticsearch做聚合分析
知乎Live全文搜索之使用Elasticsearch做搜索建议
在 知乎Live全文搜索之让elasticsearch_dsl支持asyncio 一文中,我把后端工作分成了4步,今天是完成爬虫和模型接口这2步,接口返回的数据会被微信小程序使用。
详细的列一下接口需求:
搜索。搜索符合输入的关键字的Live和用户,按照之前提到的各种策略排序,也支持通过status状态过滤「已结束」和「未结束」2种类型的Live。支持分页。
搜索建议。提供符合输入的关键字的Live的建议。
发现。把全部的Live按照之前提到的各种策略排序,可以通过各种字段排序,可以选择Live开始的时间范围(默认是全部)。
获取热门话题。
获取某话题详细信息及话题下的Live,支持分页、排序、时间范围。
获取全部用户,并且可以按照举办的Live数量、更新Live时间等条件排序。
获取单个用户信息。
根据各种策略排序,获取7天热门Live,非知乎排序。
根据各种策略排序,获取30天热门Live,非知乎排序。
由于4和5的需求,我添加了Topic这个模型,上篇文章说过SQLite不支持并发,所以替换成了MySQL,要把config里面的DB_URI改成如下格式:
DB_URI = 'mysql+pymysql://localhost/test?charset=utf8mb4'
其中test是库的名字,charset要用utf8mb4,因为有些用户信息甚至Live的标题里面包含emoji。MySQL的客户端用的是PyMySQL,需要在schema上指出来。
Topic类和之前的User格式差不多,只是不同的字段,限于篇幅就不列出来了。
为了实现可以按照举办的Live数量、更新Live时间排序,我添加了2个字段,也改了字符集:
from config import SUGGEST_USER_LIMIT, PEOPLE_URL, LIVE_USER_URL
class User(Base):
__tablename__ = 'users'
__table_args__ = {
'mysql_engine': 'InnoDB',
'mysql_charset': 'utf8mb4'
}
...
live_count = Column(Integer, default=0)
updated_time = Column(DateTime, default=datetime.now)
接着添加一些必要的方法:
class User(Base):
...
def incr_live_count(self):
self.live_count += 1
session.commit()
@property
def url(self):
return PEOPLE_URL.format(self.speaker_id)
@property
def lives_url(self):
return LIVE_USER_URL.format(self.speaker_id)
def to_dict(self):
d = {c.name: getattr(self, c.name, None)
for c in self.__table__.columns}
d.update({
'type': 'user',
'url': self.url,
'lives_url': self.lives_url
})
return d
我习惯给model添加一个to_dict方法,把需要的字段和值拼成一个dict返回。当然有些API实际不需要这么多的字段,在下一篇中我会介绍怎么处理schema的问题。
最后是3个接口方法:
class User(Base):
@classmethod
def add(cls, **kwargs):
speaker_id = kwargs.get('speaker_id', None)
r = None
if id is not None:
q = session.query(cls).filter_by(speaker_id=speaker_id)
r = q.first()
if r:
q.update(kwargs)
if r is None:
r = cls(**kwargs)
session.add(r)
try:
session.commit()
except:
session.rollback()
else:
return r
@classmethod
def suggest(cls, q, start=0, limit=SUGGEST_USER_LIMIT):
query = session.query(User)
users = query.filter(User.name.like('%{}%'.format(q))).offset(
start).limit(limit).all()
return [user.to_dict() for user in users]
@classmethod
def get_all(cls, order_by='id', start=0, limit=10, desc=False):
'''
:param order_by: One of ``'id'``, ``'live_count'`` or
``'updated_time'``
'''
query = session.query(User)
order_by = getattr(User, order_by)
if desc:
order_by = _desc(order_by)
users = query.order_by(order_by).offset(start).limit(limit).all()
return [user.to_dict() for user in users]
需要注意add方法,其实叫做add_or_update更合适,需要使用session一定要commit才能提交数据。
sqlalchemy没有自带的suggest功能,只能用Like来实现。get_all方法就是上面第6个需求接口。
首先道歉,之前我理解的自定义analyzer的用法是错误的,下面的才是正确的姿势:
from elasticsearch_dsl.analysis import CustomAnalyzer ik_analyzer = CustomAnalyzer( 'ik_analyzer', tokenizer='ik_max_word', filter=['lowercase'] )
tokenizer字段是必选的,这里使用ik分词插件提供的ik_max_word。我还给Live添加了2个字段:
class Live(DocType): cover = Text(index='not_analyzed') # 对应专栏头图(如果有) zhuanlan_url = Text(index='not_analyzed') # 对应专栏地址
加上参数 index='not_analyzed' 是因为这2个字段不用于搜索和聚合,没必要分词,就当成数据库使用了。
也给Live添加一些属性和方法,方便最后用to_dict()生成需要的全部数据:
from .speaker import User, session
class Live(DocType):
@property
def id(self):
return self._id
@property
def speaker(self):
return session.query(User).get(self.speaker_id)
@property
def url(self):
return LIVE_URL.format(self.id)
class Meta:
index = 'live130'
def to_dict(self, include_extended=True):
d = super().to_dict()
if include_extended:
d.update({
'id': self._id,
'type': 'live',
'speaker': self.speaker.to_dict(),
'url': self.url
})
return d
其中speaker属性是常见的关联多个model的快捷方式,但是需要注意,竟然不要设计成A的model里面某个方法返回了B的model数据,B的model里面也返回了A的model的数据而造成只能进行方法内import。
用 super().to_dict() 的原因是DocType内置了to_dict方法,随便提一下,而且接收include meta参数,为True会包含index和doc type的元数据。
这个是今天的重点,昨天说的「让elasticsearch_dsl支持asyncio」就是给今天做准备。换汤不换药,说白了就是在合适的地方添加async/await关键字,先看个add的:
class Live(DocType):
...
@classmethod
async def add(cls, **kwargs):
id = kwargs.pop('id', None)
if id is None:
return False
live = cls(meta={'id': int(id)}, **kwargs)
await live.save()
return live
现在我们挨个实现需求,首先是搜索接口,由于DocType包含了search方法,得换个名字了:
class Live(DocType):
...
async def _execute(cls, s, order_by=None):
# 可以选择字段的排序,前面加-表示desc,不加就是默认的asc
if order_by is not None:
s = s.sort(order_by)
lives = await s.execute() # 执行,要在这步之前拼好查询条件
return [live.to_dict() for live in lives]
@classmethod
def apply_weight(cls, s, start, limit):
return s.query(Q('function_score', functions=[gauss_sf, log_sf])).extra(
**{'from': start, 'size': limit})
@classmethod
async def ik_search(cls, query, status=None, start=0, limit=10):
s = cls.search()
# 多字段匹配要搜索的内容,SEARCH_FIELDS中不同字段权重不同
s = s.query('multi_match', query=query,
fields=SEARCH_FIELDS)
if status is not None: # 根据结束状态过滤
s = s.query('match', status=status)
# 搜索是带权重的,按照之前的设计做了时间衰减和归一化
s = cls.apply_weight(s, start, limit)
return await cls._execute(s)
就是根据需求,按照DSL的方式来拼。我添加了些注释,看不懂的话可以按照文章开始的链接去找找答案。
然后是发现接口,7/30天热门都是基于这个接口,只不过划定了时间:
class Live(DocType):
...
@classmethod
async def explore(cls, from_date=None, to_date=None, order_by=None,
start=0, limit=10, topic=None):
s = cls.search()
if topic is not None:
s = s.query(Q('term', topic_names=topic))
starts_at = {}
if from_date is not None:
starts_at['from'] = from_date
if to_date is not None:
starts_at['to'] = to_date
if starts_at:
s = s.query(Q('range', starts_at=starts_at))
if order_by is None:
s = cls.apply_weight(s, start, limit)
return await cls._execute(s, order_by)
@classmethod
async def get_hot_weekly(cls):
today = date.today()
return await cls.explore(from_date=today - timedelta(days=7),
to_date=today, limit=20)
@classmethod
async def get_hot_monthly(cls):
today = date.today()
return await cls.explore(from_date=today - timedelta(days=30),
to_date=today, limit=50)
注意,explore方法如果指定了排序方案,就不会添加时间衰减和归一化的处理了。
然后是获取用户举报的全部Live的方法:
class Live(DocType):
...
@classmethod
async def ik_search_by_speaker_id(cls, speaker_id, order_by='-starts_at'):
s = cls.search()
s = s.query(Q('bool', should=Q('match', speaker_id=speaker_id)))
return await cls._execute(s, order_by)
可以看到_execute方法抽象后被重复利用了。
再然后是suggest接口:
class Live(DocType):
...
@classmethod
async def ik_suggest(cls, query, size=10):
s = cls.search()
s = s.suggest('live_suggestion', query, completion={
'field': 'live_suggest', 'fuzzy': {'fuzziness': 2}, 'size': size
})
suggestions = await s.execute_suggest()
matches = suggestions.live_suggestion[0].options
ids = [match._id for match in matches]
lives = await Live.mget(ids)
return [live.to_dict() for live in lives]
其中支持2个编辑距离的模糊搜索。这个实现的比较简单,没有考虑拼音,也没有考虑搜索用户。值得一提的是DocType提供了mget这个获取多个id的接口,请善用减少网络请求,也就是给ES后端减压。
第4个获得热门话题的需求是本项目唯一用到聚合功能的地方了:
from .topic import Topic
class Live(DocType):
@classmethod
async def get_hot_topics(cls, size=50):
s = cls.search()
s.aggs.bucket('topics', A('terms', field='topics', size=size))
rs = await s.execute()
buckets = rs.aggregations.topics.buckets
topic_names = [r['key'] for r in buckets]
topics = session.query(Topic).filter(Topic.name.in_(topic_names)).all()
topics = sorted(topics, key=lambda t: topic_names.index(t.name))
return [topic.to_dict() for topic in topics]
每个Live都会打话题标签,越多的live打这个话题就说明它越热门。
最后要说的是init()方法:
async def init(): await Live.init()
原来import模块的时候直接就init了,现在由于异步化了,直接init没人所以要在loop中用,比如在爬虫中:
from models.live import init as live_init
if __name__ == '__main__':
loop = asyncio.get_event_loop()
crawler = Crawler()
loop.run_until_complete(live_init())
loop.run_until_complete(crawler.crawl())
print('Finished in {:.3f} secs'.format(crawler.t1 - crawler.t0))
crawler.close()
loop.close()
es.transport.close()
理解了嘛?
好了全部接口都完成了,但是大家有木有感觉,异步编程调试起来很麻烦,我来教一个好用的方法.
asyncio要求把需要协程化的函数都放进一个loop,通过run_until_complete方法让它执行完成。
但是现在非常不好玩:
In : from models import Live In : live = Live.get(789840559912009728) In : live Out: <coroutine object DocType.get at 0x10a0d1fc0> In : live.subject --------------------------------------------------------------------------- AttributeError Traceback (most recent call last) <ipython-input-4-8c237874146c> in <module>() ----> 1 live.subject AttributeError: 'coroutine' object has no attribute 'subject'
异步化的函数(方法)用起来很不直观。一开始可以写个脚本把要调试的东西放进去用(test_es.py):
import asyncio
from elasticsearch_dsl.connections import connections
from models.live import Live, SEARCH_FIELDS, init as live_init
s = Live.search()
es = connections.get_connection(Live._doc_type.using)
async def print_info():
rs = await s.query('multi_match', query='python',
fields=SEARCH_FIELDS).execute()
print(rs)
loop = asyncio.get_event_loop()
loop.run_until_complete(live_init())
loop.run_until_complete(print_info())
loop.close()
es.transport.close()
这样也是可以调试的,很麻烦,对吧?
抽象一下,其实写个函数就好了:
import asyncio def execute(coro): loop = asyncio.get_event_loop() rs = loop.run_until_complete(coro) return rs
OK, 再用:
In : from models import Live, execute In : live = Live.get(789840559912009728) In : live = execute(live) In : live.subject Out: 'Python 工程师的入门和进阶'
这样就方便多了。