shipinhao_scheduling.py 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315
  1. import os
  2. import json
  3. import random
  4. import sys
  5. import datetime
  6. import time
  7. import uuid
  8. import requests
  9. sys.path.append(os.getcwd())
  10. from common import PiaoQuanPipeline, AliyunLogger
  11. from common.feishu import Feishu
  12. from common.db import MysqlHelper
  13. from common.mq import MQ
  14. from common.public import clean_title
  15. def find_target_user(name, user_list):
  16. for obj in user_list:
  17. if obj["nickname"] == name:
  18. return obj
  19. else:
  20. continue
  21. return False
  22. class ShiPinHaoAccount:
  23. def __init__(self, platform, mode, rule_dict, user_dict, env):
  24. # self.token = token
  25. # self.cookie = cookie
  26. self.account_name = user_dict["link"]
  27. self.platform = platform
  28. self.mode = mode
  29. self.rule_dict = rule_dict
  30. self.user_dict = user_dict
  31. self.env = env
  32. self.download_cnt = 0
  33. self.mq = MQ(topic_name="topic_crawler_etl_" + self.env)
  34. def get_token_from_mysql(self):
  35. select_sql = (
  36. f"""SELECT config from crawler_config where source = '{self.platform}'; """
  37. )
  38. # print(select_sql)
  39. configs = MysqlHelper.get_values(
  40. log_type=self.mode,
  41. crawler=self.platform,
  42. sql=select_sql,
  43. env=self.env,
  44. machine="",
  45. )
  46. token_config = configs[0][0]
  47. token_info = json.loads(token_config)
  48. self.token = token_info["token"]
  49. self.cookie = token_info["cookie"]
  50. def get_history_id(self):
  51. """
  52. 从数据库表中读取 id
  53. """
  54. select_user_sql = f"""select name_id from accounts where name = "{self.account_name}" and platform = "{self.platform}" and useful = 1 limit 1"""
  55. name_id = MysqlHelper.get_values(
  56. log_type=self.mode,
  57. crawler=self.platform,
  58. sql=select_user_sql,
  59. env=self.env,
  60. machine="",
  61. )
  62. if name_id:
  63. return name_id[0][0]
  64. else:
  65. return False
  66. def get_account_id(self):
  67. # 读历史数据,如果存在 id,则直接返回 id
  68. history_id = self.get_history_id()
  69. if history_id:
  70. return history_id
  71. else:
  72. url = "https://mp.weixin.qq.com/cgi-bin/videosnap"
  73. params = {
  74. "action": "search",
  75. "scene": "1",
  76. "buffer": "",
  77. "query": self.account_name,
  78. "count": "21",
  79. "token": self.token,
  80. "lang": "zh_CN",
  81. "f": "json",
  82. "ajax": "1",
  83. }
  84. headers = {
  85. "authority": "mp.weixin.qq.com",
  86. "accept": "*/*",
  87. "accept-language": "en,zh-CN;q=0.9,zh;q=0.8",
  88. "cookie": self.cookie,
  89. "referer": "https://mp.weixin.qq.com/cgi-bin/appmsg?t=media/appmsg_edit_v2&action=edit&isNew=1&type=77&createType=0&token={}&lang=zh_CN".format(
  90. self.token
  91. ),
  92. "user-agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/119.0.0.0 Safari/537.36",
  93. "x-requested-with": "XMLHttpRequest",
  94. }
  95. response = requests.request("GET", url, headers=headers, params=params)
  96. # try:
  97. # user_list = response.json()["acct_list"]
  98. # except:
  99. # if 20 >= datetime.datetime.now().hour >= 10:
  100. # Feishu.bot(
  101. # log_type=self.mode,
  102. # crawler=self.platform,
  103. # text="视频号Token 过期啦"
  104. # # text=f"{token_dict['title']}\n操作人:{token_dict['operator']}\n更换日期:{token_dict['update_time']} \n频控啦,请扫码更换其他公众号token\nhttps://mp.weixin.qq.com/"
  105. # )
  106. # # time.sleep(60 * 15)
  107. # # continue
  108. # return
  109. user_list = response.json()["acct_list"]
  110. target_user = find_target_user(name=self.account_name, user_list=user_list)
  111. # 写入 MySql 数据库
  112. if target_user:
  113. update_sql = f"""INSERT INTO accounts (name, name_id, platform, useful) values ("{self.account_name}", "{target_user['username']}", "{self.platform}", 1 )"""
  114. # print(update_sql)
  115. MysqlHelper.update_values(
  116. log_type=self.mode,
  117. crawler=self.platform,
  118. sql=update_sql,
  119. env=self.env,
  120. machine="",
  121. )
  122. return target_user["username"]
  123. else:
  124. return False
  125. def get_account_videos(self):
  126. # 一个账号最多抓取 30 条数据
  127. self.get_token_from_mysql()
  128. user_id = self.get_account_id()
  129. if user_id:
  130. url = "https://mp.weixin.qq.com/cgi-bin/videosnap"
  131. headers = {
  132. "authority": "mp.weixin.qq.com",
  133. "accept": "*/*",
  134. "accept-language": "en,zh-CN;q=0.9,zh;q=0.8",
  135. "cookie": self.cookie,
  136. "referer": "https://mp.weixin.qq.com/cgi-bin/appmsg?t=media/appmsg_edit_v2&action=edit&isNew=1&type=77&createType=0&token={}&lang=zh_CN".format(
  137. self.token
  138. ),
  139. "user-agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/119.0.0.0 Safari/537.36",
  140. "x-requested-with": "XMLHttpRequest",
  141. }
  142. buffer = "" # 翻页指示器
  143. while True:
  144. if self.download_cnt >= int(
  145. self.rule_dict.get("videos_cnt", {}).get("min", 30)
  146. ):
  147. return
  148. params = {
  149. "action": "get_feed_list",
  150. "username": user_id,
  151. "buffer": buffer,
  152. "count": "15",
  153. "scene": "1",
  154. "token": self.token,
  155. "lang": "zh_CN",
  156. "f": "json",
  157. "ajax": "1",
  158. }
  159. response = requests.request("GET", url, headers=headers, params=params)
  160. time.sleep(random.randint(10, 30))
  161. res_json = response.json()
  162. # 开始判断视频是否有信息,是否频控
  163. if res_json["base_resp"]["err_msg"] == "invalid session":
  164. AliyunLogger.logging(
  165. code="2000",
  166. platform=self.platform,
  167. mode=self.mode,
  168. env=self.env,
  169. message=f"status_code:{response.status_code}, get_videoList:{response.text}\n",
  170. )
  171. if 20 >= datetime.datetime.now().hour >= 10:
  172. Feishu.bot(
  173. log_type=self.mode,
  174. crawler=self.platform,
  175. text="视频号Token 过期啦"
  176. # text=f"{token_dict['title']}\n操作人:{token_dict['operator']}\n更换日期:{token_dict['update_time']} \n频控啦,请扫码更换其他公众号token\nhttps://mp.weixin.qq.com/"
  177. )
  178. time.sleep(60 * 15)
  179. continue
  180. if res_json["base_resp"]["err_msg"] == "freq control":
  181. AliyunLogger.logging(
  182. code="2000",
  183. platform=self.platform,
  184. mode=self.mode,
  185. env=self.env,
  186. message=f"status_code:{response.status_code}, get_videoList:{response.text}\n",
  187. )
  188. if 20 >= datetime.datetime.now().hour >= 10:
  189. Feishu.bot(
  190. log_type=self.mode,
  191. crawler=self.platform,
  192. text="视频号Token 过期啦"
  193. # text=f"{token_dict['title']}\n操作人:{token_dict['operator']}\n更换日期:{token_dict['update_time']} \n频控啦,请扫码更换其他公众号token\nhttps://mp.weixin.qq.com/"
  194. )
  195. time.sleep(60 * 15)
  196. continue
  197. if not res_json.get("list"):
  198. AliyunLogger.logging(
  199. code="2000",
  200. platform=self.platform,
  201. mode=self.mode,
  202. env=self.env,
  203. message="没有更多视频了",
  204. )
  205. return
  206. else:
  207. buffer = res_json["last_buff"]
  208. for obj in res_json["list"]:
  209. try:
  210. AliyunLogger.logging(
  211. code="1001",
  212. platform=self.platform,
  213. mode=self.mode,
  214. message="扫描到一条视频",
  215. env=self.env,
  216. data=obj,
  217. )
  218. repeat_flag = self.process_video_obj(obj)
  219. if not repeat_flag:
  220. return
  221. except Exception as e:
  222. AliyunLogger.logging(
  223. code="3000",
  224. platform=self.platform,
  225. mode=self.mode,
  226. env=self.env,
  227. message=f"抓取单条视频异常:{e}\n",
  228. )
  229. else:
  230. AliyunLogger.logging(
  231. code="3000",
  232. platform=self.platform,
  233. mode=self.mode,
  234. env=self.env,
  235. message="{}\t获取 id 失败".format(self.account_name),
  236. )
  237. def process_video_obj(self, video_obj):
  238. trace_id = self.platform + str(uuid.uuid1())
  239. video_dict = {
  240. "video_id": video_obj["nonce_id"],
  241. "video_title": clean_title(video_obj["desc"].split("\n")[0].split("#")[0]),
  242. "out_video_id": video_obj["nonce_id"],
  243. "publish_time_stamp": int(time.time()),
  244. "publish_time_str": time.strftime(
  245. "%Y-%m-%d %H:%M:%S", time.localtime(int(time.time()))
  246. ),
  247. "play_cnt": 0,
  248. "comment_cnt": 0,
  249. "like_cnt": 0,
  250. "share_cnt": 0,
  251. "user_id": self.user_dict["uid"],
  252. "cover_url": video_obj["media"][0]["cover_url"] if video_obj['media'][0]['cover_url'] else video_obj['media'][0]['thumb_url'],
  253. "video_url": video_obj["media"][0]["url"],
  254. "avatar_url": video_obj["head_url"],
  255. "width": video_obj["media"][0]["width"],
  256. "height": video_obj["media"][0]["height"],
  257. "duration": video_obj["media"][0]["video_play_len_s"],
  258. "platform": self.platform,
  259. "strategy": self.mode,
  260. "crawler_rule": self.rule_dict,
  261. "session": f"shipinhao-author-{int(time.time())}",
  262. }
  263. # video_dict["out_user_id"] = video_dict["user_id"]
  264. # 无更新时间,去重即可
  265. pipeline = PiaoQuanPipeline(
  266. platform=self.platform,
  267. mode=self.mode,
  268. item=video_dict,
  269. rule_dict=self.rule_dict,
  270. env=self.env,
  271. trace_id=trace_id,
  272. )
  273. if not pipeline.repeat_video():
  274. return False
  275. else:
  276. video_dict["publish_time"] = video_dict["publish_time_str"]
  277. self.mq.send_msg(video_dict)
  278. self.download_cnt += 1
  279. AliyunLogger.logging(
  280. code="1002",
  281. platform=self.platform,
  282. mode=self.mode,
  283. env=self.env,
  284. data=video_dict,
  285. trace_id=trace_id,
  286. message="成功发送 MQ 至 ETL",
  287. )
  288. return True
  289. # if __name__ == "__main__":
  290. # temp_token = "2080949641"
  291. # temp_cookie = "ua_id=bw4VuFJr6fAuSkwdAAAAAClaW0m9Aua-6IfHaXU_zpo=; wxuin=95302180931488; mm_lang=zh_CN; RK=kreEMgtMMJ; ptcz=8fd1b267c98a1185bbe6455a081f1264048ee388363ca305d9ef4812892c7900; qq_domain_video_guid_verify=2ba78a5010233582; poc_sid=HOinP2Wj322Ex737kV651Zqy6y8fSprOUUvaegBg; _qimei_q36=; _qimei_h38=9eea33ea92afe8a922333fce03000001317916; pgv_pvid=9056371236; _clck=3930572231|1|fgk|0; uuid=6562bbd8859230ce4120dfa063c76997; rand_info=CAESIGAatjSIjvxVJVDxRDN7F/CNFWMifvAVqje98rd++8UY; slave_bizuin=3236647229; data_bizuin=3236647229; bizuin=3236647229; data_ticket=qm3i6jRhObs1yKHttGh0gVI02Mz7FTPfatn0RMLdaWyD7Ukcokm5Dc3mmYLQUZPg; slave_sid=UWxjZnhBREZRRTNKZ3dYZTlYRE9Db2lxQUhOM3lZUlRoMkV0MG1wdVVudGpQTWxnVkxzYW5pV2c3NjB3bnAyQ2lPaXBBVVRPazEybWtKSVEzTnUyazZ6WEJsdnFaWWVDaUFrM3pTTXRkeUNJS3RNVTc2NFRBWkZiVGQzYllacEFRalBBZ2tXZlltblJYS2VS; slave_user=gh_d284c09295eb; xid=cb96e6ba4b4960d74a22869b1bb21406; _clsk=z77guf|1699532621466|4|1|mp.weixin.qq.com/weheat-agent/payload/record"
  292. # SP = ShiPinHaoAccount(
  293. # token=temp_token,
  294. # cookie=temp_cookie,
  295. # account_name="心煤",
  296. # platform="shipinhao",
  297. # mode="author",
  298. # rule_dict={},
  299. # env="prod",
  300. # )
  301. # SP.get_account_videos()