search_schedule.py 7.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255
  1. """
  2. @author: luojunhui
  3. 调用接口在微信内搜索视频
  4. """
  5. from applications.search import *
  6. from applications.static.config import gh_id_dict
  7. from applications.functions.log import logging
  8. from applications.functions.video_item import VideoProducer
  9. from applications.functions.async_etl import AsyncETL
  10. from applications.functions.mysql import select_sensitive_words
  11. class SearchABTest(object):
  12. """
  13. 搜索策略实验方案
  14. """
  15. ori_title = None
  16. article_summary = None
  17. article_keys = None
  18. gh_id = None
  19. trace_id = None
  20. def __init__(self, info, gh_id):
  21. SearchABTest.set_class_properties(info, gh_id)
  22. @classmethod
  23. def set_class_properties(cls, info, gh_id):
  24. """
  25. 初始化搜索策略实验类
  26. :param info: kimi 挖掘的基本信息
  27. :param gh_id: 公众号账号 id
  28. :return:
  29. """
  30. cls.ori_title = info["ori_title"]
  31. cls.article_summary = info["content_title"]
  32. cls.article_keys = info["content_keys"]
  33. cls.trace_id = info["trace_id"]
  34. cls.gh_id = gh_id
  35. @classmethod
  36. async def base_line(cls):
  37. """
  38. 兜底策略
  39. """
  40. result = await SearchMethod().search_v0(
  41. text=cls.article_keys[0],
  42. trace_id=cls.trace_id
  43. )
  44. if result:
  45. return result
  46. else:
  47. sub_result = await SearchMethod().search_v0(
  48. text=cls.article_keys[1],
  49. trace_id=cls.trace_id)
  50. if sub_result:
  51. return sub_result
  52. else:
  53. return await SearchMethod().search_v0(
  54. text=cls.article_keys[2],
  55. trace_id=cls.trace_id
  56. )
  57. @classmethod
  58. async def ab_0(cls):
  59. """
  60. 默认原标题搜索
  61. :return:
  62. """
  63. search_result = await SearchMethod().search_v0(
  64. text=cls.ori_title,
  65. trace_id=cls.trace_id
  66. )
  67. if search_result:
  68. return search_result
  69. else:
  70. return await cls.base_line()
  71. @classmethod
  72. async def ab_1(cls):
  73. """
  74. 使用 content_summary搜索
  75. :return:
  76. """
  77. search_result = await SearchMethod().search_v0(
  78. text=cls.article_summary,
  79. trace_id=cls.trace_id
  80. )
  81. if search_result:
  82. return search_result
  83. else:
  84. return await cls.ab_0()
  85. @classmethod
  86. async def ab_2(cls):
  87. """
  88. 使用文本关键词搜索
  89. :return:
  90. """
  91. search_result = await SearchMethod().search_v0(
  92. text=",".join(cls.article_keys),
  93. trace_id=cls.trace_id
  94. )
  95. if search_result:
  96. return search_result
  97. else:
  98. return await cls.base_line()
  99. class SearchMethod(object):
  100. """
  101. 搜索召回模式
  102. """
  103. s_words = select_sensitive_words()
  104. @classmethod
  105. async def search_v0(cls, text, trace_id):
  106. """
  107. 搜索顺序-wx --> baidu --> xigua
  108. :return:
  109. """
  110. wx_result = wx_search(keys=text, sensitive_words=cls.s_words)
  111. if wx_result:
  112. return {"platform": "wx_search", "result": wx_result[0]}
  113. else:
  114. logging(
  115. code="7001",
  116. info="通过微信搜索失败---{}".format(text),
  117. trace_id=trace_id,
  118. )
  119. # 微信搜不到的话,采用好看视频搜索
  120. baidu_result = hksp_search(key=text, sensitive_words=cls.s_words)
  121. if baidu_result:
  122. return {"platform": "baidu_search", "result": baidu_result[0]}
  123. else:
  124. # 若好看视频未搜到,则采用西瓜搜索
  125. logging(
  126. code="7001",
  127. info="通过baidu搜索失败---{}".format(text),
  128. trace_id=trace_id,
  129. )
  130. xigua_result = xigua_search(keyword=text, sensitive_words=cls.s_words)
  131. if xigua_result:
  132. return {"platform": "xg_search", "result": xigua_result[0]}
  133. else:
  134. logging(
  135. code="7001",
  136. info="通过西瓜搜索失败---{}, 启用兜底方式".format(text),
  137. trace_id=trace_id,
  138. )
  139. return None
  140. async def video_sender(video_obj, user, trace_id, platform):
  141. """
  142. 异步处理微信 video_obj
  143. 公众号和站内账号一一对应
  144. :param platform:
  145. :param user:
  146. :param trace_id:
  147. :param video_obj:
  148. :return:
  149. """
  150. # ETL_MQ = MQ(topic_name="topic_crawler_etl_prod")
  151. Video = VideoProducer()
  152. if platform == "xg_search":
  153. mq_obj = Video.xg_video_producer(
  154. video_obj=video_obj,
  155. user=user,
  156. trace_id=trace_id,
  157. )
  158. elif platform == "baidu_search":
  159. mq_obj = Video.baidu_video_producer(
  160. video_obj=video_obj,
  161. user=user,
  162. trace_id=trace_id,
  163. )
  164. elif platform == "wx_search":
  165. mq_obj = Video.wx_video_producer(
  166. video_obj=video_obj,
  167. user=user,
  168. trace_id=trace_id,
  169. )
  170. else:
  171. mq_obj = {}
  172. AE = AsyncETL(video_obj=mq_obj)
  173. video_id = await AE.etl_deal()
  174. logging(
  175. code="6002",
  176. info="视频下载完成",
  177. data=mq_obj,
  178. trace_id=trace_id
  179. )
  180. return video_id
  181. async def search_videos(kimi_info, trace_id, gh_id, mysql_client):
  182. """
  183. search and send msg to ETL
  184. :param mysql_client:
  185. :param kimi_info:
  186. :param gh_id: 通过账号 id 来控制实验策略
  187. :param trace_id:
  188. :return:
  189. """
  190. kimi_info["trace_id"] = trace_id
  191. SearchAB = SearchABTest(info=kimi_info, gh_id=gh_id)
  192. # if ab_test_config.get(gh_id):
  193. # test_id = ab_test_config[gh_id]
  194. # if test_id == 0:
  195. # recall_obj = SearchAB.ab_0()
  196. # elif test_id == 1:
  197. # recall_obj = SearchAB.ab_1()
  198. # elif test_id == 2:
  199. # recall_obj = SearchAB.ab_2()
  200. # # elif test_id == 3:
  201. # # recall_obj = SearchAB.ab_3()
  202. # # elif test_id == 4:
  203. # # recall_obj = SearchAB.ab_4()
  204. # # elif test_id == 5:
  205. # # recall_obj = SearchAB.ab_5()
  206. # # elif test_id == 6:
  207. # # recall_obj = SearchAB.ab_6()
  208. # else:
  209. # recall_obj = {}
  210. # else:
  211. recall_obj = await SearchAB.ab_1()
  212. if recall_obj:
  213. platform = recall_obj["platform"]
  214. recall_video = recall_obj["result"]
  215. if recall_video:
  216. logging(
  217. code="7002",
  218. info="视频搜索成功, 搜索平台为--{}".format(platform),
  219. trace_id=trace_id,
  220. data=recall_video,
  221. )
  222. video_id = await video_sender(
  223. video_obj=recall_video,
  224. user=gh_id_dict.get(gh_id),
  225. trace_id=trace_id,
  226. platform=platform,
  227. )
  228. update_id_sql = f"""
  229. UPDATE long_articles_video
  230. SET
  231. recall_video_id1 = {video_id}
  232. WHERE
  233. trace_id = '{trace_id}'
  234. """
  235. await mysql_client.async_insert(update_id_sql)
  236. else:
  237. logging(code="7003", info="视频搜索失败", trace_id=trace_id)
  238. return None