zhongqingkandian.py 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344
  1. import asyncio
  2. import os
  3. import random
  4. import sys
  5. import time
  6. import uuid
  7. import json
  8. from datetime import datetime
  9. import aiohttp
  10. import requests
  11. from application.common.feishu import FsData
  12. from application.common.feishu.feishu_utils import FeishuUtils
  13. from application.common.gpt import GPT4oMini
  14. from application.common.redis.redis_helper import SyncRedisHelper
  15. sys.path.append(os.getcwd())
  16. from application.items import VideoItem
  17. from application.pipeline import PiaoQuanPipeline
  18. from application.common.messageQueue import MQ
  19. from application.common.log import AliyunLogger
  20. from application.common.mysql import MysqlHelper
  21. class ZhongQingKanDian:
  22. # / recommend(列表11个id)
  23. # ↓ 并发请求每个id的 / related(得到列表N个元素)
  24. # ↓ 对每个元素并发请求 / detail
  25. # ↓ 若为视频,写入Redis(键:detail_id,值:视频数据)
  26. API_BASE_URL = "http://8.217.192.46:8889"
  27. COMMON_HEADERS = {
  28. "Content-Type": "application/json"
  29. }
  30. MAX_RETRIES = 3
  31. TIMEOUT = 30 # 设置超时时间
  32. max_recommend_count = 100 # 推荐抓取每日最大量
  33. max_related_recommend_count = 200 # 相关推荐抓取每日最大量
  34. max_author_video = 300 # 账号每日抓取视频最大量
  35. """
  36. 中青看点推荐流
  37. Topic:zqkd_recommend_prod
  38. """
  39. def __init__(self, platform, mode, rule_dict, user_list, env="prod"):
  40. self.limit_flag = False
  41. self.platform = platform
  42. self.mode = mode
  43. self.rule_dict = rule_dict
  44. self.user_list = user_list
  45. self.env = env
  46. self.download_cnt = 0
  47. self.mq = MQ(topic_name="topic_crawler_etl_" + self.env)
  48. self.expire_flag = False
  49. self.aliyun_log = AliyunLogger(mode=self.mode, platform=self.platform)
  50. self.mysql = MysqlHelper(mode=self.mode, platform=self)
  51. data_rule = FsData()
  52. self.title_rule = data_rule.get_title_rule()
  53. async def send_request(self, path, data):
  54. full_url = f"{self.API_BASE_URL}{path}"
  55. async with aiohttp.ClientSession(headers=self.COMMON_HEADERS) as session:
  56. for retry in range(self.MAX_RETRIES):
  57. try:
  58. async with session.post(full_url, data=data, timeout=self.TIMEOUT) as response:
  59. response.raise_for_status()
  60. return await response.json()
  61. except aiohttp.ClientError as e:
  62. if retry < self.MAX_RETRIES - 1:
  63. await asyncio.sleep(2)
  64. except json.JSONDecodeError as e:
  65. if retry < self.MAX_RETRIES - 1:
  66. await asyncio.sleep(2)
  67. return None
  68. def is_response_valid(self, resp):
  69. if resp['code'] != 0:
  70. self.aliyun_log.logging(
  71. code="3000",
  72. message="抓取单条视频失败,请求失败"
  73. ),
  74. return
  75. return resp
  76. async def req_recommend_list(self):
  77. print("开始请求推荐")
  78. '''
  79. 推荐请求
  80. '''
  81. url = '/crawler/zhong_qing_kan_dian/recommend'
  82. body = json.dumps({"cursor": ""})
  83. resp = await self.send_request(url, body)
  84. return self.is_response_valid(resp)
  85. async def req_related_recommend_list(self, content_id):
  86. print("请求相关推荐")
  87. '''
  88. 相关推荐请求
  89. '''
  90. url = '/crawler/zhong_qing_kan_dian/related'
  91. body = json.dumps({
  92. "content_id": str(content_id),
  93. "cursor": ""
  94. })
  95. resp = await self.send_request(url, body)
  96. return self.is_response_valid(resp)
  97. async def req_detail(self, content_link, label,**kwargs):
  98. print("请求详情")
  99. '''
  100. 请求详情
  101. '''
  102. url = '/crawler/zhong_qing_kan_dian/detail'
  103. body = json.dumps({
  104. "content_link": content_link
  105. })
  106. resp = await self.send_request(url, body)
  107. if not self.is_response_valid(resp):
  108. return
  109. data = resp.get("data", {}).get("data", {})
  110. if data.get("content_type") != "video":
  111. self.aliyun_log.logging(
  112. code="3003",
  113. message=f"跳过非视频内容(label={label})",
  114. data={"content_link": content_link}
  115. )
  116. return
  117. print("是视频")
  118. # 将 kwargs 中的键值对更新到 data 字典中
  119. data.update(kwargs)
  120. self.process_video_obj(data, label)
  121. await asyncio.sleep(10)
  122. async def control_request(self):
  123. print("开始处理")
  124. """核心控制逻辑:顺序处理三个接口"""
  125. recommend_resp = await self.req_recommend_list()
  126. if not self.is_response_valid(recommend_resp):
  127. return
  128. recommend_list = recommend_resp.get("data", {}).get("data", [])
  129. for video_obj in recommend_list:
  130. content_link = video_obj.get("share_url")
  131. content_id = video_obj.get("id")
  132. if not (content_link and content_id):
  133. continue
  134. # 处理推荐视频详情
  135. await self.req_detail(content_link, "recommend",**video_obj)
  136. # # 处理相关推荐列表(间隔后执行)
  137. # await asyncio.sleep(5)
  138. # related_resp = await self.req_related_recommend_list(content_id)
  139. # if not self.is_response_valid(related_resp):
  140. # continue
  141. #
  142. # related_list = related_resp.get("data", {}).get("data", [])
  143. # for related_obj in related_list:
  144. # related_content_link = related_obj.get("share_url")
  145. # if related_content_link:
  146. # await self.req_detail(related_content_link, "related",**related_obj)
  147. def process_video_obj(self, video_obj, label):
  148. """
  149. 处理视频
  150. :param video_obj:
  151. """
  152. if not self.save_video_id():
  153. our_user = random.choice(self.user_list)
  154. trace_id = self.platform + str(uuid.uuid1())
  155. item = VideoItem()
  156. try:
  157. video_id = video_obj['channel_content_id']
  158. account_id = video_obj["channel_account_id"]
  159. account_name = video_obj["channel_account_name"]
  160. account_avatar = video_obj["avatar"]
  161. is_repeat_user = self.select_id(account_id)
  162. # 判断用户是否重复
  163. if is_repeat_user:
  164. self.update_name_url(account_id, account_name, account_avatar)
  165. else:
  166. # 写表
  167. self.insert_name_url(account_id, account_name, account_avatar)
  168. # 写redis
  169. self.write_redis_user_data(json.dumps({"uid": account_id}))
  170. print("写入成功")
  171. except Exception as e:
  172. print(f"写入异常{e}")
  173. pass
  174. url = video_obj["video_url_list"][0]['video_url']
  175. duration = video_obj["video_url_list"][0]['video_duration']
  176. item.add_video_info("video_id", video_obj['channel_content_id'])
  177. item.add_video_info("video_title", video_obj["title"])
  178. item.add_video_info("play_cnt", int(video_obj["read_num"]))
  179. item.add_video_info("publish_time_stamp", int(int(video_obj["publish_timestamp"])/1000))
  180. item.add_video_info("out_user_id", video_obj["channel_account_id"])
  181. item.add_video_info("cover_url", video_obj["image_url_list"][0]['image_url'])
  182. item.add_video_info("like_cnt", 0)
  183. item.add_video_info("collection_cnt", int(video_obj['collect_num']))
  184. item.add_video_info("share_cnt", int(video_obj["share_num"]))
  185. item.add_video_info("comment_cnt", int(video_obj["cmt_num"]))
  186. item.add_video_info("video_url", video_obj["video_url_list"][0]['video_url'])
  187. item.add_video_info("out_video_id", int(video_obj["channel_content_id"]))
  188. item.add_video_info("duration", video_obj["video_url_list"][0]['video_duration'])
  189. item.add_video_info("platform", self.platform)
  190. item.add_video_info("strategy", self.mode)
  191. item.add_video_info("session", "{}-{}".format(self.platform, int(time.time())))
  192. item.add_video_info("user_id", our_user["uid"])
  193. item.add_video_info("user_name", our_user["nick_name"])
  194. mq_obj = item.produce_item()
  195. pipeline = PiaoQuanPipeline(
  196. platform=self.platform,
  197. mode=self.mode,
  198. rule_dict=self.rule_dict,
  199. env=self.env,
  200. item=mq_obj,
  201. trace_id=trace_id,
  202. )
  203. if pipeline.process_item():
  204. title_list = self.title_rule.split(",")
  205. title = video_obj["title"]
  206. contains_keyword = any(keyword in title for keyword in title_list)
  207. if contains_keyword:
  208. new_title = GPT4oMini.get_ai_mini_title(title)
  209. if new_title:
  210. item.add_video_info("video_title", new_title)
  211. current_time = datetime.now()
  212. formatted_time = current_time.strftime("%Y-%m-%d %H:%M:%S")
  213. values = [
  214. [
  215. video_obj["video_url_list"][0]['video_url'],
  216. video_obj["image_url_list"][0]['image_url'],
  217. title,
  218. new_title,
  219. formatted_time,
  220. ]
  221. ]
  222. FeishuUtils.insert_columns("U5dXsSlPOhiNNCtEfgqcm1iYnpf", "v8S6nL", "ROWS", 1, 2)
  223. time.sleep(0.5)
  224. FeishuUtils.update_values("U5dXsSlPOhiNNCtEfgqcm1iYnpf", "v8S6nL", "A2:Z2", values)
  225. self.download_cnt += 1
  226. self.mq.send_msg(mq_obj)
  227. self.aliyun_log.logging(code="1002", message="成功发送至 ETL", data=mq_obj)
  228. if self.download_cnt >= int(
  229. self.rule_dict.get("videos_cnt", {}).get("min", 200)
  230. ):
  231. self.limit_flag = True
  232. if label == "recommend":
  233. key = f"crawler:zqkd:{video_id}"
  234. self.save_video_id(key)
  235. """
  236. 查询用户id是否存在
  237. """
  238. def select_id(self, uid):
  239. sql = f""" select uid from zqkd_uid where uid = "{uid}"; """
  240. db = MysqlHelper()
  241. repeat_user = db.select(sql=sql)
  242. if repeat_user:
  243. return True
  244. return False
  245. def update_name_url(self, uid,user_name,avatar_url):
  246. sql = f""" update zqkd_uid set avatar_url = "{avatar_url}", user_name="{user_name}" where uid = "{uid}"; """
  247. db = MysqlHelper()
  248. repeat_video = db.update(sql=sql)
  249. if repeat_video:
  250. return True
  251. return False
  252. def insert_name_url(self, uid, user_name, avatar_url):
  253. current_time = datetime.now()
  254. formatted_time = current_time.strftime("%Y-%m-%d %H:%M:%S")
  255. insert_sql = f"""INSERT INTO zqkd_uid (uid, avatar_url, user_name, data_time) values ('{uid}' ,'{avatar_url}','{user_name}', '{formatted_time}')"""
  256. db = MysqlHelper()
  257. repeat_video = db.update(sql=insert_sql)
  258. if repeat_video:
  259. return True
  260. return False
  261. def get_redis_video_data(self):
  262. """获取一条id"""
  263. task = f"task:zqkd_video_id"
  264. helper = SyncRedisHelper()
  265. client = helper.get_client()
  266. # 获取列表的长度
  267. list_length = client.llen(task)
  268. # 循环获取列表中的元素
  269. for i in range(list_length):
  270. # 使用 lrange 获取单个元素
  271. element = client.lrange(task, i, i)
  272. if element:
  273. print(f"Element at index {i}: {element[0].decode('utf-8')}")
  274. return element
  275. def write_redis_user_data(self,key,ret):
  276. """写入"""
  277. task = f"task:zqkd_user_id"
  278. helper = SyncRedisHelper()
  279. client = helper.get_client()
  280. client.rpush(task, ret)
  281. async def run(self):
  282. while True:
  283. await self.control_request()
  284. def save_video_id(self,key):
  285. helper = SyncRedisHelper()
  286. client = helper.get_client()
  287. # 将视频ID存储到Redis中,并设置过期时间为7天
  288. # 检查键是否存在
  289. if client.exists(key):
  290. return False
  291. else:
  292. expiration_time = int(timedelta(days=7).total_seconds())
  293. client.setex(key, expiration_time, "1")
  294. from datetime import datetime, timedelta
  295. if __name__ == '__main__':
  296. # asyncio.run(ZhongQingKanDian(
  297. # platform="zhongqingkandian",
  298. # mode="recommend",
  299. # rule_dict={},
  300. # user_list=[{"uid": 81522822, "link": "中青看点推荐", "nick_name": "免不了俗"},
  301. # ]
  302. #
  303. # ).run())
  304. save_video_id("1234")