zhongqingkandian_author.py 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356
  1. import os
  2. import sys
  3. import asyncio
  4. import json
  5. import random
  6. import uuid
  7. import time
  8. import traceback
  9. from datetime import datetime
  10. import aiohttp
  11. sys.path.append(os.getcwd())
  12. from application.common.feishu import FsData
  13. from application.common.feishu.feishu_utils import FeishuUtils
  14. from application.common.gpt import GPT4oMini
  15. from application.common.messageQueue import MQ
  16. from application.common.log import AliyunLogger
  17. from application.functions.zqkd_db_redis import DatabaseOperations, RedisOperations
  18. from application.items import VideoItem
  19. from application.pipeline import PiaoQuanPipeline
  20. from application.common.log import Local
  21. class ZhongQingKanDianAuthor:
  22. API_BASE_URL = "http://8.217.192.46:8889"
  23. COMMON_HEADERS = {
  24. "Content-Type": "application/json"
  25. }
  26. # 最大重试次数
  27. MAX_RETRIES = 3
  28. # 最大等待时长
  29. TIMEOUT = 30
  30. def __init__(self, platform, mode, rule_dict, user_list, env="prod"):
  31. """
  32. 初始化
  33. :param platform: 平台名称 zhongqingkandian
  34. :param mode: 运行模式 recommend
  35. :param rule_dict: 规则字典,包含视频数量限制、时长限制等规则 [{"videos_cnt":{"min":100,"max":0}},{"duration":{"min":30,"max":1200}}]
  36. :param user_list: 用户列表
  37. :param env: 运行环境,默认为 "prod"
  38. """
  39. self.limit_flag = True
  40. self.platform = platform
  41. self.mode = mode
  42. self.rule_dict = rule_dict
  43. self.user_list = user_list
  44. self.env = env
  45. self.download_cnt = 0
  46. self.mq = MQ(topic_name="topic_crawler_etl_" + self.env)
  47. self.expire_flag = False
  48. self.aliyun_log = AliyunLogger(mode=self.mode, platform=self.platform)
  49. self.db_ops = DatabaseOperations(mode=mode, platform=platform)
  50. self.redis_ops = RedisOperations(mode=mode, platform=platform)
  51. data_rule = FsData()
  52. self.title_rule = data_rule.get_title_rule()
  53. self.LocalLog = Local.logger(self.platform, self.mode)
  54. self.curses = 1
  55. result = self.redis_ops.get_last_scanned_id()
  56. self.last_scanned_id = 0 if result is None else int(result)
  57. self.zqkd_user_list = self.db_ops.select_user(self.last_scanned_id)
  58. self.LocalLog.info(f"获取到的用户列表:{self.zqkd_user_list} \n 昨天最后扫描的用户ID{self.last_scanned_id}")
  59. async def send_request(self, path, data):
  60. """
  61. 异步发送 POST 请求到指定路径,带有重试机制。
  62. :param path: 请求的 API 路径
  63. :param data: 请求的数据
  64. :return: 响应的 JSON 数据,如果请求失败则返回 None
  65. """
  66. full_url = f"{self.API_BASE_URL}{path}"
  67. async with aiohttp.ClientSession(headers=self.COMMON_HEADERS) as session:
  68. for retry in range(self.MAX_RETRIES):
  69. try:
  70. async with session.post(full_url, data=data, timeout=self.TIMEOUT) as response:
  71. response.raise_for_status()
  72. self.LocalLog.info(f"{path}响应数据:{await response.json()}")
  73. return await response.json()
  74. except (aiohttp.ClientError, json.JSONDecodeError) as e:
  75. tb_info = traceback.format_exc()
  76. self.LocalLog.info(f"{path}请求失败:{e} \n{tb_info}")
  77. self.aliyun_log.logging(
  78. code="3000",
  79. message=f"请求 {path} 失败,错误信息: {str(e)}",
  80. data={"path": path}
  81. )
  82. await asyncio.sleep(5)
  83. return None
  84. def is_response_valid(self, resp, url):
  85. """
  86. 检查响应是否有效(状态码为 0 表示有效)。
  87. :param resp: 响应数据
  88. :param url: 请求的 URL
  89. :return: 如果响应有效则返回响应数据,否则返回 None
  90. """
  91. try:
  92. if resp and resp.get('code') != 0:
  93. self.aliyun_log.logging(
  94. code="3000",
  95. message=f"抓取{url}失败,请求失败,响应:{resp}"
  96. )
  97. self.LocalLog.info(f"{url}请求失败,响应:{resp}")
  98. return None
  99. return resp
  100. except Exception as e:
  101. tb_info = traceback.format_exc()
  102. self.aliyun_log.logging(
  103. code="3000",
  104. message=f"检查响应有效性时出错,错误信息: {str(e)}",
  105. data={"url": url, "resp": resp}
  106. )
  107. self.LocalLog.info(f"检查 {url} 响应有效性时出错:{e} \n{tb_info}")
  108. return None
  109. async def req_user_list(self, account_id):
  110. """
  111. 异步请求与指定内容 ID 相关的推荐列表。
  112. :param
  113. :return: 相关推荐视频列表的有效响应数据,如果请求失败则返回 None
  114. """
  115. try:
  116. url = '/crawler/zhong_qing_kan_dian/blogger'
  117. body = json.dumps({
  118. "account_id": f"{account_id}",
  119. "content_type": "全部",
  120. "cursor": f"{self.curses}"
  121. })
  122. self.LocalLog.info(f"开始请求用户视频列表{body}")
  123. resp = await self.send_request(url, body)
  124. return self.is_response_valid(resp, url)
  125. except Exception as e:
  126. tb_info = traceback.format_exc()
  127. self.aliyun_log.logging(
  128. code="1004",
  129. message=f"请求相关推荐视频列表时发生异常,错误信息: {str(e)}",
  130. data={"url": url}
  131. )
  132. self.LocalLog.info(f"请求相关推荐视频列表 {url} 时发生异常:{e} \n{tb_info}")
  133. return None
  134. async def req_detail(self, content_link, **kwargs):
  135. """
  136. 异步请求视频详情。
  137. :param content_link: 视频内容链接
  138. :param kwargs: 额外的视频信息
  139. :return: 无返回值,处理视频详情信息
  140. """
  141. try:
  142. self.LocalLog.info(f"开始请求视频详情,链接: {content_link}")
  143. url = '/crawler/zhong_qing_kan_dian/detail'
  144. body = json.dumps({
  145. "content_link": content_link
  146. })
  147. resp = await self.send_request(url, body)
  148. if not self.is_response_valid(resp, url):
  149. return
  150. data = resp.get("data", {}).get("data", {})
  151. if data.get("content_type") != "video":
  152. self.aliyun_log.logging(
  153. code="3003",
  154. message=f"跳过非视频内容",
  155. data={"content_link": content_link}
  156. )
  157. self.LocalLog.info(f"跳过非视频内容,链接: {content_link}")
  158. return
  159. self.LocalLog.info(f"{content_link} 是视频")
  160. data.update(kwargs)
  161. await self.process_video_obj(data)
  162. await asyncio.sleep(10)
  163. except Exception as e:
  164. tb_info = traceback.format_exc()
  165. self.aliyun_log.logging(
  166. code="1005",
  167. message=f"请求视频详情时发生异常,错误信息: {str(e)}",
  168. data={"content_link": content_link}
  169. )
  170. self.LocalLog.error(f"请求视频详情,链接 {content_link} 时发生异常:{e} \n{tb_info}")
  171. async def control_request_author(self):
  172. """
  173. 控制相关推荐视频列表的请求和处理流程。
  174. :return: 无返回值,根据下载数量限制控制流程
  175. """
  176. while self.limit_flag:
  177. try:
  178. self.LocalLog.info(f"开始用户视频列表的请求和处理流程,今日已爬 {self.download_cnt} 个视频")
  179. if not self.zqkd_user_list:
  180. self.LocalLog.info("没有用户数据")
  181. await asyncio.sleep(10)
  182. continue
  183. for user_info in self.zqkd_user_list:
  184. current_id, user_id = user_info
  185. author_resp = await self.req_user_list(user_id)
  186. if current_id > self.last_scanned_id:
  187. self.last_scanned_id = current_id
  188. if not author_resp:
  189. continue
  190. author_data = author_resp.get("data", {}).get("data", [])
  191. if not author_data["next_cursor"]:
  192. continue
  193. for author_obj in author_data:
  194. author_content_link = author_obj.get("share_url")
  195. if author_content_link:
  196. await self.req_detail(author_content_link, **author_obj)
  197. except Exception as e:
  198. tb_info = traceback.format_exc()
  199. self.aliyun_log.logging(
  200. code="3009",
  201. message=f"控制相关推荐视频请求和处理时发生异常,错误信息: {str(e)}",
  202. data={}
  203. )
  204. self.LocalLog.info(f"控制相关推荐视频请求和处理时发生异常:\n{tb_info}")
  205. async def process_video_obj(self, video_obj):
  206. """
  207. 处理视频对象,包括检查视频时长、用户信息、保存数据等操作。
  208. :param video_obj: 视频对象,包含视频的各种信息
  209. :return: 无返回值,完成视频对象的处理
  210. """
  211. try:
  212. video_duration = video_obj["video_url_list"][0]['video_duration']
  213. video_id = video_obj['channel_content_id']
  214. # 检查视频ID是否存在
  215. if self.redis_ops.check_video_id_exists(video_id):
  216. self.aliyun_log.logging(
  217. code="3004",
  218. message=f"重复视频ID:{video_id}"
  219. )
  220. self.LocalLog.info(f"重复视频ID: {video_id}")
  221. return
  222. our_user = random.choice(self.user_list)
  223. trace_id = self.platform + str(uuid.uuid1())
  224. item = VideoItem()
  225. account_id = video_obj["channel_account_id"]
  226. account_name = video_obj["channel_account_name"]
  227. account_avatar = video_obj["avatar"]
  228. # # 检查用户ID是否存在
  229. # """
  230. # 需要改为判断redis
  231. # """
  232. # is_repeat_user = self.db_ops.check_user_id(account_id)
  233. # if is_repeat_user:
  234. # # 更新用户信息,使用异步方法并等待结果
  235. # self.LocalLog.info(f"用户{account_id}已经存在数据库中")
  236. # self.db_ops.update_user(account_id, account_name, account_avatar)
  237. # else:
  238. # self.LocalLog.info(f"用户{account_id}没在数据库中")
  239. # # 插入用户信息,使用异步方法并等待结果
  240. # self.db_ops.insert_user(account_id, account_name, account_avatar)
  241. # self.redis_ops.add_user_data("task:zqkd_user_id", json.dumps({"uid": account_id}))
  242. # self.aliyun_log.logging(code="1007", message=f"用户数据写入成功,用户ID:{account_id}")
  243. # self.LocalLog.info(f"用户数据写入成功,用户ID: {account_id}")
  244. if video_duration > self.rule_dict.get("duration", {}).get("max",
  245. 1200) or video_duration < self.rule_dict.get(
  246. "duration", {}).get("min", 30):
  247. self.aliyun_log.logging(
  248. code="3005",
  249. message=f"视频时长不满足条件[>=30s&<=1200s]视频ID:{video_obj['channel_content_id']},视频时长:{video_duration}"
  250. )
  251. self.LocalLog.info(
  252. f"视频时长不满足条件,视频ID: {video_obj['channel_content_id']}, 视频时长: {video_duration}")
  253. return
  254. item.add_video_info("video_id", video_obj['channel_content_id'])
  255. item.add_video_info("video_title", video_obj["title"])
  256. item.add_video_info("play_cnt", video_obj["read_num"])
  257. item.add_video_info("publish_time_stamp", int(int(video_obj["publish_timestamp"]) / 1000))
  258. item.add_video_info("out_user_id", video_obj["channel_account_id"])
  259. item.add_video_info("cover_url", video_obj["image_url_list"][0]['image_url'])
  260. item.add_video_info("like_cnt", 0)
  261. item.add_video_info("collection_cnt", 0)
  262. item.add_video_info("share_cnt", 0)
  263. item.add_video_info("comment_cnt", 0)
  264. item.add_video_info("video_url", video_obj["video_url_list"][0]['video_url'])
  265. item.add_video_info("out_video_id", int(video_obj["channel_content_id"]))
  266. item.add_video_info("duration", video_obj["video_url_list"][0]['video_duration'])
  267. item.add_video_info("platform", self.platform)
  268. item.add_video_info("strategy", self.mode)
  269. item.add_video_info("session", f"{self.platform}-{int(time.time())}")
  270. item.add_video_info("user_id", our_user["uid"])
  271. item.add_video_info("user_name", our_user["nick_name"])
  272. mq_obj = item.produce_item()
  273. pipeline = PiaoQuanPipeline(
  274. platform=self.platform,
  275. mode=self.mode,
  276. rule_dict=self.rule_dict,
  277. env=self.env,
  278. item=mq_obj,
  279. trace_id=trace_id
  280. )
  281. if pipeline.process_item():
  282. title_list = self.title_rule.split(",")
  283. title = video_obj["title"]
  284. contains_keyword = any(keyword in title for keyword in title_list)
  285. if contains_keyword:
  286. new_title = GPT4oMini.get_ai_mini_title(title)
  287. if new_title:
  288. item.add_video_info("video_title", new_title)
  289. current_time = datetime.now()
  290. formatted_time = current_time.strftime("%Y-%m-%d %H:%M:%S")
  291. values = [
  292. [
  293. video_obj["video_url_list"][0]['video_url'],
  294. video_obj["image_url_list"][0]['image_url'],
  295. title,
  296. new_title,
  297. formatted_time,
  298. ]
  299. ]
  300. FeishuUtils.insert_columns("U5dXsSlPOhiNNCtEfgqcm1iYnpf", "v8S6nL", "ROWS", 1, 2)
  301. time.sleep(0.5)
  302. FeishuUtils.update_values("U5dXsSlPOhiNNCtEfgqcm1iYnpf", "v8S6nL", "A2:Z2", values)
  303. self.download_cnt += 1
  304. self.mq.send_msg(mq_obj)
  305. # 保存视频ID
  306. self.redis_ops.save_video_id(video_obj['channel_content_id'])
  307. if self.download_cnt >= self.rule_dict.get("videos_cnt", {}).get("min", 300):
  308. # 记录轮训到的用户id
  309. self.redis_ops.set_last_scanned_id(self.last_scanned_id)
  310. self.limit_flag = False
  311. else:
  312. self.redis_ops.set_last_scanned_id(0)
  313. self.curses += 1
  314. except Exception as e:
  315. tb_info = traceback.format_exc()
  316. self.aliyun_log.logging(
  317. code="1005",
  318. message=f"处理视频对象时发生异常,错误信息: {str(e)}",
  319. data={"video_obj": video_obj}
  320. )
  321. self.LocalLog.error(f"处理视频对象时发生异常: {e}\n{tb_info}")
  322. async def run(self):
  323. """
  324. 运行主流程,异步执行推荐视频和相关推荐视频的请求,直到达到下载数量限制。
  325. :return: 无返回值,程序运行的主逻辑
  326. """
  327. self.LocalLog.info("开始执行中青看点用户视频抓取...")
  328. await asyncio.gather(
  329. self.control_request_author()
  330. )
  331. if __name__ == '__main__':
  332. asyncio.run(ZhongQingKanDianAuthor(
  333. platform="zhongqingkandian",
  334. mode="author",
  335. rule_dict={"videos_cnt": {"min": 2, "max": 0}},
  336. user_list=[{"uid": 81525568, "link": "中青看点推荐", "nick_name": "芸芸众生"}]
  337. ).run())