yuannifuqichangzai.py 7.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191
  1. """
  2. @author: luojunhui
  3. """
  4. import os
  5. import sys
  6. import json
  7. import time
  8. import uuid
  9. import random
  10. import asyncio
  11. import aiohttp
  12. import datetime
  13. sys.path.append(os.getcwd())
  14. from application.items import VideoItem
  15. from application.pipeline import PiaoQuanPipeline
  16. from application.common.messageQueue import MQ
  17. from application.common.proxies import tunnel_proxies
  18. from application.common.log import AliyunLogger
  19. class YuanNiFuQiChangZai(object):
  20. """
  21. 愿你福气常在——推荐爬虫
  22. """
  23. def __init__(self, platform, mode, rule_dict, user_list, env="prod"):
  24. self.platform = platform
  25. self.mode = mode
  26. self.rule_dict = rule_dict
  27. self.user_list = user_list
  28. self.env = env
  29. self.download_cnt = 0
  30. self.mq = MQ(topic_name="topic_crawler_etl_" + self.env)
  31. self.expire_flag = False
  32. self.aliyun_log = AliyunLogger(platform=self.platform, mode=self.mode)
  33. async def process_video_obj(self, video_obj):
  34. """
  35. 处理每一个视频内容
  36. :return: None
  37. """
  38. trace_id = self.platform + str(uuid.uuid1())
  39. our_user = random.choice(self.user_list)
  40. publish_time_stamp = int(video_obj['update_time'])
  41. publish_time_str = datetime.datetime.fromtimestamp(publish_time_stamp).strftime('%Y-%m-%d %H:%M:%S')
  42. item = VideoItem()
  43. item.add_video_info("user_id", our_user["uid"])
  44. item.add_video_info("user_name", our_user["nick_name"])
  45. item.add_video_info("video_id", video_obj["nid"])
  46. item.add_video_info("video_title", video_obj["title"])
  47. item.add_video_info("publish_time_str", publish_time_str)
  48. item.add_video_info("publish_time_stamp", int(publish_time_stamp))
  49. item.add_video_info("video_url", video_obj["video_url"])
  50. item.add_video_info("cover_url", video_obj["video_cover"])
  51. item.add_video_info("out_video_id", video_obj["nid"])
  52. item.add_video_info("platform", self.platform)
  53. item.add_video_info("strategy", self.mode)
  54. item.add_video_info("session", "{}-{}".format(self.platform, int(time.time())))
  55. mq_obj = item.produce_item()
  56. pipeline = PiaoQuanPipeline(
  57. platform=self.platform,
  58. mode=self.mode,
  59. rule_dict=self.rule_dict,
  60. env=self.env,
  61. item=mq_obj,
  62. trace_id=trace_id,
  63. )
  64. if pipeline.process_item():
  65. self.download_cnt += 1
  66. self.mq.send_msg(mq_obj)
  67. # print(json.dumps(mq_obj, ensure_ascii=False, indent=4))
  68. self.aliyun_log.logging(
  69. code="1002",
  70. message="成功发送至 ETL",
  71. data=mq_obj,
  72. )
  73. if self.download_cnt >= int(
  74. self.rule_dict.get("videos_cnt", {}).get("min", 200)
  75. ):
  76. self.expire_flag = True
  77. async def get_recommend_list(self, session, page_index):
  78. """
  79. 获取推荐页面的video_list
  80. :param session: aiohttp 的session
  81. :param page_index: 页码
  82. :return: None
  83. """
  84. if self.expire_flag:
  85. self.aliyun_log.logging(
  86. code="2000",
  87. message="本轮已经抓取到足够的数据,自动退出\t{}".format(self.download_cnt),
  88. )
  89. return
  90. headers = {
  91. 'Host': 'ynfqcz.jiabeijian.cn',
  92. 'content-time': str(int(time.time() * 1000)),
  93. 'cache-time': str(int(time.time() * 1000)),
  94. 'chatkey': 'wxa1431c6e7acdd32d',
  95. 'user-agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/107.0.0.0 Safari/537.36 MicroMessenger/6.8.0(0x16080000) NetType/WIFI MiniProgramEnv/Mac MacWechat/WMPF MacWechat/3.8.6(0x13080610) XWEB/1156',
  96. 'content-type': 'application/x-www-form-urlencoded',
  97. 'visitorkey': '17096322301026589978',
  98. 'xweb_xhr': '1',
  99. 'vision': '1.1.0',
  100. 'token': '',
  101. 'accept': '*/*',
  102. 'sec-fetch-site': 'cross-site',
  103. 'sec-fetch-mode': 'cors',
  104. 'sec-fetch-dest': 'empty',
  105. 'referer': 'https://servicewechat.com/wxa1431c6e7acdd32d/2/page-frame.html',
  106. 'accept-language': 'en-US,en;q=0.9'
  107. }
  108. po = {
  109. "cid": "",
  110. "page": page_index,
  111. "is_ads": 1,
  112. "model": random.choice(["Windows", "Mac", "HuaWei", "Xiaomi", "Xiaomi2", "Yandex", "Google", "iphone", "oppo"]),
  113. "mini_version": "3.8.6",
  114. "ini_id": "17096322301026589978"
  115. }
  116. params = {
  117. "parameter": json.dumps(po)
  118. }
  119. url = "https://ynfqcz.jiabeijian.cn/index.php/v111/index/index"
  120. await asyncio.sleep(5)
  121. async with session.get(
  122. url, headers=headers, params=params, proxy=tunnel_proxies()['https']
  123. ) as response:
  124. data = await response.json()
  125. for index, video_obj in enumerate(data["data"]["list"], 1):
  126. try:
  127. self.aliyun_log.logging(
  128. code="1001",
  129. message="扫描到一条视频",
  130. data=video_obj,
  131. )
  132. await self.process_video_obj(video_obj)
  133. except Exception as e:
  134. self.aliyun_log.logging(
  135. code="3000",
  136. message="抓取第{}条的时候出现问题, 报错信息是{}".format(index, e),
  137. )
  138. async def run(self):
  139. """
  140. 执行代码
  141. :return: None
  142. """
  143. async with aiohttp.ClientSession() as session:
  144. # for i in range(0, 100, 20):
  145. # if self.expire_flag:
  146. # return
  147. # tasks = [self.get_recommend_list(session, index) for index in range(i, min(i + 20, 200))]
  148. # await asyncio.gather(*tasks)
  149. # done, pending = await asyncio.wait(
  150. # tasks, return_when=asyncio.FIRST_COMPLETED
  151. # )
  152. # # 取消所有剩余的任务
  153. # for task in pending:
  154. # task.cancel()
  155. # tasks = [self.get_recommend_list(session, index) for index in range(1, 100)]
  156. # await asyncio.gather(*tasks)
  157. # done, pending = await asyncio.wait(
  158. # tasks, return_when=asyncio.FIRST_COMPLETED
  159. # )
  160. # # 取消所有剩余的任务
  161. # for task in pending:
  162. # task.cancel()
  163. for page in range(1, 30):
  164. if self.expire_flag:
  165. self.aliyun_log.logging(
  166. code="2000",
  167. message="本轮已经抓取到足够的数据,自动退出\t{}".format(self.download_cnt),
  168. )
  169. # message = "本轮已经抓取到足够的数据,自动退出\t{}".format(self.download_cnt)
  170. # print(message)
  171. return
  172. else:
  173. try:
  174. await self.get_recommend_list(session, page_index=page)
  175. except Exception as e:
  176. self.aliyun_log.logging(
  177. code="3000",
  178. message="抓取第{}页时候出现错误, 报错信息是{}".format(page, e),
  179. )
  180. # message = "抓取第{}页时候出现错误, 报错信息是{}".format(page, e)
  181. # print(message)