search_schedule.py 8.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282
  1. """
  2. @author: luojunhui
  3. 调用接口在微信内搜索视频
  4. """
  5. import asyncio
  6. from applications.search import *
  7. from applications.static.config import gh_id_dict
  8. from applications.functions.log import logging
  9. from applications.functions.video_item import VideoProducer
  10. from applications.functions.async_etl import AsyncETL
  11. from applications.functions.mysql import select_sensitive_words
  12. class SearchABTest(object):
  13. """
  14. 搜索策略实验方案
  15. """
  16. ori_title = None
  17. article_summary = None
  18. article_keys = None
  19. gh_id = None
  20. trace_id = None
  21. def __init__(self, info, gh_id):
  22. SearchABTest.set_class_properties(info, gh_id)
  23. @classmethod
  24. def set_class_properties(cls, info, gh_id):
  25. """
  26. 初始化搜索策略实验类
  27. :param info: kimi 挖掘的基本信息
  28. :param gh_id: 公众号账号 id
  29. :return:
  30. """
  31. cls.ori_title = info["ori_title"]
  32. cls.article_summary = info["content_title"]
  33. cls.article_keys = info["content_keys"]
  34. cls.trace_id = info["trace_id"]
  35. cls.gh_id = gh_id
  36. @classmethod
  37. async def base_line(cls):
  38. """
  39. 兜底策略
  40. """
  41. result = await SearchMethod().search_v0(
  42. text=cls.article_keys[0],
  43. trace_id=cls.trace_id
  44. )
  45. if result:
  46. return result
  47. else:
  48. sub_result = await SearchMethod().search_v0(
  49. text=cls.article_keys[1],
  50. trace_id=cls.trace_id)
  51. if sub_result:
  52. return sub_result
  53. else:
  54. return await SearchMethod().search_v0(
  55. text=cls.article_keys[2],
  56. trace_id=cls.trace_id
  57. )
  58. @classmethod
  59. async def ab_0(cls):
  60. """
  61. 默认原标题搜索
  62. :return:
  63. """
  64. search_result = await SearchMethod().search_v0(
  65. text=cls.ori_title,
  66. trace_id=cls.trace_id
  67. )
  68. if search_result:
  69. return search_result
  70. else:
  71. return await cls.base_line()
  72. @classmethod
  73. async def ab_1(cls):
  74. """
  75. 使用 content_summary搜索
  76. :return:
  77. """
  78. search_result = await SearchMethod().search_v0(
  79. text=cls.article_summary,
  80. trace_id=cls.trace_id
  81. )
  82. if search_result:
  83. return search_result
  84. else:
  85. return await cls.ab_0()
  86. @classmethod
  87. async def ab_2(cls):
  88. """
  89. 使用文本关键词搜索
  90. :return:
  91. """
  92. search_result = await SearchMethod().search_v0(
  93. text=",".join(cls.article_keys[0]),
  94. trace_id=cls.trace_id
  95. )
  96. if search_result:
  97. return search_result
  98. else:
  99. return await cls.base_line()
  100. @classmethod
  101. async def ab_3(cls):
  102. """
  103. 使用文本关键词搜索
  104. :return:
  105. """
  106. search_result = await SearchMethod().search_v0(
  107. text=",".join(cls.article_keys[1]),
  108. trace_id=cls.trace_id
  109. )
  110. if search_result:
  111. return search_result
  112. else:
  113. return await cls.base_line()
  114. @classmethod
  115. async def ab_4(cls):
  116. """
  117. 使用文本关键词搜索
  118. :return:
  119. """
  120. search_result = await SearchMethod().search_v0(
  121. text=",".join(cls.article_keys[3]),
  122. trace_id=cls.trace_id
  123. )
  124. if search_result:
  125. return search_result
  126. else:
  127. return await cls.base_line()
  128. class SearchMethod(object):
  129. """
  130. 搜索召回模式
  131. """
  132. s_words = select_sensitive_words()
  133. @classmethod
  134. async def search_v0(cls, text, trace_id):
  135. """
  136. 搜索顺序-wx --> baidu --> xigua
  137. 一共需要返回三条视频
  138. :return:
  139. """
  140. wx_result = wx_search(keys=text, sensitive_words=cls.s_words)
  141. if wx_result:
  142. return {"platform": "wx_search", "result": wx_result[0]}
  143. else:
  144. logging(
  145. code="7001",
  146. info="通过微信搜索失败---{}".format(text),
  147. trace_id=trace_id,
  148. )
  149. # 微信搜不到的话,采用好看视频搜索
  150. baidu_result = hksp_search(key=text, sensitive_words=cls.s_words)
  151. if baidu_result:
  152. return {"platform": "baidu_search", "result": baidu_result[0]}
  153. else:
  154. # 若好看视频未搜到,则采用西瓜搜索
  155. logging(
  156. code="7001",
  157. info="通过baidu搜索失败---{}".format(text),
  158. trace_id=trace_id,
  159. )
  160. xigua_result = xigua_search(keyword=text, sensitive_words=cls.s_words)
  161. if xigua_result:
  162. return {"platform": "xg_search", "result": xigua_result[0]}
  163. else:
  164. logging(
  165. code="7001",
  166. info="通过西瓜搜索失败---{}, 启用兜底方式".format(text),
  167. trace_id=trace_id,
  168. )
  169. return None
  170. async def video_sender(video_obj, user, trace_id, platform):
  171. """
  172. 异步处理微信 video_obj
  173. 公众号和站内账号一一对应
  174. :param platform:
  175. :param user:
  176. :param trace_id:
  177. :param video_obj:
  178. :return:
  179. """
  180. # ETL_MQ = MQ(topic_name="topic_crawler_etl_prod")
  181. Video = VideoProducer()
  182. if platform == "xg_search":
  183. mq_obj = Video.xg_video_producer(
  184. video_obj=video_obj,
  185. user=user,
  186. trace_id=trace_id,
  187. )
  188. elif platform == "baidu_search":
  189. mq_obj = Video.baidu_video_producer(
  190. video_obj=video_obj,
  191. user=user,
  192. trace_id=trace_id,
  193. )
  194. elif platform == "wx_search":
  195. mq_obj = Video.wx_video_producer(
  196. video_obj=video_obj,
  197. user=user,
  198. trace_id=trace_id,
  199. )
  200. else:
  201. mq_obj = {}
  202. AE = AsyncETL(video_obj=mq_obj)
  203. video_id = await AE.etl_deal()
  204. logging(
  205. code="6002",
  206. info="视频下载完成",
  207. data=mq_obj,
  208. trace_id=trace_id
  209. )
  210. return video_id
  211. async def search_videos(kimi_info, trace_id, gh_id, mysql_client):
  212. """
  213. search and send msg to ETL
  214. :param mysql_client:
  215. :param kimi_info:
  216. :param gh_id: 通过账号 id 来控制实验策略
  217. :param trace_id:
  218. :return:
  219. """
  220. kimi_info["trace_id"] = trace_id
  221. SearchAB = SearchABTest(info=kimi_info, gh_id=gh_id)
  222. recall_obj_1 = await SearchAB.ab_1()
  223. await asyncio.sleep(3)
  224. recall_obj_2 = await SearchAB.ab_2()
  225. await asyncio.sleep(3)
  226. recall_obj_3 = await SearchAB.ab_3()
  227. recall_list = [recall_obj_1, recall_obj_2, recall_obj_3]
  228. un_empty_list = [i for i in recall_list if i]
  229. if len(un_empty_list) < 3:
  230. await asyncio.sleep(3)
  231. recall_obj_4 = await SearchAB.ab_4()
  232. un_empty_list.append(recall_obj_4)
  233. # 逐条下载,逐条写表
  234. if un_empty_list:
  235. for index, recall_obj in enumerate(un_empty_list, 1):
  236. platform = recall_obj["platform"]
  237. recall_video = recall_obj["result"]
  238. if recall_video:
  239. logging(
  240. code="7002",
  241. info="视频搜索成功, 搜索平台为--{}".format(platform),
  242. trace_id=trace_id,
  243. data=recall_video,
  244. )
  245. video_id = await video_sender(
  246. video_obj=recall_video,
  247. user=gh_id_dict.get(gh_id),
  248. trace_id=trace_id,
  249. platform=platform,
  250. )
  251. update_id_sql = f"""
  252. UPDATE long_articles_video_dev
  253. SET
  254. recall_video_id{index} = {video_id}
  255. WHERE
  256. trace_id = '{trace_id}'
  257. """
  258. print(update_id_sql)
  259. await mysql_client.async_insert(update_id_sql)
  260. else:
  261. logging(code="7003", info="视频搜索失败", trace_id=trace_id)
  262. return None