xiaoniangao_author_test.py 7.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194
  1. import json
  2. import os
  3. import random
  4. import sys
  5. import time
  6. import uuid
  7. import requests
  8. from common.mq import MQ
  9. sys.path.append(os.getcwd())
  10. from common.pipeline import PiaoQuanPipelineTest
  11. from common.public import get_config_from_mysql, clean_title
  12. def tunnel_proxies():
  13. # 隧道域名:端口号
  14. tunnel = "q796.kdltps.com:15818"
  15. # 用户名密码方式
  16. username = "t17772369458618"
  17. password = "5zqcjkmy"
  18. tunnel_proxies = {
  19. "http": "http://%(user)s:%(pwd)s@%(proxy)s/"
  20. % {"user": username, "pwd": password, "proxy": tunnel},
  21. "https": "http://%(user)s:%(pwd)s@%(proxy)s/"
  22. % {"user": username, "pwd": password, "proxy": tunnel},
  23. }
  24. return tunnel_proxies
  25. class XiaoNianGaoAuthor:
  26. def __init__(self, platform, mode, rule_dict, env, user_list):
  27. self.platform = platform
  28. self.mode = mode
  29. self.rule_dict = rule_dict
  30. self.env = env
  31. self.user_list = user_list
  32. self.mq = MQ(topic_name="topic_crawler_etl_" + self.env)
  33. self.download_count = 0
  34. def get_author_list(self):
  35. # 每轮只抓取定量的数据,到达数量后自己退出
  36. max_count = int(self.rule_dict.get("videos_cnt", {}).get("min", 300))
  37. for user_dict in self.user_list:
  38. if self.download_count <= max_count:
  39. self.get_video_list(user_dict)
  40. time.sleep(random.randint(1, 15))
  41. else:
  42. message = "本轮已经抓取足够数量的视频,已经自动退出"
  43. print(message)
  44. return
  45. def get_video_list(self, user_dict):
  46. next_t = -1
  47. # 只抓取更新的视频,如果刷到已经更新的立即退出
  48. url = "https://kapi-xng-app.xiaoniangao.cn/v1/album/user_public"
  49. headers = {
  50. "Host": "kapi-xng-app.xiaoniangao.cn",
  51. "content-type": "application/json; charset=utf-8",
  52. "accept": "*/*",
  53. "verb": "POST",
  54. "accept-language": "zh-cn",
  55. "date": "Wed, 01 Nov 2023 11:53:22 GMT",
  56. "x-token-id": "",
  57. "x-signaturemethod": "hmac-sha1",
  58. }
  59. while True:
  60. payload = {
  61. "token": "",
  62. "limit": 20,
  63. "start_t": next_t,
  64. "visited_mid": int(user_dict["link"]),
  65. "share_width": 300,
  66. "share_height": 240,
  67. }
  68. response = requests.request(
  69. "POST",
  70. url,
  71. headers=headers,
  72. data=json.dumps(payload),
  73. proxies=tunnel_proxies(),
  74. )
  75. if "data" not in response.text or response.status_code != 200:
  76. message = f"get_videoList:{response.text}"
  77. print(message)
  78. return
  79. elif "list" not in response.json()["data"]:
  80. message = f"get_videoList:{response.json()}"
  81. print(message)
  82. return
  83. elif len(response.json()["data"]["list"]) == 0:
  84. message = f"没有更多数据啦~"
  85. print(message)
  86. return
  87. else:
  88. next_t = response.json()["data"]["next_t"]
  89. feeds = response.json()["data"]["list"]
  90. for video_obj in feeds:
  91. try:
  92. message = f"扫描到一条视频"
  93. print(message)
  94. self.process_video_obj(video_obj, user_dict)
  95. except Exception as e:
  96. message = "抓取单条视频异常, 报错原因是: {}".format(e)
  97. print(message)
  98. def process_video_obj(self, video_obj, user_dict):
  99. trace_id = self.platform + str(uuid.uuid1())
  100. # 标题,表情随机加在片头、片尾,或替代句子中间的标点符号
  101. xiaoniangao_title = clean_title(video_obj.get("title", ""))
  102. # 随机取一个表情/符号
  103. emoji = random.choice(
  104. get_config_from_mysql(self.mode, self.platform, self.env, "emoji")
  105. )
  106. # 生成最终标题,标题list[表情+title, title+表情]随机取一个
  107. video_title = random.choice(
  108. [f"{emoji}{xiaoniangao_title}", f"{xiaoniangao_title}{emoji}"]
  109. )
  110. # 发布时间
  111. publish_time_stamp = int(int(video_obj.get("t", 0)) / 1000)
  112. publish_time_str = time.strftime(
  113. "%Y-%m-%d %H:%M:%S", time.localtime(publish_time_stamp)
  114. )
  115. # 用户名 / 头像
  116. user_name = (
  117. video_obj.get("user", {})
  118. .get("nick", "")
  119. .strip()
  120. .replace("\n", "")
  121. .replace("/", "")
  122. .replace(" ", "")
  123. .replace(" ", "")
  124. .replace("&NBSP", "")
  125. .replace("\r", "")
  126. )
  127. video_dict = {
  128. "video_title": video_title,
  129. "video_id": video_obj.get("vid", ""),
  130. "duration": int(video_obj.get("du", 0) / 1000),
  131. "play_cnt": video_obj.get("play_pv", 0),
  132. "like_cnt": video_obj.get("favor", {}).get("total", 0),
  133. "comment_cnt": video_obj.get("comment_count", 0),
  134. "share_cnt": video_obj.get("share", 0),
  135. "user_name": user_name,
  136. "publish_time_stamp": publish_time_stamp,
  137. "publish_time_str": publish_time_str,
  138. "update_time_stamp": int(time.time()),
  139. "video_width": int(video_obj.get("w", 0)),
  140. "video_height": int(video_obj.get("h", 0)),
  141. "avatar_url": video_obj.get("user", {}).get("hurl", ""),
  142. "profile_id": video_obj["id"],
  143. "profile_mid": video_obj.get("user", {}).get("mid", ""),
  144. "cover_url": video_obj.get("url", ""),
  145. "video_url": video_obj.get("v_url", ""),
  146. "session": f"xiaoniangao-author-{int(time.time())}",
  147. "out_user_id": video_obj["id"],
  148. "platform": self.platform,
  149. "strategy": self.mode,
  150. "out_video_id": video_obj.get("vid", ""),
  151. }
  152. print(video_dict)
  153. pipeline = PiaoQuanPipelineTest(
  154. platform=self.platform,
  155. mode=self.mode,
  156. rule_dict=self.rule_dict,
  157. env=self.env,
  158. item=video_dict,
  159. trace_id=trace_id,
  160. )
  161. flag = pipeline.process_item()
  162. if flag:
  163. video_dict["width"] = video_dict["video_width"]
  164. video_dict["height"] = video_dict["video_height"]
  165. video_dict["crawler_rule"] = json.dumps(self.rule_dict)
  166. video_dict["user_id"] = user_dict["uid"]
  167. video_dict["publish_time"] = video_dict["publish_time_str"]
  168. # print(video_dict)
  169. self.mq.send_msg(video_dict)
  170. self.download_count += 1
  171. message = "成功发送 MQ 至 ETL"
  172. print(message)
  173. if __name__ == "__main__":
  174. XNGA = XiaoNianGaoAuthor(
  175. platform="xiaoniangao",
  176. mode="author",
  177. rule_dict={},
  178. env="prod",
  179. user_list=[{"link": 295640510, "uid": "12334"}],
  180. )
  181. XNGA.get_author_list()