|
- """
- @author: luojunhui
- 调用接口在微信内搜索视频
- """
- import json
- import time
- from applications.match_algorithm.rank import title_similarity_rank
- from applications.search import *
- from static.config import gh_id_dict, db_article
- from applications.functions.log import logging
- from applications.functions.video_item import VideoProducer
- from applications.functions.kimi import KimiServer
- from applications.functions.common import request_etl
- class SearchABTest(object):
- """
- 搜索策略实验方案
- """
- ori_title = None
- article_summary = None
- article_keys = None
- gh_id = None
- trace_id = None
- def __init__(self, info, gh_id):
- SearchABTest.set_class_properties(info, gh_id)
- @classmethod
- def set_class_properties(cls, info, gh_id):
- """
- 初始化搜索策略实验类
- :param info: kimi 挖掘的基本信息
- :param gh_id: 公众号账号 id
- :return:
- """
- cls.ori_title = info["ori_title"]
- cls.article_summary = info["content_title"]
- cls.article_keys = info["content_keys"]
- cls.trace_id = info["trace_id"]
- cls.gh_id = gh_id
- @classmethod
- async def base_line(cls):
- """
- 兜底策略
- """
- result = await SearchMethod().search_v1(
- text=cls.article_keys[0],
- trace_id=cls.trace_id
- )
- if result:
- return result
- else:
- sub_result = await SearchMethod().search_v1(
- text=cls.article_keys[1],
- trace_id=cls.trace_id)
- if sub_result:
- return sub_result
- else:
- return await SearchMethod().search_v1(
- text=cls.article_keys[2],
- trace_id=cls.trace_id
- )
- @classmethod
- async def ab_0(cls):
- """
- 默认原标题搜索
- :return:
- """
- search_result = await SearchMethod().search_v1(
- text=cls.ori_title,
- trace_id=cls.trace_id
- )
- if search_result:
- return search_result
- else:
- return await cls.base_line()
- @classmethod
- async def ab_1(cls):
- """
- 使用 content_summary搜索
- :return:
- """
- search_result = await SearchMethod().search_v1(
- text=cls.article_summary,
- trace_id=cls.trace_id
- )
- if search_result:
- return search_result
- else:
- return await cls.ab_0()
- @classmethod
- async def ab_2(cls):
- """
- 使用文本关键词搜索
- :return:
- """
- search_result = await SearchMethod().search_v1(
- text=cls.article_keys[0],
- trace_id=cls.trace_id
- )
- if search_result:
- return search_result
- else:
- return await cls.base_line()
- @classmethod
- async def ab_3(cls):
- """
- 使用文本关键词搜索
- :return:
- """
- search_result = await SearchMethod().search_v1(
- text=cls.article_keys[1],
- trace_id=cls.trace_id
- )
- if search_result:
- return search_result
- else:
- return await cls.base_line()
- @classmethod
- async def ab_4(cls):
- """
- 使用文本关键词搜索
- :return:
- """
- search_result = await SearchMethod().search_v1(
- text=cls.article_keys[2],
- trace_id=cls.trace_id
- )
- if search_result:
- return search_result
- else:
- return await cls.base_line()
- @classmethod
- async def ab_5(cls):
- """
- 增量搜索, 返回result_list
- :return:
- """
- result_list = await SearchMethod().search_v2(
- text=cls.article_summary[:15],
- trace_id=cls.trace_id
- )
- if len(result_list) > 3:
- return result_list
- else:
- result_list += await SearchMethod().search_v2(
- text=cls.ori_title[:15],
- trace_id=cls.trace_id
- )
- if len(result_list) > 3:
- return result_list
- else:
- result_list += await SearchMethod().search_v2(
- text=cls.article_keys[0],
- trace_id=cls.trace_id
- )
- if len(result_list) > 3:
- return result_list
- else:
- result_list += await SearchMethod().search_v2(
- text=cls.article_keys[1],
- trace_id=cls.trace_id
- )
- if result_list:
- return result_list
- else:
- result_list += await SearchMethod().search_v2(
- text=cls.article_keys[2],
- trace_id=cls.trace_id
- )
- return result_list
- class SearchMethod(object):
- """
- 搜索召回模式
- """
- s_words = []
- @classmethod
- async def search_v1(cls, text, trace_id):
- """
- dy ---> baidu ---> xigua
- :param text:
- :param trace_id:
- :return:
- """
- douyin_result = douyin_search(keyword=text, sensitive_words=cls.s_words, trace_id=trace_id)
- if douyin_result:
- return {"platform": "dy_search", "result": douyin_result[0]}
- else:
- time.sleep(1)
- baidu_result = hksp_search(key=text, sensitive_words=cls.s_words, trace_id=trace_id)
- if baidu_result:
- return {"platform": "baidu_search", "result": baidu_result[0]}
- else:
- xigua_result = xigua_search_v2(keyword=text, sensitive_words=cls.s_words)
- if xigua_result:
- return {"platform": "xg_search", "result": xigua_result[0]}
- else:
- return None
- @classmethod
- async def search_v2(cls, text, trace_id):
- """
- dy ---> baidu ---> xigua
- :param trace_id:
- :param text:
- :return:
- """
- L = []
- douyin_result = douyin_search(keyword=text, sensitive_words=cls.s_words, trace_id=trace_id)
- for vid_obj in douyin_result:
- L.append({"platform": "dy_search", "result": vid_obj})
- if len(L) >= 3:
- return L
- else:
- baidu_result = hksp_search(key=text, sensitive_words=cls.s_words, trace_id=trace_id)
- if baidu_result:
- L.append({"platform": "baidu_search", "result": baidu_result[0]})
- xigua_result = xigua_search_v2(keyword=text, sensitive_words=cls.s_words)
- if xigua_result:
- L.append({"platform": "xg_search", "result": xigua_result[0]})
- return L
- async def video_sender(video_obj, user, trace_id, platform, content_id):
- """
- 异步处理微信 video_obj
- 公众号和站内账号一一对应
- :param content_id:
- :param platform:
- :param user:
- :param trace_id:
- :param video_obj:
- :return:
- """
- Video = VideoProducer()
- if platform == "xg_search":
- mq_obj = Video.xg_video_producer(
- video_obj=video_obj,
- user=user,
- trace_id=trace_id,
- )
- elif platform == "baidu_search":
- mq_obj = Video.baidu_video_producer(
- video_obj=video_obj,
- user=user,
- trace_id=trace_id,
- )
- elif platform == "wx_search":
- mq_obj = Video.wx_video_producer(
- video_obj=video_obj,
- user=user,
- trace_id=trace_id,
- )
- elif platform == "dy_search":
- mq_obj = Video.dy_video_producer(
- video_obj=video_obj,
- user=user,
- trace_id=trace_id,
- )
- else:
- mq_obj = {}
- mq_obj['trace_id'] = trace_id
- mq_obj['content_id'] = content_id
- header = {
- "Content-Type": "application/json",
- }
- response = await request_etl(
- url="http://192.168.203.137:4612/etl",
- headers=header,
- json_data=mq_obj
- )
- return response
- # response = await request_etl(
- # url="http://localhost:4612/etl",
- # headers=header,
- # json_data=mq_obj
- # )
- # return response
- async def search_videos(params, trace_id, gh_id, mysql_client):
- """
- search and send msg to ETL
- :param mysql_client:
- :param params:
- :param gh_id: 通过账号 id 来控制实验策略
- :param trace_id:
- :return:
- """
- K = KimiServer()
- kimi_info = await K.search_kimi_schedule(params=params)
- kimi_title = kimi_info['k_title']
- content_title = kimi_info['content_title'].replace("'", "").replace('"', "")
- content_keys = json.dumps(kimi_info['content_keys'], ensure_ascii=False)
- update_kimi_sql = f"""
- UPDATE {db_article} SET
- kimi_title = %s,
- kimi_summary = %s,
- kimi_keys = %s
- WHERE trace_id = %s;
- """
- await mysql_client.async_insert(
- sql=update_kimi_sql,
- params=(kimi_title, content_title, content_keys, trace_id)
- )
- kimi_info["trace_id"] = trace_id
- SearchAB = SearchABTest(info=kimi_info, gh_id=gh_id)
- # 启三个搜索,每个搜索都保证要搜索到, 分别用key1, key2, key3去搜索
- recall_list = await SearchAB.ab_5()
- logging(
- code="1006",
- info="搜索到{}条视频".format(len(recall_list)),
- data=recall_list,
- trace_id=trace_id
- )
- # 按照标题相似度排序
- ranked_list = title_similarity_rank(content_title=params['title'].split("@@")[-1], recall_list=recall_list)
- index = 0
- for recall_obj in ranked_list:
- if recall_obj:
- platform = recall_obj['platform']
- recall_video = recall_obj['result']
- if recall_video:
- response = await video_sender(
- video_obj=recall_video,
- user=gh_id_dict.get(gh_id),
- trace_id=trace_id,
- platform=platform,
- content_id=params['content_id']
- )
- if response['status'] == "success":
- index += 1
- logging(
- code="1007",
- info="成功请求etl",
- data=recall_video,
- trace_id=trace_id
- )
- if index >= 3:
- print("already downloaded 3 videos")
- logging(
- code="1008",
- info="成功下载三条视频",
- trace_id=trace_id
- )
- return index
- return index
|