search_schedule.py 6.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190
  1. """
  2. @author: luojunhui
  3. 调用接口在微信内搜索视频
  4. """
  5. import json
  6. import time
  7. import requests
  8. from applications.functions.mq import MQ
  9. from applications.functions.log import logging
  10. from applications.static.config import gh_id_dict
  11. from applications.functions.item import VideoItem
  12. def wx_search(keys):
  13. """
  14. WeChat search
  15. :param keys:
  16. :return:
  17. """
  18. url = "http://8.217.190.241:8888/crawler/wei_xin/keyword"
  19. payload = json.dumps({
  20. "keyword": keys,
  21. "cursor": "0",
  22. "content_type": "video"
  23. })
  24. headers = {
  25. 'Content-Type': 'application/json'
  26. }
  27. response = requests.request("POST", url, headers=headers, data=payload)
  28. return response.json()
  29. def process_weixin_video_obj(video_obj, user, trace_id):
  30. """
  31. 异步处理微信 video_obj
  32. 公众号和站内账号一一对应
  33. :param trace_id:
  34. :param user:
  35. :param video_obj:
  36. :return:
  37. """
  38. ETL_MQ = MQ(topic_name="topic_crawler_etl_prod")
  39. platform = "weixin_search"
  40. publish_time_stamp = int(video_obj['pubTime'])
  41. item = VideoItem()
  42. item.add_video_info("user_id", user["uid"])
  43. item.add_video_info("user_name", user["nick_name"])
  44. item.add_video_info("video_id", video_obj['hashDocID'])
  45. item.add_video_info("video_title", trace_id)
  46. item.add_video_info("publish_time_stamp", int(publish_time_stamp))
  47. item.add_video_info("video_url", video_obj["videoUrl"])
  48. item.add_video_info("cover_url", video_obj["image"])
  49. item.add_video_info("out_video_id", video_obj['hashDocID'])
  50. item.add_video_info("out_user_id", trace_id)
  51. item.add_video_info("platform", platform)
  52. item.add_video_info("strategy", "search")
  53. item.add_video_info("session", "{}-{}".format(platform, int(time.time())))
  54. mq_obj = item.produce_item()
  55. ETL_MQ.send_msg(params=mq_obj)
  56. logging(
  57. code="6002",
  58. info="发送消息至 ETL",
  59. data=mq_obj
  60. )
  61. def return_video(video_path, title, trace_id):
  62. """
  63. search and send msg to ETL
  64. :param trace_id:
  65. :param title: 视频标题
  66. :param video_path: 视频路径
  67. :return:
  68. """
  69. with open(video_path, encoding='utf-8') as f:
  70. my_obj = json.loads(f.read())
  71. if my_obj:
  72. # 三者都搜索,优先搜索 title
  73. title_result = wx_search(keys=title)
  74. if title_result['msg'] == '未知错误':
  75. logging(
  76. code="7001",
  77. info="通过标题搜索失败---{}".format(title),
  78. trace_id=trace_id
  79. )
  80. else:
  81. obj_list = title_result['data']['data']
  82. if obj_list:
  83. return obj_list[0]
  84. # for obj in obj_list:
  85. # try:
  86. # title = obj['items'][0]['title'].replace('<em class=\"highlight\">', '').replace('</em>',
  87. # '').replace("#",
  88. # "")
  89. # if Functions().sensitive_flag(title):
  90. # return obj
  91. # else:
  92. # continue
  93. # except Exception as e:
  94. # print(e)
  95. # continue
  96. # # search_keys
  97. search_keys_result = wx_search(keys=my_obj['search_keys'][0])
  98. if search_keys_result['msg'] == '未知错误':
  99. logging(
  100. code="7001",
  101. info="通过搜索词搜索失败---{}".format(title),
  102. trace_id=trace_id
  103. )
  104. else:
  105. obj_list = search_keys_result['data']['data']
  106. if obj_list:
  107. return obj_list[0]
  108. # for obj in obj_list:
  109. # try:
  110. # title = obj['items'][0]['title'].replace('<em class=\"highlight\">', '').replace('</em>',
  111. # '').replace("#",
  112. # "")
  113. # if Functions().sensitive_flag(title):
  114. # return obj
  115. # else:
  116. # continue
  117. # except Exception as e:
  118. # print(e)
  119. # continue
  120. # theme
  121. theme_result = wx_search(keys=my_obj['theme'])
  122. if theme_result['msg'] == '未知错误':
  123. logging(
  124. code="7001",
  125. info="通过主题搜索失败---{}".format(title),
  126. trace_id=trace_id
  127. )
  128. else:
  129. obj_list = theme_result['data']['data']
  130. if obj_list:
  131. return obj_list[0]
  132. # for obj in obj_list:
  133. # try:
  134. # title = obj['items'][0]['title'].replace('<em class=\"highlight\">', '').replace('</em>',
  135. # '').replace("#",
  136. # "")
  137. # if Functions().sensitive_flag(title):
  138. # return obj
  139. # else:
  140. # continue
  141. # except Exception as e:
  142. # print(e)
  143. # continue
  144. return None
  145. else:
  146. logging(
  147. code="7000",
  148. info="标题--{}--kimi 挖掘数据失败".format(title),
  149. trace_id=trace_id
  150. )
  151. return None
  152. def search_videos(video_path, title, trace_id, gh_id):
  153. """
  154. search and send msg to ETL
  155. :param gh_id:
  156. :param video_path:
  157. :param title:
  158. :param trace_id:
  159. :return:
  160. """
  161. video_obj = return_video(video_path, title, trace_id)
  162. if video_obj:
  163. logging(
  164. code="7002",
  165. info="视频搜索成功",
  166. trace_id=trace_id,
  167. data=video_obj
  168. )
  169. process_weixin_video_obj(
  170. video_obj=video_obj['items'][0],
  171. user=gh_id_dict.get(gh_id),
  172. trace_id=trace_id
  173. )
  174. else:
  175. logging(
  176. code="7003",
  177. info="视频搜索失败",
  178. trace_id=trace_id
  179. )