import os import json import random import sys import datetime import time import uuid import requests sys.path.append(os.getcwd()) from common import PiaoQuanPipeline, AliyunLogger from common.feishu import Feishu from common.db import MysqlHelper from common.mq import MQ from common.public import clean_title def find_target_user(name, user_list): """ 在搜索到到账号列表中找目标列表 """ for obj in user_list: if obj["nickname"] == name: return obj else: continue return False class ShiPinHaoAccount(object): """ 视频号账号爬虫 """ def __init__(self, platform, mode, rule_dict, user_dict, env): self.cookie = None self.token = None self.account_name = user_dict["link"] self.platform = platform self.mode = mode self.rule_dict = rule_dict self.user_dict = user_dict self.env = env self.download_cnt = 0 self.mq = MQ(topic_name="topic_crawler_etl_" + self.env) def get_token_from_mysql(self): """ 从mysql中读取token和cookie """ select_sql = ( f"""SELECT config from crawler_config where source = '{self.platform}'; """ ) # print(select_sql) configs = MysqlHelper.get_values( log_type=self.mode, crawler=self.platform, sql=select_sql, env=self.env, machine="", ) token_config = configs[0][0] token_info = json.loads(token_config) self.token = token_info["token"] self.cookie = token_info["cookie"] def get_history_id(self): """ 从数据库表中读取 id """ select_user_sql = f"""select name_id from accounts where name = "{self.account_name}" and platform = "{self.platform}" and useful = 1 limit 1""" name_id = MysqlHelper.get_values( log_type=self.mode, crawler=self.platform, sql=select_user_sql, env=self.env, machine="", ) if name_id: return name_id[0][0] else: return False def get_account_id(self): """ 读历史数据,如果存在 id,则直接返回 id """ history_id = self.get_history_id() if history_id: return history_id else: url = "https://mp.weixin.qq.com/cgi-bin/videosnap" self.get_token_from_mysql() params = { "action": "search", "scene": "1", "buffer": "", "query": self.account_name, "count": "21", "token": self.token, "lang": "zh_CN", "f": "json", "ajax": "1", } headers = { "authority": "mp.weixin.qq.com", "accept": "*/*", "accept-language": "en,zh-CN;q=0.9,zh;q=0.8", "cookie": self.cookie, "referer": "https://mp.weixin.qq.com/cgi-bin/appmsg?t=media/appmsg_edit_v2&action=edit&isNew=1&type=77&createType=0&token={}&lang=zh_CN".format( self.token ), "user-agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/119.0.0.0 Safari/537.36", "x-requested-with": "XMLHttpRequest", } response = requests.request("GET", url, headers=headers, params=params) user_list = response.json()["acct_list"] target_user = find_target_user(name=self.account_name, user_list=user_list) # 写入 MySql 数据库 if target_user: update_sql = f"""INSERT INTO accounts (name, name_id, platform, useful) values ("{self.account_name}", "{target_user['username']}", "{self.platform}", 1 )""" # print(update_sql) MysqlHelper.update_values( log_type=self.mode, crawler=self.platform, sql=update_sql, env=self.env, machine="", ) return target_user["username"] else: return False def get_account_videos(self): """ # 一个账号最多抓取 30 条数据 """ user_id = self.get_account_id() if user_id: url = "https://mp.weixin.qq.com/cgi-bin/videosnap" buffer = "" # 翻页指示器 while True: if self.download_cnt >= int( self.rule_dict.get("videos_cnt", {}).get("min", 30) ): return self.get_token_from_mysql() headers = { "authority": "mp.weixin.qq.com", "accept": "*/*", "accept-language": "en,zh-CN;q=0.9,zh;q=0.8", "cookie": self.cookie, "referer": "https://mp.weixin.qq.com/cgi-bin/appmsg?t=media/appmsg_edit_v2&action=edit&isNew=1&type=77&createType=0&token={}&lang=zh_CN".format( self.token ), "user-agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/119.0.0.0 Safari/537.36", "x-requested-with": "XMLHttpRequest", } params = { "action": "get_feed_list", "username": user_id, "buffer": buffer, "count": "15", "scene": "1", "token": self.token, "lang": "zh_CN", "f": "json", "ajax": "1", } response = requests.request("GET", url, headers=headers, params=params) time.sleep(random.randint(10, 30)) res_json = response.json() # 开始判断视频是否有信息,是否频控 if res_json["base_resp"]["err_msg"] == "invalid session": AliyunLogger.logging( code="2000", platform=self.platform, mode=self.mode, env=self.env, message=f"status_code:{response.status_code}, get_videoList:{response.text}\n", ) if 20 >= datetime.datetime.now().hour >= 10: Feishu.bot( log_type=self.mode, crawler=self.platform, text="视频号Token 过期啦" # text=f"{token_dict['title']}\n操作人:{token_dict['operator']}\n更换日期:{token_dict['update_time']} \n频控啦,请扫码更换其他公众号token\nhttps://mp.weixin.qq.com/" ) time.sleep(60 * 15) continue if res_json["base_resp"]["err_msg"] == "freq control": AliyunLogger.logging( code="2000", platform=self.platform, mode=self.mode, env=self.env, message=f"status_code:{response.status_code}, get_videoList:{response.text}\n", ) if 20 >= datetime.datetime.now().hour >= 10: Feishu.bot( log_type=self.mode, crawler=self.platform, text="视频号Token 过期啦" # text=f"{token_dict['title']}\n操作人:{token_dict['operator']}\n更换日期:{token_dict['update_time']} \n频控啦,请扫码更换其他公众号token\nhttps://mp.weixin.qq.com/" ) time.sleep(60 * 15) continue if not res_json.get("list"): AliyunLogger.logging( code="2000", platform=self.platform, mode=self.mode, env=self.env, message="没有更多视频了", ) return else: buffer = res_json["last_buff"] for obj in res_json["list"]: try: AliyunLogger.logging( code="1001", platform=self.platform, mode=self.mode, message="扫描到一条视频", env=self.env, data=obj, ) repeat_flag = self.process_video_obj(obj) if not repeat_flag: return except Exception as e: AliyunLogger.logging( code="3000", platform=self.platform, mode=self.mode, env=self.env, message=f"抓取单条视频异常:{e}\n", ) else: AliyunLogger.logging( code="3000", platform=self.platform, mode=self.mode, env=self.env, message="{}\t获取 id 失败".format(self.account_name), ) def process_video_obj(self, video_obj): trace_id = self.platform + str(uuid.uuid1()) video_dict = { "video_id": video_obj["nonce_id"], "video_title": clean_title(video_obj["desc"].split("\n")[0].split("#")[0]), "out_video_id": video_obj["nonce_id"], "publish_time_stamp": int(time.time()), "publish_time_str": time.strftime( "%Y-%m-%d %H:%M:%S", time.localtime(int(time.time())) ), "play_cnt": 0, "comment_cnt": 0, "like_cnt": 0, "share_cnt": 0, "user_id": self.user_dict["uid"], "cover_url": video_obj["media"][0]["cover_url"] if video_obj['media'][0]['cover_url'] else video_obj['media'][0]['thumb_url'], "video_url": video_obj["media"][0]["url"], "avatar_url": video_obj["head_url"], "width": video_obj["media"][0]["width"], "height": video_obj["media"][0]["height"], "duration": video_obj["media"][0]["video_play_len_s"], "platform": self.platform, "strategy": self.mode, "crawler_rule": self.rule_dict, "session": f"shipinhao-author-{int(time.time())}", } # video_dict["out_user_id"] = video_dict["user_id"] # 无更新时间,去重即可 pipeline = PiaoQuanPipeline( platform=self.platform, mode=self.mode, item=video_dict, rule_dict=self.rule_dict, env=self.env, trace_id=trace_id, ) if not pipeline.repeat_video(): return False else: video_dict["publish_time"] = video_dict["publish_time_str"] self.mq.send_msg(video_dict) self.download_cnt += 1 AliyunLogger.logging( code="1002", platform=self.platform, mode=self.mode, env=self.env, data=video_dict, trace_id=trace_id, message="成功发送 MQ 至 ETL", ) return True # if __name__ == "__main__": # temp_token = "2080949641" # temp_cookie = "ua_id=bw4VuFJr6fAuSkwdAAAAAClaW0m9Aua-6IfHaXU_zpo=; wxuin=95302180931488; mm_lang=zh_CN; RK=kreEMgtMMJ; ptcz=8fd1b267c98a1185bbe6455a081f1264048ee388363ca305d9ef4812892c7900; qq_domain_video_guid_verify=2ba78a5010233582; poc_sid=HOinP2Wj322Ex737kV651Zqy6y8fSprOUUvaegBg; _qimei_q36=; _qimei_h38=9eea33ea92afe8a922333fce03000001317916; pgv_pvid=9056371236; _clck=3930572231|1|fgk|0; uuid=6562bbd8859230ce4120dfa063c76997; rand_info=CAESIGAatjSIjvxVJVDxRDN7F/CNFWMifvAVqje98rd++8UY; slave_bizuin=3236647229; data_bizuin=3236647229; bizuin=3236647229; data_ticket=qm3i6jRhObs1yKHttGh0gVI02Mz7FTPfatn0RMLdaWyD7Ukcokm5Dc3mmYLQUZPg; slave_sid=UWxjZnhBREZRRTNKZ3dYZTlYRE9Db2lxQUhOM3lZUlRoMkV0MG1wdVVudGpQTWxnVkxzYW5pV2c3NjB3bnAyQ2lPaXBBVVRPazEybWtKSVEzTnUyazZ6WEJsdnFaWWVDaUFrM3pTTXRkeUNJS3RNVTc2NFRBWkZiVGQzYllacEFRalBBZ2tXZlltblJYS2VS; slave_user=gh_d284c09295eb; xid=cb96e6ba4b4960d74a22869b1bb21406; _clsk=z77guf|1699532621466|4|1|mp.weixin.qq.com/weheat-agent/payload/record" # SP = ShiPinHaoAccount( # token=temp_token, # cookie=temp_cookie, # account_name="心煤", # platform="shipinhao", # mode="author", # rule_dict={}, # env="prod", # ) # SP.get_account_videos()