search_schedule.py 10.0 KB


  1. """
  2. @author: luojunhui
  3. 调用接口在微信内搜索视频
  4. """
  5. from applications.search import *
  6. from applications.static.config import gh_id_dict, ab_test_config
  7. from applications.functions.log import logging
  8. from applications.functions.video_item import VideoProducer
  9. from applications.functions.async_etl import AsyncETL
  10. class SearchABTest(object):
  11. """
  12. 搜索策略实验方案
  13. """
  14. ori_title = None
  15. article_summary = None
  16. article_keys = None
  17. gh_id = None
  18. trace_id = None
  19. def __init__(self, info, gh_id):
  20. SearchABTest.set_class_properties(info, gh_id)
  21. @classmethod
  22. def set_class_properties(cls, info, gh_id):
  23. """
  24. 初始化搜索策略实验类
  25. :param info: kimi 挖掘的基本信息
  26. :param gh_id: 公众号账号 id
  27. :return:
  28. """
  29. cls.ori_title = info["ori_title"]
  30. cls.article_summary = info["content_title"]
  31. cls.article_keys = info["content_keys"]
  32. cls.trace_id = info["trace_id"]
  33. cls.gh_id = gh_id
  34. @classmethod
  35. def dd(cls):
  36. """
  37. 兜底
  38. :return:
  39. """
  40. wx_result_ = wx_search(keys=cls.article_keys[0])
  41. if wx_result_:
  42. logging(
  43. code="7011",
  44. info="微信兜底搜索成功",
  45. trace_id=cls.trace_id,
  46. )
  47. return {"platform": "wx_search", "result": wx_result_[0]}
  48. else:
  49. baidu_result_ = hksp_search(key=cls.article_keys[0])
  50. if baidu_result_:
  51. logging(
  52. code="7011",
  53. info="百度兜底搜索成功",
  54. trace_id=cls.trace_id,
  55. )
  56. return {"platform": "baidu_search", "result": baidu_result_[0]}
  57. else:
  58. return None
  59. @classmethod
  60. def ab_0(cls):
  61. """
  62. 默认搜索逻辑
  63. :return:
  64. """
  65. wx_result = wx_search(keys=cls.ori_title)
  66. if wx_result:
  67. return {"platform": "wx_search", "result": wx_result[0]}
  68. else:
  69. logging(
  70. code="7001",
  71. info="通过微信搜索失败---{}".format(cls.ori_title),
  72. trace_id=cls.trace_id,
  73. )
  74. # 微信搜不到的话,采用好看视频搜索
  75. baidu_result = hksp_search(key=cls.ori_title)
  76. if baidu_result:
  77. return {"platform": "baidu_search", "result": baidu_result[0]}
  78. else:
  79. # 若好看视频未搜到,则采用西瓜搜索
  80. logging(
  81. code="7001",
  82. info="通过baidu搜索失败---{}".format(cls.ori_title),
  83. trace_id=cls.trace_id,
  84. )
  85. xigua_result = xigua_search(keyword=cls.ori_title)
  86. if xigua_result:
  87. return {"platform": "xg_search", "result": xigua_result[0]}
  88. else:
  89. logging(
  90. code="7001",
  91. info="通过西瓜搜索失败---{}, 启用兜底方式".format(cls.ori_title),
  92. trace_id=cls.trace_id,
  93. )
  94. return cls.dd()
  95. @classmethod
  96. def ab_1(cls):
  97. """
  98. :return:
  99. """
  100. wx_result = wx_search(keys=cls.article_summary)
  101. if wx_result:
  102. return {"platform": "wx_search", "result": wx_result[0]}
  103. else:
  104. logging(
  105. code="7001",
  106. info="通过微信搜索失败---{}".format(cls.article_summary),
  107. trace_id=cls.trace_id,
  108. )
  109. # 微信搜不到的话,采用好看视频搜索
  110. baidu_result = hksp_search(key=cls.article_summary)
  111. if baidu_result:
  112. return {"platform": "baidu_search", "result": baidu_result[0]}
  113. else:
  114. # 若好看视频未搜到,则采用西瓜搜索
  115. logging(
  116. code="7001",
  117. info="通过baidu搜索失败---{}".format(cls.article_summary),
  118. trace_id=cls.trace_id,
  119. )
  120. xigua_result = xigua_search(keyword=cls.article_summary)
  121. if xigua_result:
  122. return {"platform": "xg_search", "result": xigua_result[0]}
  123. else:
  124. logging(
  125. code="7001",
  126. info="通过西瓜搜索失败---{},启用兜底方式".format(cls.article_summary),
  127. trace_id=cls.trace_id,
  128. )
  129. return cls.dd()
  130. @classmethod
  131. def ab_2(cls):
  132. """
  133. ori_title + wx
  134. :return:
  135. """
  136. wx_result = wx_search(keys=",".join(cls.article_keys))
  137. if wx_result:
  138. return {"platform": "wx_search", "result": wx_result[0]}
  139. else:
  140. logging(
  141. code="7001",
  142. info="通过微信搜索失败---{}".format(",".join(cls.article_keys)),
  143. trace_id=cls.trace_id,
  144. )
  145. # 微信搜不到的话,采用好看视频搜索
  146. baidu_result = hksp_search(key=",".join(cls.article_keys))
  147. if baidu_result:
  148. return {"platform": "baidu_search", "result": baidu_result[0]}
  149. else:
  150. # 若好看视频未搜到,则采用西瓜搜索
  151. logging(
  152. code="7001",
  153. info="通过baidu搜索失败---{}".format(",".join(cls.article_keys)),
  154. trace_id=cls.trace_id,
  155. )
  156. xigua_result = xigua_search(keyword=",".join(cls.article_keys))
  157. if xigua_result:
  158. return {"platform": "xg_search", "result": xigua_result[0]}
  159. else:
  160. logging(
  161. code="7001",
  162. info="通过西瓜搜索失败---{},启用兜底".format(",".join(cls.article_keys)),
  163. trace_id=cls.trace_id,
  164. )
  165. return cls.dd()
  166. @classmethod
  167. def ab_3(cls):
  168. """
  169. article_summary + baidu
  170. :return:
  171. """
  172. result = hksp_search(key=cls.article_summary)
  173. return {"platform": "baidu_search", "result": result[0] if result else []}
  174. @classmethod
  175. def ab_4(cls):
  176. """
  177. article_summary + weixin
  178. :return:
  179. """
  180. result = wx_search(keys=cls.article_summary)
  181. return {"platform": "wx_search", "result": result[0] if result else []}
  182. @classmethod
  183. def ab_5(cls):
  184. """
  185. article_keys + weixin
  186. :return:
  187. """
  188. result = wx_search(keys=",".join(cls.article_keys))
  189. return {"platform": "wx_search", "result": result[0] if result else []}
  190. @classmethod
  191. def ab_6(cls):
  192. """
  193. article_keys + baidu
  194. :return:
  195. """
  196. result = hksp_search(key=",".join(cls.article_keys))
  197. return {"platform": "baidu_search", "result": result[0] if result else []}
  198. async def video_sender(video_obj, user, trace_id, platform):
  199. """
  200. 异步处理微信 video_obj
  201. 公众号和站内账号一一对应
  202. :param platform:
  203. :param user:
  204. :param trace_id:
  205. :param video_obj:
  206. :return:
  207. """
  208. # ETL_MQ = MQ(topic_name="topic_crawler_etl_prod")
  209. Video = VideoProducer()
  210. if platform == "xg_search":
  211. mq_obj = Video.xg_video_producer(
  212. video_obj=video_obj,
  213. user=user,
  214. trace_id=trace_id,
  215. )
  216. elif platform == "baidu_search":
  217. mq_obj = Video.baidu_video_producer(
  218. video_obj=video_obj,
  219. user=user,
  220. trace_id=trace_id,
  221. )
  222. elif platform == "wx_search":
  223. mq_obj = Video.wx_video_producer(
  224. video_obj=video_obj,
  225. user=user,
  226. trace_id=trace_id,
  227. )
  228. else:
  229. mq_obj = {}
  230. AE = AsyncETL(video_obj=mq_obj)
  231. video_id = await AE.etl_deal()
  232. logging(
  233. code="6002",
  234. info="视频下载完成",
  235. data=mq_obj,
  236. trace_id=trace_id
  237. )
  238. return video_id
  239. async def search_videos(kimi_info, trace_id, gh_id, mysql_client):
  240. """
  241. search and send msg to ETL
  242. :param mysql_client:
  243. :param kimi_info:
  244. :param gh_id: 通过账号 id 来控制实验策略
  245. :param trace_id:
  246. :return:
  247. """
  248. kimi_info["trace_id"] = trace_id
  249. SearchAB = SearchABTest(info=kimi_info, gh_id=gh_id)
  250. if ab_test_config.get(gh_id):
  251. test_id = ab_test_config[gh_id]
  252. if test_id == 0:
  253. recall_obj = SearchAB.ab_0()
  254. elif test_id == 1:
  255. recall_obj = SearchAB.ab_1()
  256. elif test_id == 2:
  257. recall_obj = SearchAB.ab_2()
  258. # elif test_id == 3:
  259. # recall_obj = SearchAB.ab_3()
  260. # elif test_id == 4:
  261. # recall_obj = SearchAB.ab_4()
  262. # elif test_id == 5:
  263. # recall_obj = SearchAB.ab_5()
  264. # elif test_id == 6:
  265. # recall_obj = SearchAB.ab_6()
  266. else:
  267. recall_obj = {}
  268. else:
  269. recall_obj = SearchAB.ab_0()
  270. if recall_obj:
  271. platform = recall_obj["platform"]
  272. recall_video = recall_obj["result"]
  273. if recall_video:
  274. logging(
  275. code="7002",
  276. info="视频搜索成功, 搜索平台为--{}".format(platform),
  277. trace_id=trace_id,
  278. data=recall_video,
  279. )
  280. video_id = await video_sender(
  281. video_obj=recall_video,
  282. user=gh_id_dict.get(gh_id),
  283. trace_id=trace_id,
  284. platform=platform,
  285. )
  286. update_id_sql = f"""
  287. UPDATE long_articles_video_dev
  288. SET
  289. recall_video_id1 = {video_id}
  290. WHERE
  291. trace_id = '{trace_id}'
  292. """
  293. await mysql_client.async_insert(update_id_sql)
  294. else:
  295. logging(code="7003", info="视频搜索失败", trace_id=trace_id)
  296. return None