""" @author: luojunhui 调用接口在微信内搜索视频 """ import json import asyncio from applications.search import * from applications.static.config import gh_id_dict from applications.functions.log import logging from applications.functions.video_item import VideoProducer from applications.functions.async_etl import AsyncETL from applications.functions.mysql import select_sensitive_words from applications.functions.kimi import KimiServer class SearchABTest(object): """ 搜索策略实验方案 """ ori_title = None article_summary = None article_keys = None gh_id = None trace_id = None def __init__(self, info, gh_id): SearchABTest.set_class_properties(info, gh_id) @classmethod def set_class_properties(cls, info, gh_id): """ 初始化搜索策略实验类 :param info: kimi 挖掘的基本信息 :param gh_id: 公众号账号 id :return: """ cls.ori_title = info["ori_title"] cls.article_summary = info["content_title"] cls.article_keys = info["content_keys"] cls.trace_id = info["trace_id"] cls.gh_id = gh_id @classmethod async def base_line(cls): """ 兜底策略 """ result = await SearchMethod().search_v0( text=cls.article_keys[0], trace_id=cls.trace_id ) if result: return result else: sub_result = await SearchMethod().search_v0( text=cls.article_keys[1], trace_id=cls.trace_id) if sub_result: return sub_result else: return await SearchMethod().search_v0( text=cls.article_keys[2], trace_id=cls.trace_id ) @classmethod async def ab_0(cls): """ 默认原标题搜索 :return: """ search_result = await SearchMethod().search_v0( text=cls.ori_title, trace_id=cls.trace_id ) if search_result: return search_result else: return await cls.base_line() @classmethod async def ab_1(cls): """ 使用 content_summary搜索 :return: """ search_result = await SearchMethod().search_v0( text=cls.article_summary, trace_id=cls.trace_id ) if search_result: return search_result else: return await cls.ab_0() @classmethod async def ab_2(cls): """ 使用文本关键词搜索 :return: """ search_result = await SearchMethod().search_v0( text=",".join(cls.article_keys[0]), trace_id=cls.trace_id ) if search_result: return search_result else: return await cls.base_line() @classmethod async def ab_3(cls): """ 使用文本关键词搜索 :return: """ search_result = await SearchMethod().search_v0( text=",".join(cls.article_keys[1]), trace_id=cls.trace_id ) if search_result: return search_result else: return await cls.base_line() @classmethod async def ab_4(cls): """ 使用文本关键词搜索 :return: """ search_result = await SearchMethod().search_v0( text=",".join(cls.article_keys[3]), trace_id=cls.trace_id ) if search_result: return search_result else: return await cls.base_line() class SearchMethod(object): """ 搜索召回模式 """ s_words = select_sensitive_words() @classmethod async def search_v0(cls, text, trace_id): """ 搜索顺序-wx --> baidu --> xigua 一共需要返回三条视频 :return: """ wx_result = wx_search(keys=text, sensitive_words=cls.s_words) if wx_result: return {"platform": "wx_search", "result": wx_result[0]} else: logging( code="7001", info="通过微信搜索失败---{}".format(text), trace_id=trace_id, ) # 微信搜不到的话,采用好看视频搜索 baidu_result = hksp_search(key=text, sensitive_words=cls.s_words) if baidu_result: return {"platform": "baidu_search", "result": baidu_result[0]} else: # 若好看视频未搜到,则采用西瓜搜索 logging( code="7001", info="通过baidu搜索失败---{}".format(text), trace_id=trace_id, ) return None # xigua_result = xigua_search(keyword=text, sensitive_words=cls.s_words) # if xigua_result: # return {"platform": "xg_search", "result": xigua_result[0]} # else: # logging( # code="7001", # info="通过西瓜搜索失败---{}, 启用兜底方式".format(text), # trace_id=trace_id, # ) # return None async def video_sender(video_obj, user, trace_id, platform): """ 异步处理微信 video_obj 公众号和站内账号一一对应 :param platform: :param user: :param trace_id: :param video_obj: :return: """ # ETL_MQ = MQ(topic_name="topic_crawler_etl_prod") Video = VideoProducer() if platform == "xg_search": mq_obj = Video.xg_video_producer( video_obj=video_obj, user=user, trace_id=trace_id, ) elif platform == "baidu_search": mq_obj = Video.baidu_video_producer( video_obj=video_obj, user=user, trace_id=trace_id, ) elif platform == "wx_search": mq_obj = Video.wx_video_producer( video_obj=video_obj, user=user, trace_id=trace_id, ) else: mq_obj = {} AE = AsyncETL(video_obj=mq_obj) video_id = await AE.etl_deal() logging( code="6002", info="视频下载完成", data=mq_obj, trace_id=trace_id ) return video_id async def search_videos(params, trace_id, gh_id, mysql_client): """ search and send msg to ETL :param mysql_client: :param params: :param gh_id: 通过账号 id 来控制实验策略 :param trace_id: :return: """ K = KimiServer() try: kimi_info = await K.search_kimi_schedule(params=params) print(json.dumps(kimi_info, ensure_ascii=False, indent=4)) kimi_title = kimi_info['k_title'] content_title = kimi_info['content_title'] content_keys = json.dumps(kimi_info['content_keys'], ensure_ascii=False) update_kimi_sql = f""" UPDATE long_articles_video_dev SET kimi_title = '{kimi_title}', kimi_summary = '{content_title}', kimi_keys = '{content_keys}' WHERE trace_id = '{trace_id}'; """ await mysql_client.async_insert(update_kimi_sql) kimi_info["trace_id"] = trace_id SearchAB = SearchABTest(info=kimi_info, gh_id=gh_id) recall_obj_1 = await SearchAB.ab_1() await asyncio.sleep(3) recall_obj_2 = await SearchAB.ab_2() await asyncio.sleep(3) recall_obj_3 = await SearchAB.ab_3() recall_list = [recall_obj_1, recall_obj_2, recall_obj_3] un_empty_list = [i for i in recall_list if i] if len(un_empty_list) < 3: await asyncio.sleep(3) recall_obj_4 = await SearchAB.ab_4() un_empty_list.append(recall_obj_4) # 逐条下载,逐条写表 if un_empty_list: for index, recall_obj in enumerate(un_empty_list, 1): platform = recall_obj["platform"] recall_video = recall_obj["result"] if recall_video: logging( code="7002", info="视频搜索成功, 搜索平台为--{}".format(platform), trace_id=trace_id, data=recall_video, ) video_id = await video_sender( video_obj=recall_video, user=gh_id_dict.get(gh_id), trace_id=trace_id, platform=platform, ) update_id_sql = f""" UPDATE long_articles_video_dev SET recall_video_id{index} = {video_id} WHERE trace_id = '{trace_id}' """ print(update_id_sql) await mysql_client.async_insert(update_id_sql) else: logging(code="7003", info="视频搜索失败", trace_id=trace_id) return None except Exception as e: logging( code="9000", info="kimi挖掘失败, 原因是-{}".format(e), trace_id=trace_id ) return None