boqingzhufu.py 6.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178
  1. """
  2. 博清科技-祝福中老年之视频
  3. @author luojunhui
  4. @date 2024-02-06
  5. """
  6. import os
  7. import sys
  8. import json
  9. import time
  10. import uuid
  11. import random
  12. import asyncio
  13. import aiohttp
  14. sys.path.append(os.getcwd())
  15. from application.items import VideoItem
  16. from application.pipeline import PiaoQuanPipeline
  17. from application.common.messageQueue import MQ
  18. from application.common.proxies import tunnel_proxies
  19. from application.common.log import AliyunLogger
  20. class BoQingZhuFu(object):
  21. """
  22. 祝福咱们中老年之视频, 推荐爬虫
  23. """
  24. def __init__(self, platform, mode, rule_dict, user_list, env="prod"):
  25. self.platform = platform
  26. self.mode = mode
  27. self.rule_dict = rule_dict
  28. self.user_list = user_list
  29. self.env = env
  30. self.download_cnt = 0
  31. self.mq = MQ(topic_name="topic_crawler_etl_" + self.env)
  32. self.expire_flag = False
  33. self.aliyun_log = AliyunLogger(platform=self.platform, mode=self.mode)
  34. async def process_video_obj(self, video_obj):
  35. """
  36. 处理每一个视频内容
  37. :return: None
  38. """
  39. trace_id = self.platform + str(uuid.uuid1())
  40. our_user = random.choice(self.user_list)
  41. publish_time_stamp = int(time.time())
  42. item = VideoItem()
  43. item.add_video_info("user_id", our_user["uid"])
  44. item.add_video_info("user_name", our_user["nick_name"])
  45. item.add_video_info("video_id", video_obj["id"])
  46. item.add_video_info("video_title", video_obj["title"])
  47. # item.add_video_info("publish_time_str", video_obj["create"])
  48. item.add_video_info("publish_time_stamp", int(publish_time_stamp))
  49. item.add_video_info("video_url", video_obj["video_url"])
  50. item.add_video_info(
  51. "cover_url", video_obj["video_cover"]
  52. )
  53. item.add_video_info("out_video_id", video_obj["id"])
  54. item.add_video_info("platform", self.platform)
  55. item.add_video_info("strategy", self.mode)
  56. item.add_video_info("session", "{}-{}".format(self.platform, int(time.time())))
  57. mq_obj = item.produce_item()
  58. pipeline = PiaoQuanPipeline(
  59. platform=self.platform,
  60. mode=self.mode,
  61. rule_dict=self.rule_dict,
  62. env=self.env,
  63. item=mq_obj,
  64. trace_id=trace_id,
  65. )
  66. if pipeline.process_item():
  67. self.download_cnt += 1
  68. self.mq.send_msg(mq_obj)
  69. # print(json.dumps(mq_obj, ensure_ascii=False, indent=4))
  70. self.aliyun_log.logging(
  71. code="1002",
  72. message="成功发送至 ETL",
  73. data=mq_obj,
  74. )
  75. if self.download_cnt >= int(
  76. self.rule_dict.get("videos_cnt", {}).get("min", 200)
  77. ):
  78. self.expire_flag = True
  79. async def get_recommend_list(self, session, page_index):
  80. """
  81. 获取推荐页面的video_list
  82. :param session: aiohttp 的session
  83. :param page_index: 页码
  84. :return: None
  85. """
  86. if self.expire_flag:
  87. self.aliyun_log.logging(
  88. code="2000",
  89. message="本轮已经抓取到足够的数据,自动退出\t{}".format(self.download_cnt),
  90. )
  91. return
  92. headers = {
  93. 'Host': 'api.newboqing.top',
  94. 'Content-Type': 'application/json',
  95. 'Accept-Language': 'zh-cn',
  96. 'Accept': '*/*',
  97. 'User-Agent': 'Mozilla/5.0 (iPhone; CPU iPhone OS 11_3 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) Mobile/15E217 MicroMessenger/6.8.0(0x16080000) NetType/WIFI Language/en Branch/Br_trunk MiniProgramEnv/Mac',
  98. 'Referer': 'https://servicewechat.com/wxacce796175899acd/9/page-frame.html',
  99. 'token': 'eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9.eyJpYXQiOjE3MDI0MzcyMDgsIm5iZiI6MTcwMjQzNzIwOCwiZXhwIjoxNzAyNDQ0NDA4LCJkYXRhIjp7InVzZXJfaWQiOjU3OTUyNzM0fX0.WhnR4quSvRGkTCr5HIY0b7_mUcbzNXLY5Y-iTdp2QuM',
  100. 'ik': 'b326b5062b2f0e69046810717534cb09'
  101. }
  102. url = "https://api.newboqing.top/index.php"
  103. # ?s=mobile/Video/getList&cid=1&page={}&api_version=4&appid=wxacce796175899acd&version=1.9.0&env_version=release&scene=1008&from_uid=57769568&share_time=1701661552497&referer_vid=231783".format(
  104. # i)
  105. params = {
  106. "s": "mobile/Video/getList",
  107. 'cid': 1,
  108. 'page': page_index,
  109. 'api_version': 4,
  110. 'appid': "wxacce796175899acd",
  111. 'version': "1.9.0",
  112. 'env_version': "release",
  113. 'scene': "1008"
  114. }
  115. await asyncio.sleep(5)
  116. async with session.get(
  117. url, headers=headers, params=params
  118. ) as response:
  119. response_text = await response.text()
  120. response_json = json.loads(response_text)
  121. # print(json.dumps(response_json, ensure_ascii=False, indent=4))
  122. for index, video_obj in enumerate(response_json["data"]["list"], 1):
  123. try:
  124. self.aliyun_log.logging(
  125. code="1001",
  126. message="扫描到一条视频",
  127. data=video_obj,
  128. )
  129. await self.process_video_obj(video_obj)
  130. except Exception as e:
  131. self.aliyun_log.logging(
  132. code="3000",
  133. message="抓取第{}条的时候出现问题, 报错信息是{}".format(index, e),
  134. )
  135. async def run(self):
  136. """
  137. 执行代码
  138. :return: None
  139. """
  140. async with aiohttp.ClientSession() as session:
  141. for page in range(1, 20):
  142. if self.expire_flag:
  143. self.aliyun_log.logging(
  144. code="2000",
  145. message="本轮已经抓取到足够的数据,自动退出\t{}".format(self.download_cnt),
  146. )
  147. message = "本轮已经抓取到足够的数据,自动退出\t{}".format(self.download_cnt)
  148. print(message)
  149. return
  150. else:
  151. try:
  152. await self.get_recommend_list(session, page_index=page)
  153. except Exception as e:
  154. # print(e)
  155. self.aliyun_log.logging(
  156. code="3000",
  157. message="抓取第{}页时候出现错误, 报错信息是{}".format(page, e),
  158. )
  159. # if __name__ == '__main__':
  160. # BQ = BoQingZhuFu(
  161. # platform="BoQingZhuFu",
  162. # mode="recommend",
  163. # rule_dict={},
  164. # user_list=[{"uid": "12345", "nick_name": "luojunhui"}],
  165. # )
  166. # loop = asyncio.get_event_loop()
  167. # loop.run_until_complete(BQ.run())