zhongqingkandian_author.py 19 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420
  1. import os
  2. import sys
  3. import json
  4. import random
  5. import uuid
  6. import time
  7. import traceback
  8. from datetime import datetime, timedelta
  9. import requests
  10. sys.path.append(os.getcwd())
  11. from application.common.feishu import FsData
  12. from application.common.feishu.feishu_utils import FeishuUtils
  13. from application.common.gpt import GPT4oMini
  14. from application.common.messageQueue import MQ
  15. from application.common.log import AliyunLogger
  16. from application.functions.zqkd_db_redis import DatabaseOperations, RedisOperations
  17. from application.items import VideoItem
  18. from application.pipeline import PiaoQuanPipeline
  19. from application.common.log import Local
  20. class ZhongQingKanDianAuthor:
  21. API_BASE_URL = "http://8.217.192.46:8889"
  22. COMMON_HEADERS = {
  23. "Content-Type": "application/json"
  24. }
  25. # 最大重试次数
  26. MAX_RETRIES = 3
  27. # 最大等待时长
  28. TIMEOUT = 30
  29. def __init__(self, platform, mode, rule_dict, user_list, env="prod"):
  30. """
  31. 初始化
  32. :param platform: 平台名称 zhongqingkandian
  33. :param mode: 运行模式 recommend
  34. :param rule_dict: 规则字典,包含视频数量限制、时长限制等规则 [{"videos_cnt":{"min":100,"max":0}},{"duration":{"min":30,"max":1200}}]
  35. :param user_list: 用户列表
  36. :param env: 运行环境,默认为 "prod"
  37. """
  38. self.limit_flag = True
  39. self.platform = platform
  40. self.mode = mode
  41. self.rule_dict = rule_dict
  42. self.user_list = user_list
  43. self.env = env
  44. self.download_cnt = 0
  45. self.mq = MQ(topic_name="topic_crawler_etl_" + self.env)
  46. self.expire_flag = False
  47. self.aliyun_log = AliyunLogger(mode=self.mode, platform=self.platform)
  48. self.db_ops = DatabaseOperations(mode=mode, platform=platform)
  49. self.redis_ops = RedisOperations(mode=mode, platform=platform)
  50. data_rule = FsData()
  51. self.title_rule = data_rule.get_title_rule()
  52. self.LocalLog = Local.logger(self.platform, self.mode)
  53. self.curses = 1
  54. result = self.redis_ops.get_last_scanned_id()
  55. # self.last_scanned_id = 0 if result is None else int(result)
  56. # self.zqkd_user_list = self.db_ops.select_user(self.last_scanned_id)
  57. # self.LocalLog.info(f"获取到的用户列表:{self.zqkd_user_list} \n 昨天最后扫描的用户ID{self.last_scanned_id}")
  58. self.session = requests.session()
  59. def __del__(self):
  60. if self.session:
  61. self.LocalLog.info("session 被正确关闭")
  62. self.session.close()
  63. def send_request(self, path, data):
  64. """发送带重试机制的API请求"""
  65. for attempt in range(self.MAX_RETRIES):
  66. try:
  67. response = self.session.post(
  68. f"{self.API_BASE_URL}{path}",
  69. data=data,
  70. timeout=self.TIMEOUT,
  71. headers=self.COMMON_HEADERS
  72. )
  73. resp_data = response.json()
  74. # 检查响应格式
  75. if 'code' not in resp_data:
  76. self.LocalLog.warning(f"{path}响应缺少code字段,尝试重试")
  77. raise ValueError("Missing 'code' in response")
  78. code = resp_data['code']
  79. # 成功情况 (code=0)
  80. if code == 0:
  81. self.LocalLog.info(f"{path}请求成功:{resp_data}")
  82. return resp_data
  83. # 特定错误码不重试
  84. if code == 29036:
  85. self.LocalLog.warning(f"{path}返回code:29036,消息:{resp_data}")
  86. return None
  87. # 其他错误码重试
  88. self.LocalLog.warning(f"{path}返回错误码{code},尝试重试,响应内容:{resp_data}")
  89. except Exception as e:
  90. tb_info = traceback.format_exc()
  91. self.LocalLog.error(f"{path}请求异常: {str(e)} \n {tb_info}")
  92. if attempt < self.MAX_RETRIES - 1:
  93. time.sleep(random.randint(5, 10))
  94. # 所有重试失败,记录错误并返回None
  95. self.LocalLog.error(f"{path}达到最大重试次数")
  96. self.aliyun_log.logging(
  97. code="3000",
  98. message=f"请求 {path} 失败,达到最大重试次数",
  99. data=data
  100. )
  101. return None
  102. def req_user_list(self, account_id):
  103. """
  104. 请求与指定内容 ID 相关的推荐列表。
  105. :param
  106. :return: 相关推荐视频列表的有效响应数据,如果请求失败则返回 None
  107. """
  108. try:
  109. url = '/crawler/zhong_qing_kan_dian/blogger'
  110. body = json.dumps({
  111. "account_id": f"{account_id}",
  112. "content_type": "全部",
  113. "cursor": f"{self.curses}"
  114. })
  115. self.LocalLog.info(f"开始请求用户视频列表{body}")
  116. resp = self.send_request(url, body)
  117. return resp
  118. except Exception as e:
  119. tb_info = traceback.format_exc()
  120. self.aliyun_log.logging(
  121. code="1004",
  122. message=f"请求相关推荐视频列表时发生异常,错误信息: {str(e)}",
  123. data={"url": url}
  124. )
  125. self.LocalLog.info(f"请求相关推荐视频列表 {url} 时发生异常:{e} \n{tb_info}")
  126. return None
  127. def req_detail(self, content_link, **kwargs):
  128. """
  129. 请求视频详情。
  130. :param content_link: 视频内容链接
  131. :param kwargs: 额外的视频信息
  132. :return: 无返回值,处理视频详情信息
  133. """
  134. try:
  135. self.LocalLog.info(f"开始请求视频详情,链接: {content_link}")
  136. url = '/crawler/zhong_qing_kan_dian/detail'
  137. body = json.dumps({
  138. "content_link": content_link
  139. })
  140. resp = self.send_request(url, body)
  141. if not resp:
  142. return
  143. data = resp.get("data", {}).get("data", {})
  144. if data.get("content_type") != "video":
  145. self.aliyun_log.logging(
  146. code="3003",
  147. message=f"跳过非视频内容",
  148. data={"content_link": content_link}
  149. )
  150. self.LocalLog.info(f"跳过非视频内容,链接: {content_link}")
  151. return
  152. self.LocalLog.info(f"{content_link} 是视频")
  153. data.update(kwargs)
  154. self.process_video_obj(data)
  155. except Exception as e:
  156. tb_info = traceback.format_exc()
  157. self.aliyun_log.logging(
  158. code="1005",
  159. message=f"请求视频详情时发生异常,错误信息: {str(e)}",
  160. data={"content_link": content_link}
  161. )
  162. self.LocalLog.error(f"请求视频详情,链接 {content_link} 时发生异常:{e} \n{tb_info}")
  163. def control_request_author(self):
  164. """
  165. 控制相关推荐视频列表的请求和处理流程。
  166. :return: 无返回值,根据下载数量限制控制流程
  167. """
  168. while self.limit_flag:
  169. try:
  170. self.download_cnt = self.db_ops.get_today_videos()
  171. if self.download_cnt >= self.rule_dict.get("videos_cnt", {}).get("min", 100):
  172. self.aliyun_log.logging(
  173. code="2010",
  174. message=f"今日已经达到最大量",
  175. data=self.download_cnt
  176. )
  177. self.LocalLog.info(f"当日视频已达到最大爬取量{self.download_cnt}")
  178. return
  179. self.LocalLog.info(f"开始用户视频列表的请求和处理流程,今日已爬 {self.download_cnt} 个视频")
  180. if not self.db_ops.select_user(0):
  181. self.LocalLog.info("没有用户数据")
  182. time.sleep(10)
  183. continue
  184. for user_info in self.db_ops.select_user(0):
  185. if not self.limit_flag:
  186. self.aliyun_log.logging(
  187. code="2010",
  188. message=f"今日已经达到最大量",
  189. data=self.download_cnt
  190. )
  191. self.LocalLog.info("视频数量已达到预期")
  192. return
  193. if is_less_than_30_minutes():
  194. self.LocalLog.info("时间已不足,停止执行")
  195. return
  196. current_id, user_id = user_info
  197. author_resp = self.req_user_list(user_id)
  198. self.redis_ops.set_last_scanned_id(current_id)
  199. if not author_resp:
  200. continue
  201. author_data = author_resp.get("data", {})
  202. # 判断是否有下一页
  203. if not author_data["next_cursor"]:
  204. continue
  205. video_data = author_data.get("data", [])
  206. self.LocalLog.info(f"用户{user_id}第{self.curses}页数据长度{len(video_data)}")
  207. for video_obj in video_data:
  208. # if not self.limit_flag:
  209. # return
  210. video_content_link = video_obj.get("share_url")
  211. if video_content_link:
  212. self.req_detail(video_content_link, **video_obj)
  213. time.sleep(random.randint(5,10))
  214. # self.redis_ops.set_last_scanned_id(0)
  215. # self.zqkd_user_list = self.db_ops.select_user(0)
  216. self.curses += 1
  217. except Exception as e:
  218. tb_info = traceback.format_exc()
  219. self.aliyun_log.logging(
  220. code="3009",
  221. message=f"控制相关推荐视频请求和处理时发生异常,错误信息: {str(e)}",
  222. data={}
  223. )
  224. self.LocalLog.info(f"控制相关推荐视频请求和处理时发生异常:\n{tb_info}")
  225. def process_video_obj(self, video_obj):
  226. """
  227. 处理视频对象,包括检查视频时长、用户信息、保存数据等操作。
  228. :param video_obj: 视频对象,包含视频的各种信息
  229. :return: 无返回值,完成视频对象的处理
  230. """
  231. try:
  232. video_duration = video_obj["video_url_list"][0]['video_duration']
  233. video_id = video_obj['channel_content_id']
  234. # 检查视频ID是否存在
  235. if self.redis_ops.check_video_id_exists(video_id):
  236. self.aliyun_log.logging(
  237. code="3004",
  238. message=f"重复视频ID:{video_id}"
  239. )
  240. self.LocalLog.info(f"重复视频ID: {video_id}")
  241. return
  242. our_user = random.choice(self.user_list)
  243. trace_id = self.platform + str(uuid.uuid1())
  244. item = VideoItem()
  245. # account_id = video_obj["channel_account_id"]
  246. # account_name = video_obj["channel_account_name"]
  247. # account_avatar = video_obj["avatar"]
  248. # # 检查用户ID是否存在
  249. # """
  250. # 需要改为判断redis
  251. # """
  252. # is_repeat_user = self.db_ops.check_user_id(account_id)
  253. # if is_repeat_user:
  254. # # 更新用户信息,使用异步方法并等待结果
  255. # self.LocalLog.info(f"用户{account_id}已经存在数据库中")
  256. # self.db_ops.update_user(account_id, account_name, account_avatar)
  257. # else:
  258. # self.LocalLog.info(f"用户{account_id}没在数据库中")
  259. # # 插入用户信息,使用异步方法并等待结果
  260. # self.db_ops.insert_user(account_id, account_name, account_avatar)
  261. # self.redis_ops.add_user_data("task:zqkd_user_id", json.dumps({"uid": account_id}))
  262. # self.aliyun_log.logging(code="1007", message=f"用户数据写入成功,用户ID:{account_id}")
  263. # self.LocalLog.info(f"用户数据写入成功,用户ID: {account_id}")
  264. if video_duration > self.rule_dict.get("duration", {}).get("max",
  265. 1200) or video_duration < self.rule_dict.get(
  266. "duration", {}).get("min", 30):
  267. self.aliyun_log.logging(
  268. code="3005",
  269. message=f"视频时长不满足条件[>=30s&<=1200s]视频ID:{video_obj['channel_content_id']},视频时长:{video_duration}"
  270. )
  271. self.LocalLog.info(
  272. f"视频时长不满足条件,视频ID: {video_obj['channel_content_id']}, 视频时长: {video_duration}")
  273. return
  274. item.add_video_info("video_id", video_obj['channel_content_id'])
  275. item.add_video_info("video_title", video_obj["title"])
  276. item.add_video_info("play_cnt", self.convert_number(video_obj["read_num"]))
  277. item.add_video_info("publish_time_stamp", int(int(video_obj["publish_timestamp"]) / 1000))
  278. item.add_video_info("out_user_id", video_obj["channel_account_id"])
  279. item.add_video_info("cover_url", video_obj["image_url_list"][0]['image_url'])
  280. item.add_video_info("like_cnt", 0)
  281. item.add_video_info("collection_cnt", 0)
  282. item.add_video_info("share_cnt", 0)
  283. item.add_video_info("comment_cnt", 0)
  284. item.add_video_info("video_url", video_obj["video_url_list"][0]['video_url'])
  285. item.add_video_info("out_video_id", int(video_obj["channel_content_id"]))
  286. item.add_video_info("duration", video_obj["video_url_list"][0]['video_duration'])
  287. item.add_video_info("platform", self.platform)
  288. item.add_video_info("strategy", self.mode)
  289. item.add_video_info("session", f"{self.platform}-{int(time.time())}")
  290. item.add_video_info("user_id", our_user["uid"])
  291. item.add_video_info("user_name", our_user["nick_name"])
  292. mq_obj = item.produce_item()
  293. pipeline = PiaoQuanPipeline(
  294. platform=self.platform,
  295. mode=self.mode,
  296. rule_dict=self.rule_dict,
  297. env=self.env,
  298. item=mq_obj,
  299. trace_id=trace_id
  300. )
  301. if pipeline.process_item():
  302. title_list = self.title_rule.split(",")
  303. title = video_obj["title"]
  304. contains_keyword = any(keyword in title for keyword in title_list)
  305. if contains_keyword:
  306. new_title = GPT4oMini.get_ai_mini_title(title)
  307. if new_title:
  308. item.add_video_info("video_title", new_title)
  309. current_time = datetime.now()
  310. formatted_time = current_time.strftime("%Y-%m-%d %H:%M:%S")
  311. values = [
  312. [
  313. video_obj["video_url_list"][0]['video_url'],
  314. video_obj["image_url_list"][0]['image_url'],
  315. title,
  316. new_title,
  317. formatted_time,
  318. ]
  319. ]
  320. FeishuUtils.insert_columns("U5dXsSlPOhiNNCtEfgqcm1iYnpf", "v8S6nL", "ROWS", 1, 2)
  321. time.sleep(0.5)
  322. FeishuUtils.update_values("U5dXsSlPOhiNNCtEfgqcm1iYnpf", "v8S6nL", "A2:Z2", values)
  323. self.download_cnt += 1
  324. self.mq.send_msg(mq_obj)
  325. self.aliyun_log.logging(
  326. code="2009",
  327. message=f"成功发送视频到etl",
  328. data={"video_obj": video_obj}
  329. )
  330. # 保存视频ID
  331. self.redis_ops.save_video_id(video_obj['channel_content_id'])
  332. if self.download_cnt >= self.rule_dict.get("videos_cnt", {}).get("min", 300):
  333. self.aliyun_log.logging(
  334. code="2010",
  335. message=f"今日已经达到最大量",
  336. data=self.download_cnt
  337. )
  338. self.LocalLog.info("视频数量已达到预期")
  339. # 判断视频数量达到预期且用户列表没有轮训完
  340. # self.redis_ops.set_last_scanned_id(self.last_scanned_id)
  341. self.limit_flag = False
  342. except Exception as e:
  343. tb_info = traceback.format_exc()
  344. self.aliyun_log.logging(
  345. code="1005",
  346. message=f"处理视频对象时发生异常,错误信息: {str(e)}",
  347. data={"video_obj": video_obj}
  348. )
  349. self.LocalLog.error(f"处理视频对象时发生异常: {e}\n{tb_info}")
  350. def convert_number(self,s):
  351. """解析数字字符串,处理包含'万'的情况"""
  352. if isinstance(s, int): # 如果已经是int,直接返回
  353. return s
  354. elif isinstance(s, str): # 如果是字符串,处理'万'
  355. if '万' in s:
  356. try:
  357. num = float(s.strip('万')) * 10000
  358. return int(num) if num.is_integer() else num # 整数返回int,小数返回float
  359. except ValueError:
  360. print(f"无法将 '{s}' 转换为有效的数字。")
  361. return 0 # 默认返回0或其他默认值
  362. else:
  363. try:
  364. return int(s) # 尝试转换为int
  365. except ValueError:
  366. try:
  367. return float(s) # 尝试转换为float
  368. except ValueError:
  369. print(f"'{s}' 不是有效的数字格式。")
  370. return 0 # 默认返回0或其他默认值
  371. else:
  372. print(f"不支持的类型: {type(s).__name__}")
  373. return 0 # 非int/str类型返回默认值
  374. def run(self):
  375. """
  376. 运行主流程,执行推荐视频和相关推荐视频的请求,直到达到下载数量限制。
  377. :return: 无返回值,程序运行的主逻辑
  378. """
  379. self.LocalLog.info("开始执行中青看点用户视频抓取...")
  380. self.control_request_author()
  381. def is_less_than_30_minutes():
  382. now = datetime.now()
  383. tomorrow = now.date() + timedelta(days=1)
  384. midnight = datetime.combine(tomorrow, datetime.min.time())
  385. time_left = midnight - now
  386. return time_left.total_seconds() < 30 * 60
  387. if __name__ == '__main__':
  388. ZhongQingKanDianAuthor(
  389. platform="zhongqingkandian",
  390. mode="author",
  391. rule_dict={'videos_cnt': {'min': 500, 'max': 0}, 'duration': {'min': 30, 'max': 1200}},
  392. user_list=[{"uid": 81525568, "link": "中青看点推荐", "nick_name": "芸芸众生"}]
  393. ).run()