search_schedule.py 6.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242
  1. """
  2. @author: luojunhui
  3. 调用接口在微信内搜索视频
  4. """
  5. from applications.search import *
  6. from applications.static.config import gh_id_dict
  7. from applications.functions.log import logging
  8. from applications.functions.video_item import VideoProducer
  9. from applications.functions.async_etl import AsyncETL
  10. from applications.functions.common import MySQLServer
  11. class SearchABTest(object):
  12. """
  13. 搜索策略实验方案
  14. """
  15. ori_title = None
  16. article_summary = None
  17. article_keys = None
  18. gh_id = None
  19. trace_id = None
  20. def __init__(self, info, gh_id):
  21. SearchABTest.set_class_properties(info, gh_id)
  22. @classmethod
  23. def set_class_properties(cls, info, gh_id):
  24. """
  25. 初始化搜索策略实验类
  26. :param info: kimi 挖掘的基本信息
  27. :param gh_id: 公众号账号 id
  28. :return:
  29. """
  30. cls.ori_title = info["ori_title"]
  31. cls.article_summary = info["content_title"]
  32. cls.article_keys = info["content_keys"]
  33. cls.trace_id = info["trace_id"]
  34. cls.gh_id = gh_id
  35. @classmethod
  36. async def base_line(cls):
  37. """
  38. 兜底策略
  39. """
  40. return await SearchMethod().search_v0(
  41. text=cls.article_keys[0],
  42. trace_id=cls.trace_id
  43. )
  44. @classmethod
  45. async def ab_0(cls):
  46. """
  47. 默认原标题搜索
  48. :return:
  49. """
  50. search_result = await SearchMethod().search_v0(
  51. text=cls.ori_title,
  52. trace_id=cls.trace_id
  53. )
  54. if search_result:
  55. return search_result
  56. else:
  57. return await cls.base_line()
  58. @classmethod
  59. async def ab_1(cls):
  60. """
  61. 使用 content_summary搜索
  62. :return:
  63. """
  64. search_result = await SearchMethod().search_v0(
  65. text=cls.article_summary,
  66. trace_id=cls.trace_id
  67. )
  68. if search_result:
  69. return search_result
  70. else:
  71. return await cls.base_line()
  72. @classmethod
  73. async def ab_2(cls):
  74. """
  75. 使用文本关键词搜索
  76. :return:
  77. """
  78. search_result = await SearchMethod().search_v0(
  79. text=",".join(cls.article_keys),
  80. trace_id=cls.trace_id
  81. )
  82. if search_result:
  83. return search_result
  84. else:
  85. return await cls.base_line()
  86. class SearchMethod(object):
  87. """
  88. 搜索召回模式
  89. """
  90. s_words = MySQLServer().select_sensitive_words()
  91. @classmethod
  92. async def search_v0(cls, text, trace_id):
  93. """
  94. 搜索顺序-wx --> baidu --> xigua
  95. :return:
  96. """
  97. wx_result = wx_search(keys=text, sensitive_words=cls.s_words)
  98. if wx_result:
  99. return {"platform": "wx_search", "result": wx_result[0]}
  100. else:
  101. logging(
  102. code="7001",
  103. info="通过微信搜索失败---{}".format(text),
  104. trace_id=trace_id,
  105. )
  106. # 微信搜不到的话,采用好看视频搜索
  107. baidu_result = hksp_search(key=text, sensitive_words=cls.s_words)
  108. if baidu_result:
  109. return {"platform": "baidu_search", "result": baidu_result[0]}
  110. else:
  111. # 若好看视频未搜到,则采用西瓜搜索
  112. logging(
  113. code="7001",
  114. info="通过baidu搜索失败---{}".format(text),
  115. trace_id=trace_id,
  116. )
  117. xigua_result = xigua_search(keyword=text, sensitive_words=cls.s_words)
  118. if xigua_result:
  119. return {"platform": "xg_search", "result": xigua_result[0]}
  120. else:
  121. logging(
  122. code="7001",
  123. info="通过西瓜搜索失败---{}, 启用兜底方式".format(text),
  124. trace_id=trace_id,
  125. )
  126. return None
  127. async def video_sender(video_obj, user, trace_id, platform):
  128. """
  129. 异步处理微信 video_obj
  130. 公众号和站内账号一一对应
  131. :param platform:
  132. :param user:
  133. :param trace_id:
  134. :param video_obj:
  135. :return:
  136. """
  137. # ETL_MQ = MQ(topic_name="topic_crawler_etl_prod")
  138. Video = VideoProducer()
  139. if platform == "xg_search":
  140. mq_obj = Video.xg_video_producer(
  141. video_obj=video_obj,
  142. user=user,
  143. trace_id=trace_id,
  144. )
  145. elif platform == "baidu_search":
  146. mq_obj = Video.baidu_video_producer(
  147. video_obj=video_obj,
  148. user=user,
  149. trace_id=trace_id,
  150. )
  151. elif platform == "wx_search":
  152. mq_obj = Video.wx_video_producer(
  153. video_obj=video_obj,
  154. user=user,
  155. trace_id=trace_id,
  156. )
  157. else:
  158. mq_obj = {}
  159. AE = AsyncETL(video_obj=mq_obj)
  160. video_id = await AE.etl_deal()
  161. logging(
  162. code="6002",
  163. info="视频下载完成",
  164. data=mq_obj,
  165. trace_id=trace_id
  166. )
  167. return video_id
  168. async def search_videos(kimi_info, trace_id, gh_id, mysql_client):
  169. """
  170. search and send msg to ETL
  171. :param mysql_client:
  172. :param kimi_info:
  173. :param gh_id: 通过账号 id 来控制实验策略
  174. :param trace_id:
  175. :return:
  176. """
  177. kimi_info["trace_id"] = trace_id
  178. SearchAB = SearchABTest(info=kimi_info, gh_id=gh_id)
  179. # if ab_test_config.get(gh_id):
  180. # test_id = ab_test_config[gh_id]
  181. # if test_id == 0:
  182. # recall_obj = SearchAB.ab_0()
  183. # elif test_id == 1:
  184. # recall_obj = SearchAB.ab_1()
  185. # elif test_id == 2:
  186. # recall_obj = SearchAB.ab_2()
  187. # # elif test_id == 3:
  188. # # recall_obj = SearchAB.ab_3()
  189. # # elif test_id == 4:
  190. # # recall_obj = SearchAB.ab_4()
  191. # # elif test_id == 5:
  192. # # recall_obj = SearchAB.ab_5()
  193. # # elif test_id == 6:
  194. # # recall_obj = SearchAB.ab_6()
  195. # else:
  196. # recall_obj = {}
  197. # else:
  198. recall_obj = await SearchAB.ab_1()
  199. if recall_obj:
  200. platform = recall_obj["platform"]
  201. recall_video = recall_obj["result"]
  202. if recall_video:
  203. logging(
  204. code="7002",
  205. info="视频搜索成功, 搜索平台为--{}".format(platform),
  206. trace_id=trace_id,
  207. data=recall_video,
  208. )
  209. video_id = await video_sender(
  210. video_obj=recall_video,
  211. user=gh_id_dict.get(gh_id),
  212. trace_id=trace_id,
  213. platform=platform,
  214. )
  215. update_id_sql = f"""
  216. UPDATE long_articles_video_dev
  217. SET
  218. recall_video_id1 = {video_id}
  219. WHERE
  220. trace_id = '{trace_id}'
  221. """
  222. await mysql_client.async_insert(update_id_sql)
  223. else:
  224. logging(code="7003", info="视频搜索失败", trace_id=trace_id)
  225. return None