zhufuquanzituijianliu.py 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340
  1. import os
  2. import random
  3. import sys
  4. import time
  5. import uuid
  6. import json
  7. from datetime import datetime
  8. import cv2
  9. import requests
  10. from application.common.feishu import FsData
  11. from application.common.feishu.feishu_utils import FeishuUtils
  12. from application.common.gpt import GPT4oMini
  13. from application.common.mysql.sql import Sql
  14. from typing import Dict, Any
  15. from application.common.redis.xng_redis import xng_in_video_data
  16. from application.config.config import zhufuquanzi_view_api,zhufuquanzi_history_api,zhufuquanzi_log_upload_api
  17. sys.path.append(os.getcwd())
  18. from application.items import VideoItem
  19. from application.pipeline import PiaoQuanPipeline
  20. from application.common.messageQueue import MQ
  21. from application.common.log import AliyunLogger
  22. from application.common.mysql import MysqlHelper
  23. def video_view(content_id, account_id):
  24. headers = {
  25. "Content-Type": "application/json"
  26. }
  27. payload = {
  28. "content_id": str(content_id),
  29. "account_id": str(account_id)
  30. }
  31. try:
  32. # 发送 POST 请求
  33. response = requests.post(
  34. zhufuquanzi_view_api,
  35. headers=headers,
  36. json=payload # 自动将字典转换为 JSON
  37. )
  38. # 检查 HTTP 状态码
  39. if response.status_code == 200:
  40. # 解析 JSON 响应
  41. result = response.json()
  42. # 提取关键字段
  43. code = result.get("code")
  44. msg = result.get("msg")
  45. # 业务逻辑处理(示例)
  46. if code == 0:
  47. print("请求成功")
  48. else:
  49. print(f"{zhufuquanzi_view_api}请求失败,错误码: {code}, 消息: {msg}")
  50. else:
  51. print(f"{zhufuquanzi_view_api}HTTP 请求失败,状态码: {response.status_code}")
  52. except requests.exceptions.RequestException as e:
  53. print(f"{zhufuquanzi_view_api}请求异常: {e}")
  54. except json.JSONDecodeError:
  55. print(f"{zhufuquanzi_view_api}响应不是有效的 JSON 格式")
  56. def video_history(video_view_lists):
  57. headers = {
  58. "Content-Type": "application/json"
  59. }
  60. payload = {
  61. "content_ids": video_view_lists
  62. }
  63. try:
  64. # 发送 POST 请求
  65. response = requests.post(
  66. zhufuquanzi_history_api,
  67. headers=headers,
  68. json=payload # 自动将字典转换为 JSON
  69. )
  70. # 检查 HTTP 状态码
  71. if response.status_code == 200:
  72. # 解析 JSON 响应
  73. result = response.json()
  74. # 提取关键字段
  75. code = result.get("code")
  76. msg = result.get("msg")
  77. # 业务逻辑处理(示例)
  78. if code == 0:
  79. print("请求成功")
  80. else:
  81. print(f"{zhufuquanzi_history_api}请求失败,错误码: {code}, 消息: {msg}")
  82. else:
  83. print(f"{zhufuquanzi_history_api}HTTP 请求失败,状态码: {response.status_code}")
  84. except requests.exceptions.RequestException as e:
  85. print(f"{zhufuquanzi_history_api}请求异常: {e}")
  86. except json.JSONDecodeError:
  87. print(f"{zhufuquanzi_history_api}响应不是有效的 JSON 格式")
  88. def log_upload(video_objs):
  89. headers = {
  90. "Content-Type": "application/json"
  91. }
  92. payload = {
  93. "e": video_objs
  94. }
  95. try:
  96. # 发送 POST 请求
  97. response = requests.post(
  98. zhufuquanzi_log_upload_api,
  99. headers=headers,
  100. json=payload # 自动将字典转换为 JSON
  101. )
  102. # 检查 HTTP 状态码
  103. if response.status_code == 200:
  104. # 解析 JSON 响应
  105. result = response.json()
  106. # 提取关键字段
  107. code = result.get("code")
  108. msg = result.get("msg")
  109. # 业务逻辑处理(示例)
  110. if code == 0:
  111. print(f"{zhufuquanzi_log_upload_api}请求成功")
  112. else:
  113. print(f"{zhufuquanzi_log_upload_api}请求失败,错误码: {code}, 消息: {msg}")
  114. else:
  115. print(f"{zhufuquanzi_log_upload_api}HTTP 请求失败,状态码: {response.status_code}")
  116. except requests.exceptions.RequestException as e:
  117. print(f"{zhufuquanzi_log_upload_api}请求异常: {e}")
  118. except json.JSONDecodeError:
  119. print(f"{zhufuquanzi_log_upload_api}响应不是有效的 JSON 格式")
  120. class ZFQZTJLRecommend(object):
  121. """
  122. 祝福圈子推荐流
  123. """
  124. def __init__(self, platform, mode, rule_dict, user_list, env="prod"):
  125. self.limit_flag = False
  126. self.platform = platform
  127. self.mode = mode
  128. self.rule_dict = rule_dict
  129. self.user_list = user_list
  130. self.env = env
  131. self.download_cnt = 0
  132. self.mq = MQ(topic_name="topic_crawler_etl_" + self.env)
  133. self.expire_flag = False
  134. self.aliyun_log = AliyunLogger(mode=self.mode, platform=self.platform)
  135. self.mysql = MysqlHelper(mode=self.mode, platform=self)
  136. def get_video_duration(self, video_link: str) -> int:
  137. cap = cv2.VideoCapture(video_link)
  138. if cap.isOpened():
  139. rate = cap.get(5)
  140. frame_num = cap.get(7)
  141. duration = int(frame_num / rate)
  142. return duration
  143. return 0
  144. def get_recommend_list(self):
  145. print("祝福圈子推荐流开始")
  146. """
  147. 获取推荐页视频
  148. """
  149. headers = {
  150. 'Content-Type': 'application/json'
  151. }
  152. data_rule = FsData()
  153. title_rule = data_rule.get_title_rule()
  154. for i in range(4):
  155. # 目前90%视频会被过滤掉,改为频率和策略
  156. # for i in range(100):
  157. url = "http://8.217.192.46:8889/crawler/zhu_fu_quan_zi/recommend"
  158. payload = json.dumps({})
  159. response = requests.request("POST", url, headers=headers, data=payload)
  160. response = response.json()
  161. if response['code'] != 0:
  162. self.aliyun_log.logging(
  163. code="3000",
  164. message="抓取单条视频失败,请求失败"
  165. ),
  166. return
  167. video_view_lists = []
  168. video_log_uploads = []
  169. for index, video_obj in enumerate(response['data']['data'], 1):
  170. try:
  171. self.aliyun_log.logging(
  172. code="1001", message="扫描到一条视频", data=video_obj
  173. )
  174. vid = video_obj['id']
  175. video_view_lists.append(str(vid))
  176. video_log_uploads.append(self.build_video_log(video_obj,index))
  177. self.process_video_obj(video_obj,title_rule)
  178. except Exception as e:
  179. self.aliyun_log.logging(
  180. code="3000",
  181. message="抓取单条视频失败, 该视频位于第{}页第{}条报错原因是{}".format(
  182. 1, index, e
  183. ),
  184. )
  185. if self.limit_flag:
  186. return
  187. time.sleep(random.randint(5, 10))
  188. # time.sleep(random.randint(5, 30))
  189. video_history(video_view_lists)
  190. log_upload(video_log_uploads)
  191. def process_video_obj(self, video_obj, title_rule):
  192. """
  193. 处理视频
  194. :param video_obj:
  195. """
  196. time.sleep(random.randint(3, 8))
  197. trace_id = self.platform + str(uuid.uuid1())
  198. our_user = random.choice(self.user_list)
  199. item = VideoItem()
  200. vid = video_obj['id']
  201. mid = int(video_obj['user']['mid'])
  202. print(f"vid={vid},mid={mid}")
  203. try:
  204. user_name = video_obj['user']['nick']
  205. avatar_url = video_obj['user']['hurl']
  206. sql = Sql()
  207. max_id = sql.select_id(mid)
  208. if max_id:
  209. sql.update_name_url(mid, avatar_url, user_name)
  210. else:
  211. time.sleep(1)
  212. link = sql.select_id_status(mid)
  213. if link:
  214. sql.insert_name_url(mid, avatar_url, user_name)
  215. print(f"开始写入{mid}")
  216. xng_in_video_data(json.dumps({"mid": mid}))
  217. except Exception as e:
  218. print(f"写入异常{e}")
  219. pass
  220. url = video_obj["v_url"]
  221. duration = self.get_video_duration(url)
  222. item.add_video_info("video_id", video_obj["id"])
  223. item.add_video_info("video_title", video_obj["title"])
  224. item.add_video_info("play_cnt", int(video_obj["play_pv"]))
  225. item.add_video_info("publish_time_stamp", int(int(video_obj["t"])/1000))
  226. item.add_video_info("out_user_id", video_obj["id"])
  227. item.add_video_info("cover_url", video_obj["url"])
  228. item.add_video_info("like_cnt", 0)
  229. item.add_video_info("share_cnt", int(video_obj["share"]))
  230. item.add_video_info("comment_cnt", int(video_obj["comment_count"]))
  231. item.add_video_info("video_url", video_obj["v_url"])
  232. item.add_video_info("out_video_id", video_obj["id"])
  233. item.add_video_info("duration", int(duration))
  234. item.add_video_info("platform", self.platform)
  235. item.add_video_info("strategy", self.mode)
  236. item.add_video_info("session", "{}-{}".format(self.platform, int(time.time())))
  237. item.add_video_info("user_id", our_user["uid"])
  238. item.add_video_info("user_name", our_user["nick_name"])
  239. mq_obj = item.produce_item()
  240. pipeline = PiaoQuanPipeline(
  241. platform=self.platform,
  242. mode=self.mode,
  243. rule_dict=self.rule_dict,
  244. env=self.env,
  245. item=mq_obj,
  246. trace_id=trace_id,
  247. )
  248. if pipeline.process_item():
  249. title_list = title_rule.split(",")
  250. title = video_obj["title"]
  251. contains_keyword = any(keyword in title for keyword in title_list)
  252. if contains_keyword:
  253. new_title = GPT4oMini.get_ai_mini_title(title)
  254. if new_title:
  255. item.add_video_info("video_title", new_title)
  256. current_time = datetime.now()
  257. formatted_time = current_time.strftime("%Y-%m-%d %H:%M:%S")
  258. values = [
  259. [
  260. video_obj["v_url"],
  261. video_obj["url"],
  262. title,
  263. new_title,
  264. formatted_time,
  265. ]
  266. ]
  267. FeishuUtils.insert_columns("U5dXsSlPOhiNNCtEfgqcm1iYnpf", "GVottu", "ROWS", 1, 2)
  268. time.sleep(0.5)
  269. FeishuUtils.update_values("U5dXsSlPOhiNNCtEfgqcm1iYnpf", "GVottu", "A2:Z2", values)
  270. self.download_cnt += 1
  271. self.mq.send_msg(mq_obj)
  272. self.aliyun_log.logging(code="1002", message="成功发送至 ETL", data=mq_obj)
  273. video_view(vid, mid)
  274. self.aliyun_log.logging(code="1010", message="触发曝光", data=mq_obj)
  275. if self.download_cnt >= int(
  276. self.rule_dict.get("videos_cnt", {}).get("min", 200)
  277. ):
  278. self.limit_flag = True
  279. def build_video_log(self, video_obj: Dict[str, Any], index: int) -> Dict[str, Any]:
  280. """构建视频日志对象"""
  281. return {
  282. "ac": "show",
  283. "md_ver": "2.0",
  284. "data": {
  285. "page": "discoverIndexPage",
  286. "topic": "recommend",
  287. "tpl_id": str(video_obj['tpl_id']),
  288. "profile_ct": str(video_obj['p_ct']),
  289. "flv": "0",
  290. "sign": video_obj['sign'],
  291. "serial_id": video_obj['serial_id'],
  292. "type": "post",
  293. "name": "post",
  294. "src_page": "discoverIndexPage/recommend",
  295. "aid": str(video_obj['album_id']),
  296. "cid": str(video_obj['id']),
  297. "feed_idx": index - 1,
  298. "cmid": str(video_obj['user']['mid']),
  299. "user_ct": "1638346045009"
  300. },
  301. "t": int(time.time() * 1000),
  302. "ab": {}
  303. }
  304. def run(self):
  305. self.get_recommend_list()
  306. if __name__ == '__main__':
  307. J = ZFQZTJLRecommend(
  308. platform="zhufuquanzituijianliu",
  309. mode="recommend",
  310. rule_dict={},
  311. user_list=[{"uid": 75590470, "link": "zfqz推荐流_接口1", "nick_name": "做你的尾巴"}, {"uid": 75590471, "link": "zfqz推荐流_接口2", "nick_name": "能够相遇"}, {"uid": 75590472, "link": "zfqz推荐流_接口3", "nick_name": "一别两宽各生欢喜"}, {"uid": 75590473, "link": "zfqz推荐流_接口4", "nick_name": "惹火"}, {"uid": 75590475, "link": "zfqz推荐流_接口5", "nick_name": "顾九"}, {"uid": 75590476, "link": "zfqz推荐流_接口6", "nick_name": "宠一身脾气惯一身毛病"}],
  312. )
  313. J.get_recommend_list()
  314. # J.logic()