123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248 |
- import json
- import os
- import sys
- import time
- import uuid
- import requests
- sys.path.append(os.getcwd())
- from common.pipeline import PiaoQuanPipelineTest
- from common.mq import MQ
- from common.db import MysqlHelper
- def find_target_user(name, user_list):
- for obj in user_list:
- if obj["nickname"] == name:
- return obj
- else:
- continue
- return False
- class ShiPinHaoAccount:
- def __init__(self, platform, mode, rule_dict, user_dict, env):
- self.cookie = None
- self.token = None
- self.account_name = user_dict["link"]
- self.platform = platform
- self.mode = mode
- self.rule_dict = rule_dict
- self.user_dict = user_dict
- self.env = env
- self.download_cnt = 0
- self.token_count = 0
- self.mq = MQ(topic_name="topic_crawler_etl_" + self.env)
- def get_token_from_mysql(self):
- select_sql = f"""SELECT config from crawler_config where source = '{ self.platform }'; """
- # print(select_sql)
- configs = MysqlHelper.get_values(
- log_type=self.mode,
- crawler=self.platform,
- sql=select_sql,
- env=self.env,
- machine="",
- )
- token_config = configs[0][0]
- token_info = json.loads(token_config)
- self.token = token_info["token"]
- self.cookie = token_info["cookie"]
- def get_history_id(self):
- """
- 从数据库表中读取 id
- """
- select_user_sql = f"""select name_id from accounts where name = "{self.account_name}" and platform = "{self.platform}" and useful = 1 limit 1"""
- name_id = MysqlHelper.get_values(
- log_type=self.mode,
- crawler=self.platform,
- sql=select_user_sql,
- env=self.env,
- machine="",
- )
- if name_id:
- # return False
- return name_id[0][0]
- else:
- return False
- def get_account_id(self):
- # 读历史数据,如果存在 id,则直接返回 id
- history_id = self.get_history_id()
- if history_id:
- return history_id
- else:
- self.get_token_from_mysql()
- print(self.token)
- print(self.cookie)
- url = "https://mp.weixin.qq.com/cgi-bin/videosnap"
- params = {
- "action": "search",
- "scene": "1",
- "buffer": "",
- "query": self.account_name,
- "count": "21",
- "token": self.token,
- "lang": "zh_CN",
- "f": "json",
- "ajax": "1",
- }
- headers = {
- "authority": "mp.weixin.qq.com",
- "accept": "*/*",
- "accept-language": "en,zh-CN;q=0.9,zh;q=0.8",
- "cookie": self.cookie,
- "referer": "https://mp.weixin.qq.com/cgi-bin/appmsg?t=media/appmsg_edit_v2&action=edit&isNew=1&type=77&createType=0&token={}&lang=zh_CN".format(
- self.token
- ),
- "user-agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/119.0.0.0 Safari/537.36",
- "x-requested-with": "XMLHttpRequest",
- }
- response = requests.request("GET", url, headers=headers, params=params)
- self.token_count += 1
- user_list = response.json()
- # print(user_list)
- user_list = user_list["acct_list"]
- target_user = find_target_user(name=self.account_name, user_list=user_list)
- # 写入 MySql 数据库
- if target_user:
- update_sql = f"""INSERT INTO accounts (name, name_id, platform) values ("{self.account_name}", "{target_user['username']}", "{self.platform}")"""
- # print(update_sql)
- MysqlHelper.update_values(
- log_type=self.mode,
- crawler=self.platform,
- sql=update_sql,
- env=self.env,
- machine="",
- )
- return target_user["username"]
- else:
- return False
- def get_account_videos(self):
- # 一个账号最多抓 30 条数据
- user_id = self.get_account_id()
- if user_id:
- print(user_id)
- url = "https://mp.weixin.qq.com/cgi-bin/videosnap"
- buffer = "" # 翻页指示器
- while True:
- if self.download_cnt >= 30:
- return
- self.get_token_from_mysql()
- params = {
- "action": "get_feed_list",
- "username": user_id,
- "buffer": buffer,
- "count": "15",
- "scene": "1",
- "token": self.token,
- "lang": "zh_CN",
- "f": "json",
- "ajax": "1",
- }
- headers = {
- "authority": "mp.weixin.qq.com",
- "accept": "*/*",
- "accept-language": "en,zh-CN;q=0.9,zh;q=0.8",
- "cookie": self.cookie,
- "referer": "https://mp.weixin.qq.com/cgi-bin/appmsg?t=media/appmsg_edit_v2&action=edit&isNew=1&type=77&createType=0&token={}&lang=zh_CN".format(
- self.token
- ),
- "user-agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/119.0.0.0 Safari/537.36",
- "x-requested-with": "XMLHttpRequest",
- }
- response = requests.request("GET", url, headers=headers, params=params)
- self.token_count += 1
- res_json = response.json()
- # 开始判断视频是否有信息,是否频控
- if res_json["base_resp"]["err_msg"] == "invalid session":
- print(
- f"status_code:{response.status_code}, get_videoList:{response.text}\n"
- )
- time.sleep(60 * 15)
- continue
- if res_json["base_resp"]["err_msg"] == "freq control":
- print(
- f"status_code:{response.status_code}, get_videoList:{response.text}\n"
- )
- time.sleep(60 * 15)
- continue
- if not res_json.get("list"):
- print("没有更多视频了")
- return
- else:
- buffer = res_json["last_buff"]
- for obj in res_json["list"]:
- print("扫描到一条视频", self.token_count)
- try:
- print("扫描到一条视频")
- repeat_flag = self.process_video_obj(obj)
- if not repeat_flag:
- return
- except Exception as e:
- print(f"抓取单条视频异常:{e}\n")
- else:
- print("{}\t获取 id 失败".format(self.account_name))
- def process_video_obj(self, video_obj):
- trace_id = self.platform + str(uuid.uuid1())
- # print(json.dumps(video_obj, ensure_ascii=False, indent=4))
- video_dict = {
- "video_id": video_obj["nonce_id"],
- "out_video_id": video_obj["nonce_id"],
- "video_title": video_obj["desc"],
- "publish_time_stamp": int(time.time()),
- "publish_time_str": time.strftime(
- "%Y-%m-%d %H:%M:%S", time.localtime(int(time.time()))
- ),
- "play_cnt": 0,
- "comment_cnt": 0,
- "like_cnt": 0,
- "share_cnt": 0,
- "user_id": self.user_dict["user_id"],
- "cover_url": video_obj["media"][0]["cover_url"],
- "video_url": video_obj["media"][0]["url"],
- "avatar_url": video_obj["head_url"],
- "width": video_obj["media"][0]["width"],
- "height": video_obj["media"][0]["height"],
- "duration": video_obj["media"][0]["video_play_len_s"],
- "platform": self.platform,
- "strategy": self.mode,
- "crawler_rule": self.rule_dict,
- "session": f"shipinhao-author-{int(time.time())}",
- }
- # 无更新时间,去重即可
- pipeline = PiaoQuanPipelineTest(
- platform=self.platform,
- mode=self.mode,
- item=video_dict,
- rule_dict=self.rule_dict,
- env=self.env,
- trace_id=trace_id,
- )
- if not pipeline.repeat_video():
- return False
- else:
- video_dict["out_user_id"] = video_dict["user_id"]
- video_dict["user_id"] = self.user_dict["uid"]
- video_dict["publish_time"] = video_dict["publish_time_str"]
- print(video_dict)
- print("成功发送 MQ 至 ETL")
- # self.mq.send_msg(video_dict)
- self.download_cnt += 1
- return True
- if __name__ == "__main__":
- SP = ShiPinHaoAccount(
- platform="shipinhao",
- mode="author",
- user_dict={"uid": "123456", "link": "树树读书1014", "user_id": "1234565"},
- rule_dict={},
- env="prod",
- )
- SP.get_account_videos()
|