search_schedule.py 15 KB


  1. """
  2. @author: luojunhui
  3. 调用接口在微信内搜索视频
  4. """
  5. import json
  6. import asyncio
  7. import time
  8. from applications.search import *
  9. from applications.static.config import gh_id_dict
  10. from applications.functions.log import logging
  11. from applications.functions.video_item import VideoProducer
  12. from applications.functions.async_etl import AsyncETL
  13. from applications.functions.mysql import select_sensitive_words
  14. from applications.functions.kimi import KimiServer
  15. class SearchABTest(object):
  16. """
  17. 搜索策略实验方案
  18. """
  19. ori_title = None
  20. article_summary = None
  21. article_keys = None
  22. gh_id = None
  23. trace_id = None
  24. def __init__(self, info, gh_id):
  25. SearchABTest.set_class_properties(info, gh_id)
  26. @classmethod
  27. def set_class_properties(cls, info, gh_id):
  28. """
  29. 初始化搜索策略实验类
  30. :param info: kimi 挖掘的基本信息
  31. :param gh_id: 公众号账号 id
  32. :return:
  33. """
  34. cls.ori_title = info["ori_title"]
  35. cls.article_summary = info["content_title"]
  36. cls.article_keys = info["content_keys"]
  37. cls.trace_id = info["trace_id"]
  38. cls.gh_id = gh_id
  39. @classmethod
  40. async def base_line(cls):
  41. """
  42. 兜底策略
  43. """
  44. result = await SearchMethod().search_v1(
  45. text=cls.article_keys[0],
  46. trace_id=cls.trace_id
  47. )
  48. if result:
  49. return result
  50. else:
  51. sub_result = await SearchMethod().search_v1(
  52. text=cls.article_keys[1],
  53. trace_id=cls.trace_id)
  54. if sub_result:
  55. return sub_result
  56. else:
  57. return await SearchMethod().search_v1(
  58. text=cls.article_keys[2],
  59. trace_id=cls.trace_id
  60. )
  61. @classmethod
  62. async def ab_0(cls):
  63. """
  64. 默认原标题搜索
  65. :return:
  66. """
  67. search_result = await SearchMethod().search_v1(
  68. text=cls.ori_title,
  69. trace_id=cls.trace_id
  70. )
  71. if search_result:
  72. return search_result
  73. else:
  74. return await cls.base_line()
  75. @classmethod
  76. async def ab_1(cls):
  77. """
  78. 使用 content_summary搜索
  79. :return:
  80. """
  81. search_result = await SearchMethod().search_v1(
  82. text=cls.article_summary,
  83. trace_id=cls.trace_id
  84. )
  85. if search_result:
  86. return search_result
  87. else:
  88. return await cls.ab_0()
  89. @classmethod
  90. async def ab_2(cls):
  91. """
  92. 使用文本关键词搜索
  93. :return:
  94. """
  95. search_result = await SearchMethod().search_v1(
  96. text=cls.article_keys[0],
  97. trace_id=cls.trace_id
  98. )
  99. if search_result:
  100. return search_result
  101. else:
  102. return await cls.base_line()
  103. @classmethod
  104. async def ab_3(cls):
  105. """
  106. 使用文本关键词搜索
  107. :return:
  108. """
  109. search_result = await SearchMethod().search_v1(
  110. text=cls.article_keys[1],
  111. trace_id=cls.trace_id
  112. )
  113. if search_result:
  114. return search_result
  115. else:
  116. return await cls.base_line()
  117. @classmethod
  118. async def ab_4(cls):
  119. """
  120. 使用文本关键词搜索
  121. :return:
  122. """
  123. search_result = await SearchMethod().search_v1(
  124. text=cls.article_keys[2],
  125. trace_id=cls.trace_id
  126. )
  127. if search_result:
  128. return search_result
  129. else:
  130. return await cls.base_line()
  131. @classmethod
  132. async def ab_5(cls):
  133. """
  134. 增量搜索, 返回result_list
  135. :return:
  136. """
  137. result_list = await SearchMethod().search_v2(
  138. text=cls.article_summary[:15],
  139. trace_id=cls.trace_id
  140. )
  141. if len(result_list) > 3:
  142. return result_list
  143. else:
  144. result_list += await SearchMethod().search_v2(
  145. text=cls.ori_title[:15],
  146. trace_id=cls.trace_id
  147. )
  148. if len(result_list) > 3:
  149. return result_list
  150. else:
  151. result_list += await SearchMethod().search_v2(
  152. text=cls.article_keys[0],
  153. trace_id=cls.trace_id
  154. )
  155. if len(result_list) > 3:
  156. return result_list
  157. else:
  158. result_list += await SearchMethod().search_v2(
  159. text=cls.article_keys[1],
  160. trace_id=cls.trace_id
  161. )
  162. if result_list:
  163. return result_list
  164. else:
  165. result_list += await SearchMethod().search_v2(
  166. text=cls.article_keys[2],
  167. trace_id=cls.trace_id
  168. )
  169. return result_list
  170. class SearchMethod(object):
  171. """
  172. 搜索召回模式
  173. """
  174. s_words = select_sensitive_words()
  175. @classmethod
  176. async def search_v0(cls, text, trace_id):
  177. """
  178. 搜索顺序-wx --> baidu --> xigua
  179. 一共需要返回三条视频
  180. :return:
  181. """
  182. wx_result = []
  183. if wx_result:
  184. return {"platform": "wx_search", "result": wx_result[0]}
  185. else:
  186. logging(
  187. code="7001",
  188. info="通过微信搜索失败---{}".format(text),
  189. trace_id=trace_id,
  190. )
  191. # 微信搜不到的话,采用好看视频搜索
  192. time.sleep(1)
  193. baidu_result = hksp_search(key=text, sensitive_words=cls.s_words)
  194. if baidu_result:
  195. return {"platform": "baidu_search", "result": baidu_result[0]}
  196. else:
  197. # 若好看视频未搜到,则采用西瓜搜索
  198. logging(
  199. code="7001",
  200. info="通过baidu搜索失败---{}".format(text),
  201. trace_id=trace_id,
  202. )
  203. # return None
  204. xigua_result = xigua_search_v2(keyword=text, sensitive_words=cls.s_words)
  205. if xigua_result:
  206. return {"platform": "xg_search", "result": xigua_result[0]}
  207. else:
  208. logging(
  209. code="7001",
  210. info="通过西瓜搜索失败---{}, 启用兜底方式".format(text),
  211. trace_id=trace_id,
  212. )
  213. return None
  214. @classmethod
  215. async def search_v1(cls, text, trace_id):
  216. """
  217. dy ---> baidu ---> xigua
  218. :param text:
  219. :param trace_id:
  220. :return:
  221. """
  222. douyin_result = douyin_search(keyword=text, sensitive_words=cls.s_words)
  223. if douyin_result:
  224. return {"platform": "dy_search", "result": douyin_result[0]}
  225. else:
  226. logging(
  227. code="7001",
  228. info="抖音搜索失败--{}".format(text),
  229. trace_id=trace_id
  230. )
  231. time.sleep(1)
  232. baidu_result = hksp_search(key=text, sensitive_words=cls.s_words)
  233. if baidu_result:
  234. return {"platform": "baidu_search", "result": baidu_result[0]}
  235. else:
  236. # 若好看视频未搜到,则采用西瓜搜索
  237. logging(
  238. code="7001",
  239. info="通过baidu搜索失败---{}".format(text),
  240. trace_id=trace_id,
  241. )
  242. # return None
  243. xigua_result = xigua_search_v2(keyword=text, sensitive_words=cls.s_words)
  244. if xigua_result:
  245. return {"platform": "xg_search", "result": xigua_result[0]}
  246. else:
  247. logging(
  248. code="7001",
  249. info="通过西瓜搜索失败---{}, 启用兜底方式".format(text),
  250. trace_id=trace_id,
  251. )
  252. return None
  253. @classmethod
  254. async def search_v2(cls, text, trace_id):
  255. """
  256. dy ---> baidu ---> xigua
  257. :param trace_id:
  258. :param text:
  259. :return:
  260. """
  261. L = []
  262. print(trace_id)
  263. douyin_result = douyin_search(keyword=text, sensitive_words=cls.s_words)
  264. for vid_obj in douyin_result:
  265. L.append({"platform": "dy_search", "result": vid_obj})
  266. if len(L) >= 3:
  267. return L
  268. else:
  269. baidu_result = hksp_search(key=text, sensitive_words=cls.s_words)
  270. if baidu_result:
  271. L.append({"platform": "baidu_search", "result": baidu_result[0]})
  272. xigua_result = xigua_search_v2(keyword=text, sensitive_words=cls.s_words)
  273. if xigua_result:
  274. L.append({"platform": "xg_search", "result": xigua_result[0]})
  275. return L
  276. async def video_sender(video_obj, user, trace_id, platform):
  277. """
  278. 异步处理微信 video_obj
  279. 公众号和站内账号一一对应
  280. :param platform:
  281. :param user:
  282. :param trace_id:
  283. :param video_obj:
  284. :return:
  285. """
  286. # ETL_MQ = MQ(topic_name="topic_crawler_etl_prod")
  287. Video = VideoProducer()
  288. if platform == "xg_search":
  289. mq_obj = Video.xg_video_producer(
  290. video_obj=video_obj,
  291. user=user,
  292. trace_id=trace_id,
  293. )
  294. elif platform == "baidu_search":
  295. mq_obj = Video.baidu_video_producer(
  296. video_obj=video_obj,
  297. user=user,
  298. trace_id=trace_id,
  299. )
  300. elif platform == "wx_search":
  301. mq_obj = Video.wx_video_producer(
  302. video_obj=video_obj,
  303. user=user,
  304. trace_id=trace_id,
  305. )
  306. elif platform == "dy_search":
  307. mq_obj = Video.dy_video_producer(
  308. video_obj=video_obj,
  309. user=user,
  310. trace_id=trace_id,
  311. )
  312. else:
  313. mq_obj = {}
  314. AE = AsyncETL(video_obj=mq_obj)
  315. video_id = await AE.etl_deal()
  316. logging(
  317. code="6002",
  318. info="视频下载完成, 平台是---{}".format(platform),
  319. data=mq_obj,
  320. trace_id=trace_id,
  321. )
  322. return video_id
  323. async def search_videos(params, trace_id, gh_id, mysql_client):
  324. """
  325. search and send msg to ETL
  326. :param mysql_client:
  327. :param params:
  328. :param gh_id: 通过账号 id 来控制实验策略
  329. :param trace_id:
  330. :return:
  331. """
  332. K = KimiServer()
  333. kimi_info = await K.search_kimi_schedule(params=params)
  334. print("{}---kimi 挖掘正常".format(trace_id))
  335. kimi_title = kimi_info['k_title']
  336. content_title = kimi_info['content_title'].replace("'", "").replace('"', "")
  337. content_keys = json.dumps(kimi_info['content_keys'], ensure_ascii=False)
  338. update_kimi_sql = f"""
  339. UPDATE long_articles_video SET
  340. kimi_title = '{kimi_title}',
  341. kimi_summary = '{content_title}',
  342. kimi_keys = '{content_keys}'
  343. WHERE trace_id = '{trace_id}';
  344. """
  345. await mysql_client.async_insert(update_kimi_sql)
  346. kimi_info["trace_id"] = trace_id
  347. SearchAB = SearchABTest(info=kimi_info, gh_id=gh_id)
  348. recall_obj_1 = await SearchAB.ab_1()
  349. # recall_obj_1 = await SearchAB.ab_0()
  350. await asyncio.sleep(3)
  351. recall_obj_2 = await SearchAB.ab_2()
  352. await asyncio.sleep(3)
  353. recall_obj_3 = await SearchAB.ab_3()
  354. print("{}---视频搜索正常".format(trace_id))
  355. recall_list = [recall_obj_1, recall_obj_2, recall_obj_3]
  356. un_empty_list = [i for i in recall_list if i]
  357. if len(un_empty_list) < 3:
  358. await asyncio.sleep(3)
  359. recall_obj_4 = await SearchAB.ab_4()
  360. if recall_obj_4:
  361. un_empty_list.append(recall_obj_4)
  362. # 逐条下载,逐条写表
  363. if un_empty_list:
  364. for index, recall_obj in enumerate(un_empty_list, 1):
  365. platform = recall_obj["platform"]
  366. recall_video = recall_obj["result"]
  367. if recall_video:
  368. logging(
  369. code="7002",
  370. info="视频搜索成功, 搜索平台为--{}".format(platform),
  371. trace_id=trace_id,
  372. data=recall_video,
  373. )
  374. video_id = await video_sender(
  375. video_obj=recall_video,
  376. user=gh_id_dict.get(gh_id),
  377. trace_id=trace_id,
  378. platform=platform,
  379. )
  380. update_id_sql = f"""
  381. UPDATE long_articles_video
  382. SET
  383. recall_video_id{index} = {video_id}
  384. WHERE
  385. trace_id = '{trace_id}'
  386. """
  387. await mysql_client.async_insert(update_id_sql)
  388. else:
  389. logging(
  390. code="7003",
  391. info="视频搜索失败, 被敏感词过滤",
  392. trace_id=trace_id
  393. )
  394. async def insert_into_mysql(index, mysql_client, recall_video, gh_id, trace_id, platform):
  395. """
  396. :param platform:
  397. :param trace_id:
  398. :param gh_id:
  399. :param index:
  400. :param mysql_client:
  401. :param recall_video:
  402. """
  403. video_id = await video_sender(
  404. video_obj=recall_video,
  405. user=gh_id_dict.get(gh_id),
  406. trace_id=trace_id,
  407. platform=platform,
  408. )
  409. update_id_sql = f"""
  410. UPDATE long_articles_video
  411. SET
  412. recall_video_id{index} = {video_id}
  413. WHERE
  414. trace_id = '{trace_id}'
  415. """
  416. await mysql_client.async_insert(update_id_sql)
  417. async def re_search_videos(params, trace_id, gh_id, mysql_client):
  418. """
  419. 重新搜索接口
  420. :param params:
  421. :param trace_id:
  422. :param gh_id:
  423. :param mysql_client:
  424. :return:
  425. """
  426. obj = {
  427. "ori_title": params['title'],
  428. "content_title": params['kimi_summary'],
  429. "content_keys": params['kimi_keys'],
  430. "trace_id": params['trace_id']
  431. }
  432. SearchAB = SearchABTest(info=obj, gh_id=gh_id)
  433. # 启三个搜索,每个搜索都保证要搜索到, 分别用key1, key2, key3去搜索
  434. recall_list = await SearchAB.ab_5()
  435. print("一共搜索到{}条视频".format(len(recall_list)))
  436. index = 0
  437. for recall_obj in recall_list:
  438. if recall_obj:
  439. platform = recall_obj['platform']
  440. recall_video = recall_obj['result']
  441. if recall_video:
  442. index += 1
  443. await insert_into_mysql(
  444. index=index,
  445. mysql_client=mysql_client,
  446. recall_video=recall_video,
  447. gh_id=gh_id,
  448. trace_id=trace_id,
  449. platform=platform
  450. )
  451. if index >= 3:
  452. print("already downloaded 3 videos")
  453. break
  454. print("一个匹配到{}条文章".format(index))