haoyunzhufuduo.py 6.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179
  1. import os
  2. import random
  3. import sys
  4. import time
  5. import uuid
  6. import json
  7. from datetime import datetime
  8. import cv2
  9. import requests
  10. from application.common.feishu import FsData
  11. from application.common.feishu.feishu_utils import FeishuUtils
  12. from application.common.ffmpeg.ffmpeg_utils import Ffmpeg
  13. from application.common.gpt import GPT4oMini
  14. from application.common.mysql.sql import Sql
  15. from application.common.redis.xng_redis import xng_in_video_data
  16. sys.path.append(os.getcwd())
  17. from application.items import VideoItem
  18. from application.pipeline import PiaoQuanPipeline
  19. from application.common.messageQueue import MQ
  20. from application.common.log import AliyunLogger
  21. from application.common.mysql import MysqlHelper
  22. class HYZFDfRecommend(object):
  23. """
  24. 好运祝福多
  25. """
  26. def __init__(self, platform, mode, rule_dict, user_list, env="prod"):
  27. self.limit_flag = False
  28. self.platform = platform
  29. self.mode = mode
  30. self.rule_dict = rule_dict
  31. self.user_list = user_list
  32. self.env = env
  33. self.download_cnt = 0
  34. self.mq = MQ(topic_name="topic_crawler_etl_" + self.env)
  35. self.expire_flag = False
  36. self.aliyun_log = AliyunLogger(mode=self.mode, platform=self.platform)
  37. self.mysql = MysqlHelper(mode=self.mode, platform=self)
  38. def get_recommend_list(self):
  39. print("好运祝福多开始")
  40. """
  41. 获取推荐页视频
  42. """
  43. headers = {
  44. 'Content-Type': 'application/json'
  45. }
  46. url = "http://8.217.192.46:8889/crawler/hao_yun_zhu_fu_duo/recommend"
  47. data_rule = FsData()
  48. title_rule = data_rule.get_title_rule()
  49. while True:
  50. payload = json.dumps({
  51. "cursor": ""
  52. })
  53. response = requests.request("POST", url, headers=headers, data=payload)
  54. response = response.json()
  55. if response['code'] != 0:
  56. self.aliyun_log.logging(
  57. code="3000",
  58. message="抓取单条视频失败,请求失败"
  59. ),
  60. return
  61. data = response['data']['data']
  62. if len(data) == 0:
  63. return
  64. for index, video_obj in enumerate(data, 1):
  65. try:
  66. self.aliyun_log.logging(
  67. code="1001", message="扫描到一条视频", data=video_obj
  68. )
  69. self.process_video_obj(video_obj, title_rule)
  70. except Exception as e:
  71. self.aliyun_log.logging(
  72. code="3000",
  73. message="抓取单条视频失败, 该视频位于第{}页第{}条报错原因是{}".format(
  74. 1, index, e
  75. ),
  76. )
  77. if self.limit_flag:
  78. return
  79. time.sleep(random.randint(1, 5))
  80. def process_video_obj(self, video_obj, title_rule):
  81. """
  82. 处理视频
  83. :param video_obj:
  84. """
  85. time.sleep(random.randint(3, 8))
  86. trace_id = self.platform + str(uuid.uuid1())
  87. our_user = random.choice(self.user_list)
  88. item = VideoItem()
  89. item.add_video_info("video_id", video_obj["videoId"])
  90. item.add_video_info("video_title", video_obj["title"])
  91. item.add_video_info("play_cnt", 0)
  92. item.add_video_info("publish_time_stamp", int(time.time()))
  93. item.add_video_info("out_user_id", video_obj["videoId"])
  94. item.add_video_info("cover_url", video_obj["coverImagePath"])
  95. item.add_video_info("like_cnt", 0)
  96. item.add_video_info("share_cnt", 0)
  97. item.add_video_info("comment_cnt", 0)
  98. item.add_video_info("video_url", video_obj["videoPath"])
  99. item.add_video_info("out_video_id", video_obj["videoId"])
  100. item.add_video_info("platform", self.platform)
  101. item.add_video_info("strategy", self.mode)
  102. item.add_video_info("session", "{}-{}".format(self.platform, int(time.time())))
  103. item.add_video_info("user_id", our_user["uid"])
  104. item.add_video_info("user_name", our_user["nick_name"])
  105. mq_obj = item.produce_item()
  106. pipeline = PiaoQuanPipeline(
  107. platform=self.platform,
  108. mode=self.mode,
  109. rule_dict=self.rule_dict,
  110. env=self.env,
  111. item=mq_obj,
  112. trace_id=trace_id,
  113. )
  114. if pipeline.process_item():
  115. video_url = video_obj["videoPath"]
  116. cover_url = video_obj["coverImagePath"]
  117. ffmpeg = Ffmpeg()
  118. new_video_url = ffmpeg.merge_m3u8(video_url)
  119. new_cover_url = ffmpeg.webp2_jpg(cover_url)
  120. if not new_video_url or not new_cover_url:
  121. return
  122. item.add_video_info("video_url", new_video_url)
  123. item.add_video_info("cover_url", new_cover_url)
  124. title_list = title_rule.split(",")
  125. title = video_obj["title"]
  126. contains_keyword = any(keyword in title for keyword in title_list)
  127. if contains_keyword:
  128. new_title = GPT4oMini.get_ai_mini_title(title)
  129. if new_title:
  130. item.add_video_info("video_title", new_title)
  131. current_time = datetime.now()
  132. formatted_time = current_time.strftime("%Y-%m-%d %H:%M:%S")
  133. values = [
  134. [
  135. video_obj["videoPath"],
  136. video_obj["coverImagePath"],
  137. title,
  138. new_title,
  139. formatted_time,
  140. ]
  141. ]
  142. FeishuUtils.insert_columns("U5dXsSlPOhiNNCtEfgqcm1iYnpf", "V36GHT", "ROWS", 1, 2)
  143. time.sleep(0.5)
  144. FeishuUtils.update_values("U5dXsSlPOhiNNCtEfgqcm1iYnpf", "V36GHT", "A2:Z2", values)
  145. self.download_cnt += 1
  146. self.mq.send_msg(mq_obj)
  147. self.aliyun_log.logging(code="1002", message="成功发送至 ETL", data=mq_obj)
  148. if self.download_cnt >= int(
  149. self.rule_dict.get("videos_cnt", {}).get("min", 200)
  150. ):
  151. self.limit_flag = True
  152. def run(self):
  153. self.get_recommend_list()
  154. if __name__ == '__main__':
  155. J = HYZFDfRecommend(
  156. platform="haoyunzhufuduo",
  157. mode="recommend",
  158. rule_dict={},
  159. user_list=[{'uid': "123456", 'nick_name': "xiaoxiao"}],
  160. )
  161. J.get_recommend_list()
  162. # J.logic()