xiaoniangaotuijianliu.py 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300
  1. import os
  2. import random
  3. import sys
  4. import time
  5. import uuid
  6. import json
  7. from datetime import datetime
  8. import cv2
  9. import requests
  10. from application.common.feishu import FsData
  11. from application.common.feishu.feishu_utils import FeishuUtils
  12. from application.common.gpt import GPT4oMini
  13. from application.common.mysql.sql import Sql
  14. from application.common.redis.xng_redis import xng_in_video_data
  15. from application.config.config import xiaoniangao_view_api,xiaoniangao_history_api
  16. sys.path.append(os.getcwd())
  17. from application.items import VideoItem
  18. from application.pipeline import PiaoQuanPipeline
  19. from application.common.messageQueue import MQ
  20. from application.common.log import AliyunLogger
  21. from application.common.mysql import MysqlHelper
  22. video_view_count = 0
  23. video_view_lists = []
  24. def video_view(content_id, account_id):
  25. global video_view_count
  26. headers = {
  27. "Content-Type": "application/json"
  28. }
  29. payload = {
  30. "content_id": str(content_id),
  31. "account_id": str(account_id)
  32. }
  33. try:
  34. # 发送 POST 请求
  35. response = requests.post(
  36. xiaoniangao_view_api,
  37. headers=headers,
  38. json=payload # 自动将字典转换为 JSON
  39. )
  40. # 检查 HTTP 状态码
  41. if response.status_code == 200:
  42. # 解析 JSON 响应
  43. result = response.json()
  44. # 提取关键字段
  45. code = result.get("code")
  46. msg = result.get("msg")
  47. # 业务逻辑处理(示例)
  48. if code == 0:
  49. print("请求成功")
  50. video_view_count += 1
  51. video_history(content_id)
  52. else:
  53. print(f"请求失败,错误码: {code}, 消息: {msg}")
  54. else:
  55. print(f"HTTP 请求失败,状态码: {response.status_code}")
  56. except requests.exceptions.RequestException as e:
  57. print(f"请求异常: {e}")
  58. except json.JSONDecodeError:
  59. print("响应不是有效的 JSON 格式")
  60. def video_history():
  61. headers = {
  62. "Content-Type": "application/json"
  63. }
  64. payload = {
  65. "content_id": video_view_lists
  66. }
  67. try:
  68. # 发送 POST 请求
  69. response = requests.post(
  70. xiaoniangao_history_api,
  71. headers=headers,
  72. json=payload # 自动将字典转换为 JSON
  73. )
  74. # 检查 HTTP 状态码
  75. if response.status_code == 200:
  76. # 解析 JSON 响应
  77. result = response.json()
  78. # 提取关键字段
  79. code = result.get("code")
  80. msg = result.get("msg")
  81. # 业务逻辑处理(示例)
  82. if code == 0:
  83. video_view_lists.clear()
  84. print("请求成功")
  85. else:
  86. print(f"请求失败,错误码: {code}, 消息: {msg}")
  87. else:
  88. print(f"HTTP 请求失败,状态码: {response.status_code}")
  89. except requests.exceptions.RequestException as e:
  90. print(f"请求异常: {e}")
  91. except json.JSONDecodeError:
  92. print("响应不是有效的 JSON 格式")
  93. class XNGTJLRecommend(object):
  94. """
  95. 小年糕推荐流
  96. """
  97. def __init__(self, platform, mode, rule_dict, user_list, env="prod"):
  98. self.limit_flag = False
  99. self.platform = platform
  100. self.mode = mode
  101. self.rule_dict = rule_dict
  102. self.user_list = user_list
  103. self.env = env
  104. self.download_cnt = 0
  105. self.mq = MQ(topic_name="topic_crawler_etl_" + self.env)
  106. self.expire_flag = False
  107. self.aliyun_log = AliyunLogger(mode=self.mode, platform=self.platform)
  108. self.mysql = MysqlHelper(mode=self.mode, platform=self)
  109. def get_video_duration(self, video_link: str) -> int:
  110. cap = cv2.VideoCapture(video_link)
  111. if cap.isOpened():
  112. rate = cap.get(5)
  113. frame_num = cap.get(7)
  114. duration = int(frame_num / rate)
  115. return duration
  116. return 0
  117. def get_recommend_list(self):
  118. print("小年糕推荐流开始")
  119. """
  120. 获取推荐页视频
  121. """
  122. headers = {
  123. 'Content-Type': 'application/json'
  124. }
  125. data_rule = FsData()
  126. title_rule = data_rule.get_title_rule()
  127. # for i in range(3):
  128. for i in range(6):
  129. url = "http://8.217.192.46:8889/crawler/xiao_nian_gao_plus/recommend"
  130. payload = json.dumps({})
  131. response = requests.request("POST", url, headers=headers, data=payload)
  132. response = response.json()
  133. if response['code'] != 0:
  134. self.aliyun_log.logging(
  135. code="3000",
  136. message="抓取单条视频失败,请求失败"
  137. ),
  138. return
  139. for index, video_obj in enumerate(response['data']['data'], 1):
  140. try:
  141. self.aliyun_log.logging(
  142. code="1001", message="扫描到一条视频", data=video_obj
  143. )
  144. self.process_video_obj(video_obj, title_rule)
  145. except Exception as e:
  146. self.aliyun_log.logging(
  147. code="3000",
  148. message="抓取单条视频失败, 该视频位于第{}页第{}条报错原因是{}".format(
  149. 1, index, e
  150. ),
  151. )
  152. if self.limit_flag:
  153. return
  154. time.sleep(random.randint(5, 10))
  155. def process_video_obj(self, video_obj, title_rule):
  156. """
  157. 处理视频
  158. :param video_obj:
  159. """
  160. time.sleep(random.randint(3, 8))
  161. trace_id = self.platform + str(uuid.uuid1())
  162. our_user = random.choice(self.user_list)
  163. item = VideoItem()
  164. vid = video_obj['id']
  165. mid = int(video_obj['user']['mid'])
  166. print(f"vid={vid},mid={mid}")
  167. try:
  168. mid = int(video_obj['user']['mid'])
  169. print(f"id:{mid}")
  170. user_name = video_obj['user']['nick']
  171. avatar_url = video_obj['user']['hurl']
  172. sql = Sql()
  173. max_id = sql.select_id(mid)
  174. if max_id:
  175. sql.update_name_url(mid, avatar_url, user_name)
  176. else:
  177. time.sleep(1)
  178. link = sql.select_id_status(mid)
  179. if link:
  180. sql.insert_name_url(mid, avatar_url, user_name)
  181. print(f"开始写入{mid}")
  182. xng_in_video_data(json.dumps({"mid": mid}))
  183. except Exception as e:
  184. print(f"写入异常{e}")
  185. pass
  186. url = video_obj["v_url"]
  187. duration = self.get_video_duration(url)
  188. item.add_video_info("video_id", video_obj["id"])
  189. item.add_video_info("video_title", video_obj["title"])
  190. item.add_video_info("play_cnt", int(video_obj["play_pv"]))
  191. item.add_video_info("publish_time_stamp", int(int(video_obj["t"]) / 1000))
  192. item.add_video_info("out_user_id", video_obj["id"])
  193. item.add_video_info("cover_url", video_obj["url"])
  194. item.add_video_info("like_cnt", 0)
  195. item.add_video_info("share_cnt", int(video_obj["share"]))
  196. item.add_video_info("comment_cnt", int(video_obj["comment_count"]))
  197. item.add_video_info("video_url", video_obj["v_url"])
  198. item.add_video_info("out_video_id", video_obj["id"])
  199. item.add_video_info("duration", int(duration))
  200. item.add_video_info("platform", self.platform)
  201. item.add_video_info("strategy", self.mode)
  202. item.add_video_info("session", "{}-{}".format(self.platform, int(time.time())))
  203. item.add_video_info("user_id", our_user["uid"])
  204. item.add_video_info("user_name", our_user["nick_name"])
  205. mq_obj = item.produce_item()
  206. pipeline = PiaoQuanPipeline(
  207. platform=self.platform,
  208. mode=self.mode,
  209. rule_dict=self.rule_dict,
  210. env=self.env,
  211. item=mq_obj,
  212. trace_id=trace_id,
  213. )
  214. if pipeline.process_item():
  215. title_list = title_rule.split(",")
  216. title = video_obj["title"]
  217. contains_keyword = any(keyword in title for keyword in title_list)
  218. if contains_keyword:
  219. new_title = GPT4oMini.get_ai_mini_title(title)
  220. if new_title:
  221. item.add_video_info("video_title", new_title)
  222. current_time = datetime.now()
  223. formatted_time = current_time.strftime("%Y-%m-%d %H:%M:%S")
  224. values = [
  225. [
  226. video_obj["v_url"],
  227. video_obj["url"],
  228. title,
  229. new_title,
  230. formatted_time,
  231. ]
  232. ]
  233. FeishuUtils.insert_columns("U5dXsSlPOhiNNCtEfgqcm1iYnpf", "D1nVxQ", "ROWS", 1, 2)
  234. time.sleep(0.5)
  235. FeishuUtils.update_values("U5dXsSlPOhiNNCtEfgqcm1iYnpf", "D1nVxQ", "A2:Z2", values)
  236. self.download_cnt += 1
  237. self.mq.send_msg(mq_obj)
  238. video_view(vid, mid)
  239. video_view_count += 1
  240. video_view_lists.append(str(vid))
  241. if video_view_count % 4 == 0:
  242. video_history()
  243. self.aliyun_log.logging(code="1002", message="成功发送至 ETL", data=mq_obj)
  244. if self.download_cnt >= int(
  245. self.rule_dict.get("videos_cnt", {}).get("min", 200)
  246. ):
  247. video_history()
  248. self.limit_flag = True
  249. """
  250. 查询用户id是否存在
  251. """
  252. def select_id(self, uid):
  253. sql = f""" select uid from xng_uid where uid = "{uid}"; """
  254. db = MysqlHelper()
  255. repeat_video = db.select(sql=sql)
  256. if repeat_video:
  257. return True
  258. return False
  259. """
  260. 查询用户id是否之前已添加过
  261. """
  262. def select_id_status(self, uid):
  263. sql = f""" select uid from crawler_user_v3 where link = "{uid}"; """
  264. db = MysqlHelper()
  265. repeat_video = db.select(sql=sql)
  266. if repeat_video:
  267. return False
  268. return True
  269. def run(self):
  270. self.get_recommend_list()
  271. if __name__ == '__main__':
  272. J = XNGTJLRecommend(
  273. platform="xiaonianggaotuijianliu",
  274. mode="recommend",
  275. rule_dict={},
  276. user_list=[{'uid': "123456", 'nick_name': "xiaoxiao"}],
  277. )
  278. J.get_recommend_list()
  279. # J.logic()