""" @author: luojunhui """ import json from datetime import datetime from applications.feishu import bot from applications.const import server_const from applications.functions.video_item import VideoProducer from applications.log import logging from applications.match_algorithm import title_similarity_with_nlp from .spiderAB import SearchABTest from .spiderSchedule import SearchMethod async def save_video_to_mysql(video_obj, user, trace_id, platform, content_id, crawler_video_table, db_client, similarity_score): """ 异步处理微信 video_obj 公众号和站内账号一一对应 :param similarity_score: :param crawler_video_table: 爬虫表 :param db_client: mysql :param content_id: :param platform: :param user: :param trace_id: :param video_obj: :return: """ Video = VideoProducer() if platform == "xg_search": mq_obj = Video.xg_video_produce( video_obj=video_obj, user=user, trace_id=trace_id, ) elif platform == "baidu_search": mq_obj = Video.baidu_video_produce( video_obj=video_obj, user=user, trace_id=trace_id, ) elif platform == "wx_search": mq_obj = Video.wx_video_produce( video_obj=video_obj, user=user, trace_id=trace_id, ) elif platform == "dy_search": mq_obj = Video.dy_video_produce( video_obj=video_obj, user=user, trace_id=trace_id, ) else: mq_obj = {} mq_obj['trace_id'] = trace_id mq_obj['content_id'] = content_id out_video_id = mq_obj['video_id'] # 先查询此content_id下是否有out_video_id了 select_sql = f""" SELECT id FROM {crawler_video_table} WHERE out_video_id = '{out_video_id}' and content_id = '{content_id}'; """ result = await db_client.async_select(select_sql) if result: return insert_sql = f""" INSERT INTO {crawler_video_table} (content_id, out_video_id, platform, video_title, play_count, like_count, publish_time, crawler_time, duration, video_url, cover_url, user_id, trace_id, score, score_version) values (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s); """ await db_client.async_insert( sql=insert_sql, params=( content_id, mq_obj['video_id'], platform, mq_obj['video_title'], mq_obj['play_cnt'], mq_obj['like_cnt'], datetime.fromtimestamp(mq_obj['publish_timestamp']).strftime('%Y-%m-%d %H:%M:%S'), datetime.now().__str__(), mq_obj['duration'], mq_obj['video_url'], mq_obj['cover_url'], mq_obj['user_id'], trace_id, similarity_score, server_const.NLP_VERSION ) ) async def search_videos_from_web(info, gh_id_map, db_client): """ search and send msg to ETL :param db_client: :param gh_id_map: :param info: :return: """ default_account_id = 69637498 search_AB = SearchABTest(info=info, searchMethod=SearchMethod()) # 启三个搜索,每个搜索都保证要搜索到, 分别用key1, key2, key3去搜索 trace_id = info['trace_id'] gh_id = info['gh_id'] content_id = info['content_id'] recall_list = await search_AB.ab_6() logging( code="1006", info="搜索到{}条视频".format(len(recall_list)), data=recall_list, trace_id=info['trace_id'] ) # 按照标题相似度排序 ranked_result = await title_similarity_with_nlp(content_title=info['ori_title'].split("@@")[-1], recall_list=recall_list) ranked_list = ranked_result['result'] if recall_list and not ranked_list: bot( title="NLP服务请求失败", detail={ "trace_id": info['trace_id'] }, mention=False ) success_match_video_count = 0 for recall_obj in ranked_list: if recall_obj: platform = recall_obj['platform'] recall_video = recall_obj['result'] score = recall_obj['score'] # 过滤掉nlp分低于0.55的 if score < server_const.NLP_SIMILARITY_THRESHOLD: continue if recall_video: await save_video_to_mysql( video_obj=recall_video, user=gh_id_map.get(gh_id, default_account_id), trace_id=trace_id, platform=platform, content_id=content_id, crawler_video_table=info['crawler_video_table'], db_client=db_client, similarity_score=score ) success_match_video_count += 1 return success_match_video_count