xiaoniangao_author_test.py 8.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220
  1. import json
  2. import os
  3. import random
  4. import sys
  5. import time
  6. import uuid
  7. import requests
  8. from common.mq import MQ
  9. sys.path.append(os.getcwd())
  10. from common.pipeline import PiaoQuanPipelineTest
  11. from common.public import get_config_from_mysql, clean_title
  12. from common.scheduling_db import MysqlHelper
  13. def tunnel_proxies():
  14. # 隧道域名:端口号
  15. tunnel = "q796.kdltps.com:15818"
  16. # 用户名密码方式
  17. username = "t17772369458618"
  18. password = "5zqcjkmy"
  19. tunnel_proxies = {
  20. "http": "http://%(user)s:%(pwd)s@%(proxy)s/"
  21. % {"user": username, "pwd": password, "proxy": tunnel},
  22. "https": "http://%(user)s:%(pwd)s@%(proxy)s/"
  23. % {"user": username, "pwd": password, "proxy": tunnel},
  24. }
  25. return tunnel_proxies
  26. class XiaoNianGaoAuthor:
  27. def __init__(self, platform, mode, rule_dict, env, user_list):
  28. self.platform = platform
  29. self.mode = mode
  30. self.rule_dict = rule_dict
  31. self.env = env
  32. self.user_list = user_list
  33. self.mq = MQ(topic_name="topic_crawler_etl_" + self.env)
  34. self.download_count = 0
  35. def get_author_list(self):
  36. # 每轮只抓取定量的数据,到达数量后自己退出
  37. max_count = int(self.rule_dict.get("videos_cnt", {}).get("min", 300))
  38. for user_dict in self.user_list:
  39. print(user_dict)
  40. account_level = user_dict['account_level']
  41. if account_level and account_level != "P3":
  42. if self.download_count <= max_count:
  43. self.get_video_list(user_dict)
  44. time.sleep(random.randint(1, 15))
  45. else:
  46. message = "本轮已经抓取足够数量的视频,已经自动退出"
  47. print(message)
  48. return
  49. def get_video_list(self, user_dict):
  50. next_t = -1
  51. # 只抓取更新的视频,如果刷到已经更新的立即退出
  52. url = "https://kapi-xng-app.xiaoniangao.cn/v1/album/user_public"
  53. headers = {
  54. "Host": "kapi-xng-app.xiaoniangao.cn",
  55. "content-type": "application/json; charset=utf-8",
  56. "accept": "*/*",
  57. "verb": "POST",
  58. "accept-language": "zh-cn",
  59. "date": "Wed, 01 Nov 2023 11:53:22 GMT",
  60. "x-token-id": "",
  61. "x-signaturemethod": "hmac-sha1",
  62. }
  63. while True:
  64. payload = {
  65. "token": "",
  66. "limit": 20,
  67. "start_t": next_t,
  68. "visited_mid": int(user_dict["link"]),
  69. "share_width": 300,
  70. "share_height": 240,
  71. }
  72. response = requests.request(
  73. "POST",
  74. url,
  75. headers=headers,
  76. data=json.dumps(payload),
  77. proxies=tunnel_proxies(),
  78. )
  79. if "data" not in response.text or response.status_code != 200:
  80. message = f"get_videoList:{response.text}"
  81. print(message)
  82. return
  83. elif "list" not in response.json()["data"]:
  84. message = f"get_videoList:{response.json()}"
  85. print(message)
  86. return
  87. elif len(response.json()["data"]["list"]) == 0:
  88. message = f"没有更多数据啦~"
  89. print(message)
  90. return
  91. else:
  92. next_t = response.json()["data"]["next_t"]
  93. feeds = response.json()["data"]["list"]
  94. for video_obj in feeds:
  95. try:
  96. message = f"扫描到一条视频"
  97. print(message)
  98. flag = self.process_video_obj(video_obj, user_dict)
  99. if not flag:
  100. return
  101. except Exception as e:
  102. message = "抓取单条视频异常, 报错原因是: {}".format(e)
  103. print(message)
  104. def process_video_obj(self, video_obj, user_dict):
  105. trace_id = self.platform + str(uuid.uuid1())
  106. # 标题,表情随机加在片头、片尾,或替代句子中间的标点符号
  107. xiaoniangao_title = clean_title(video_obj.get("title", ""))
  108. # 随机取一个表情/符号
  109. emoji = random.choice(
  110. get_config_from_mysql(self.mode, self.platform, self.env, "emoji")
  111. )
  112. # 生成最终标题,标题list[表情+title, title+表情]随机取一个
  113. video_title = random.choice(
  114. [f"{emoji}{xiaoniangao_title}", f"{xiaoniangao_title}{emoji}"]
  115. )
  116. # 发布时间
  117. publish_time_stamp = int(int(video_obj.get("t", 0)) / 1000)
  118. publish_time_str = time.strftime(
  119. "%Y-%m-%d %H:%M:%S", time.localtime(publish_time_stamp)
  120. )
  121. # 用户名 / 头像
  122. user_name = (
  123. video_obj.get("user", {})
  124. .get("nick", "")
  125. .strip()
  126. .replace("\n", "")
  127. .replace("/", "")
  128. .replace(" ", "")
  129. .replace(" ", "")
  130. .replace("&NBSP", "")
  131. .replace("\r", "")
  132. )
  133. video_dict = {
  134. "video_title": video_title,
  135. "video_id": video_obj.get("vid", ""),
  136. "duration": int(video_obj.get("du", 0) / 1000),
  137. "play_cnt": video_obj.get("play_pv", 0),
  138. "like_cnt": video_obj.get("favor", {}).get("total", 0),
  139. "comment_cnt": video_obj.get("comment_count", 0),
  140. "share_cnt": video_obj.get("share", 0),
  141. "user_name": user_name,
  142. "publish_time_stamp": publish_time_stamp,
  143. "publish_time_str": publish_time_str,
  144. "update_time_stamp": int(time.time()),
  145. "video_width": int(video_obj.get("w", 0)),
  146. "video_height": int(video_obj.get("h", 0)),
  147. "avatar_url": video_obj.get("user", {}).get("hurl", ""),
  148. "profile_id": video_obj["id"],
  149. "profile_mid": video_obj.get("user", {}).get("mid", ""),
  150. "cover_url": video_obj.get("url", ""),
  151. "video_url": video_obj.get("v_url", ""),
  152. "session": f"xiaoniangao-author-{int(time.time())}",
  153. "out_user_id": video_obj["id"],
  154. "platform": self.platform,
  155. "strategy": self.mode,
  156. "out_video_id": video_obj.get("vid", ""),
  157. }
  158. if (
  159. int(time.time()) - publish_time_stamp
  160. > 3600 * 24 * int(self.rule_dict.get("period", {}).get("max", 1000))
  161. ):
  162. message = "发布时间超过{}天".format(
  163. int(self.rule_dict.get("period", {}).get("max", 1000))
  164. )
  165. print(message)
  166. return False
  167. pipeline = PiaoQuanPipelineTest(
  168. platform=self.platform,
  169. mode=self.mode,
  170. rule_dict=self.rule_dict,
  171. env=self.env,
  172. item=video_dict,
  173. trace_id=trace_id,
  174. )
  175. account_level = user_dict['account_level']
  176. if account_level == "P0" or account_level == "P1":
  177. flag = True
  178. else:
  179. flag = pipeline.process_item()
  180. # flag = pipeline.process_item()
  181. if flag:
  182. video_dict["width"] = video_dict["video_width"]
  183. video_dict["height"] = video_dict["video_height"]
  184. video_dict["crawler_rule"] = json.dumps(self.rule_dict)
  185. video_dict["user_id"] = user_dict["uid"]
  186. video_dict["publish_time"] = video_dict["publish_time_str"]
  187. print(video_dict)
  188. # self.mq.send_msg(video_dict)
  189. self.download_count += 1
  190. message = "成功发送 MQ 至 ETL"
  191. print(message)
  192. return True
  193. if __name__ == "__main__":
  194. select_user_sql = (
  195. f"""select * from crawler_user_v3 where task_id=21"""
  196. )
  197. user_list = MysqlHelper.get_values(
  198. "author", "xiaoniangao", select_user_sql, "prod", ""
  199. )
  200. XNGA = XiaoNianGaoAuthor(
  201. platform="xiaoniangao",
  202. mode="author",
  203. rule_dict={},
  204. env="prod",
  205. user_list=user_list
  206. )
  207. XNGA.get_author_list()