search_schedule.py 11 KB


  1. """
  2. @author: luojunhui
  3. 调用接口在微信内搜索视频
  4. """
  5. import json
  6. import time
  7. from applications.match_algorithm.rank import title_similarity_rank
  8. from applications.search import *
  9. from static.config import gh_id_dict, db_article
  10. from applications.functions.log import logging
  11. from applications.functions.video_item import VideoProducer
  12. from applications.functions.kimi import KimiServer
  13. from applications.functions.common import request_etl
  14. DEFAULT_ACCOUNT_ID_DICT = {
  15. "uid": 69637498,
  16. "nick_name": "望长安"
  17. }
  18. class SearchABTest(object):
  19. """
  20. 搜索策略实验方案
  21. """
  22. ori_title = None
  23. article_summary = None
  24. article_keys = None
  25. gh_id = None
  26. trace_id = None
  27. def __init__(self, info, gh_id):
  28. SearchABTest.set_class_properties(info, gh_id)
  29. @classmethod
  30. def set_class_properties(cls, info, gh_id):
  31. """
  32. 初始化搜索策略实验类
  33. :param info: kimi 挖掘的基本信息
  34. :param gh_id: 公众号账号 id
  35. :return:
  36. """
  37. cls.ori_title = info["ori_title"]
  38. cls.article_summary = info["content_title"]
  39. cls.article_keys = info["content_keys"]
  40. cls.trace_id = info["trace_id"]
  41. cls.gh_id = gh_id
  42. @classmethod
  43. async def base_line(cls):
  44. """
  45. 兜底策略
  46. """
  47. result = await SearchMethod().search_v1(
  48. text=cls.article_keys[0],
  49. trace_id=cls.trace_id
  50. )
  51. if result:
  52. return result
  53. else:
  54. sub_result = await SearchMethod().search_v1(
  55. text=cls.article_keys[1],
  56. trace_id=cls.trace_id)
  57. if sub_result:
  58. return sub_result
  59. else:
  60. return await SearchMethod().search_v1(
  61. text=cls.article_keys[2],
  62. trace_id=cls.trace_id
  63. )
  64. @classmethod
  65. async def ab_0(cls):
  66. """
  67. 默认原标题搜索
  68. :return:
  69. """
  70. search_result = await SearchMethod().search_v1(
  71. text=cls.ori_title,
  72. trace_id=cls.trace_id
  73. )
  74. if search_result:
  75. return search_result
  76. else:
  77. return await cls.base_line()
  78. @classmethod
  79. async def ab_1(cls):
  80. """
  81. 使用 content_summary搜索
  82. :return:
  83. """
  84. search_result = await SearchMethod().search_v1(
  85. text=cls.article_summary,
  86. trace_id=cls.trace_id
  87. )
  88. if search_result:
  89. return search_result
  90. else:
  91. return await cls.ab_0()
  92. @classmethod
  93. async def ab_2(cls):
  94. """
  95. 使用文本关键词搜索
  96. :return:
  97. """
  98. search_result = await SearchMethod().search_v1(
  99. text=cls.article_keys[0],
  100. trace_id=cls.trace_id
  101. )
  102. if search_result:
  103. return search_result
  104. else:
  105. return await cls.base_line()
  106. @classmethod
  107. async def ab_3(cls):
  108. """
  109. 使用文本关键词搜索
  110. :return:
  111. """
  112. search_result = await SearchMethod().search_v1(
  113. text=cls.article_keys[1],
  114. trace_id=cls.trace_id
  115. )
  116. if search_result:
  117. return search_result
  118. else:
  119. return await cls.base_line()
  120. @classmethod
  121. async def ab_4(cls):
  122. """
  123. 使用文本关键词搜索
  124. :return:
  125. """
  126. search_result = await SearchMethod().search_v1(
  127. text=cls.article_keys[2],
  128. trace_id=cls.trace_id
  129. )
  130. if search_result:
  131. return search_result
  132. else:
  133. return await cls.base_line()
  134. @classmethod
  135. async def ab_5(cls):
  136. """
  137. 增量搜索, 返回result_list
  138. :return:
  139. """
  140. result_list = await SearchMethod().search_v2(
  141. text=cls.article_summary[:15],
  142. trace_id=cls.trace_id
  143. )
  144. if len(result_list) > 3:
  145. return result_list
  146. else:
  147. result_list += await SearchMethod().search_v2(
  148. text=cls.ori_title[:15],
  149. trace_id=cls.trace_id
  150. )
  151. if len(result_list) > 3:
  152. return result_list
  153. else:
  154. result_list += await SearchMethod().search_v2(
  155. text=cls.article_keys[0],
  156. trace_id=cls.trace_id
  157. )
  158. if len(result_list) > 3:
  159. return result_list
  160. else:
  161. result_list += await SearchMethod().search_v2(
  162. text=cls.article_keys[1],
  163. trace_id=cls.trace_id
  164. )
  165. if result_list:
  166. return result_list
  167. else:
  168. result_list += await SearchMethod().search_v2(
  169. text=cls.article_keys[2],
  170. trace_id=cls.trace_id
  171. )
  172. return result_list
  173. class SearchMethod(object):
  174. """
  175. 搜索召回模式
  176. """
  177. s_words = []
  178. @classmethod
  179. async def search_v1(cls, text, trace_id):
  180. """
  181. dy ---> baidu ---> xigua
  182. :param text:
  183. :param trace_id:
  184. :return:
  185. """
  186. douyin_result = douyin_search(keyword=text, sensitive_words=cls.s_words, trace_id=trace_id)
  187. if douyin_result:
  188. return {"platform": "dy_search", "result": douyin_result[0]}
  189. else:
  190. time.sleep(1)
  191. baidu_result = hksp_search(key=text, sensitive_words=cls.s_words, trace_id=trace_id)
  192. if baidu_result:
  193. return {"platform": "baidu_search", "result": baidu_result[0]}
  194. else:
  195. xigua_result = xigua_search_v2(keyword=text, sensitive_words=cls.s_words)
  196. if xigua_result:
  197. return {"platform": "xg_search", "result": xigua_result[0]}
  198. else:
  199. return None
  200. @classmethod
  201. async def search_v2(cls, text, trace_id):
  202. """
  203. dy ---> baidu ---> xigua
  204. :param trace_id:
  205. :param text:
  206. :return:
  207. """
  208. L = []
  209. douyin_result = douyin_search(keyword=text, sensitive_words=cls.s_words, trace_id=trace_id)
  210. for vid_obj in douyin_result:
  211. L.append({"platform": "dy_search", "result": vid_obj})
  212. if len(L) >= 3:
  213. return L
  214. else:
  215. baidu_result = hksp_search(key=text, sensitive_words=cls.s_words, trace_id=trace_id)
  216. if baidu_result:
  217. L.append({"platform": "baidu_search", "result": baidu_result[0]})
  218. xigua_result = xigua_search_v2(keyword=text, sensitive_words=cls.s_words)
  219. if xigua_result:
  220. L.append({"platform": "xg_search", "result": xigua_result[0]})
  221. return L
  222. async def video_sender(video_obj, user, trace_id, platform, content_id):
  223. """
  224. 异步处理微信 video_obj
  225. 公众号和站内账号一一对应
  226. :param content_id:
  227. :param platform:
  228. :param user:
  229. :param trace_id:
  230. :param video_obj:
  231. :return:
  232. """
  233. Video = VideoProducer()
  234. if platform == "xg_search":
  235. mq_obj = Video.xg_video_producer(
  236. video_obj=video_obj,
  237. user=user,
  238. trace_id=trace_id,
  239. )
  240. elif platform == "baidu_search":
  241. mq_obj = Video.baidu_video_producer(
  242. video_obj=video_obj,
  243. user=user,
  244. trace_id=trace_id,
  245. )
  246. elif platform == "wx_search":
  247. mq_obj = Video.wx_video_producer(
  248. video_obj=video_obj,
  249. user=user,
  250. trace_id=trace_id,
  251. )
  252. elif platform == "dy_search":
  253. mq_obj = Video.dy_video_producer(
  254. video_obj=video_obj,
  255. user=user,
  256. trace_id=trace_id,
  257. )
  258. else:
  259. mq_obj = {}
  260. mq_obj['trace_id'] = trace_id
  261. mq_obj['content_id'] = content_id
  262. header = {
  263. "Content-Type": "application/json",
  264. }
  265. response = await request_etl(
  266. url="http://192.168.203.137:4612/etl",
  267. headers=header,
  268. json_data=mq_obj
  269. )
  270. return response
  271. # response = await request_etl(
  272. # url="http://localhost:4612/etl",
  273. # headers=header,
  274. # json_data=mq_obj
  275. # )
  276. # return response
  277. async def search_videos(params, trace_id, gh_id, mysql_client):
  278. """
  279. search and send msg to ETL
  280. :param mysql_client:
  281. :param params:
  282. :param gh_id: 通过账号 id 来控制实验策略
  283. :param trace_id:
  284. :return:
  285. """
  286. K = KimiServer()
  287. kimi_info = await K.search_kimi_schedule(params=params)
  288. kimi_title = kimi_info['k_title']
  289. content_title = kimi_info['content_title'].replace("'", "").replace('"', "")
  290. content_keys = json.dumps(kimi_info['content_keys'], ensure_ascii=False)
  291. update_kimi_sql = f"""
  292. UPDATE {db_article} SET
  293. kimi_title = %s,
  294. kimi_summary = %s,
  295. kimi_keys = %s
  296. WHERE trace_id = %s;
  297. """
  298. await mysql_client.async_insert(
  299. sql=update_kimi_sql,
  300. params=(kimi_title, content_title, content_keys, trace_id)
  301. )
  302. kimi_info["trace_id"] = trace_id
  303. SearchAB = SearchABTest(info=kimi_info, gh_id=gh_id)
  304. # 启三个搜索,每个搜索都保证要搜索到, 分别用key1, key2, key3去搜索
  305. recall_list = await SearchAB.ab_5()
  306. logging(
  307. code="1006",
  308. info="搜索到{}条视频".format(len(recall_list)),
  309. data=recall_list,
  310. trace_id=trace_id
  311. )
  312. # 按照标题相似度排序
  313. ranked_list = title_similarity_rank(content_title=params['title'].split("@@")[-1], recall_list=recall_list)
  314. index = 0
  315. for recall_obj in ranked_list:
  316. if recall_obj:
  317. platform = recall_obj['platform']
  318. recall_video = recall_obj['result']
  319. if recall_video:
  320. response = await video_sender(
  321. video_obj=recall_video,
  322. user=gh_id_dict.get(gh_id, DEFAULT_ACCOUNT_ID_DICT),
  323. trace_id=trace_id,
  324. platform=platform,
  325. content_id=params['content_id']
  326. )
  327. if response['status'] == "success":
  328. index += 1
  329. logging(
  330. code="1007",
  331. info="成功请求etl",
  332. data=recall_video,
  333. trace_id=trace_id
  334. )
  335. if index >= 3:
  336. print("already downloaded 3 videos")
  337. logging(
  338. code="1008",
  339. info="成功下载三条视频",
  340. trace_id=trace_id
  341. )
  342. return index
  343. return index