search.py 5.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177
  1. """
  2. @author: luojunhui
  3. 调用接口在微信内搜索视频
  4. """
  5. import json
  6. import time
  7. import requests
  8. from applications.mq import MQ
  9. from applications.log import logging
  10. from applications.config import gh_id_dict
  11. from applications.functions.item import VideoItem
  12. from applications.functions.common import sensitive_flag
  13. def wx_search(keys):
  14. """
  15. WeChat search
  16. :param keys:
  17. :return:
  18. """
  19. url = "http://8.217.190.241:8888/crawler/wei_xin/keyword"
  20. payload = json.dumps({
  21. "keyword": keys,
  22. "cursor": "0",
  23. "content_type": "video"
  24. })
  25. headers = {
  26. 'Content-Type': 'application/json'
  27. }
  28. response = requests.request("POST", url, headers=headers, data=payload)
  29. return response.json()
  30. def process_weixin_video_obj(video_obj, user, trace_id, title):
  31. """
  32. 异步处理微信 video_obj
  33. 公众号和站内账号一一对应
  34. :param title:
  35. :param trace_id:
  36. :param user:
  37. :param video_obj:
  38. :return:
  39. """
  40. ETL_MQ = MQ(topic_name="topic_crawler_etl_prod")
  41. platform = "weixin_search"
  42. publish_time_stamp = int(video_obj['pubTime'])
  43. item = VideoItem()
  44. item.add_video_info("user_id", user["uid"])
  45. item.add_video_info("user_name", user["nick_name"])
  46. item.add_video_info("video_id", video_obj['hashDocID'])
  47. item.add_video_info("video_title", title)
  48. item.add_video_info("publish_time_stamp", int(publish_time_stamp))
  49. item.add_video_info("video_url", video_obj["videoUrl"])
  50. item.add_video_info("cover_url", video_obj["image"])
  51. item.add_video_info("out_video_id", video_obj['hashDocID'])
  52. item.add_video_info("out_user_id", trace_id)
  53. item.add_video_info("platform", platform)
  54. item.add_video_info("strategy", "search")
  55. item.add_video_info("session", "{}-{}".format(platform, int(time.time())))
  56. mq_obj = item.produce_item()
  57. ETL_MQ.send_msg(params=mq_obj)
  58. logging(
  59. code="6002",
  60. info="发送消息至 ETL",
  61. data=mq_obj
  62. )
  63. def return_video(video_path, title, trace_id):
  64. """
  65. search and send msg to ETL
  66. :param trace_id:
  67. :param title: 视频标题
  68. :param video_path: 视频路径
  69. :return:
  70. """
  71. with open(video_path, encoding='utf-8') as f:
  72. my_obj = json.loads(f.read())
  73. if my_obj:
  74. # 三者都搜索,优先搜索 title
  75. title_result = wx_search(keys=title)
  76. if title_result['msg'] == '未知错误':
  77. logging(
  78. code="7001",
  79. info="通过标题搜索失败---{}".format(title),
  80. trace_id=trace_id
  81. )
  82. else:
  83. obj_list = title_result['data']['data']
  84. for obj in obj_list:
  85. title = obj['items'][0]['title'].replace('<em class=\"highlight\">', '').replace('</em>',
  86. '').replace("#",
  87. "")
  88. if sensitive_flag(title):
  89. return obj
  90. else:
  91. continue
  92. # search_keys
  93. search_keys_result = wx_search(keys=my_obj['search_keys'][0])
  94. if search_keys_result['msg'] == '未知错误':
  95. logging(
  96. code="7001",
  97. info="通过搜索词搜索失败---{}".format(title),
  98. trace_id=trace_id
  99. )
  100. else:
  101. obj_list = search_keys_result['data']['data']
  102. for obj in obj_list:
  103. title = obj['items'][0]['title'].replace('<em class=\"highlight\">', '').replace('</em>',
  104. '').replace("#",
  105. "")
  106. if sensitive_flag(title):
  107. return obj
  108. else:
  109. continue
  110. # theme
  111. theme_result = wx_search(keys=my_obj['theme'])
  112. if theme_result['msg'] == '未知错误':
  113. logging(
  114. code="7001",
  115. info="通过主题搜索失败---{}".format(title),
  116. trace_id=trace_id
  117. )
  118. else:
  119. obj_list = theme_result['data']['data']
  120. for obj in obj_list:
  121. title = obj['items'][0]['title'].replace('<em class=\"highlight\">', '').replace('</em>',
  122. '').replace("#",
  123. "")
  124. if sensitive_flag(title):
  125. return obj
  126. else:
  127. continue
  128. return None
  129. else:
  130. logging(
  131. code="7000",
  132. info="标题--{}--kimi 挖掘数据失败".format(title),
  133. trace_id=trace_id
  134. )
  135. return None
  136. def search_videos(video_path, title, trace_id, gh_id):
  137. """
  138. search and send msg to ETL
  139. :param gh_id:
  140. :param video_path:
  141. :param title:
  142. :param trace_id:
  143. :return:
  144. """
  145. video_obj = return_video(video_path, title, trace_id)
  146. if video_obj:
  147. logging(
  148. code="7002",
  149. info="视频搜索成功",
  150. trace_id=trace_id,
  151. data=video_obj
  152. )
  153. title = video_obj['items'][0]['title'].replace('<em class=\"highlight\">', '').replace('</em>', '').replace("#",
  154. "")
  155. process_weixin_video_obj(
  156. video_obj=video_obj['items'][0],
  157. user=gh_id_dict.get(gh_id),
  158. trace_id=trace_id,
  159. title=title
  160. )
  161. else:
  162. logging(
  163. code="7003",
  164. info="视频搜索失败",
  165. trace_id=trace_id
  166. )