""" @author: luojunhui 调用接口在微信内搜索视频 """ import json import asyncio import time from applications.search import * from applications.static.config import gh_id_dict from applications.functions.log import logging from applications.functions.video_item import VideoProducer from applications.functions.async_etl import AsyncETL from applications.functions.mysql import select_sensitive_words from applications.functions.kimi import KimiServer class SearchABTest(object): """ 搜索策略实验方案 """ ori_title = None article_summary = None article_keys = None gh_id = None trace_id = None def __init__(self, info, gh_id): SearchABTest.set_class_properties(info, gh_id) @classmethod def set_class_properties(cls, info, gh_id): """ 初始化搜索策略实验类 :param info: kimi 挖掘的基本信息 :param gh_id: 公众号账号 id :return: """ cls.ori_title = info["ori_title"] cls.article_summary = info["content_title"] cls.article_keys = info["content_keys"] cls.trace_id = info["trace_id"] cls.gh_id = gh_id @classmethod async def base_line(cls): """ 兜底策略 """ result = await SearchMethod().search_v1( text=cls.article_keys[0], trace_id=cls.trace_id ) if result: return result else: sub_result = await SearchMethod().search_v1( text=cls.article_keys[1], trace_id=cls.trace_id) if sub_result: return sub_result else: return await SearchMethod().search_v1( text=cls.article_keys[2], trace_id=cls.trace_id ) @classmethod async def ab_0(cls): """ 默认原标题搜索 :return: """ search_result = await SearchMethod().search_v1( text=cls.ori_title, trace_id=cls.trace_id ) if search_result: return search_result else: return await cls.base_line() @classmethod async def ab_1(cls): """ 使用 content_summary搜索 :return: """ search_result = await SearchMethod().search_v1( text=cls.article_summary, trace_id=cls.trace_id ) if search_result: return search_result else: return await cls.ab_0() @classmethod async def ab_2(cls): """ 使用文本关键词搜索 :return: """ search_result = await SearchMethod().search_v1( text=cls.article_keys[0], trace_id=cls.trace_id ) if search_result: return search_result else: return await cls.base_line() @classmethod async def ab_3(cls): """ 使用文本关键词搜索 :return: """ search_result = await SearchMethod().search_v1( text=cls.article_keys[1], trace_id=cls.trace_id ) if search_result: return search_result else: return await cls.base_line() @classmethod async def ab_4(cls): """ 使用文本关键词搜索 :return: """ search_result = await SearchMethod().search_v1( text=cls.article_keys[2], trace_id=cls.trace_id ) if search_result: return search_result else: return await cls.base_line() @classmethod async def ab_5(cls): """ 增量搜索, 返回result_list :return: """ result_list = await SearchMethod().search_v2( text=cls.article_summary[:15], trace_id=cls.trace_id ) if len(result_list) > 3: return result_list else: result_list += await SearchMethod().search_v2( text=cls.ori_title[:15], trace_id=cls.trace_id ) if len(result_list) > 3: return result_list else: result_list += await SearchMethod().search_v2( text=cls.article_keys[0], trace_id=cls.trace_id ) if len(result_list) > 3: return result_list else: result_list += await SearchMethod().search_v2( text=cls.article_keys[1], trace_id=cls.trace_id ) if result_list: return result_list else: result_list += await SearchMethod().search_v2( text=cls.article_keys[2], trace_id=cls.trace_id ) return result_list class SearchMethod(object): """ 搜索召回模式 """ s_words = select_sensitive_words() @classmethod async def search_v0(cls, text, trace_id): """ 搜索顺序-wx --> baidu --> xigua 一共需要返回三条视频 :return: """ wx_result = [] if wx_result: return {"platform": "wx_search", "result": wx_result[0]} else: logging( code="7001", info="通过微信搜索失败---{}".format(text), trace_id=trace_id, ) # 微信搜不到的话,采用好看视频搜索 time.sleep(1) baidu_result = hksp_search(key=text, sensitive_words=cls.s_words) if baidu_result: return {"platform": "baidu_search", "result": baidu_result[0]} else: # 若好看视频未搜到,则采用西瓜搜索 logging( code="7001", info="通过baidu搜索失败---{}".format(text), trace_id=trace_id, ) # return None xigua_result = xigua_search_v2(keyword=text, sensitive_words=cls.s_words) if xigua_result: return {"platform": "xg_search", "result": xigua_result[0]} else: logging( code="7001", info="通过西瓜搜索失败---{}, 启用兜底方式".format(text), trace_id=trace_id, ) return None @classmethod async def search_v1(cls, text, trace_id): """ dy ---> baidu ---> xigua :param text: :param trace_id: :return: """ douyin_result = douyin_search(keyword=text, sensitive_words=cls.s_words) if douyin_result: return {"platform": "dy_search", "result": douyin_result[0]} else: logging( code="7001", info="抖音搜索失败--{}".format(text), trace_id=trace_id ) time.sleep(1) baidu_result = hksp_search(key=text, sensitive_words=cls.s_words) if baidu_result: return {"platform": "baidu_search", "result": baidu_result[0]} else: # 若好看视频未搜到,则采用西瓜搜索 logging( code="7001", info="通过baidu搜索失败---{}".format(text), trace_id=trace_id, ) # return None xigua_result = xigua_search_v2(keyword=text, sensitive_words=cls.s_words) if xigua_result: return {"platform": "xg_search", "result": xigua_result[0]} else: logging( code="7001", info="通过西瓜搜索失败---{}, 启用兜底方式".format(text), trace_id=trace_id, ) return None @classmethod async def search_v2(cls, text, trace_id): """ dy ---> baidu ---> xigua :param trace_id: :param text: :return: """ L = [] print(trace_id) douyin_result = douyin_search(keyword=text, sensitive_words=cls.s_words) for vid_obj in douyin_result: L.append({"platform": "dy_search", "result": vid_obj}) if len(L) >= 3: return L else: baidu_result = hksp_search(key=text, sensitive_words=cls.s_words) if baidu_result: L.append({"platform": "baidu_search", "result": baidu_result[0]}) xigua_result = xigua_search_v2(keyword=text, sensitive_words=cls.s_words) if xigua_result: L.append({"platform": "xg_search", "result": xigua_result[0]}) return L async def video_sender(video_obj, user, trace_id, platform): """ 异步处理微信 video_obj 公众号和站内账号一一对应 :param platform: :param user: :param trace_id: :param video_obj: :return: """ # ETL_MQ = MQ(topic_name="topic_crawler_etl_prod") Video = VideoProducer() if platform == "xg_search": mq_obj = Video.xg_video_producer( video_obj=video_obj, user=user, trace_id=trace_id, ) elif platform == "baidu_search": mq_obj = Video.baidu_video_producer( video_obj=video_obj, user=user, trace_id=trace_id, ) elif platform == "wx_search": mq_obj = Video.wx_video_producer( video_obj=video_obj, user=user, trace_id=trace_id, ) elif platform == "dy_search": mq_obj = Video.dy_video_producer( video_obj=video_obj, user=user, trace_id=trace_id, ) else: mq_obj = {} AE = AsyncETL(video_obj=mq_obj) video_id = await AE.etl_deal() logging( code="6002", info="视频下载完成, 平台是---{}".format(platform), data=mq_obj, trace_id=trace_id, ) return video_id async def search_videos(params, trace_id, gh_id, mysql_client): """ search and send msg to ETL :param mysql_client: :param params: :param gh_id: 通过账号 id 来控制实验策略 :param trace_id: :return: """ K = KimiServer() kimi_info = await K.search_kimi_schedule(params=params) print("{}---kimi 挖掘正常".format(trace_id)) kimi_title = kimi_info['k_title'] content_title = kimi_info['content_title'].replace("'", "").replace('"', "") content_keys = json.dumps(kimi_info['content_keys'], ensure_ascii=False) update_kimi_sql = f""" UPDATE long_articles_video SET kimi_title = '{kimi_title}', kimi_summary = '{content_title}', kimi_keys = '{content_keys}' WHERE trace_id = '{trace_id}'; """ await mysql_client.async_insert(update_kimi_sql) kimi_info["trace_id"] = trace_id SearchAB = SearchABTest(info=kimi_info, gh_id=gh_id) recall_obj_1 = await SearchAB.ab_1() # recall_obj_1 = await SearchAB.ab_0() await asyncio.sleep(3) recall_obj_2 = await SearchAB.ab_2() await asyncio.sleep(3) recall_obj_3 = await SearchAB.ab_3() print("{}---视频搜索正常".format(trace_id)) recall_list = [recall_obj_1, recall_obj_2, recall_obj_3] un_empty_list = [i for i in recall_list if i] if len(un_empty_list) < 3: await asyncio.sleep(3) recall_obj_4 = await SearchAB.ab_4() if recall_obj_4: un_empty_list.append(recall_obj_4) # 逐条下载,逐条写表 if un_empty_list: for index, recall_obj in enumerate(un_empty_list, 1): platform = recall_obj["platform"] recall_video = recall_obj["result"] if recall_video: logging( code="7002", info="视频搜索成功, 搜索平台为--{}".format(platform), trace_id=trace_id, data=recall_video, ) video_id = await video_sender( video_obj=recall_video, user=gh_id_dict.get(gh_id), trace_id=trace_id, platform=platform, ) update_id_sql = f""" UPDATE long_articles_video SET recall_video_id{index} = {video_id} WHERE trace_id = '{trace_id}' """ await mysql_client.async_insert(update_id_sql) else: logging( code="7003", info="视频搜索失败, 被敏感词过滤", trace_id=trace_id ) async def insert_into_mysql(index, mysql_client, recall_video, gh_id, trace_id, platform): """ :param platform: :param trace_id: :param gh_id: :param index: :param mysql_client: :param recall_video: """ video_id = await video_sender( video_obj=recall_video, user=gh_id_dict.get(gh_id), trace_id=trace_id, platform=platform, ) update_id_sql = f""" UPDATE long_articles_video SET recall_video_id{index} = {video_id} WHERE trace_id = '{trace_id}' """ await mysql_client.async_insert(update_id_sql) async def re_search_videos(params, trace_id, gh_id, mysql_client): """ 重新搜索接口 :param params: :param trace_id: :param gh_id: :param mysql_client: :return: """ obj = { "ori_title": params['title'], "content_title": params['kimi_summary'], "content_keys": params['kimi_keys'], "trace_id": params['trace_id'] } SearchAB = SearchABTest(info=obj, gh_id=gh_id) # 启三个搜索,每个搜索都保证要搜索到, 分别用key1, key2, key3去搜索 recall_list = await SearchAB.ab_5() print("一共搜索到{}条视频".format(len(recall_list))) index = 0 for recall_obj in recall_list: if recall_obj: platform = recall_obj['platform'] recall_video = recall_obj['result'] if recall_video: index += 1 await insert_into_mysql( index=index, mysql_client=mysql_client, recall_video=recall_video, gh_id=gh_id, trace_id=trace_id, platform=platform ) if index >= 3: print("already downloaded 3 videos") break print("一个匹配到{}条文章".format(index))