123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122 |
- """
- @author: luojunhui
- """
- from datetime import datetime
- from applications.functions.video_item import VideoProducer
- from applications.log import logging
- from applications.match_algorithm.rank import title_similarity_rank
- from .spiderAB import SearchABTest
- from .spiderSchedule import SearchMethod
- async def save_video_to_mysql(video_obj, user, trace_id, platform, content_id, crawler_video_table, db_client, similarity_score):
- """
- 异步处理微信 video_obj
- 公众号和站内账号一一对应
- :param similarity_score:
- :param crawler_video_table: 爬虫表
- :param db_client: mysql
- :param content_id:
- :param platform:
- :param user:
- :param trace_id:
- :param video_obj:
- :return:
- """
- Video = VideoProducer()
- if platform == "xg_search":
- mq_obj = Video.xg_video_produce(
- video_obj=video_obj,
- user=user,
- trace_id=trace_id,
- )
- elif platform == "baidu_search":
- mq_obj = Video.baidu_video_produce(
- video_obj=video_obj,
- user=user,
- trace_id=trace_id,
- )
- elif platform == "wx_search":
- mq_obj = Video.wx_video_produce(
- video_obj=video_obj,
- user=user,
- trace_id=trace_id,
- )
- elif platform == "dy_search":
- mq_obj = Video.dy_video_produce(
- video_obj=video_obj,
- user=user,
- trace_id=trace_id,
- )
- else:
- mq_obj = {}
- mq_obj['trace_id'] = trace_id
- mq_obj['content_id'] = content_id
- insert_sql = f"""
- INSERT INTO {crawler_video_table}
- (content_id, out_video_id, platform, video_title, play_count, like_count, publish_time, crawler_time, duration, video_url, cover_url, user_id, trace_id, score)
- values
- (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s);
- """
- await db_client.async_insert(
- sql=insert_sql,
- params=(
- content_id,
- mq_obj['video_id'],
- platform,
- mq_obj['video_title'],
- mq_obj['play_cnt'],
- mq_obj['like_cnt'],
- datetime.fromtimestamp(mq_obj['publish_timestamp']).strftime('%Y-%m-%d %H:%M:%S'),
- datetime.now().__str__(),
- mq_obj['duration'],
- mq_obj['video_url'],
- mq_obj['cover_url'],
- mq_obj['user_id'],
- trace_id,
- similarity_score
- )
- )
- async def search_videos_from_web(info, gh_id_map, db_client):
- """
- search and send msg to ETL
- :param db_client:
- :param gh_id_map:
- :param info:
- :return:
- """
- default_account_id = 69637498
- search_AB = SearchABTest(info=info, searchMethod=SearchMethod())
- # 启三个搜索,每个搜索都保证要搜索到, 分别用key1, key2, key3去搜索
- trace_id = info['trace_id']
- gh_id = info['gh_id']
- content_id = info['content_id']
- recall_list = await search_AB.ab_5()
- logging(
- code="1006",
- info="搜索到{}条视频".format(len(recall_list)),
- data=recall_list,
- trace_id=info['trace_id']
- )
- # 按照标题相似度排序
- ranked_list = title_similarity_rank(content_title=info['ori_title'].split("@@")[-1], recall_list=recall_list)
- for recall_obj in ranked_list:
- if recall_obj:
- platform = recall_obj['platform']
- recall_video = recall_obj['result']
- score = recall_video['score']
- if recall_video:
- await save_video_to_mysql(
- video_obj=recall_video,
- user=gh_id_map.get(gh_id, default_account_id),
- trace_id=trace_id,
- platform=platform,
- content_id=content_id,
- crawler_video_table=info['crawler_video_table'],
- db_client=db_client,
- similarity_score=score
- )
- return len(ranked_list)
|