search_schedule.py 11 KB


  1. """
  2. @author: luojunhui
  3. 调用接口在微信内搜索视频
  4. """
  5. import json
  6. import time
  7. from applications.match_algorithm.rank import title_similarity_rank
  8. from applications.search import *
  9. from static.config import gh_id_dict, db_article
  10. from applications.functions.log import logging
  11. from applications.functions.video_item import VideoProducer
  12. from applications.functions.kimi import KimiServer
  13. from applications.functions.common import request_etl
  14. DEFAULT_ACCOUNT_ID = "69637498"
  15. class SearchABTest(object):
  16. """
  17. 搜索策略实验方案
  18. """
  19. ori_title = None
  20. article_summary = None
  21. article_keys = None
  22. gh_id = None
  23. trace_id = None
  24. def __init__(self, info, gh_id):
  25. SearchABTest.set_class_properties(info, gh_id)
  26. @classmethod
  27. def set_class_properties(cls, info, gh_id):
  28. """
  29. 初始化搜索策略实验类
  30. :param info: kimi 挖掘的基本信息
  31. :param gh_id: 公众号账号 id
  32. :return:
  33. """
  34. cls.ori_title = info["ori_title"]
  35. cls.article_summary = info["content_title"]
  36. cls.article_keys = info["content_keys"]
  37. cls.trace_id = info["trace_id"]
  38. cls.gh_id = gh_id
  39. @classmethod
  40. async def base_line(cls):
  41. """
  42. 兜底策略
  43. """
  44. result = await SearchMethod().search_v1(
  45. text=cls.article_keys[0],
  46. trace_id=cls.trace_id
  47. )
  48. if result:
  49. return result
  50. else:
  51. sub_result = await SearchMethod().search_v1(
  52. text=cls.article_keys[1],
  53. trace_id=cls.trace_id)
  54. if sub_result:
  55. return sub_result
  56. else:
  57. return await SearchMethod().search_v1(
  58. text=cls.article_keys[2],
  59. trace_id=cls.trace_id
  60. )
  61. @classmethod
  62. async def ab_0(cls):
  63. """
  64. 默认原标题搜索
  65. :return:
  66. """
  67. search_result = await SearchMethod().search_v1(
  68. text=cls.ori_title,
  69. trace_id=cls.trace_id
  70. )
  71. if search_result:
  72. return search_result
  73. else:
  74. return await cls.base_line()
  75. @classmethod
  76. async def ab_1(cls):
  77. """
  78. 使用 content_summary搜索
  79. :return:
  80. """
  81. search_result = await SearchMethod().search_v1(
  82. text=cls.article_summary,
  83. trace_id=cls.trace_id
  84. )
  85. if search_result:
  86. return search_result
  87. else:
  88. return await cls.ab_0()
  89. @classmethod
  90. async def ab_2(cls):
  91. """
  92. 使用文本关键词搜索
  93. :return:
  94. """
  95. search_result = await SearchMethod().search_v1(
  96. text=cls.article_keys[0],
  97. trace_id=cls.trace_id
  98. )
  99. if search_result:
  100. return search_result
  101. else:
  102. return await cls.base_line()
  103. @classmethod
  104. async def ab_3(cls):
  105. """
  106. 使用文本关键词搜索
  107. :return:
  108. """
  109. search_result = await SearchMethod().search_v1(
  110. text=cls.article_keys[1],
  111. trace_id=cls.trace_id
  112. )
  113. if search_result:
  114. return search_result
  115. else:
  116. return await cls.base_line()
  117. @classmethod
  118. async def ab_4(cls):
  119. """
  120. 使用文本关键词搜索
  121. :return:
  122. """
  123. search_result = await SearchMethod().search_v1(
  124. text=cls.article_keys[2],
  125. trace_id=cls.trace_id
  126. )
  127. if search_result:
  128. return search_result
  129. else:
  130. return await cls.base_line()
  131. @classmethod
  132. async def ab_5(cls):
  133. """
  134. 增量搜索, 返回result_list
  135. :return:
  136. """
  137. result_list = await SearchMethod().search_v2(
  138. text=cls.article_summary[:15],
  139. trace_id=cls.trace_id
  140. )
  141. if len(result_list) > 3:
  142. return result_list
  143. else:
  144. result_list += await SearchMethod().search_v2(
  145. text=cls.ori_title[:15],
  146. trace_id=cls.trace_id
  147. )
  148. if len(result_list) > 3:
  149. return result_list
  150. else:
  151. result_list += await SearchMethod().search_v2(
  152. text=cls.article_keys[0],
  153. trace_id=cls.trace_id
  154. )
  155. if len(result_list) > 3:
  156. return result_list
  157. else:
  158. result_list += await SearchMethod().search_v2(
  159. text=cls.article_keys[1],
  160. trace_id=cls.trace_id
  161. )
  162. if result_list:
  163. return result_list
  164. else:
  165. result_list += await SearchMethod().search_v2(
  166. text=cls.article_keys[2],
  167. trace_id=cls.trace_id
  168. )
  169. return result_list
  170. class SearchMethod(object):
  171. """
  172. 搜索召回模式
  173. """
  174. s_words = []
  175. @classmethod
  176. async def search_v1(cls, text, trace_id):
  177. """
  178. dy ---> baidu ---> xigua
  179. :param text:
  180. :param trace_id:
  181. :return:
  182. """
  183. douyin_result = douyin_search(keyword=text, sensitive_words=cls.s_words, trace_id=trace_id)
  184. if douyin_result:
  185. return {"platform": "dy_search", "result": douyin_result[0]}
  186. else:
  187. time.sleep(1)
  188. baidu_result = hksp_search(key=text, sensitive_words=cls.s_words, trace_id=trace_id)
  189. if baidu_result:
  190. return {"platform": "baidu_search", "result": baidu_result[0]}
  191. else:
  192. xigua_result = xigua_search_v2(keyword=text, sensitive_words=cls.s_words)
  193. if xigua_result:
  194. return {"platform": "xg_search", "result": xigua_result[0]}
  195. else:
  196. return None
  197. @classmethod
  198. async def search_v2(cls, text, trace_id):
  199. """
  200. dy ---> baidu ---> xigua
  201. :param trace_id:
  202. :param text:
  203. :return:
  204. """
  205. L = []
  206. douyin_result = douyin_search(keyword=text, sensitive_words=cls.s_words, trace_id=trace_id)
  207. for vid_obj in douyin_result:
  208. L.append({"platform": "dy_search", "result": vid_obj})
  209. if len(L) >= 3:
  210. return L
  211. else:
  212. baidu_result = hksp_search(key=text, sensitive_words=cls.s_words, trace_id=trace_id)
  213. if baidu_result:
  214. L.append({"platform": "baidu_search", "result": baidu_result[0]})
  215. xigua_result = xigua_search_v2(keyword=text, sensitive_words=cls.s_words)
  216. if xigua_result:
  217. L.append({"platform": "xg_search", "result": xigua_result[0]})
  218. return L
  219. async def video_sender(video_obj, user, trace_id, platform, content_id):
  220. """
  221. 异步处理微信 video_obj
  222. 公众号和站内账号一一对应
  223. :param content_id:
  224. :param platform:
  225. :param user:
  226. :param trace_id:
  227. :param video_obj:
  228. :return:
  229. """
  230. Video = VideoProducer()
  231. if platform == "xg_search":
  232. mq_obj = Video.xg_video_producer(
  233. video_obj=video_obj,
  234. user=user,
  235. trace_id=trace_id,
  236. )
  237. elif platform == "baidu_search":
  238. mq_obj = Video.baidu_video_producer(
  239. video_obj=video_obj,
  240. user=user,
  241. trace_id=trace_id,
  242. )
  243. elif platform == "wx_search":
  244. mq_obj = Video.wx_video_producer(
  245. video_obj=video_obj,
  246. user=user,
  247. trace_id=trace_id,
  248. )
  249. elif platform == "dy_search":
  250. mq_obj = Video.dy_video_producer(
  251. video_obj=video_obj,
  252. user=user,
  253. trace_id=trace_id,
  254. )
  255. else:
  256. mq_obj = {}
  257. mq_obj['trace_id'] = trace_id
  258. mq_obj['content_id'] = content_id
  259. header = {
  260. "Content-Type": "application/json",
  261. }
  262. response = await request_etl(
  263. url="http://192.168.203.137:4612/etl",
  264. headers=header,
  265. json_data=mq_obj
  266. )
  267. return response
  268. # response = await request_etl(
  269. # url="http://localhost:4612/etl",
  270. # headers=header,
  271. # json_data=mq_obj
  272. # )
  273. # return response
  274. async def search_videos(params, trace_id, gh_id, mysql_client):
  275. """
  276. search and send msg to ETL
  277. :param mysql_client:
  278. :param params:
  279. :param gh_id: 通过账号 id 来控制实验策略
  280. :param trace_id:
  281. :return:
  282. """
  283. K = KimiServer()
  284. kimi_info = await K.search_kimi_schedule(params=params)
  285. kimi_title = kimi_info['k_title']
  286. content_title = kimi_info['content_title'].replace("'", "").replace('"', "")
  287. content_keys = json.dumps(kimi_info['content_keys'], ensure_ascii=False)
  288. update_kimi_sql = f"""
  289. UPDATE {db_article} SET
  290. kimi_title = %s,
  291. kimi_summary = %s,
  292. kimi_keys = %s
  293. WHERE trace_id = %s;
  294. """
  295. await mysql_client.async_insert(
  296. sql=update_kimi_sql,
  297. params=(kimi_title, content_title, content_keys, trace_id)
  298. )
  299. kimi_info["trace_id"] = trace_id
  300. SearchAB = SearchABTest(info=kimi_info, gh_id=gh_id)
  301. # 启三个搜索,每个搜索都保证要搜索到, 分别用key1, key2, key3去搜索
  302. recall_list = await SearchAB.ab_5()
  303. logging(
  304. code="1006",
  305. info="搜索到{}条视频".format(len(recall_list)),
  306. data=recall_list,
  307. trace_id=trace_id
  308. )
  309. # 按照标题相似度排序
  310. ranked_list = title_similarity_rank(content_title=params['title'].split("@@")[-1], recall_list=recall_list)
  311. index = 0
  312. for recall_obj in ranked_list:
  313. if recall_obj:
  314. platform = recall_obj['platform']
  315. recall_video = recall_obj['result']
  316. if recall_video:
  317. response = await video_sender(
  318. video_obj=recall_video,
  319. user=gh_id_dict.get(gh_id, DEFAULT_ACCOUNT_ID),
  320. trace_id=trace_id,
  321. platform=platform,
  322. content_id=params['content_id']
  323. )
  324. if response['status'] == "success":
  325. index += 1
  326. logging(
  327. code="1007",
  328. info="成功请求etl",
  329. data=recall_video,
  330. trace_id=trace_id
  331. )
  332. if index >= 3:
  333. print("already downloaded 3 videos")
  334. logging(
  335. code="1008",
  336. info="成功下载三条视频",
  337. trace_id=trace_id
  338. )
  339. return index
  340. return index