import json import os import sys import time import uuid import requests sys.path.append(os.getcwd()) from common.pipeline import PiaoQuanPipelineTest from common.mq import MQ from common.db import MysqlHelper def find_target_user(name, user_list): for obj in user_list: if obj["nickname"] == name: return obj else: continue return False class ShiPinHaoAccount: def __init__(self, platform, mode, rule_dict, user_dict, env): self.cookie = None self.token = None self.account_name = user_dict["link"] self.platform = platform self.mode = mode self.rule_dict = rule_dict self.user_dict = user_dict self.env = env self.download_cnt = 0 self.token_count = 0 self.mq = MQ(topic_name="topic_crawler_etl_" + self.env) def get_token_from_mysql(self): # select_sql = f"""SELECT config from crawler_config where source = '{ self.platform }'; """ # # print(select_sql) # configs = MysqlHelper.get_values( # log_type=self.mode, # crawler=self.platform, # sql=select_sql, # env=self.env, # machine="", # ) # print(configs) # token_config = configs[0][0] # token_info = json.loads(token_config) # self.token = token_info["token"] # self.cookie = token_info["cookie"] self.token = "766484754" self.cookie = "ua_id=bw4VuFJr6fAuSkwdAAAAAClaW0m9Aua-6IfHaXU_zpo=; wxuin=95302180931488; mm_lang=zh_CN; RK=kreEMgtMMJ; ptcz=8fd1b267c98a1185bbe6455a081f1264048ee388363ca305d9ef4812892c7900; qq_domain_video_guid_verify=2ba78a5010233582; poc_sid=HOinP2Wj322Ex737kV651Zqy6y8fSprOUUvaegBg; _qimei_q36=; _qimei_h38=9eea33ea92afe8a922333fce03000001317916; pgv_pvid=9056371236; _clck=3524986952|1|fgp|0; uuid=a76c16bf749aaf6418aa610ad5c6e66c; rand_info=CAESIDhWIfyhucI9xQkQm/2xYzaHtaGjRUbHeNKgSt4b382C; slave_bizuin=3930572231; data_bizuin=3930572231; bizuin=3930572231; data_ticket=k3o3TmbxDq450TMRpBL2zW+f1onbHFg7G4/9iLi/jlp1zyWQtmpjxFouT+/kRE1e; slave_sid=TndTREg5TW9MaFUxRllkaVFacXh6bVhFSEhpSEVRNUc2RWtBbnJRZmdxZzNxaUpOc29oRGJ1RjhFZm9jNXZ3Q1JzUzN3elFDYlVjZTEyN1YyWm9nOGhsUW9sNTFEUEtDRmo1Z0hzZjA1ZjhibXg0YzVrOE91N3ZOZWVqT3UxT0FSN3lsNG9SNTNNdEE2VWNC; slave_user=gh_deef7ad59a83; xid=9bd5b038d83164cbfa24bcf224bc9172; _clsk=bqf6jh|1699929305392|6|1|mp.weixin.qq.com/weheat-agent/payload/record" print(self.token) print(self.cookie) def get_history_id(self): """ 从数据库表中读取 id """ select_user_sql = f"""select name_id from accounts where name = "{self.account_name}" and platform = "{self.platform}" and useful = 1 limit 1""" name_id = MysqlHelper.get_values( log_type=self.mode, crawler=self.platform, sql=select_user_sql, env=self.env, machine="", ) print(name_id) if name_id: return name_id[0] else: return False def get_account_id(self): # 读历史数据,如果存在 id,则直接返回 id history_id = self.get_history_id() if history_id: return history_id else: url = "https://mp.weixin.qq.com/cgi-bin/videosnap" params = { "action": "search", "scene": "1", "buffer": "", "query": self.account_name, "count": "21", "token": self.token, "lang": "zh_CN", "f": "json", "ajax": "1", } headers = { "authority": "mp.weixin.qq.com", "accept": "*/*", "accept-language": "en,zh-CN;q=0.9,zh;q=0.8", "cookie": self.cookie, "referer": "https://mp.weixin.qq.com/cgi-bin/appmsg?t=media/appmsg_edit_v2&action=edit&isNew=1&type=77&createType=0&token={}&lang=zh_CN".format( self.token ), "user-agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/119.0.0.0 Safari/537.36", "x-requested-with": "XMLHttpRequest", } response = requests.request("GET", url, headers=headers, params=params) self.token_count += 1 user_list = response.json() print(user_list) user_list = user_list["acct_list"] target_user = find_target_user(name=self.account_name, user_list=user_list) # 写入 MySql 数据库 if target_user: update_sql = f"""INSERT INTO accounts (name, name_id, platform) values ("{self.account_name}", "{target_user['username']}", "{self.platform}")""" # print(update_sql) MysqlHelper.update_values( log_type=self.mode, crawler=self.platform, sql=update_sql, env=self.env, machine="", ) return target_user["username"] else: return False def get_account_videos(self): # 一个账号最多抓 30 条数据 self.get_token_from_mysql() user_id = self.get_account_id() print("ljh", user_id) print(type(user_id)) if user_id: url = "https://mp.weixin.qq.com/cgi-bin/videosnap" headers = { "authority": "mp.weixin.qq.com", "accept": "*/*", "accept-language": "en,zh-CN;q=0.9,zh;q=0.8", "cookie": self.cookie, "referer": "https://mp.weixin.qq.com/cgi-bin/appmsg?t=media/appmsg_edit_v2&action=edit&isNew=1&type=77&createType=0&token={}&lang=zh_CN".format( self.token ), "user-agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/119.0.0.0 Safari/537.36", "x-requested-with": "XMLHttpRequest", } buffer = "" # 翻页指示器 while True: if self.download_cnt >= 30: return params = { "action": "get_feed_list", "username": user_id, "buffer": buffer, "count": "15", "scene": "1", "token": self.token, # "token": "123456", "lang": "zh_CN", "f": "json", "ajax": "1", } response = requests.request("GET", url, headers=headers, params=params) self.token_count += 1 res_json = response.json() # 开始判断视频是否有信息,是否频控 if res_json["base_resp"]["err_msg"] == "invalid session": print( f"status_code:{response.status_code}, get_videoList:{response.text}\n" ) time.sleep(60 * 15) continue if res_json["base_resp"]["err_msg"] == "freq control": print( f"status_code:{response.status_code}, get_videoList:{response.text}\n" ) time.sleep(60 * 15) continue if not res_json.get("list"): print("没有更多视频了") return else: buffer = res_json["last_buff"] for obj in res_json["list"]: print("扫描到一条视频", self.token_count) # repeat_flag = self.process_video_obj(obj) # if not repeat_flag: # return try: print("扫描到一条视频") repeat_flag = self.process_video_obj(obj) if not repeat_flag: return except Exception as e: print(f"抓取单条视频异常:{e}\n") else: print("{}\t获取 id 失败".format(self.account_name)) def process_video_obj(self, video_obj): trace_id = self.platform + str(uuid.uuid1()) # print(json.dumps(video_obj, ensure_ascii=False, indent=4)) video_dict = { "video_id": video_obj["nonce_id"], "out_video_id": video_obj["nonce_id"], "video_title": video_obj["desc"], "publish_time_stamp": int(time.time()), "publish_time_str": time.strftime( "%Y-%m-%d %H:%M:%S", time.localtime(int(time.time())) ), "play_cnt": 0, "comment_cnt": 0, "like_cnt": 0, "share_cnt": 0, "user_id": self.user_dict["user_id"], "cover_url": video_obj["media"][0]["cover_url"], "video_url": video_obj["media"][0]["url"], "avatar_url": video_obj["head_url"], "width": video_obj["media"][0]["width"], "height": video_obj["media"][0]["height"], "duration": video_obj["media"][0]["video_play_len_s"], "platform": self.platform, "strategy": self.mode, "crawler_rule": self.rule_dict, "session": f"shipinhao-author-{int(time.time())}", } # 无更新时间,去重即可 pipeline = PiaoQuanPipelineTest( platform=self.platform, mode=self.mode, item=video_dict, rule_dict=self.rule_dict, env=self.env, trace_id=trace_id, ) if not pipeline.repeat_video(): return False else: video_dict["out_user_id"] = video_dict["user_id"] video_dict["user_id"] = self.user_dict["uid"] video_dict["publish_time"] = video_dict["publish_time_str"] print(video_dict) print("成功发送 MQ 至 ETL") self.mq.send_msg(video_dict) self.download_cnt += 1 return True if __name__ == "__main__": # temp_token = "2080949641" # temp_cookie = "ua_id=bw4VuFJr6fAuSkwdAAAAAClaW0m9Aua-6IfHaXU_zpo=; wxuin=95302180931488; mm_lang=zh_CN; RK=kreEMgtMMJ; ptcz=8fd1b267c98a1185bbe6455a081f1264048ee388363ca305d9ef4812892c7900; qq_domain_video_guid_verify=2ba78a5010233582; poc_sid=HOinP2Wj322Ex737kV651Zqy6y8fSprOUUvaegBg; _qimei_q36=; _qimei_h38=9eea33ea92afe8a922333fce03000001317916; pgv_pvid=9056371236; _clck=3930572231|1|fgk|0; uuid=6562bbd8859230ce4120dfa063c76997; rand_info=CAESIGAatjSIjvxVJVDxRDN7F/CNFWMifvAVqje98rd++8UY; slave_bizuin=3236647229; data_bizuin=3236647229; bizuin=3236647229; data_ticket=qm3i6jRhObs1yKHttGh0gVI02Mz7FTPfatn0RMLdaWyD7Ukcokm5Dc3mmYLQUZPg; slave_sid=UWxjZnhBREZRRTNKZ3dYZTlYRE9Db2lxQUhOM3lZUlRoMkV0MG1wdVVudGpQTWxnVkxzYW5pV2c3NjB3bnAyQ2lPaXBBVVRPazEybWtKSVEzTnUyazZ6WEJsdnFaWWVDaUFrM3pTTXRkeUNJS3RNVTc2NFRBWkZiVGQzYllacEFRalBBZ2tXZlltblJYS2VS; slave_user=gh_d284c09295eb; xid=cb96e6ba4b4960d74a22869b1bb21406; _clsk=z77guf|1699532621466|4|1|mp.weixin.qq.com/weheat-agent/payload/record" SP = ShiPinHaoAccount( platform="shipinhao", mode="author", user_dict={"uid": "123456", "link": "树树读书1014", "user_id": "1234565"}, rule_dict={}, env="dev", ) SP.get_account_videos()