search_schedule.py 15 KB


  1. """
  2. @author: luojunhui
  3. 调用接口在微信内搜索视频
  4. """
  5. import json
  6. import time
  7. from applications.search import *
  8. from applications.static.config import gh_id_dict
  9. from applications.functions.log import logging
  10. from applications.functions.video_item import VideoProducer
  11. from applications.functions.mysql import select_sensitive_words
  12. from applications.functions.kimi import KimiServer
  13. from applications.functions.common import request_etl
  14. class SearchABTest(object):
  15. """
  16. 搜索策略实验方案
  17. """
  18. ori_title = None
  19. article_summary = None
  20. article_keys = None
  21. gh_id = None
  22. trace_id = None
  23. def __init__(self, info, gh_id):
  24. SearchABTest.set_class_properties(info, gh_id)
  25. @classmethod
  26. def set_class_properties(cls, info, gh_id):
  27. """
  28. 初始化搜索策略实验类
  29. :param info: kimi 挖掘的基本信息
  30. :param gh_id: 公众号账号 id
  31. :return:
  32. """
  33. cls.ori_title = info["ori_title"]
  34. cls.article_summary = info["content_title"]
  35. cls.article_keys = info["content_keys"]
  36. cls.trace_id = info["trace_id"]
  37. cls.gh_id = gh_id
  38. @classmethod
  39. async def base_line(cls):
  40. """
  41. 兜底策略
  42. """
  43. result = await SearchMethod().search_v1(
  44. text=cls.article_keys[0],
  45. trace_id=cls.trace_id
  46. )
  47. if result:
  48. return result
  49. else:
  50. sub_result = await SearchMethod().search_v1(
  51. text=cls.article_keys[1],
  52. trace_id=cls.trace_id)
  53. if sub_result:
  54. return sub_result
  55. else:
  56. return await SearchMethod().search_v1(
  57. text=cls.article_keys[2],
  58. trace_id=cls.trace_id
  59. )
  60. @classmethod
  61. async def ab_0(cls):
  62. """
  63. 默认原标题搜索
  64. :return:
  65. """
  66. search_result = await SearchMethod().search_v1(
  67. text=cls.ori_title,
  68. trace_id=cls.trace_id
  69. )
  70. if search_result:
  71. return search_result
  72. else:
  73. return await cls.base_line()
  74. @classmethod
  75. async def ab_1(cls):
  76. """
  77. 使用 content_summary搜索
  78. :return:
  79. """
  80. search_result = await SearchMethod().search_v1(
  81. text=cls.article_summary,
  82. trace_id=cls.trace_id
  83. )
  84. if search_result:
  85. return search_result
  86. else:
  87. return await cls.ab_0()
  88. @classmethod
  89. async def ab_2(cls):
  90. """
  91. 使用文本关键词搜索
  92. :return:
  93. """
  94. search_result = await SearchMethod().search_v1(
  95. text=cls.article_keys[0],
  96. trace_id=cls.trace_id
  97. )
  98. if search_result:
  99. return search_result
  100. else:
  101. return await cls.base_line()
  102. @classmethod
  103. async def ab_3(cls):
  104. """
  105. 使用文本关键词搜索
  106. :return:
  107. """
  108. search_result = await SearchMethod().search_v1(
  109. text=cls.article_keys[1],
  110. trace_id=cls.trace_id
  111. )
  112. if search_result:
  113. return search_result
  114. else:
  115. return await cls.base_line()
  116. @classmethod
  117. async def ab_4(cls):
  118. """
  119. 使用文本关键词搜索
  120. :return:
  121. """
  122. search_result = await SearchMethod().search_v1(
  123. text=cls.article_keys[2],
  124. trace_id=cls.trace_id
  125. )
  126. if search_result:
  127. return search_result
  128. else:
  129. return await cls.base_line()
  130. @classmethod
  131. async def ab_5(cls):
  132. """
  133. 增量搜索, 返回result_list
  134. :return:
  135. """
  136. result_list = await SearchMethod().search_v2(
  137. text=cls.article_summary[:15],
  138. trace_id=cls.trace_id
  139. )
  140. if len(result_list) > 3:
  141. return result_list
  142. else:
  143. result_list += await SearchMethod().search_v2(
  144. text=cls.ori_title[:15],
  145. trace_id=cls.trace_id
  146. )
  147. if len(result_list) > 3:
  148. return result_list
  149. else:
  150. result_list += await SearchMethod().search_v2(
  151. text=cls.article_keys[0],
  152. trace_id=cls.trace_id
  153. )
  154. if len(result_list) > 3:
  155. return result_list
  156. else:
  157. result_list += await SearchMethod().search_v2(
  158. text=cls.article_keys[1],
  159. trace_id=cls.trace_id
  160. )
  161. if result_list:
  162. return result_list
  163. else:
  164. result_list += await SearchMethod().search_v2(
  165. text=cls.article_keys[2],
  166. trace_id=cls.trace_id
  167. )
  168. return result_list
  169. class SearchMethod(object):
  170. """
  171. 搜索召回模式
  172. """
  173. s_words = select_sensitive_words()
  174. @classmethod
  175. async def search_v0(cls, text, trace_id):
  176. """
  177. 搜索顺序-wx --> baidu --> xigua
  178. 一共需要返回三条视频
  179. :return:
  180. """
  181. wx_result = []
  182. if wx_result:
  183. return {"platform": "wx_search", "result": wx_result[0]}
  184. else:
  185. logging(
  186. code="7001",
  187. info="通过微信搜索失败---{}".format(text),
  188. trace_id=trace_id,
  189. )
  190. # 微信搜不到的话,采用好看视频搜索
  191. time.sleep(1)
  192. baidu_result = hksp_search(key=text, sensitive_words=cls.s_words)
  193. if baidu_result:
  194. return {"platform": "baidu_search", "result": baidu_result[0]}
  195. else:
  196. # 若好看视频未搜到,则采用西瓜搜索
  197. logging(
  198. code="7001",
  199. info="通过baidu搜索失败---{}".format(text),
  200. trace_id=trace_id,
  201. )
  202. # return None
  203. xigua_result = xigua_search_v2(keyword=text, sensitive_words=cls.s_words)
  204. if xigua_result:
  205. return {"platform": "xg_search", "result": xigua_result[0]}
  206. else:
  207. logging(
  208. code="7001",
  209. info="通过西瓜搜索失败---{}, 启用兜底方式".format(text),
  210. trace_id=trace_id,
  211. )
  212. return None
  213. @classmethod
  214. async def search_v1(cls, text, trace_id):
  215. """
  216. dy ---> baidu ---> xigua
  217. :param text:
  218. :param trace_id:
  219. :return:
  220. """
  221. douyin_result = douyin_search(keyword=text, sensitive_words=cls.s_words)
  222. if douyin_result:
  223. return {"platform": "dy_search", "result": douyin_result[0]}
  224. else:
  225. logging(
  226. code="7001",
  227. info="抖音搜索失败--{}".format(text),
  228. trace_id=trace_id
  229. )
  230. time.sleep(1)
  231. baidu_result = hksp_search(key=text, sensitive_words=cls.s_words)
  232. if baidu_result:
  233. return {"platform": "baidu_search", "result": baidu_result[0]}
  234. else:
  235. # 若好看视频未搜到,则采用西瓜搜索
  236. logging(
  237. code="7001",
  238. info="通过baidu搜索失败---{}".format(text),
  239. trace_id=trace_id,
  240. )
  241. # return None
  242. xigua_result = xigua_search_v2(keyword=text, sensitive_words=cls.s_words)
  243. if xigua_result:
  244. return {"platform": "xg_search", "result": xigua_result[0]}
  245. else:
  246. logging(
  247. code="7001",
  248. info="通过西瓜搜索失败---{}, 启用兜底方式".format(text),
  249. trace_id=trace_id,
  250. )
  251. return None
  252. @classmethod
  253. async def search_v2(cls, text, trace_id):
  254. """
  255. dy ---> baidu ---> xigua
  256. :param trace_id:
  257. :param text:
  258. :return:
  259. """
  260. L = []
  261. print(trace_id)
  262. douyin_result = douyin_search(keyword=text, sensitive_words=cls.s_words)
  263. for vid_obj in douyin_result:
  264. L.append({"platform": "dy_search", "result": vid_obj})
  265. if len(L) >= 3:
  266. return L
  267. else:
  268. baidu_result = hksp_search(key=text, sensitive_words=cls.s_words)
  269. if baidu_result:
  270. L.append({"platform": "baidu_search", "result": baidu_result[0]})
  271. xigua_result = xigua_search_v2(keyword=text, sensitive_words=cls.s_words)
  272. if xigua_result:
  273. L.append({"platform": "xg_search", "result": xigua_result[0]})
  274. return L
  275. async def video_sender(video_obj, user, trace_id, platform, index):
  276. """
  277. 异步处理微信 video_obj
  278. 公众号和站内账号一一对应
  279. :param index:
  280. :param platform:
  281. :param user:
  282. :param trace_id:
  283. :param video_obj:
  284. :return:
  285. """
  286. Video = VideoProducer()
  287. if platform == "xg_search":
  288. mq_obj = Video.xg_video_producer(
  289. video_obj=video_obj,
  290. user=user,
  291. trace_id=trace_id,
  292. )
  293. elif platform == "baidu_search":
  294. mq_obj = Video.baidu_video_producer(
  295. video_obj=video_obj,
  296. user=user,
  297. trace_id=trace_id,
  298. )
  299. elif platform == "wx_search":
  300. mq_obj = Video.wx_video_producer(
  301. video_obj=video_obj,
  302. user=user,
  303. trace_id=trace_id,
  304. )
  305. elif platform == "dy_search":
  306. mq_obj = Video.dy_video_producer(
  307. video_obj=video_obj,
  308. user=user,
  309. trace_id=trace_id,
  310. )
  311. else:
  312. mq_obj = {}
  313. mq_obj['index'] = index
  314. mq_obj['trace_id'] = trace_id
  315. header = {
  316. "Content-Type": "application/json",
  317. }
  318. await request_etl(
  319. url="http://192.168.203.137:4612/etl",
  320. headers=header,
  321. json_data=mq_obj
  322. )
  323. async def search_videos(params, trace_id, gh_id, mysql_client):
  324. """
  325. search and send msg to ETL
  326. :param mysql_client:
  327. :param params:
  328. :param gh_id: 通过账号 id 来控制实验策略
  329. :param trace_id:
  330. :return:
  331. """
  332. K = KimiServer()
  333. kimi_info = await K.search_kimi_schedule(params=params)
  334. print("{}---kimi 挖掘正常".format(trace_id))
  335. kimi_title = kimi_info['k_title']
  336. content_title = kimi_info['content_title'].replace("'", "").replace('"', "")
  337. content_keys = json.dumps(kimi_info['content_keys'], ensure_ascii=False)
  338. update_kimi_sql = f"""
  339. UPDATE long_articles_video_dev SET
  340. kimi_title = '{kimi_title}',
  341. kimi_summary = '{content_title}',
  342. kimi_keys = '{content_keys}'
  343. WHERE trace_id = '{trace_id}';
  344. """
  345. await mysql_client.async_insert(update_kimi_sql)
  346. kimi_info["trace_id"] = trace_id
  347. SearchAB = SearchABTest(info=kimi_info, gh_id=gh_id)
  348. # 启三个搜索,每个搜索都保证要搜索到, 分别用key1, key2, key3去搜索
  349. recall_list = await SearchAB.ab_5()
  350. print("一共搜索到{}条视频".format(len(recall_list)))
  351. index = 0
  352. for recall_obj in recall_list:
  353. if recall_obj:
  354. platform = recall_obj['platform']
  355. recall_video = recall_obj['result']
  356. if recall_video:
  357. index += 1
  358. await video_sender(
  359. video_obj=recall_video,
  360. user=gh_id_dict.get(gh_id),
  361. trace_id=trace_id,
  362. platform=platform,
  363. index=index
  364. )
  365. logging(
  366. code="7004",
  367. info="成功请求etl",
  368. trace_id=trace_id
  369. )
  370. if index >= 3:
  371. print("already downloaded 3 videos")
  372. break
  373. # SearchAB = SearchABTest(info=kimi_info, gh_id=gh_id)
  374. # recall_obj_1 = await SearchAB.ab_1()
  375. # # recall_obj_1 = await SearchAB.ab_0()
  376. # await asyncio.sleep(3)
  377. # recall_obj_2 = await SearchAB.ab_2()
  378. # await asyncio.sleep(3)
  379. # recall_obj_3 = await SearchAB.ab_3()
  380. # print("{}---视频搜索正常".format(trace_id))
  381. # recall_list = [recall_obj_1, recall_obj_2, recall_obj_3]
  382. # un_empty_list = [i for i in recall_list if i]
  383. # if len(un_empty_list) < 3:
  384. # await asyncio.sleep(3)
  385. # recall_obj_4 = await SearchAB.ab_4()
  386. # if recall_obj_4:
  387. # un_empty_list.append(recall_obj_4)
  388. #
  389. # # 逐条下载,逐条写表
  390. # if un_empty_list:
  391. # for index, recall_obj in enumerate(un_empty_list, 1):
  392. # platform = recall_obj["platform"]
  393. # recall_video = recall_obj["result"]
  394. # if recall_video:
  395. # logging(
  396. # code="7002",
  397. # info="视频搜索成功, 搜索平台为--{}".format(platform),
  398. # trace_id=trace_id,
  399. # data=recall_video,
  400. # )
  401. # response = await video_sender(
  402. # video_obj=recall_video,
  403. # user=gh_id_dict.get(gh_id),
  404. # trace_id=trace_id,
  405. # platform=platform,
  406. # index=index
  407. # )
  408. # logging(
  409. # code="7004",
  410. # info="成功请求etl",
  411. # trace_id=trace_id,
  412. # data=response
  413. # )
  414. # else:
  415. # logging(
  416. # code="7003",
  417. # info="视频搜索失败, 被敏感词过滤",
  418. # trace_id=trace_id
  419. # )
  420. async def re_search_videos(params, trace_id, gh_id):
  421. """
  422. 重新搜索接口
  423. :param params:
  424. :param trace_id:
  425. :param gh_id:
  426. :return:
  427. """
  428. obj = {
  429. "ori_title": params['title'],
  430. "content_title": params['kimi_summary'],
  431. "content_keys": params['kimi_keys'],
  432. "trace_id": params['trace_id']
  433. }
  434. SearchAB = SearchABTest(info=obj, gh_id=gh_id)
  435. # 启三个搜索,每个搜索都保证要搜索到, 分别用key1, key2, key3去搜索
  436. recall_list = await SearchAB.ab_5()
  437. print("一共搜索到{}条视频".format(len(recall_list)))
  438. index = 0
  439. for recall_obj in recall_list:
  440. if recall_obj:
  441. platform = recall_obj['platform']
  442. recall_video = recall_obj['result']
  443. if recall_video:
  444. index += 1
  445. await video_sender(
  446. video_obj=recall_video,
  447. user=gh_id_dict.get(gh_id),
  448. trace_id=trace_id,
  449. platform=platform,
  450. index=index
  451. )
  452. logging(
  453. code="7004",
  454. info="成功请求etl",
  455. trace_id=trace_id
  456. )
  457. if index >= 3:
  458. print("already downloaded 3 videos")
  459. break
  460. print("一个匹配到{}条".format(index))