video_item.py 8.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247
  1. """
  2. @author: luojunhui
  3. """
  4. import time
  5. from applications.functions.mq import MQ
  6. from applications.functions.log import logging
  7. from applications.functions.common import Functions
  8. from applications.functions.async_etl import AsyncETL
  9. class VideoItem(object):
  10. """
  11. function: 当扫描进一条视频的时候,对该视频的基本信息进行处理,保证发送给 pipeline和 etl 的 video_dict 是正确的
  12. __init__: 初始化空json 对象,用来存储视频信息
  13. add_video_info: 把视频信息存储到 item 对象中
  14. check_item: 检查 item 对象中的各个元素以及处理
  15. """
  16. def __init__(self):
  17. self.item = {}
  18. def add_video_info(self, key, value):
  19. """
  20. insert or update video info
  21. :param key:
  22. :param value:
  23. """
  24. self.item[key] = value
  25. def check_item(self):
  26. """
  27. 判断item 里面的字段,是否符合要求
  28. 字段分为 3 类:
  29. 1. 必须存在数据的字段: ["video_id", "user_id", "user_name", "out_user_id", "out_video_id", "session", "video_url", "cover_url", "platform", "strategy"]
  30. 2. 不存在默认为 0 的字段 :["duration", "play_cnt", "like_cnt", "comment_cnt", "share_cnt", "width", "height"]
  31. 3. 需要后出理的字段: video_title, publish_time
  32. """
  33. if self.item.get("video_title"):
  34. self.item["video_title"] = Functions().clean_title(self.item["video_title"])
  35. else:
  36. return False
  37. if self.item.get("publish_time_stamp"):
  38. publish_time_str = time.strftime(
  39. "%Y-%m-%d %H:%M:%S", time.localtime(self.item["publish_time_stamp"])
  40. )
  41. self.add_video_info("publish_time_str", publish_time_str)
  42. else:
  43. publish_time_stamp = int(time.time())
  44. publish_time_str = time.strftime(
  45. "%Y-%m-%d %H:%M:%S", time.localtime(publish_time_stamp)
  46. )
  47. self.add_video_info("publish_time_stamp", publish_time_stamp)
  48. self.add_video_info("publish_time_str", publish_time_str)
  49. self.add_video_info("publish_time", publish_time_str)
  50. if not self.item.get("update_time_stamp"):
  51. self.add_video_info("update_time_stamp", int(time.time()))
  52. # 如果不存在,默认值为 0
  53. config_keys = [
  54. "duration",
  55. "play_cnt",
  56. "like_cnt",
  57. "comment_cnt",
  58. "share_cnt",
  59. "width",
  60. "height",
  61. ]
  62. for config_key in config_keys:
  63. if self.item.get(config_key):
  64. continue
  65. else:
  66. self.add_video_info(config_key, 0)
  67. # 必须存在的元素,若不存在则会报错
  68. must_keys = [
  69. "video_id",
  70. "user_id",
  71. "user_name",
  72. "out_video_id",
  73. "session",
  74. "video_url",
  75. "cover_url",
  76. "platform",
  77. "strategy",
  78. ]
  79. """
  80. video_id, out_video_id 均为站外视频 id
  81. usr_id: 站内用户 id
  82. out_user_id: 站外用户 id
  83. user_name: 站外用户名称
  84. """
  85. for m_key in must_keys:
  86. if self.item.get(m_key):
  87. continue
  88. else:
  89. # print(m_key)
  90. return False
  91. return True
  92. def produce_item(self):
  93. """
  94. item producer
  95. :return:
  96. """
  97. flag = self.check_item()
  98. if flag:
  99. return self.item
  100. else:
  101. return False
  102. class VideoProducer(object):
  103. """
  104. 处理视频
  105. """
  106. @classmethod
  107. def wx_video_producer(cls, video_obj, user, trace_id):
  108. """
  109. 异步处理微信 video_obj
  110. 公众号和站内账号一一对应
  111. :param trace_id:
  112. :param user:
  113. :param video_obj:
  114. :return:
  115. """
  116. platform = "weixin_search"
  117. publish_time_stamp = int(video_obj['pubTime'])
  118. item = VideoItem()
  119. item.add_video_info("user_id", user["uid"])
  120. item.add_video_info("user_name", user["nick_name"])
  121. item.add_video_info("video_id", video_obj['hashDocID'])
  122. item.add_video_info("video_title", trace_id)
  123. item.add_video_info("publish_time_stamp", int(publish_time_stamp))
  124. item.add_video_info("video_url", video_obj["videoUrl"])
  125. item.add_video_info("cover_url", video_obj["image"])
  126. item.add_video_info("out_video_id", video_obj['hashDocID'])
  127. item.add_video_info("out_user_id", trace_id)
  128. item.add_video_info("platform", platform)
  129. item.add_video_info("strategy", "search")
  130. item.add_video_info("session", "{}-{}".format(platform, int(time.time())))
  131. mq_obj = item.produce_item()
  132. return mq_obj
  133. @classmethod
  134. def baidu_video_producer(cls, video_obj, user, trace_id):
  135. """
  136. 处理好看视频的 video_info
  137. :param video_obj:
  138. :param user:
  139. :param trace_id:
  140. :return:
  141. """
  142. platform = "baidu_search"
  143. publish_time_stamp = int(video_obj['publish_time'])
  144. item = VideoItem()
  145. item.add_video_info("user_id", user["uid"])
  146. item.add_video_info("user_name", user["nick_name"])
  147. item.add_video_info("video_id", video_obj['id'])
  148. item.add_video_info("video_title", trace_id)
  149. item.add_video_info("publish_time_stamp", publish_time_stamp)
  150. item.add_video_info("video_url", video_obj["playurl"])
  151. item.add_video_info("cover_url", video_obj["poster"])
  152. item.add_video_info("out_video_id", video_obj['id'])
  153. item.add_video_info("out_user_id", trace_id)
  154. item.add_video_info("like_cnt", video_obj['like'] if video_obj.get('like') else 0)
  155. item.add_video_info("play_cnt", video_obj['playcnt'])
  156. item.add_video_info("duration", video_obj['duration'])
  157. item.add_video_info("platform", platform)
  158. item.add_video_info("strategy", "search")
  159. item.add_video_info("session", "{}-{}".format(platform, int(time.time())))
  160. mq_obj = item.produce_item()
  161. return mq_obj
  162. @classmethod
  163. def xg_video_producer(cls, video_obj, user, trace_id):
  164. """
  165. 西瓜搜索
  166. :param video_obj:
  167. :param user:
  168. :param trace_id:
  169. :return:
  170. """
  171. platform = "xg_search"
  172. publish_time_stamp = int(video_obj['publish_time'])
  173. item = VideoItem()
  174. item.add_video_info("user_id", user["uid"])
  175. item.add_video_info("user_name", user["nick_name"])
  176. item.add_video_info("video_id", video_obj['video_id'])
  177. item.add_video_info("video_title", trace_id)
  178. item.add_video_info("publish_time_stamp", int(publish_time_stamp))
  179. item.add_video_info("video_url", video_obj["video_url"])
  180. item.add_video_info("cover_url", video_obj["cover_url"])
  181. item.add_video_info("out_video_id", video_obj['video_id'])
  182. item.add_video_info("play_cnt", video_obj['play_cnt'])
  183. item.add_video_info("duration", video_obj['duration'])
  184. item.add_video_info("like_cnt", video_obj['like_cnt'])
  185. item.add_video_info("out_user_id", trace_id)
  186. item.add_video_info("platform", platform)
  187. item.add_video_info("strategy", "search")
  188. item.add_video_info("session", "{}-{}".format(platform, int(time.time())))
  189. mq_obj = item.produce_item()
  190. return mq_obj
  191. async def video_mq_sender(video_obj, user, trace_id, platform):
  192. """
  193. 异步处理微信 video_obj
  194. 公众号和站内账号一一对应
  195. :param platform:
  196. :param user:
  197. :param trace_id:
  198. :param video_obj:
  199. :return:
  200. """
  201. # ETL_MQ = MQ(topic_name="topic_crawler_etl_prod")
  202. Video = VideoProducer()
  203. if platform == "xg_search":
  204. mq_obj = Video.xg_video_producer(
  205. video_obj=video_obj,
  206. user=user,
  207. trace_id=trace_id,
  208. )
  209. elif platform == "baidu_search":
  210. mq_obj = Video.baidu_video_producer(
  211. video_obj=video_obj,
  212. user=user,
  213. trace_id=trace_id,
  214. )
  215. elif platform == "wx_search":
  216. mq_obj = Video.wx_video_producer(
  217. video_obj=video_obj,
  218. user=user,
  219. trace_id=trace_id,
  220. )
  221. else:
  222. mq_obj = {}
  223. AE = AsyncETL(video_obj=mq_obj)
  224. video_id = await AE.etl_deal()
  225. # ETL_MQ.send_msg(params=mq_obj)
  226. logging(
  227. code="6002",
  228. info="视频下载完成",
  229. data=mq_obj,
  230. trace_id=trace_id
  231. )
  232. return video_id