tiantianjufuqi.py 7.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202
  1. import os
  2. import random
  3. import sys
  4. import time
  5. import uuid
  6. import json
  7. from datetime import datetime
  8. import requests
  9. from application.common import Feishu
  10. from application.common.feishu import FsData
  11. from application.common.feishu.feishu_utils import FeishuUtils
  12. from application.common.ffmpeg.ffmpeg_utils import Ffmpeg
  13. from application.common.gpt import GPT4oMini
  14. sys.path.append(os.getcwd())
  15. from application.items import VideoItem
  16. from application.pipeline import PiaoQuanPipeline
  17. from application.common.messageQueue import MQ
  18. from application.common.proxies import tunnel_proxies
  19. from application.common.log import AliyunLogger
  20. from application.common.mysql import MysqlHelper
  21. class TTJFFQRecommend(object):
  22. """
  23. 福气矩阵-天天聚福气
  24. """
  25. def __init__(self, platform, mode, rule_dict, user_list, env="prod"):
  26. self.limit_flag = False
  27. self.platform = platform
  28. self.mode = mode
  29. self.rule_dict = rule_dict
  30. self.user_list = user_list
  31. self.env = env
  32. self.download_cnt = 0
  33. self.mq = MQ(topic_name="topic_crawler_etl_" + self.env)
  34. self.expire_flag = False
  35. self.aliyun_log = AliyunLogger(mode=self.mode, platform=self.platform)
  36. self.mysql = MysqlHelper(mode=self.mode, platform=self)
  37. def get_recommend_list(self):
  38. if self.expire_flag:
  39. self.aliyun_log.logging(
  40. code="2000",
  41. message="本轮已经抓取到足够的数据,自动退出\t{}".format(self.download_cnt),
  42. )
  43. return
  44. """
  45. 获取推荐页视频
  46. """
  47. headers = {
  48. 'Host': 'api.xinghetime.com',
  49. 'xweb_xhr': '1',
  50. 'user-agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/107.0.0.0 Safari/537.36 MicroMessenger/6.8.0(0x16080000) NetType/WIFI MiniProgramEnv/Mac MacWechat/WMPF MacWechat/3.8.6(0x13080610) XWEB/1156',
  51. 'content-type': 'application/json',
  52. 'accept': '*/*',
  53. 'sec-fetch-site': 'cross-site',
  54. 'sec-fetch-mode': 'cors',
  55. 'sec-fetch-dest': 'empty',
  56. 'referer': 'https://servicewechat.com/wxa12a841184757478/7/page-frame.html',
  57. 'accept-language': 'zh-CN,zh;q=0.9'
  58. }
  59. data_rule = FsData()
  60. title_rule = data_rule.get_title_rule()
  61. while True:
  62. time.sleep(random.randint(1, 10))
  63. url = "https://api.xinghetime.com/luckvideo/video/getRecommendVideos"
  64. payload = json.dumps({
  65. "baseParam": {
  66. "mid": "openid_oeBeO4lCgcJCey9JEJm9ZHPnOln8",
  67. "pageSource": "video-home",
  68. "appType": 2
  69. },
  70. "bizParam": {
  71. "pageSize": 10
  72. }
  73. })
  74. response = requests.request("POST", url, headers=headers, data=payload)
  75. for index, video_obj in enumerate(response.json()['data'], 1):
  76. try:
  77. self.aliyun_log.logging(
  78. code="1001", message="扫描到一条视频", data=video_obj
  79. )
  80. self.process_video_obj(video_obj,title_rule)
  81. except Exception as e:
  82. self.aliyun_log.logging(
  83. code="3000",
  84. message="抓取单条视频失败,第{}条报错原因是{}".format(
  85. index, e
  86. ),
  87. )
  88. if self.limit_flag:
  89. return
  90. time.sleep(random.randint(5, 10))
  91. def process_video_obj(self, video_obj, title_rule):
  92. """
  93. 处理视频
  94. :param video_obj:
  95. """
  96. time.sleep(random.randint(3, 8))
  97. trace_id = self.platform + str(uuid.uuid1())
  98. our_user = random.choice(self.user_list)
  99. play_str = video_obj["playCountFormat"]
  100. play_cnt = int(play_str.replace("+", "").replace("次播放", ""))
  101. item = VideoItem()
  102. item.add_video_info("video_id", video_obj["videoId"])
  103. item.add_video_info("video_title", video_obj["title"])
  104. item.add_video_info("play_cnt", play_cnt)
  105. item.add_video_info("publish_time_stamp", int(time.time()))
  106. item.add_video_info("out_user_id", video_obj["videoId"])
  107. item.add_video_info("cover_url", video_obj["coverImagePath"])
  108. item.add_video_info("like_cnt", 0)
  109. item.add_video_info("video_url", video_obj["videoPath"])
  110. item.add_video_info("out_video_id", video_obj["videoId"])
  111. item.add_video_info("platform", self.platform)
  112. item.add_video_info("strategy", self.mode)
  113. item.add_video_info("session", "{}-{}".format(self.platform, int(time.time())))
  114. item.add_video_info("user_id", our_user["uid"])
  115. item.add_video_info("user_name", our_user["nick_name"])
  116. # 获取当前时间
  117. current_time = datetime.now()
  118. formatted_time = current_time.strftime("%Y-%m-%d %H:%M:%S")
  119. values = [[
  120. video_obj["videoId"],
  121. formatted_time,
  122. video_obj["title"],
  123. video_obj["coverImagePath"],
  124. video_obj["videoPath"],
  125. play_cnt
  126. ]]
  127. Feishu.insert_columns(self.platform, 'fuxiaoshun', "MZrtFR", "ROWS", 1, 2)
  128. time.sleep(0.5)
  129. Feishu.update_values(self.platform, 'fuxiaoshun', "MZrtFR", "A2:Z2", values)
  130. mq_obj = item.produce_item()
  131. pipeline = PiaoQuanPipeline(
  132. platform=self.platform,
  133. mode=self.mode,
  134. rule_dict=self.rule_dict,
  135. env=self.env,
  136. item=mq_obj,
  137. trace_id=trace_id,
  138. )
  139. if pipeline.process_item():
  140. video_url = video_obj["videoPath"]
  141. ffmpeg = Ffmpeg()
  142. new_video_url = ffmpeg.merge_m3u8(video_url)
  143. if not new_video_url:
  144. return
  145. item.add_video_info("video_url", new_video_url)
  146. title_list = title_rule.split(",")
  147. title = video_obj["title"]
  148. contains_keyword = any(keyword in title for keyword in title_list)
  149. if contains_keyword:
  150. new_title = GPT4oMini.get_ai_mini_title(title)
  151. if new_title:
  152. item.add_video_info("video_title", new_title)
  153. current_time = datetime.now()
  154. formatted_time = current_time.strftime("%Y-%m-%d %H:%M:%S")
  155. values = [
  156. [
  157. video_obj["videoPath"],
  158. video_obj["coverImagePath"],
  159. title,
  160. new_title,
  161. formatted_time,
  162. ]
  163. ]
  164. FeishuUtils.insert_columns("U5dXsSlPOhiNNCtEfgqcm1iYnpf", "ftQdRy", "ROWS", 1, 2)
  165. time.sleep(0.5)
  166. FeishuUtils.update_values("U5dXsSlPOhiNNCtEfgqcm1iYnpf", "ftQdRy", "A2:Z2", values)
  167. self.download_cnt += 1
  168. self.mq.send_msg(mq_obj)
  169. self.aliyun_log.logging(code="1002", message="成功发送至 ETL", data=mq_obj)
  170. if self.download_cnt >= int(
  171. self.rule_dict.get("videos_cnt", {}).get("min", 200)
  172. ):
  173. self.limit_flag = True
  174. def run(self):
  175. self.get_recommend_list()
  176. if __name__ == '__main__':
  177. J = TTJFFQRecommend(
  178. platform="tiantianjufuqi",
  179. mode="recommend",
  180. rule_dict={},
  181. user_list=[{'uid': "123456", 'nick_name': "xiaoxiao"}],
  182. )
  183. J.get_recommend_list()