gongzhonghao_author.py 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313
  1. """
  2. @author: Curry Luo
  3. @file: gongzhonghao.py
  4. @time: 2024/01/05
  5. """
  6. import os
  7. import re
  8. import sys
  9. import html
  10. import json
  11. import time
  12. import uuid
  13. import random
  14. import requests
  15. import datetime
  16. sys.path.append(os.getcwd())
  17. from application.items import VideoItem
  18. from application.pipeline import PiaoQuanPipeline
  19. from application.common.messageQueue import MQ
  20. from application.common.proxies import tunnel_proxies
  21. from application.common.log import AliyunLogger
  22. from application.common.mysql import MysqlHelper
  23. from application.common.feishu import Feishu
  24. from application.functions.read_mysql_config import get_config_from_mysql
  25. def get_video_url(article_url):
  26. """
  27. :param article_url:
  28. :return:
  29. """
  30. # 替换为目标网页的 URL
  31. response = requests.get(article_url)
  32. html_text = response.text
  33. # 正则表达式提取
  34. w = re.search(
  35. r"mp_video_trans_info.*url:\s*\(\'(.*?)\'\)\.replace", html_text, re.S | re.M
  36. ).group(1)
  37. url = html.unescape(
  38. re.sub(
  39. r"\\x\d+", lambda x: bytes.fromhex(x.group().replace("\\x", "")).decode(), w
  40. )
  41. )
  42. return url
  43. class OfficialAccountAuthor(object):
  44. """
  45. 公众号账号爬虫,
  46. """
  47. def __init__(self, platform, mode, user_list, rule_dict, env="prod"):
  48. self.platform = platform
  49. self.mode = mode
  50. self.user_list = user_list
  51. self.rule_dict = rule_dict
  52. self.env = env
  53. self.mysql = MysqlHelper(mode=self.mode, platform=self)
  54. self.aliyun_log = AliyunLogger(self.platform, self.mode)
  55. self.mq = MQ(topic_name="topic_crawler_etl_" + self.env)
  56. def get_video_list(self, account_name):
  57. """
  58. 获取视频列表
  59. :return:
  60. todo: 修改一下获取 token 的逻辑,增加 token 的可用性
  61. """
  62. # 获取 token and cookie
  63. fake_id = self.fake_id_manage(account_name)
  64. begin = 0
  65. while True:
  66. token_dict = self.get_token(1)
  67. url = "https://mp.weixin.qq.com/cgi-bin/appmsg"
  68. headers = {
  69. "accept": "*/*",
  70. "accept-encoding": "gzip, deflate, br",
  71. "accept-language": "zh-CN,zh;q=0.9",
  72. "referer": "https://mp.weixin.qq.com/cgi-bin/appmsg?"
  73. "t=media/appmsg_edit_v2&action=edit&isNew=1"
  74. "&type=77&createType=5&token="
  75. + str(token_dict["token"])
  76. + "&lang=zh_CN",
  77. "sec-ch-ua": '" Not A;Brand";v="99", "Chromium";v="100", "Google Chrome";v="100"',
  78. "sec-ch-ua-mobile": "?0",
  79. "sec-ch-ua-platform": '"Windows"',
  80. "sec-fetch-dest": "empty",
  81. "sec-fetch-mode": "cors",
  82. "sec-fetch-site": "same-origin",
  83. "user-agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36"
  84. " (KHTML, like Gecko) Chrome/100.0.4896.127 Safari/537.36",
  85. "x-requested-with": "XMLHttpRequest",
  86. "cookie": token_dict["cookie"],
  87. }
  88. params = {
  89. "action": "list_ex",
  90. "begin": str(begin),
  91. "count": "5",
  92. "fakeid": fake_id,
  93. "type": "9",
  94. "query": "",
  95. "token": str(token_dict["token"]),
  96. "lang": "zh_CN",
  97. "f": "json",
  98. "ajax": "1",
  99. }
  100. response = requests.get(url=url, params=params, headers=headers)
  101. if response.status_code == 200:
  102. result = response.json()
  103. if result["base_resp"]["err_msg"] in [
  104. "invalid session",
  105. "freq control",
  106. ]:
  107. self.aliyun_log.logging(
  108. code="2000",
  109. message=f"status_code:{response.status_code}, get_fakeid:{response.text}\n",
  110. )
  111. if 20 >= datetime.datetime.now().hour >= 10:
  112. Feishu.bot(
  113. self.mode,
  114. self.platform,
  115. f"{token_dict['title']}\n操作人:{token_dict['operator']}\n更换日期:{token_dict['update_time']} \n过期啦,请扫码更换token\nhttps://mp.weixin.qq.com/",
  116. )
  117. time.sleep(60 * 15)
  118. continue
  119. if result["base_resp"]["err_msg"] == "ok" and len(result["list"]) == 0:
  120. print("No more data")
  121. if len(result["app_msg_list"]) == 0:
  122. self.aliyun_log.logging(
  123. code="2000",
  124. message="没有更多视频了\n",
  125. )
  126. return
  127. else:
  128. begin += 5
  129. app_msg_list = result["app_msg_list"]
  130. for article in app_msg_list:
  131. try:
  132. self.process_video(article, account_name, fake_id)
  133. except Exception as e:
  134. self.aliyun_log.logging(
  135. code="3000",
  136. message="代码报错, 报错信息是{}".format(e),
  137. data=article,
  138. account=account_name,
  139. )
  140. def process_video(self, article, account_name, fake_id):
  141. """
  142. 处理视频信息
  143. :param fake_id: 公众号唯一 id
  144. :param account_name: 公众号的名称
  145. :param article: 微信公众号的链接
  146. :return: None
  147. """
  148. trace_id = self.platform + str(uuid.uuid1())
  149. create_time = article.get("create_time", 0)
  150. update_time = article.get("update_time", 0)
  151. publish_time_stamp = int(create_time)
  152. update_time_stamp = int(update_time)
  153. publish_time_str = time.strftime(
  154. "%Y-%m-%d %H:%M:%S", time.localtime(publish_time_stamp)
  155. )
  156. article_url = article.get("link", "")
  157. video_dict = {
  158. "video_id": article.get("aid", ""),
  159. "video_title": article.get("title", "")
  160. .replace(" ", "")
  161. .replace('"', "")
  162. .replace("'", ""),
  163. "publish_time_stamp": publish_time_stamp,
  164. "publish_time_str": publish_time_str,
  165. "user_name": account_name,
  166. "play_cnt": 0,
  167. "comment_cnt": 0,
  168. "like_cnt": 0,
  169. "share_cnt": 0,
  170. "user_id": fake_id,
  171. "avatar_url": "",
  172. "cover_url": article.get("cover", ""),
  173. "article_url": article.get("link", ""),
  174. "session": f"gongzhonghao-author1-{int(time.time())}",
  175. }
  176. self.aliyun_log.logging(
  177. code="1001", message="扫描到一条视频", data=article, account=account_name
  178. )
  179. if (
  180. int(time.time()) - publish_time_stamp
  181. > 3600 * 24 * int(self.rule_dict.get("period", {}).get("max", 1000))
  182. ) and (
  183. int(time.time()) - update_time_stamp
  184. > 3600 * 24 * int(self.rule_dict.get("period", {}).get("max", 1000))
  185. ):
  186. self.aliyun_log.logging(
  187. code="2004",
  188. trace_id=trace_id,
  189. data=video_dict,
  190. message="发布时间超过{}天".format(
  191. int(self.rule_dict.get("period", {}).get("max", 1000))
  192. ),
  193. account=account_name,
  194. )
  195. return
  196. # 标题敏感词过滤
  197. elif (
  198. any(
  199. str(word) if str(word) in video_dict["video_title"] else False
  200. for word in get_config_from_mysql(
  201. log_type=self.mode,
  202. source=self.platform,
  203. env=self.env,
  204. text="filter",
  205. )
  206. )
  207. is True
  208. ):
  209. self.aliyun_log.logging(
  210. code="2003",
  211. trace_id=trace_id,
  212. data=video_dict,
  213. account=account_name,
  214. message="标题已中过滤词\n",
  215. )
  216. # 已下载判断
  217. elif (
  218. self.repeat_video(
  219. video_dict["video_id"],
  220. )
  221. != 0
  222. ):
  223. self.aliyun_log.logging(
  224. code="2002",
  225. trace_id=trace_id,
  226. data=video_dict,
  227. account=account_name,
  228. message="视频已下载",
  229. )
  230. else:
  231. video_dict["out_user_id"] = video_dict["user_id"]
  232. video_dict["platform"] = self.platform
  233. video_dict["strategy"] = self.mode
  234. video_dict["out_video_id"] = video_dict["video_id"]
  235. video_dict["width"] = 0
  236. video_dict["height"] = 0
  237. video_dict["crawler_rule"] = json.dumps(self.rule_dict)
  238. video_dict["user_id"] = fake_id # 站内 UID?爬虫获取不到了(随机发布到原 5 个账号中)
  239. video_dict["publish_time"] = video_dict["publish_time_str"]
  240. video_dict["video_url"] = get_video_url(article_url)
  241. self.mq.send_msg(video_dict)
  242. self.aliyun_log.logging(
  243. code="1002",
  244. trace_id=trace_id,
  245. data=video_dict,
  246. account=account_name,
  247. message="成功发送 MQ 至 ETL",
  248. )
  249. time.sleep(random.randint(1, 8))
  250. def repeat_video(self, video_id):
  251. """
  252. :param video_id: video_id
  253. :return:
  254. """
  255. sql = f""" select * from crawler_video where platform = "{self.platform}" and out_video_id="{video_id}" ; """
  256. repeat_video = self.mysql.select(sql)
  257. return len(repeat_video)
  258. def fake_id_manage(self, account_name):
  259. """
  260. 根据公众号的名字去查询 fake_id, 若 fake_id 存在,则返回,若不存在则插入
  261. account_name: 公众号的名字,user_dict['link']
  262. 获取fake_id
  263. :return:
  264. """
  265. select_sql = f"""select name, name_id from accounts where name = "{account_name}" and platform = "{self.platform}" and useful = 1 limit 1"""
  266. account_info = self.mysql.select(sql=select_sql)
  267. if account_info:
  268. name, name_id = account_info[0]
  269. return name_id
  270. else:
  271. user_info = self.get_user_fake_id(account_name)
  272. if user_info:
  273. fake_id = user_info["user_id"]
  274. insert_sql = f"""INSERT INTO accounts (name, name_id, platform, useful) values ("{account_name}", "{fake_id}", "{self.platform}", 1 )"""
  275. self.mysql.update(sql=insert_sql)
  276. return fake_id
  277. def get_token(self, token_index):
  278. """
  279. 获取 公众号的 token
  280. :param token_index:
  281. :return:
  282. """
  283. select_sql = f""" select * from crawler_config where source="{self.platform}" and title LIKE "%公众号_{token_index}%";"""
  284. configs = self.mysql.select(select_sql)
  285. if len(configs) == 0:
  286. Feishu.bot(self.mode, self.platform, f"公众号_{token_index}:未配置token")
  287. time.sleep(60)
  288. return None
  289. token_dict = {
  290. "token_id": configs[0]["id"],
  291. "title": configs[0]["title"].strip(),
  292. "token": dict(eval(configs[0]["config"]))["token"].strip(),
  293. "cookie": dict(eval(configs[0]["config"]))["cookie"].strip(),
  294. "update_time": time.strftime(
  295. "%Y-%m-%d %H:%M:%S",
  296. time.localtime(int(configs[0]["update_time"] / 1000)),
  297. ),
  298. "operator": configs[0]["operator"].strip(),
  299. }
  300. return token_dict