fuxiaoshun.py 7.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223
  1. """
  2. 福小顺推荐爬虫代码
  3. 2024-01-22
  4. """
  5. import os
  6. import sys
  7. import json
  8. import time
  9. import uuid
  10. import random
  11. import asyncio
  12. import aiohttp
  13. import datetime
  14. from base64 import b64decode
  15. from datetime import datetime
  16. import requests
  17. from Crypto.Cipher import AES
  18. from Crypto.Util.Padding import unpad
  19. sys.path.append(os.getcwd())
  20. from application.common import Feishu
  21. from application.items import VideoItem
  22. from application.pipeline import PiaoQuanPipeline
  23. from application.common.messageQueue import MQ
  24. from application.common.log import AliyunLogger
  25. def fxs_decrypt(ciphertext):
  26. """
  27. 福小顺逆向解密代码
  28. :param ciphertext: 秘文
  29. :return: 原文
  30. """
  31. password = "xlc2ze7qnqg8xi1d".encode()
  32. iv = password
  33. try:
  34. ct = b64decode(ciphertext.encode("utf-8"))
  35. cipher = AES.new(password, AES.MODE_CBC, iv)
  36. pt = unpad(cipher.decrypt(ct), AES.block_size)
  37. return pt.decode()
  38. except Exception as e:
  39. print("Incorrect decryption {}".format(e))
  40. return None
  41. class FuXiaoShunRecommend(object):
  42. """
  43. 福小顺推荐爬虫
  44. 需要逆序, 逆向结果: AES加密,password=iv='xlc2ze7qnqg8xi1d'.encode()
  45. """
  46. def __init__(self, platform, mode, rule_dict, user_list, env="prod"):
  47. self.platform = platform
  48. self.mode = mode
  49. self.rule_dict = rule_dict
  50. self.user_list = user_list
  51. self.env = env
  52. self.download_cnt = 0
  53. self.mq = MQ(topic_name="topic_crawler_etl_" + self.env)
  54. self.expire_flag = False
  55. self.aliyun_log = AliyunLogger(platform=self.platform, mode=self.mode)
  56. def process_video_obj(self, video_obj):
  57. """
  58. 处理每一个视频内容
  59. :return: None
  60. """
  61. trace_id = self.platform + str(uuid.uuid1())
  62. our_user = random.choice(self.user_list)
  63. publish_time_stamp = datetime.strptime(
  64. video_obj["create_at"], "%Y-%m-%d %H:%M:%S"
  65. ).timestamp()
  66. item = VideoItem()
  67. item.add_video_info("user_id", our_user["uid"])
  68. item.add_video_info("user_name", our_user["nick_name"])
  69. item.add_video_info("video_id", video_obj["id"])
  70. item.add_video_info("video_title", video_obj["name"])
  71. item.add_video_info("publish_time_str", video_obj["create_at"])
  72. item.add_video_info("publish_time_stamp", int(publish_time_stamp))
  73. item.add_video_info("video_url", video_obj["cover"])
  74. item.add_video_info(
  75. "cover_url", video_obj["cover"] + "&vframe/png/offset/1/w/200"
  76. )
  77. item.add_video_info("like_cnt", video_obj["num_like"])
  78. item.add_video_info("play_cnt", video_obj["num_read"])
  79. item.add_video_info("comment_cnt", video_obj["num_comment"])
  80. item.add_video_info("out_video_id", video_obj["id"])
  81. item.add_video_info("platform", self.platform)
  82. item.add_video_info("strategy", self.mode)
  83. item.add_video_info("session", "{}-{}".format(self.platform, int(time.time())))
  84. mq_obj = item.produce_item()
  85. pipeline = PiaoQuanPipeline(
  86. platform=self.platform,
  87. mode=self.mode,
  88. rule_dict=self.rule_dict,
  89. env=self.env,
  90. item=mq_obj,
  91. trace_id=trace_id,
  92. )
  93. if pipeline.process_item():
  94. self.download_cnt += 1
  95. # 获取当前时间
  96. current_time = datetime.now()
  97. formatted_time = current_time.strftime("%Y-%m-%d %H:%M:%S")
  98. values = [[
  99. video_obj["id"],
  100. formatted_time,
  101. video_obj["name"],
  102. video_obj["cover"] + "&vframe/png/offset/1/w/200",
  103. video_obj["cover"],
  104. video_obj["num_like"],
  105. video_obj["num_read"]
  106. ]]
  107. Feishu.insert_columns(self.platform, 'fuxiaoshun', "0e1e47", "ROWS", 1, 2)
  108. time.sleep(0.5)
  109. Feishu.update_values(self.platform, 'fuxiaoshun', "0e1e47", "A2:Z2", values)
  110. self.mq.send_msg(mq_obj)
  111. self.aliyun_log.logging(
  112. code="1002",
  113. message="成功发送至 ETL",
  114. data=mq_obj,
  115. )
  116. if self.download_cnt >= int(
  117. self.rule_dict.get("videos_cnt", {}).get("min", 200)
  118. ):
  119. self.expire_flag = True
  120. def get_recommend_list(self, page_index):
  121. """
  122. 获取推荐页面的video_list
  123. :param page_index: 页码
  124. :return: None
  125. """
  126. if self.expire_flag:
  127. self.aliyun_log.logging(
  128. code="2000",
  129. message="本轮已经抓取到足够的数据,自动退出\t{}".format(self.download_cnt),
  130. )
  131. return
  132. host_referer_mapping = {
  133. 'quan.nnjuxing.cn': 'https://servicewechat.com/wxbb18ecb64efe217a/2/page-frame.html', # 福小全
  134. 'nian.nnjuxing.cn': 'https://servicewechat.com/wx3e31d735ebb23d29/3/page-frame.html', # 福小年
  135. 'shun.nnjuxing.cn': 'https://servicewechat.com/wx5b89401c90c9f367/3/page-frame.html' # 福小顺
  136. }
  137. # 随机选择一个 host
  138. random_host = random.choice(list(host_referer_mapping.keys()))
  139. # 根据选择的 host 获取对应的 referer
  140. random_referer = host_referer_mapping[random_host]
  141. headers = {
  142. "Host": random_host,
  143. "xweb_xhr": "1",
  144. "user-agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/107.0.0.0 Safari/537.36 MicroMessenger/6.8.0(0x16080000) NetType/WIFI MiniProgramEnv/Mac MacWechat/WMPF MacWechat/3.8.6(0x13080610) XWEB/1156",
  145. "content-type": "application/json",
  146. "accept": "*/*",
  147. "sec-fetch-site": "cross-site",
  148. "sec-fetch-mode": "cors",
  149. "sec-fetch-dest": "empty",
  150. "referer": random_referer,
  151. "accept-language": "en-US,en;q=0.9",
  152. }
  153. url = f"https://{random_host}/videos/api.videos/getItem?page={page_index}"
  154. payload = {}
  155. time.sleep(10)
  156. response = requests.request("GET", url, headers=headers, data=payload)
  157. cryp_data = response.json()
  158. data = json.loads(fxs_decrypt(cryp_data["data"]))
  159. for index, video_obj in enumerate(data["list"], 1):
  160. try:
  161. self.aliyun_log.logging(
  162. code="1001",
  163. message="扫描到一条视频",
  164. data=video_obj,
  165. )
  166. self.process_video_obj(video_obj)
  167. except Exception as e:
  168. self.aliyun_log.logging(
  169. code="3000",
  170. message="抓取第{}条的时候出现问题, 报错信息是{}".format(index, e),
  171. )
  172. def run(self):
  173. """
  174. 执行代码
  175. :return: None
  176. """
  177. for page in range(1, 200):
  178. if self.expire_flag:
  179. self.aliyun_log.logging(
  180. code="2000",
  181. message="本轮已经抓取到足够的数据,自动退出\t{}".format(self.download_cnt),
  182. )
  183. message = "本轮已经抓取到足够的数据,自动退出\t{}".format(self.download_cnt)
  184. print(message)
  185. return
  186. else:
  187. try:
  188. self.get_recommend_list(page_index=page)
  189. except Exception as e:
  190. self.aliyun_log.logging(
  191. code="3000",
  192. message="抓取第{}页时候出现错误, 报错信息是{}".format(page, e),
  193. )
  194. if __name__ == '__main__':
  195. J = FuXiaoShunRecommend(
  196. platform="fuxiaoshun",
  197. mode="recommend",
  198. rule_dict={},
  199. user_list=[{'uid': "123456", 'nick_name': "xiaoxiao"}],
  200. )
  201. J.run()