123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309 |
- """
- @author: luojunhui
- 调用接口在微信内搜索视频
- """
- import json
- import asyncio
- from applications.search import *
- from applications.static.config import gh_id_dict
- from applications.functions.log import logging
- from applications.functions.video_item import VideoProducer
- from applications.functions.async_etl import AsyncETL
- from applications.functions.mysql import select_sensitive_words
- from applications.functions.kimi import KimiServer
- class SearchABTest(object):
- """
- 搜索策略实验方案
- """
- ori_title = None
- article_summary = None
- article_keys = None
- gh_id = None
- trace_id = None
- def __init__(self, info, gh_id):
- SearchABTest.set_class_properties(info, gh_id)
- @classmethod
- def set_class_properties(cls, info, gh_id):
- """
- 初始化搜索策略实验类
- :param info: kimi 挖掘的基本信息
- :param gh_id: 公众号账号 id
- :return:
- """
- cls.ori_title = info["ori_title"]
- cls.article_summary = info["content_title"]
- cls.article_keys = info["content_keys"]
- cls.trace_id = info["trace_id"]
- cls.gh_id = gh_id
- @classmethod
- async def base_line(cls):
- """
- 兜底策略
- """
- result = await SearchMethod().search_v0(
- text=cls.article_keys[0],
- trace_id=cls.trace_id
- )
- if result:
- return result
- else:
- sub_result = await SearchMethod().search_v0(
- text=cls.article_keys[1],
- trace_id=cls.trace_id)
- if sub_result:
- return sub_result
- else:
- return await SearchMethod().search_v0(
- text=cls.article_keys[2],
- trace_id=cls.trace_id
- )
- @classmethod
- async def ab_0(cls):
- """
- 默认原标题搜索
- :return:
- """
- search_result = await SearchMethod().search_v0(
- text=cls.ori_title,
- trace_id=cls.trace_id
- )
- if search_result:
- return search_result
- else:
- return await cls.base_line()
- @classmethod
- async def ab_1(cls):
- """
- 使用 content_summary搜索
- :return:
- """
- search_result = await SearchMethod().search_v0(
- text=cls.article_summary,
- trace_id=cls.trace_id
- )
- if search_result:
- return search_result
- else:
- return await cls.ab_0()
- @classmethod
- async def ab_2(cls):
- """
- 使用文本关键词搜索
- :return:
- """
- search_result = await SearchMethod().search_v0(
- text=",".join(cls.article_keys[0]),
- trace_id=cls.trace_id
- )
- if search_result:
- return search_result
- else:
- return await cls.base_line()
- @classmethod
- async def ab_3(cls):
- """
- 使用文本关键词搜索
- :return:
- """
- search_result = await SearchMethod().search_v0(
- text=",".join(cls.article_keys[1]),
- trace_id=cls.trace_id
- )
- if search_result:
- return search_result
- else:
- return await cls.base_line()
- @classmethod
- async def ab_4(cls):
- """
- 使用文本关键词搜索
- :return:
- """
- search_result = await SearchMethod().search_v0(
- text=",".join(cls.article_keys[3]),
- trace_id=cls.trace_id
- )
- if search_result:
- return search_result
- else:
- return await cls.base_line()
- class SearchMethod(object):
- """
- 搜索召回模式
- """
- s_words = select_sensitive_words()
- @classmethod
- async def search_v0(cls, text, trace_id):
- """
- 搜索顺序-wx --> baidu --> xigua
- 一共需要返回三条视频
- :return:
- """
- wx_result = wx_search(keys=text, sensitive_words=cls.s_words)
- if wx_result:
- return {"platform": "wx_search", "result": wx_result[0]}
- else:
- logging(
- code="7001",
- info="通过微信搜索失败---{}".format(text),
- trace_id=trace_id,
- )
- # 微信搜不到的话,采用好看视频搜索
- baidu_result = hksp_search(key=text, sensitive_words=cls.s_words)
- if baidu_result:
- return {"platform": "baidu_search", "result": baidu_result[0]}
- else:
- # 若好看视频未搜到,则采用西瓜搜索
- logging(
- code="7001",
- info="通过baidu搜索失败---{}".format(text),
- trace_id=trace_id,
- )
- return None
- # xigua_result = xigua_search(keyword=text, sensitive_words=cls.s_words)
- # if xigua_result:
- # return {"platform": "xg_search", "result": xigua_result[0]}
- # else:
- # logging(
- # code="7001",
- # info="通过西瓜搜索失败---{}, 启用兜底方式".format(text),
- # trace_id=trace_id,
- # )
- # return None
- async def video_sender(video_obj, user, trace_id, platform):
- """
- 异步处理微信 video_obj
- 公众号和站内账号一一对应
- :param platform:
- :param user:
- :param trace_id:
- :param video_obj:
- :return:
- """
- # ETL_MQ = MQ(topic_name="topic_crawler_etl_prod")
- Video = VideoProducer()
- if platform == "xg_search":
- mq_obj = Video.xg_video_producer(
- video_obj=video_obj,
- user=user,
- trace_id=trace_id,
- )
- elif platform == "baidu_search":
- mq_obj = Video.baidu_video_producer(
- video_obj=video_obj,
- user=user,
- trace_id=trace_id,
- )
- elif platform == "wx_search":
- mq_obj = Video.wx_video_producer(
- video_obj=video_obj,
- user=user,
- trace_id=trace_id,
- )
- else:
- mq_obj = {}
- AE = AsyncETL(video_obj=mq_obj)
- video_id = await AE.etl_deal()
- logging(
- code="6002",
- info="视频下载完成",
- data=mq_obj,
- trace_id=trace_id
- )
- return video_id
- async def search_videos(params, trace_id, gh_id, mysql_client):
- """
- search and send msg to ETL
- :param mysql_client:
- :param params:
- :param gh_id: 通过账号 id 来控制实验策略
- :param trace_id:
- :return:
- """
- K = KimiServer()
- try:
- kimi_info = await K.search_kimi_schedule(params=params)
- print(json.dumps(kimi_info, ensure_ascii=False, indent=4))
- kimi_title = kimi_info['k_title']
- content_title = kimi_info['content_title']
- content_keys = json.dumps(kimi_info['content_keys'], ensure_ascii=False)
- update_kimi_sql = f"""
- UPDATE long_articles_video_dev SET
- kimi_title = '{kimi_title}',
- kimi_summary = '{content_title}',
- kimi_keys = '{content_keys}'
- WHERE trace_id = '{trace_id}';
- """
- await mysql_client.async_insert(update_kimi_sql)
- kimi_info["trace_id"] = trace_id
- SearchAB = SearchABTest(info=kimi_info, gh_id=gh_id)
- recall_obj_1 = await SearchAB.ab_1()
- await asyncio.sleep(3)
- recall_obj_2 = await SearchAB.ab_2()
- await asyncio.sleep(3)
- recall_obj_3 = await SearchAB.ab_3()
- recall_list = [recall_obj_1, recall_obj_2, recall_obj_3]
- un_empty_list = [i for i in recall_list if i]
- if len(un_empty_list) < 3:
- await asyncio.sleep(3)
- recall_obj_4 = await SearchAB.ab_4()
- un_empty_list.append(recall_obj_4)
- # 逐条下载,逐条写表
- if un_empty_list:
- for index, recall_obj in enumerate(un_empty_list, 1):
- platform = recall_obj["platform"]
- recall_video = recall_obj["result"]
- if recall_video:
- logging(
- code="7002",
- info="视频搜索成功, 搜索平台为--{}".format(platform),
- trace_id=trace_id,
- data=recall_video,
- )
- video_id = await video_sender(
- video_obj=recall_video,
- user=gh_id_dict.get(gh_id),
- trace_id=trace_id,
- platform=platform,
- )
- update_id_sql = f"""
- UPDATE long_articles_video_dev
- SET
- recall_video_id{index} = {video_id}
- WHERE
- trace_id = '{trace_id}'
- """
- print(update_id_sql)
- await mysql_client.async_insert(update_id_sql)
- else:
- logging(code="7003", info="视频搜索失败", trace_id=trace_id)
- return None
- except Exception as e:
- logging(
- code="9000",
- info="kimi挖掘失败, 原因是-{}".format(e),
- trace_id=trace_id
- )
- return None
|