""" @author: luojunhui 调用接口在微信内搜索视频 """ import json import time from applications.match_algorithm.rank import title_similarity_rank from applications.search import * from static.config import gh_id_dict, db_article from applications.functions.log import logging from applications.functions.video_item import VideoProducer from applications.functions.kimi import KimiServer from applications.functions.common import request_etl class SearchABTest(object): """ 搜索策略实验方案 """ ori_title = None article_summary = None article_keys = None gh_id = None trace_id = None def __init__(self, info, gh_id): SearchABTest.set_class_properties(info, gh_id) @classmethod def set_class_properties(cls, info, gh_id): """ 初始化搜索策略实验类 :param info: kimi 挖掘的基本信息 :param gh_id: 公众号账号 id :return: """ cls.ori_title = info["ori_title"] cls.article_summary = info["content_title"] cls.article_keys = info["content_keys"] cls.trace_id = info["trace_id"] cls.gh_id = gh_id @classmethod async def base_line(cls): """ 兜底策略 """ result = await SearchMethod().search_v1( text=cls.article_keys[0], trace_id=cls.trace_id ) if result: return result else: sub_result = await SearchMethod().search_v1( text=cls.article_keys[1], trace_id=cls.trace_id) if sub_result: return sub_result else: return await SearchMethod().search_v1( text=cls.article_keys[2], trace_id=cls.trace_id ) @classmethod async def ab_0(cls): """ 默认原标题搜索 :return: """ search_result = await SearchMethod().search_v1( text=cls.ori_title, trace_id=cls.trace_id ) if search_result: return search_result else: return await cls.base_line() @classmethod async def ab_1(cls): """ 使用 content_summary搜索 :return: """ search_result = await SearchMethod().search_v1( text=cls.article_summary, trace_id=cls.trace_id ) if search_result: return search_result else: return await cls.ab_0() @classmethod async def ab_2(cls): """ 使用文本关键词搜索 :return: """ search_result = await SearchMethod().search_v1( text=cls.article_keys[0], trace_id=cls.trace_id ) if search_result: return search_result else: return await cls.base_line() @classmethod async def ab_3(cls): """ 使用文本关键词搜索 :return: """ search_result = await SearchMethod().search_v1( text=cls.article_keys[1], trace_id=cls.trace_id ) if search_result: return search_result else: return await cls.base_line() @classmethod async def ab_4(cls): """ 使用文本关键词搜索 :return: """ search_result = await SearchMethod().search_v1( text=cls.article_keys[2], trace_id=cls.trace_id ) if search_result: return search_result else: return await cls.base_line() @classmethod async def ab_5(cls): """ 增量搜索, 返回result_list :return: """ result_list = await SearchMethod().search_v2( text=cls.article_summary[:15], trace_id=cls.trace_id ) if len(result_list) > 3: return result_list else: result_list += await SearchMethod().search_v2( text=cls.ori_title[:15], trace_id=cls.trace_id ) if len(result_list) > 3: return result_list else: result_list += await SearchMethod().search_v2( text=cls.article_keys[0], trace_id=cls.trace_id ) if len(result_list) > 3: return result_list else: result_list += await SearchMethod().search_v2( text=cls.article_keys[1], trace_id=cls.trace_id ) if result_list: return result_list else: result_list += await SearchMethod().search_v2( text=cls.article_keys[2], trace_id=cls.trace_id ) return result_list class SearchMethod(object): """ 搜索召回模式 """ s_words = [] @classmethod async def search_v1(cls, text, trace_id): """ dy ---> baidu ---> xigua :param text: :param trace_id: :return: """ douyin_result = douyin_search(keyword=text, sensitive_words=cls.s_words, trace_id=trace_id) if douyin_result: return {"platform": "dy_search", "result": douyin_result[0]} else: time.sleep(1) baidu_result = hksp_search(key=text, sensitive_words=cls.s_words, trace_id=trace_id) if baidu_result: return {"platform": "baidu_search", "result": baidu_result[0]} else: xigua_result = xigua_search_v2(keyword=text, sensitive_words=cls.s_words) if xigua_result: return {"platform": "xg_search", "result": xigua_result[0]} else: return None @classmethod async def search_v2(cls, text, trace_id): """ dy ---> baidu ---> xigua :param trace_id: :param text: :return: """ L = [] douyin_result = douyin_search(keyword=text, sensitive_words=cls.s_words, trace_id=trace_id) for vid_obj in douyin_result: L.append({"platform": "dy_search", "result": vid_obj}) if len(L) >= 3: return L else: baidu_result = hksp_search(key=text, sensitive_words=cls.s_words, trace_id=trace_id) if baidu_result: L.append({"platform": "baidu_search", "result": baidu_result[0]}) xigua_result = xigua_search_v2(keyword=text, sensitive_words=cls.s_words) if xigua_result: L.append({"platform": "xg_search", "result": xigua_result[0]}) return L async def video_sender(video_obj, user, trace_id, platform, content_id): """ 异步处理微信 video_obj 公众号和站内账号一一对应 :param content_id: :param platform: :param user: :param trace_id: :param video_obj: :return: """ Video = VideoProducer() if platform == "xg_search": mq_obj = Video.xg_video_producer( video_obj=video_obj, user=user, trace_id=trace_id, ) elif platform == "baidu_search": mq_obj = Video.baidu_video_producer( video_obj=video_obj, user=user, trace_id=trace_id, ) elif platform == "wx_search": mq_obj = Video.wx_video_producer( video_obj=video_obj, user=user, trace_id=trace_id, ) elif platform == "dy_search": mq_obj = Video.dy_video_producer( video_obj=video_obj, user=user, trace_id=trace_id, ) else: mq_obj = {} mq_obj['trace_id'] = trace_id mq_obj['content_id'] = content_id header = { "Content-Type": "application/json", } response = await request_etl( url="http://192.168.203.137:4612/etl", headers=header, json_data=mq_obj ) return response # response = await request_etl( # url="http://localhost:4612/etl", # headers=header, # json_data=mq_obj # ) # return response async def search_videos(params, trace_id, gh_id, mysql_client): """ search and send msg to ETL :param mysql_client: :param params: :param gh_id: 通过账号 id 来控制实验策略 :param trace_id: :return: """ K = KimiServer() kimi_info = await K.search_kimi_schedule(params=params) kimi_title = kimi_info['k_title'] content_title = kimi_info['content_title'].replace("'", "").replace('"', "") content_keys = json.dumps(kimi_info['content_keys'], ensure_ascii=False) update_kimi_sql = f""" UPDATE {db_article} SET kimi_title = %s, kimi_summary = %s, kimi_keys = %s WHERE trace_id = %s; """ await mysql_client.async_insert( sql=update_kimi_sql, params=(kimi_title, content_title, content_keys, trace_id) ) kimi_info["trace_id"] = trace_id SearchAB = SearchABTest(info=kimi_info, gh_id=gh_id) # 启三个搜索,每个搜索都保证要搜索到, 分别用key1, key2, key3去搜索 recall_list = await SearchAB.ab_5() logging( code="1006", info="搜索到{}条视频".format(len(recall_list)), data=recall_list, trace_id=trace_id ) # 按照标题相似度排序 ranked_list = title_similarity_rank(content_title=params['title'].split("@@")[-1], recall_list=recall_list) index = 0 for recall_obj in ranked_list: if recall_obj: platform = recall_obj['platform'] recall_video = recall_obj['result'] if recall_video: response = await video_sender( video_obj=recall_video, user=gh_id_dict.get(gh_id), trace_id=trace_id, platform=platform, content_id=params['content_id'] ) if response['status'] == "success": index += 1 logging( code="1007", info="成功请求etl", data=recall_video, trace_id=trace_id ) if index >= 3: print("already downloaded 3 videos") logging( code="1008", info="成功下载三条视频", trace_id=trace_id ) return index return index