xiaoniangaotuijianliu.py 6.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190
  1. import os
  2. import random
  3. import sys
  4. import time
  5. import uuid
  6. import json
  7. import cv2
  8. import requests
  9. from application.common.mysql.sql import Sql
  10. from application.common.redis.xng_redis import xng_in_video_data
  11. sys.path.append(os.getcwd())
  12. from application.items import VideoItem
  13. from application.pipeline import PiaoQuanPipeline
  14. from application.common.messageQueue import MQ
  15. from application.common.log import AliyunLogger
  16. from application.common.mysql import MysqlHelper
  17. class XNGTJLRecommend(object):
  18. """
  19. 小年糕推荐流
  20. """
  21. def __init__(self, platform, mode, rule_dict, user_list, env="prod"):
  22. self.limit_flag = False
  23. self.platform = platform
  24. self.mode = mode
  25. self.rule_dict = rule_dict
  26. self.user_list = user_list
  27. self.env = env
  28. self.download_cnt = 0
  29. self.mq = MQ(topic_name="topic_crawler_etl_" + self.env)
  30. self.expire_flag = False
  31. self.aliyun_log = AliyunLogger(mode=self.mode, platform=self.platform)
  32. self.mysql = MysqlHelper(mode=self.mode, platform=self)
  33. def get_video_duration(self, video_link: str) -> int:
  34. cap = cv2.VideoCapture(video_link)
  35. if cap.isOpened():
  36. rate = cap.get(5)
  37. frame_num = cap.get(7)
  38. duration = int(frame_num / rate)
  39. return duration
  40. return 0
  41. def get_recommend_list(self):
  42. print("小年糕推荐流开始")
  43. """
  44. 获取推荐页视频
  45. """
  46. headers = {
  47. 'Content-Type': 'application/json'
  48. }
  49. for i in range(3):
  50. url = "http://8.217.192.46:8889/crawler/xiao_nian_gao_plus/recommend"
  51. payload = json.dumps({})
  52. response = requests.request("POST", url, headers=headers, data=payload)
  53. response = response.json()
  54. if response['code'] != 0:
  55. self.aliyun_log.logging(
  56. code="3000",
  57. message="抓取单条视频失败,请求失败"
  58. ),
  59. return
  60. for index, video_obj in enumerate(response['data']['data'], 1):
  61. try:
  62. self.aliyun_log.logging(
  63. code="1001", message="扫描到一条视频", data=video_obj
  64. )
  65. self.process_video_obj(video_obj)
  66. except Exception as e:
  67. self.aliyun_log.logging(
  68. code="3000",
  69. message="抓取单条视频失败, 该视频位于第{}页第{}条报错原因是{}".format(
  70. 1, index, e
  71. ),
  72. )
  73. if self.limit_flag:
  74. return
  75. time.sleep(random.randint(5, 10))
  76. def process_video_obj(self, video_obj):
  77. """
  78. 处理视频
  79. :param video_obj:
  80. """
  81. time.sleep(random.randint(3, 8))
  82. trace_id = self.platform + str(uuid.uuid1())
  83. our_user = random.choice(self.user_list)
  84. item = VideoItem()
  85. try:
  86. mid = int(video_obj['user']['mid'])
  87. print(f"id:{mid}")
  88. user_name = video_obj['user']['nick']
  89. avatar_url = video_obj['user']['hurl']
  90. sql = Sql()
  91. max_id = sql.select_id(mid)
  92. if max_id:
  93. sql.update_name_url(mid, avatar_url, user_name)
  94. else:
  95. time.sleep(1)
  96. link = sql.select_id_status(mid)
  97. if link:
  98. sql.insert_name_url(mid, avatar_url, user_name)
  99. print(f"开始写入{mid}")
  100. xng_in_video_data(json.dumps({"mid": mid}))
  101. except Exception as e:
  102. print(f"写入异常{e}")
  103. pass
  104. url = video_obj["v_url"]
  105. duration = self.get_video_duration(url)
  106. item.add_video_info("video_id", video_obj["id"])
  107. item.add_video_info("video_title", video_obj["title"])
  108. item.add_video_info("play_cnt", int(video_obj["play_pv"]))
  109. item.add_video_info("publish_time_stamp", int(int(video_obj["t"]) / 1000))
  110. item.add_video_info("out_user_id", video_obj["id"])
  111. item.add_video_info("cover_url", video_obj["url"])
  112. item.add_video_info("like_cnt", 0)
  113. item.add_video_info("share_cnt", int(video_obj["share"]))
  114. item.add_video_info("comment_cnt", int(video_obj["comment_count"]))
  115. item.add_video_info("video_url", video_obj["v_url"])
  116. item.add_video_info("out_video_id", video_obj["id"])
  117. item.add_video_info("duration", int(duration))
  118. item.add_video_info("platform", self.platform)
  119. item.add_video_info("strategy", self.mode)
  120. item.add_video_info("session", "{}-{}".format(self.platform, int(time.time())))
  121. item.add_video_info("user_id", our_user["uid"])
  122. item.add_video_info("user_name", our_user["nick_name"])
  123. mq_obj = item.produce_item()
  124. pipeline = PiaoQuanPipeline(
  125. platform=self.platform,
  126. mode=self.mode,
  127. rule_dict=self.rule_dict,
  128. env=self.env,
  129. item=mq_obj,
  130. trace_id=trace_id,
  131. )
  132. if pipeline.process_item():
  133. self.download_cnt += 1
  134. self.mq.send_msg(mq_obj)
  135. self.aliyun_log.logging(code="1002", message="成功发送至 ETL", data=mq_obj)
  136. if self.download_cnt >= int(
  137. self.rule_dict.get("videos_cnt", {}).get("min", 200)
  138. ):
  139. self.limit_flag = True
  140. """
  141. 查询用户id是否存在
  142. """
  143. def select_id(self, uid):
  144. sql = f""" select uid from xng_uid where uid = "{uid}"; """
  145. db = MysqlHelper()
  146. repeat_video = db.select(sql=sql)
  147. if repeat_video:
  148. return True
  149. return False
  150. """
  151. 查询用户id是否之前已添加过
  152. """
  153. def select_id_status(self, uid):
  154. sql = f""" select uid from crawler_user_v3 where link = "{uid}"; """
  155. db = MysqlHelper()
  156. repeat_video = db.select(sql=sql)
  157. if repeat_video:
  158. return False
  159. return True
  160. def run(self):
  161. self.get_recommend_list()
  162. if __name__ == '__main__':
  163. J = XNGTJLRecommend(
  164. platform="xiaonianggaotuijianliu",
  165. mode="recommend",
  166. rule_dict={},
  167. user_list=[{'uid': "123456", 'nick_name': "xiaoxiao"}],
  168. )
  169. J.get_recommend_list()
  170. # J.logic()