jixiangxingfu.py 7.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214
  1. import os
  2. import random
  3. import sys
  4. import time
  5. import uuid
  6. import json
  7. from datetime import datetime
  8. import requests
  9. from application.common import Feishu
  10. sys.path.append(os.getcwd())
  11. from application.items import VideoItem
  12. from application.pipeline import PiaoQuanPipeline
  13. from application.common.messageQueue import MQ
  14. from application.common.proxies import tunnel_proxies
  15. from application.common.log import AliyunLogger
  16. from application.common.mysql import MysqlHelper
  17. class JXXFRecommend(object):
  18. """
  19. 吉祥幸福-欢快吉祥早安祝福
  20. """
  21. def __init__(self, platform, mode, rule_dict, user_list, env="prod"):
  22. self.limit_flag = False
  23. self.platform = platform
  24. self.mode = mode
  25. self.rule_dict = rule_dict
  26. self.user_list = user_list
  27. self.env = env
  28. self.download_cnt = 0
  29. self.mq = MQ(topic_name="topic_crawler_etl_" + self.env)
  30. self.expire_flag = False
  31. self.aliyun_log = AliyunLogger(mode=self.mode, platform=self.platform)
  32. self.mysql = MysqlHelper(mode=self.mode, platform=self)
  33. def get_cookie(self):
  34. sql = f""" select * from crawler_config where source="{self.platform}" """
  35. configs = self.mysql.select(sql=sql)
  36. for config in configs:
  37. if "token" in config:
  38. token_element = config[3]
  39. data_json = json.loads(token_element)
  40. token = data_json.get("token")
  41. return token
  42. def logic(self):
  43. for i in range(10):
  44. app_id = 'wx6692a24ad2a88bfb'
  45. js_code = self.get_js_code(app_id)
  46. token = self.get_search_params(app_id, js_code)
  47. if token:
  48. return token
  49. def get_js_code(self, app_id: str) -> str:
  50. js_code = ''
  51. try:
  52. url = 'http://61.48.133.26:30001/GetMiniAppCode'
  53. data = {
  54. "appid": app_id
  55. }
  56. response =requests.request(method='POST', url=url, json=data)
  57. body = response.content.decode()
  58. res_data = json.loads(body)
  59. js_code = res_data['GetMiniAppCode']
  60. except Exception as e:
  61. pass
  62. return js_code
  63. def get_search_params(self, app_id: str, js_code: str) -> dict:
  64. try:
  65. url = f"https://api.huanqiwl.top/index.php?s=mobile/Login/loginToken&code={js_code}&appid={app_id}"
  66. headers = {
  67. 'Connection': 'keep-alive',
  68. 'content-type': 'application/json',
  69. 'User-Agent': 'Mozilla/5.0 (iPhone; CPU iPhone OS 16_5_1 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) Mobile/15E148 MicroMessenger/8.0.48(0x18003030) NetType/WIFI Language/zh_CN',
  70. 'Referer': 'https://servicewechat.com/wx6692a24ad2a88bfb/5/page-frame.html'
  71. }
  72. response =requests.request(method='GET', headers=headers, url=url, data={})
  73. body = response.content.decode()
  74. body_json = json.loads(body)
  75. data = body_json.get("data")
  76. token = data['token']
  77. except Exception as e:
  78. return ''
  79. return token
  80. def get_recommend_list(self):
  81. """
  82. 获取推荐页视频
  83. """
  84. token = self.logic()
  85. headers = {
  86. 'Host': 'api.huanqiwl.top',
  87. 'Content-Type': 'application/json',
  88. 'Accept-Language': 'zh-cn',
  89. 'token': token,
  90. 'Accept': '*/*',
  91. 'User-Agent': 'Mozilla/5.0 (iPhone; CPU iPhone OS 11_3 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) Mobile/15E217 MicroMessenger/6.8.0(0x16080000) NetType/WIFI Language/en Branch/Br_trunk MiniProgramEnv/Mac',
  92. 'Referer': 'https://servicewechat.com/wx6692a24ad2a88bfb/3/page-frame.html'
  93. }
  94. for i in range(14):
  95. time.sleep(random.randint(1, 10))
  96. for j in range(2):
  97. url = f"https://api.huanqiwl.top/index.php?s=mobile/Video/getList&cid={j}&page={i}&api_version=4&appid=wx6692a24ad2a88bfb&version=1.9.5&env_version=release&scene=1053"
  98. payload = {}
  99. response = requests.request("GET", url, headers=headers, data=payload)
  100. if "未登录" in response.text:
  101. self.aliyun_log.logging(
  102. code="3000",
  103. message="抓取单条视频失败, token 失效"
  104. ),
  105. break
  106. for index, video_obj in enumerate(response.json()['data']['list'], 1):
  107. try:
  108. self.aliyun_log.logging(
  109. code="1001", message="扫描到一条视频", data=video_obj
  110. )
  111. self.process_video_obj(video_obj)
  112. except Exception as e:
  113. self.aliyun_log.logging(
  114. code="3000",
  115. message="抓取单条视频失败, 该视频位于第{}页第{}条报错原因是{}".format(
  116. i, index, e
  117. ),
  118. )
  119. if self.limit_flag:
  120. return
  121. time.sleep(random.randint(5, 10))
  122. def process_video_obj(self, video_obj):
  123. """
  124. 处理视频
  125. :param video_obj:
  126. """
  127. time.sleep(random.randint(3, 8))
  128. trace_id = self.platform + str(uuid.uuid1())
  129. our_user = random.choice(self.user_list)
  130. item = VideoItem()
  131. item.add_video_info("video_id", video_obj["id"])
  132. item.add_video_info("video_title", video_obj["title"])
  133. item.add_video_info("play_cnt", 0)
  134. item.add_video_info("publish_time_stamp", int(time.time()))
  135. item.add_video_info("out_user_id", video_obj["id"])
  136. item.add_video_info("cover_url", video_obj["images"])
  137. item.add_video_info("like_cnt", 0)
  138. item.add_video_info("video_url", video_obj["video_url"])
  139. item.add_video_info("out_video_id", video_obj["id"])
  140. item.add_video_info("platform", self.platform)
  141. item.add_video_info("strategy", self.mode)
  142. item.add_video_info("session", "{}-{}".format(self.platform, int(time.time())))
  143. item.add_video_info("user_id", our_user["uid"])
  144. item.add_video_info("user_name", our_user["nick_name"])
  145. # 获取当前时间
  146. current_time = datetime.now()
  147. formatted_time = current_time.strftime("%Y-%m-%d %H:%M:%S")
  148. values = [[
  149. video_obj["id"],
  150. formatted_time,
  151. video_obj["title"],
  152. video_obj["images"],
  153. video_obj["video_url"]
  154. ]]
  155. Feishu.insert_columns(self.platform, 'jixiangxingfu', "L0KXHh", "ROWS", 1, 2)
  156. time.sleep(0.5)
  157. Feishu.update_values(self.platform, 'jixiangxingfu', "L0KXHh", "A2:Z2", values)
  158. mq_obj = item.produce_item()
  159. pipeline = PiaoQuanPipeline(
  160. platform=self.platform,
  161. mode=self.mode,
  162. rule_dict=self.rule_dict,
  163. env=self.env,
  164. item=mq_obj,
  165. trace_id=trace_id,
  166. )
  167. if pipeline.process_item():
  168. self.download_cnt += 1
  169. self.mq.send_msg(mq_obj)
  170. self.aliyun_log.logging(code="1002", message="成功发送至 ETL", data=mq_obj)
  171. if self.download_cnt >= int(
  172. self.rule_dict.get("videos_cnt", {}).get("min", 200)
  173. ):
  174. self.limit_flag = True
  175. def run(self):
  176. self.get_recommend_list()
  177. if __name__ == '__main__':
  178. J = JXXFRecommend(
  179. platform="jixiangxingfu",
  180. mode="recommend",
  181. rule_dict={},
  182. user_list=[{'uid': "123456", 'nick_name': "xiaoxiao"}],
  183. )
  184. J.get_recommend_list()
  185. # J.logic()