import json import os import sys import time import uuid import requests sys.path.append(os.getcwd()) from common.pipeline import PiaoQuanPipelineTest from common.mq import MQ from common.db import MysqlHelper def find_target_user(name, user_list): for obj in user_list: if obj["nickname"] == name: return obj else: continue return False class ShiPinHaoAccount: def __init__(self, platform, mode, rule_dict, user_dict, env): self.cookie = None self.token = None self.account_name = user_dict["link"] self.platform = platform self.mode = mode self.rule_dict = rule_dict self.user_dict = user_dict self.env = env self.download_cnt = 0 self.token_count = 0 self.mq = MQ(topic_name="topic_crawler_etl_" + self.env) def get_token_from_mysql(self): select_sql = f"""SELECT config from crawler_config where source = '{ self.platform }'; """ # print(select_sql) configs = MysqlHelper.get_values( log_type=self.mode, crawler=self.platform, sql=select_sql, env=self.env, machine="", ) token_config = configs[0][0] token_info = json.loads(token_config) self.token = token_info["token"] self.cookie = token_info["cookie"] def get_history_id(self): """ 从数据库表中读取 id """ select_user_sql = f"""select name_id from accounts where name = "{self.account_name}" and platform = "{self.platform}" and useful = 1 limit 1""" name_id = MysqlHelper.get_values( log_type=self.mode, crawler=self.platform, sql=select_user_sql, env=self.env, machine="", ) if name_id: # return False return name_id[0][0] else: return False def get_account_id(self): # 读历史数据,如果存在 id,则直接返回 id history_id = self.get_history_id() if history_id: return history_id else: self.get_token_from_mysql() print(self.token) print(self.cookie) url = "https://mp.weixin.qq.com/cgi-bin/videosnap" params = { "action": "search", "scene": "1", "buffer": "", "query": self.account_name, "count": "21", "token": self.token, "lang": "zh_CN", "f": "json", "ajax": "1", } headers = { "authority": "mp.weixin.qq.com", "accept": "*/*", "accept-language": "en,zh-CN;q=0.9,zh;q=0.8", "cookie": self.cookie, "referer": "https://mp.weixin.qq.com/cgi-bin/appmsg?t=media/appmsg_edit_v2&action=edit&isNew=1&type=77&createType=0&token={}&lang=zh_CN".format( self.token ), "user-agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/119.0.0.0 Safari/537.36", "x-requested-with": "XMLHttpRequest", } response = requests.request("GET", url, headers=headers, params=params) self.token_count += 1 user_list = response.json() # print(user_list) user_list = user_list["acct_list"] target_user = find_target_user(name=self.account_name, user_list=user_list) # 写入 MySql 数据库 if target_user: update_sql = f"""INSERT INTO accounts (name, name_id, platform) values ("{self.account_name}", "{target_user['username']}", "{self.platform}")""" # print(update_sql) MysqlHelper.update_values( log_type=self.mode, crawler=self.platform, sql=update_sql, env=self.env, machine="", ) return target_user["username"] else: return False def get_account_videos(self): # 一个账号最多抓 30 条数据 user_id = self.get_account_id() if user_id: print(user_id) url = "https://mp.weixin.qq.com/cgi-bin/videosnap" buffer = "" # 翻页指示器 while True: if self.download_cnt >= 30: return self.get_token_from_mysql() params = { "action": "get_feed_list", "username": user_id, "buffer": buffer, "count": "15", "scene": "1", "token": self.token, "lang": "zh_CN", "f": "json", "ajax": "1", } headers = { "authority": "mp.weixin.qq.com", "accept": "*/*", "accept-language": "en,zh-CN;q=0.9,zh;q=0.8", "cookie": self.cookie, "referer": "https://mp.weixin.qq.com/cgi-bin/appmsg?t=media/appmsg_edit_v2&action=edit&isNew=1&type=77&createType=0&token={}&lang=zh_CN".format( self.token ), "user-agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/119.0.0.0 Safari/537.36", "x-requested-with": "XMLHttpRequest", } response = requests.request("GET", url, headers=headers, params=params) self.token_count += 1 res_json = response.json() # 开始判断视频是否有信息,是否频控 if res_json["base_resp"]["err_msg"] == "invalid session": print( f"status_code:{response.status_code}, get_videoList:{response.text}\n" ) time.sleep(60 * 15) continue if res_json["base_resp"]["err_msg"] == "freq control": print( f"status_code:{response.status_code}, get_videoList:{response.text}\n" ) time.sleep(60 * 15) continue if not res_json.get("list"): print("没有更多视频了") return else: buffer = res_json["last_buff"] for obj in res_json["list"]: print("扫描到一条视频", self.token_count) try: print("扫描到一条视频") repeat_flag = self.process_video_obj(obj) if not repeat_flag: return except Exception as e: print(f"抓取单条视频异常:{e}\n") else: print("{}\t获取 id 失败".format(self.account_name)) def process_video_obj(self, video_obj): trace_id = self.platform + str(uuid.uuid1()) # print(json.dumps(video_obj, ensure_ascii=False, indent=4)) video_dict = { "video_id": video_obj["nonce_id"], "out_video_id": video_obj["nonce_id"], "video_title": video_obj["desc"], "publish_time_stamp": int(time.time()), "publish_time_str": time.strftime( "%Y-%m-%d %H:%M:%S", time.localtime(int(time.time())) ), "play_cnt": 0, "comment_cnt": 0, "like_cnt": 0, "share_cnt": 0, "user_id": self.user_dict["user_id"], "cover_url": video_obj["media"][0]["cover_url"], "video_url": video_obj["media"][0]["url"], "avatar_url": video_obj["head_url"], "width": video_obj["media"][0]["width"], "height": video_obj["media"][0]["height"], "duration": video_obj["media"][0]["video_play_len_s"], "platform": self.platform, "strategy": self.mode, "crawler_rule": self.rule_dict, "session": f"shipinhao-author-{int(time.time())}", } # 无更新时间,去重即可 pipeline = PiaoQuanPipelineTest( platform=self.platform, mode=self.mode, item=video_dict, rule_dict=self.rule_dict, env=self.env, trace_id=trace_id, ) if not pipeline.repeat_video(): return False else: video_dict["out_user_id"] = video_dict["user_id"] video_dict["user_id"] = self.user_dict["uid"] video_dict["publish_time"] = video_dict["publish_time_str"] print(video_dict) print("成功发送 MQ 至 ETL") # self.mq.send_msg(video_dict) self.download_cnt += 1 return True if __name__ == "__main__": SP = ShiPinHaoAccount( platform="shipinhao", mode="author", user_dict={"uid": "123456", "link": "树树读书1014", "user_id": "1234565"}, rule_dict={}, env="prod", ) SP.get_account_videos()