dakaiyinghaoyun.py 7.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201
  1. import os
  2. import random
  3. import sys
  4. import time
  5. import uuid
  6. import json
  7. import requests
  8. sys.path.append(os.getcwd())
  9. from application.items import VideoItem
  10. from application.pipeline import PiaoQuanPipeline
  11. from application.common.messageQueue import MQ
  12. from application.common.log import AliyunLogger
  13. from application.common.mysql import MysqlHelper
  14. from application.common import Feishu, haiwai_tunnel_proxies
  15. class DKYHYRecommend(object):
  16. """
  17. 打开迎好运
  18. """
  19. def __init__(self, platform, mode, rule_dict, user_list, env="prod"):
  20. self.limit_flag = False
  21. self.platform = platform
  22. self.mode = mode
  23. self.rule_dict = rule_dict
  24. self.user_list = user_list
  25. self.env = env
  26. self.download_cnt = 0
  27. self.mq = MQ(topic_name="topic_crawler_etl_" + self.env)
  28. self.expire_flag = False
  29. self.aliyun_log = AliyunLogger(mode=self.mode, platform=self.platform)
  30. self.mysql = MysqlHelper(mode=self.mode, platform=self)
  31. def get_cookie(self):
  32. sql = f""" select * from crawler_config where source="{self.platform}" """
  33. configs = self.mysql.select(sql=sql)
  34. for config in configs:
  35. if "token" in config:
  36. token_element = config[3]
  37. data_json = json.loads(token_element)
  38. token = data_json.get("token")
  39. return token
  40. def logic(self):
  41. for i in range(10):
  42. app_id = 'wx2f9f796a36e11d71'
  43. js_code = self.get_js_code(app_id)
  44. token = self.get_search_params(app_id, js_code)
  45. if token:
  46. return token
  47. def get_js_code(self, app_id: str) -> str:
  48. js_code = ''
  49. try:
  50. url = 'http://61.48.133.26:30001/GetMiniAppCode'
  51. data = {
  52. "appid": app_id
  53. }
  54. response =requests.request(method='POST', url=url, json=data)
  55. body = response.content.decode()
  56. res_data = json.loads(body)
  57. js_code = res_data['GetMiniAppCode']
  58. except Exception as e:
  59. pass
  60. return js_code
  61. def get_search_params(self, app_id: str, js_code: str) -> dict:
  62. try:
  63. url = "https://api.riyingkj.com/api/user/login/v1"
  64. payload = json.dumps({
  65. "appid": app_id,
  66. "code": js_code,
  67. "exp": {}
  68. })
  69. headers = {
  70. 'Host': 'api.riyingkj.com',
  71. 'xweb_xhr': '1',
  72. 'X-Token': '',
  73. 'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/107.0.0.0 Safari/537.36 MicroMessenger/6.8.0(0x16080000) NetType/WIFI MiniProgramEnv/Mac MacWechat/WMPF MacWechat/3.8.6(0x13080610) XWEB/1156',
  74. 'Content-Type': 'application/json',
  75. 'Accept': '*/*',
  76. 'Referer': 'https://servicewechat.com/wx2f9f796a36e11d71/28/page-frame.html',
  77. 'Accept-Language': 'zh-CN,zh;q=0.9'
  78. }
  79. response = requests.request("POST", url, headers=headers, data=payload)
  80. response = response.json()
  81. token = response["token"]
  82. except Exception as e:
  83. return ''
  84. return token
  85. def get_recommend_list(self):
  86. """
  87. 获取推荐页视频
  88. """
  89. token = self.logic()
  90. headers = {
  91. 'Host': 'api.riyingkj.com',
  92. 'xweb_xhr': '1',
  93. 'X-Token': token,
  94. 'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/107.0.0.0 Safari/537.36 MicroMessenger/6.8.0(0x16080000) NetType/WIFI MiniProgramEnv/Mac MacWechat/WMPF MacWechat/3.8.6(0x13080610) XWEB/1156',
  95. 'Content-Type': 'application/json',
  96. 'Accept': '*/*',
  97. 'Referer': 'https://servicewechat.com/wx2f9f796a36e11d71/28/page-frame.html',
  98. 'Accept-Language': 'zh-CN,zh;q=0.9'
  99. }
  100. for i in range(20):
  101. time.sleep(random.randint(1, 10))
  102. url = "https://api.riyingkj.com/api/recommend/list/v1"
  103. payload = json.dumps({
  104. "limit": 5,
  105. "exp": {}
  106. })
  107. response = requests.request("POST", url, headers=headers, data=payload)
  108. for index, video_obj in enumerate(response.json(), 1):
  109. try:
  110. self.aliyun_log.logging(
  111. code="1001", message="扫描到一条视频", data=video_obj
  112. )
  113. self.process_video_obj(video_obj)
  114. except Exception as e:
  115. self.aliyun_log.logging(
  116. code="3000",
  117. message="抓取单条视频失败, 该视频位于第{}页第{}条报错原因是{}".format(
  118. i, index, e
  119. ),
  120. )
  121. if self.limit_flag:
  122. return
  123. time.sleep(random.randint(5, 10))
  124. def process_video_obj(self, video_obj):
  125. """
  126. 处理视频
  127. :param video_obj:
  128. """
  129. time.sleep(random.randint(3, 8))
  130. trace_id = self.platform + str(uuid.uuid1())
  131. our_user = random.choice(self.user_list)
  132. item = VideoItem()
  133. item.add_video_info("video_id", video_obj["uuid"])
  134. item.add_video_info("video_title", video_obj["title"])
  135. item.add_video_info("play_cnt", 0)
  136. item.add_video_info("publish_time_stamp", int(time.time()))
  137. item.add_video_info("out_user_id", video_obj["uuid"])
  138. item.add_video_info("cover_url", video_obj["cover_url"])
  139. item.add_video_info("like_cnt", 0)
  140. item.add_video_info("video_url", video_obj["urls"][0])
  141. item.add_video_info("out_video_id", video_obj["uuid"])
  142. item.add_video_info("platform", self.platform)
  143. item.add_video_info("strategy", self.mode)
  144. item.add_video_info("session", "{}-{}".format(self.platform, int(time.time())))
  145. item.add_video_info("user_id", our_user["uid"])
  146. item.add_video_info("user_name", our_user["nick_name"])
  147. mq_obj = item.produce_item()
  148. pipeline = PiaoQuanPipeline(
  149. platform=self.platform,
  150. mode=self.mode,
  151. rule_dict=self.rule_dict,
  152. env=self.env,
  153. item=mq_obj,
  154. trace_id=trace_id,
  155. )
  156. if pipeline.process_item():
  157. self.download_cnt += 1
  158. self.mq.send_msg(mq_obj)
  159. self.aliyun_log.logging(code="1002", message="成功发送至 ETL", data=mq_obj)
  160. if self.download_cnt >= int(
  161. self.rule_dict.get("videos_cnt", {}).get("min", 200)
  162. ):
  163. self.limit_flag = True
  164. def run(self):
  165. self.get_recommend_list()
  166. if __name__ == '__main__':
  167. J = DKYHYRecommend(
  168. platform="dakaiyinghaoyun",
  169. mode="recommend",
  170. rule_dict={},
  171. user_list=[{'uid': "123456", 'nick_name': "xiaoxiao"}],
  172. )
  173. J.get_recommend_list()
  174. # J.logic()