benshanzhufu.py 6.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198
  1. import os
  2. import random
  3. import sys
  4. import time
  5. import uuid
  6. import json
  7. from datetime import datetime
  8. import cv2
  9. import requests
  10. from application.common import Feishu
  11. from application.common.feishu import FsData
  12. from application.common.feishu.feishu_utils import FeishuUtils
  13. from application.common.gpt import GPT4oMini
  14. sys.path.append(os.getcwd())
  15. from application.items import VideoItem
  16. from application.pipeline import PiaoQuanPipeline
  17. from application.common.messageQueue import MQ
  18. from application.common.log import AliyunLogger
  19. from application.common.mysql import MysqlHelper
  20. class BSZHRecommend(object):
  21. """
  22. 本山祝福推荐流
  23. """
  24. def __init__(self, platform, mode, rule_dict, user_list, env="prod"):
  25. self.limit_flag = False
  26. self.platform = platform
  27. self.mode = mode
  28. self.rule_dict = rule_dict
  29. self.user_list = user_list
  30. self.env = env
  31. self.download_cnt = 0
  32. self.mq = MQ(topic_name="topic_crawler_etl_" + self.env)
  33. self.expire_flag = False
  34. self.aliyun_log = AliyunLogger(mode=self.mode, platform=self.platform)
  35. self.mysql = MysqlHelper(mode=self.mode, platform=self)
  36. def get_video_duration(self, video_link: str) -> int:
  37. cap = cv2.VideoCapture(video_link)
  38. if cap.isOpened():
  39. rate = cap.get(5)
  40. frame_num = cap.get(7)
  41. duration = int(frame_num / rate)
  42. return duration
  43. return 0
  44. def get_recommend_list(self):
  45. print("本山祝福开始")
  46. """
  47. 获取推荐页视频
  48. """
  49. url = "http://8.217.192.46:8889/crawler/ben_shan_zhu_fu/recommend"
  50. next_cursor = 1
  51. data_rule = FsData()
  52. title_rule = data_rule.get_title_rule()
  53. for i in range(1, 200):
  54. payload = json.dumps({
  55. "cursor": f"{next_cursor}"
  56. })
  57. headers = {
  58. 'Content-Type': 'application/json'
  59. }
  60. for j in range(3):
  61. response = requests.request("POST", url, headers=headers, data=payload)
  62. response = response.json()
  63. if response['code'] != 0:
  64. time.sleep(2)
  65. continue
  66. else:
  67. break
  68. if response['code'] != 0:
  69. self.aliyun_log.logging(
  70. code="3000",
  71. message="抓取单条视频失败,请求失败"
  72. ),
  73. return
  74. for index, video_obj in enumerate(response['data']['data'], i):
  75. try:
  76. self.aliyun_log.logging(
  77. code="1001", message="扫描到一条视频", data=video_obj
  78. )
  79. next_cursor = response['data']['next_cursor']
  80. self.process_video_obj(video_obj, title_rule)
  81. except Exception as e:
  82. self.aliyun_log.logging(
  83. code="3000",
  84. message="抓取单条视频失败, 该视频位于第{}页第{}条报错原因是{}".format(
  85. i, index, e
  86. ),
  87. )
  88. if self.limit_flag:
  89. return
  90. time.sleep(random.randint(5, 10))
  91. def process_video_obj(self, video_obj, title_rule):
  92. """
  93. 处理视频
  94. :param video_obj:
  95. """
  96. time.sleep(random.randint(3, 8))
  97. trace_id = self.platform + str(uuid.uuid1())
  98. our_user = random.choice(self.user_list)
  99. item = VideoItem()
  100. # id = uuid.uuid4()
  101. item.add_video_info("video_id", video_obj["nid"])
  102. item.add_video_info("video_title", video_obj["title"])
  103. item.add_video_info("play_cnt", 0)
  104. item.add_video_info("publish_time_stamp", int(video_obj["update_time"]))
  105. item.add_video_info("out_user_id", video_obj["nid"])
  106. item.add_video_info("cover_url", video_obj["video_cover"])
  107. item.add_video_info("like_cnt", 0)
  108. item.add_video_info("video_url", video_obj["video_url"])
  109. item.add_video_info("out_video_id", video_obj["nid"])
  110. item.add_video_info("platform", self.platform)
  111. item.add_video_info("strategy", self.mode)
  112. item.add_video_info("session", "{}-{}".format(self.platform, int(time.time())))
  113. item.add_video_info("user_id", our_user["uid"])
  114. item.add_video_info("user_name", our_user["nick_name"])
  115. mq_obj = item.produce_item()
  116. pipeline = PiaoQuanPipeline(
  117. platform=self.platform,
  118. mode=self.mode,
  119. rule_dict=self.rule_dict,
  120. env=self.env,
  121. item=mq_obj,
  122. trace_id=trace_id,
  123. )
  124. if pipeline.process_item():
  125. title_list = title_rule.split(",")
  126. title = video_obj["title"]
  127. contains_keyword = any(keyword in title for keyword in title_list)
  128. if contains_keyword:
  129. new_title = GPT4oMini.get_ai_mini_title(title)
  130. if new_title:
  131. item.add_video_info("video_title", new_title)
  132. current_time = datetime.now()
  133. formatted_time = current_time.strftime("%Y-%m-%d %H:%M:%S")
  134. values = [
  135. [
  136. video_obj["video_url"],
  137. video_obj["video_cover"],
  138. title,
  139. new_title,
  140. formatted_time,
  141. ]
  142. ]
  143. FeishuUtils.insert_columns("U5dXsSlPOhiNNCtEfgqcm1iYnpf", "aTSJH4", "ROWS", 1, 2)
  144. time.sleep(0.5)
  145. FeishuUtils.update_values("U5dXsSlPOhiNNCtEfgqcm1iYnpf", "aTSJH4", "A2:Z2", values)
  146. self.download_cnt += 1
  147. self.mq.send_msg(mq_obj)
  148. self.aliyun_log.logging(code="1002", message="成功发送至 ETL", data=mq_obj)
  149. if self.download_cnt >= int(
  150. self.rule_dict.get("videos_cnt", {}).get("min", 200)
  151. ):
  152. self.limit_flag = True
  153. def run(self):
  154. self.get_recommend_list()
  155. if __name__ == '__main__':
  156. J = BSZHRecommend(
  157. platform="benshanzhufu",
  158. mode="recommend",
  159. rule_dict={},
  160. user_list=[{
  161. "createTime": 1684311893899,
  162. "id": 8,
  163. "interval": 7200,
  164. "machine": "aliyun",
  165. "mode": "recommend",
  166. "operator": "王雪珂",
  167. "rule": "[{\"like_cnt\":{\"min\":0,\"max\":0}}]",
  168. "source": "benshanzhufu",
  169. "spiderName": "run_bszf_recommend",
  170. "startTime": 1730452800000,
  171. "status": 0,
  172. "taskName": "本山祝福",
  173. "updateTime": 1730452617817
  174. }],
  175. )
  176. J.get_recommend_list()
  177. # J.logic()