laonianquan.py 6.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196
  1. import os
  2. import random
  3. import sys
  4. import time
  5. import uuid
  6. import json
  7. from datetime import datetime
  8. import requests
  9. from application.common.feishu import FsData
  10. from application.common.feishu.feishu_utils import FeishuUtils
  11. from application.common.gpt import GPT4oMini
  12. sys.path.append(os.getcwd())
  13. from application.items import VideoItem
  14. from application.pipeline import PiaoQuanPipeline
  15. from application.common.messageQueue import MQ
  16. from application.common.log import AliyunLogger
  17. from application.common.mysql import MysqlHelper
  18. class LNQRecommend(object):
  19. """
  20. 老年圈
  21. """
  22. def __init__(self, platform, mode, rule_dict, user_list, env="prod"):
  23. self.limit_flag = False
  24. self.platform = platform
  25. self.mode = mode
  26. self.rule_dict = rule_dict
  27. self.user_list = user_list
  28. self.env = env
  29. self.download_cnt = 0
  30. self.mq = MQ(topic_name="topic_crawler_etl_" + self.env)
  31. self.expire_flag = False
  32. self.aliyun_log = AliyunLogger(mode=self.mode, platform=self.platform)
  33. self.mysql = MysqlHelper(mode=self.mode, platform=self)
  34. def get_recommend_list(self):
  35. print("老年圈")
  36. """
  37. 获取推荐页视频
  38. """
  39. headers = {
  40. 'Content-Type': 'application/json'
  41. }
  42. url = "http://8.217.192.46:8889/crawler/lao_nian_quan_shi_pin/recommend"
  43. data_rule = FsData()
  44. title_rule = data_rule.get_title_rule()
  45. next_cursor = ""
  46. while True:
  47. payload = json.dumps({
  48. "cursor": next_cursor
  49. })
  50. response = requests.request("POST", url, headers=headers, data=payload)
  51. response = response.json()
  52. if response['code'] != 0:
  53. self.aliyun_log.logging(
  54. code="3000",
  55. message="抓取单条视频失败,请求失败"
  56. ),
  57. return
  58. next_cursor = response['data']['next_cursor']
  59. data = response['data']['data']
  60. if len(data) == 0:
  61. return
  62. for index, video_obj in enumerate(data, 1):
  63. try:
  64. self.aliyun_log.logging(
  65. code="1001", message="扫描到一条视频", data=video_obj
  66. )
  67. self.process_video_obj(video_obj,title_rule)
  68. except Exception as e:
  69. self.aliyun_log.logging(
  70. code="3000",
  71. message="抓取单条视频失败, 该视频位于第{}页第{}条报错原因是{}".format(
  72. 1, index, e
  73. ),
  74. )
  75. if self.limit_flag:
  76. return
  77. time.sleep(random.randint(1, 5))
  78. def process_video_obj(self, video_obj,title_rule):
  79. """
  80. 处理视频
  81. :param video_obj:
  82. """
  83. video_url = self.get_video_url(video_obj["vid"])
  84. if not video_url:
  85. return
  86. time.sleep(random.randint(3, 8))
  87. trace_id = self.platform + str(uuid.uuid1())
  88. our_user = random.choice(self.user_list)
  89. item = VideoItem()
  90. item.add_video_info("video_id", video_obj["vid"])
  91. item.add_video_info("video_title", video_obj["vtitle"])
  92. item.add_video_info("play_cnt", 0)
  93. item.add_video_info("publish_time_stamp", int(time.time()))
  94. item.add_video_info("out_user_id", video_obj["vid"])
  95. item.add_video_info("cover_url", "https://qiniu.818ao.com/"+video_obj["poster"])
  96. item.add_video_info("like_cnt", 0)
  97. item.add_video_info("share_cnt", 0)
  98. item.add_video_info("comment_cnt", 0)
  99. item.add_video_info("video_url", video_url)
  100. item.add_video_info("out_video_id", video_obj["vid"])
  101. item.add_video_info("platform", self.platform)
  102. item.add_video_info("strategy", self.mode)
  103. item.add_video_info("session", "{}-{}".format(self.platform, int(time.time())))
  104. item.add_video_info("user_id", our_user["uid"])
  105. item.add_video_info("user_name", our_user["nick_name"])
  106. mq_obj = item.produce_item()
  107. pipeline = PiaoQuanPipeline(
  108. platform=self.platform,
  109. mode=self.mode,
  110. rule_dict=self.rule_dict,
  111. env=self.env,
  112. item=mq_obj,
  113. trace_id=trace_id,
  114. )
  115. if pipeline.process_item():
  116. title_list = title_rule.split(",")
  117. title = video_obj["vtitle"]
  118. contains_keyword = any(keyword in title for keyword in title_list)
  119. if contains_keyword:
  120. new_title = GPT4oMini.get_ai_mini_title(title)
  121. if new_title:
  122. item.add_video_info("video_title", new_title)
  123. current_time = datetime.now()
  124. formatted_time = current_time.strftime("%Y-%m-%d %H:%M:%S")
  125. values = [
  126. [
  127. video_url,
  128. "https://qiniu.818ao.com/"+video_obj["poster"],
  129. title,
  130. new_title,
  131. formatted_time,
  132. ]
  133. ]
  134. FeishuUtils.insert_columns("U5dXsSlPOhiNNCtEfgqcm1iYnpf", "8c7191", "ROWS", 1, 2)
  135. time.sleep(0.5)
  136. FeishuUtils.update_values("U5dXsSlPOhiNNCtEfgqcm1iYnpf", "8c7191", "A2:Z2", values)
  137. self.download_cnt += 1
  138. self.mq.send_msg(mq_obj)
  139. self.aliyun_log.logging(code="1002", message="成功发送至 ETL", data=mq_obj)
  140. if self.download_cnt >= int(
  141. self.rule_dict.get("videos_cnt", {}).get("min", 200)
  142. ):
  143. self.limit_flag = True
  144. """获取视频链接"""
  145. def get_video_url(self, vid):
  146. url = "http://8.217.192.46:8889/crawler/lao_nian_quan_shi_pin/detail"
  147. payload = json.dumps({
  148. "content_id": f"{vid}"
  149. })
  150. headers = {
  151. 'Content-Type': 'application/json'
  152. }
  153. try:
  154. response = requests.request("POST", url, headers=headers, data=payload, timeout=10)
  155. response = response.json()
  156. if response['code'] != 0:
  157. self.aliyun_log.logging(
  158. code="3000",
  159. message="获取视频链接失败"
  160. ),
  161. return None
  162. video_url = response['data']['data']['video_url_list'][0]['video_url']
  163. return video_url
  164. except Exception as e:
  165. return None
  166. def run(self):
  167. self.get_recommend_list()
  168. if __name__ == '__main__':
  169. J = LNQRecommend(
  170. platform="laonianquan",
  171. mode="recommend",
  172. rule_dict={},
  173. user_list=[{'uid': "123456", 'nick_name': "xiaoxiao"}],
  174. )
  175. J.get_recommend_list()
  176. # J.logic()