123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315 |
- import os
- import json
- import random
- import sys
- import datetime
- import time
- import uuid
- import requests
- sys.path.append(os.getcwd())
- from common import PiaoQuanPipeline, AliyunLogger
- from common.feishu import Feishu
- from common.db import MysqlHelper
- from common.mq import MQ
- from common.public import clean_title
- def find_target_user(name, user_list):
- """
- 在搜索到到账号列表中找目标列表
- """
- for obj in user_list:
- if obj["nickname"] == name:
- return obj
- else:
- continue
- return False
- class ShiPinHaoAccount(object):
- """
- 视频号账号爬虫
- """
- def __init__(self, platform, mode, rule_dict, user_dict, env):
- self.cookie = None
- self.token = None
- self.account_name = user_dict["link"]
- self.platform = platform
- self.mode = mode
- self.rule_dict = rule_dict
- self.user_dict = user_dict
- self.env = env
- self.download_cnt = 0
- self.mq = MQ(topic_name="topic_crawler_etl_" + self.env)
- def get_token_from_mysql(self):
- """
- 从mysql中读取token和cookie
- """
- select_sql = (
- f"""SELECT config from crawler_config where source = '{self.platform}'; """
- )
- # print(select_sql)
- configs = MysqlHelper.get_values(
- log_type=self.mode,
- crawler=self.platform,
- sql=select_sql,
- env=self.env,
- machine="",
- )
- token_config = configs[0][0]
- token_info = json.loads(token_config)
- self.token = token_info["token"]
- self.cookie = token_info["cookie"]
- def get_history_id(self):
- """
- 从数据库表中读取 id
- """
- select_user_sql = f"""select name_id from accounts where name = "{self.account_name}" and platform = "{self.platform}" and useful = 1 limit 1"""
- name_id = MysqlHelper.get_values(
- log_type=self.mode,
- crawler=self.platform,
- sql=select_user_sql,
- env=self.env,
- machine="",
- )
- if name_id:
- return name_id[0][0]
- else:
- return False
- def get_account_id(self):
- """
- 读历史数据,如果存在 id,则直接返回 id
- """
- history_id = self.get_history_id()
- if history_id:
- return history_id
- else:
- url = "https://mp.weixin.qq.com/cgi-bin/videosnap"
- self.get_token_from_mysql()
- params = {
- "action": "search",
- "scene": "1",
- "buffer": "",
- "query": self.account_name,
- "count": "21",
- "token": self.token,
- "lang": "zh_CN",
- "f": "json",
- "ajax": "1",
- }
- headers = {
- "authority": "mp.weixin.qq.com",
- "accept": "*/*",
- "accept-language": "en,zh-CN;q=0.9,zh;q=0.8",
- "cookie": self.cookie,
- "referer": "https://mp.weixin.qq.com/cgi-bin/appmsg?t=media/appmsg_edit_v2&action=edit&isNew=1&type=77&createType=0&token={}&lang=zh_CN".format(
- self.token
- ),
- "user-agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/119.0.0.0 Safari/537.36",
- "x-requested-with": "XMLHttpRequest",
- }
- response = requests.request("GET", url, headers=headers, params=params)
- user_list = response.json()["acct_list"]
- target_user = find_target_user(name=self.account_name, user_list=user_list)
- # 写入 MySql 数据库
- if target_user:
- update_sql = f"""INSERT INTO accounts (name, name_id, platform, useful) values ("{self.account_name}", "{target_user['username']}", "{self.platform}", 1 )"""
- # print(update_sql)
- MysqlHelper.update_values(
- log_type=self.mode,
- crawler=self.platform,
- sql=update_sql,
- env=self.env,
- machine="",
- )
- return target_user["username"]
- else:
- return False
- def get_account_videos(self):
- """
- # 一个账号最多抓取 30 条数据
- """
- user_id = self.get_account_id()
- if user_id:
- url = "https://mp.weixin.qq.com/cgi-bin/videosnap"
- buffer = "" # 翻页指示器
- while True:
- if self.download_cnt >= int(
- self.rule_dict.get("videos_cnt", {}).get("min", 30)
- ):
- return
- self.get_token_from_mysql()
- headers = {
- "authority": "mp.weixin.qq.com",
- "accept": "*/*",
- "accept-language": "en,zh-CN;q=0.9,zh;q=0.8",
- "cookie": self.cookie,
- "referer": "https://mp.weixin.qq.com/cgi-bin/appmsg?t=media/appmsg_edit_v2&action=edit&isNew=1&type=77&createType=0&token={}&lang=zh_CN".format(
- self.token
- ),
- "user-agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/119.0.0.0 Safari/537.36",
- "x-requested-with": "XMLHttpRequest",
- }
- params = {
- "action": "get_feed_list",
- "username": user_id,
- "buffer": buffer,
- "count": "15",
- "scene": "1",
- "token": self.token,
- "lang": "zh_CN",
- "f": "json",
- "ajax": "1",
- }
- response = requests.request("GET", url, headers=headers, params=params)
- time.sleep(random.randint(10, 30))
- res_json = response.json()
- # 开始判断视频是否有信息,是否频控
- if res_json["base_resp"]["err_msg"] == "invalid session":
- AliyunLogger.logging(
- code="2000",
- platform=self.platform,
- mode=self.mode,
- env=self.env,
- message=f"status_code:{response.status_code}, get_videoList:{response.text}\n",
- )
- if 20 >= datetime.datetime.now().hour >= 10:
- Feishu.bot(
- log_type=self.mode,
- crawler=self.platform,
- text="视频号Token 过期啦"
- # text=f"{token_dict['title']}\n操作人:{token_dict['operator']}\n更换日期:{token_dict['update_time']} \n频控啦,请扫码更换其他公众号token\nhttps://mp.weixin.qq.com/"
- )
- time.sleep(60 * 15)
- continue
- if res_json["base_resp"]["err_msg"] == "freq control":
- AliyunLogger.logging(
- code="2000",
- platform=self.platform,
- mode=self.mode,
- env=self.env,
- message=f"status_code:{response.status_code}, get_videoList:{response.text}\n",
- )
- if 20 >= datetime.datetime.now().hour >= 10:
- Feishu.bot(
- log_type=self.mode,
- crawler=self.platform,
- text="视频号Token 过期啦"
- # text=f"{token_dict['title']}\n操作人:{token_dict['operator']}\n更换日期:{token_dict['update_time']} \n频控啦,请扫码更换其他公众号token\nhttps://mp.weixin.qq.com/"
- )
- time.sleep(60 * 15)
- continue
- if not res_json.get("list"):
- AliyunLogger.logging(
- code="2000",
- platform=self.platform,
- mode=self.mode,
- env=self.env,
- message="没有更多视频了",
- )
- return
- else:
- buffer = res_json["last_buff"]
- for obj in res_json["list"]:
- try:
- AliyunLogger.logging(
- code="1001",
- platform=self.platform,
- mode=self.mode,
- message="扫描到一条视频",
- env=self.env,
- data=obj,
- )
- repeat_flag = self.process_video_obj(obj)
- if not repeat_flag:
- return
- except Exception as e:
- AliyunLogger.logging(
- code="3000",
- platform=self.platform,
- mode=self.mode,
- env=self.env,
- message=f"抓取单条视频异常:{e}\n",
- )
- else:
- AliyunLogger.logging(
- code="3000",
- platform=self.platform,
- mode=self.mode,
- env=self.env,
- message="{}\t获取 id 失败".format(self.account_name),
- )
- def process_video_obj(self, video_obj):
- trace_id = self.platform + str(uuid.uuid1())
- video_dict = {
- "video_id": video_obj["nonce_id"],
- "video_title": clean_title(video_obj["desc"].split("\n")[0].split("#")[0]),
- "out_video_id": video_obj["nonce_id"],
- "publish_time_stamp": int(time.time()),
- "publish_time_str": time.strftime(
- "%Y-%m-%d %H:%M:%S", time.localtime(int(time.time()))
- ),
- "play_cnt": 0,
- "comment_cnt": 0,
- "like_cnt": 0,
- "share_cnt": 0,
- "user_id": self.user_dict["uid"],
- "cover_url": video_obj["media"][0]["cover_url"] if video_obj['media'][0]['cover_url'] else video_obj['media'][0]['thumb_url'],
- "video_url": video_obj["media"][0]["url"],
- "avatar_url": video_obj["head_url"],
- "width": video_obj["media"][0]["width"],
- "height": video_obj["media"][0]["height"],
- "duration": video_obj["media"][0]["video_play_len_s"],
- "platform": self.platform,
- "strategy": self.mode,
- "crawler_rule": self.rule_dict,
- "session": f"shipinhao-author-{int(time.time())}",
- }
- # video_dict["out_user_id"] = video_dict["user_id"]
- # 无更新时间,去重即可
- pipeline = PiaoQuanPipeline(
- platform=self.platform,
- mode=self.mode,
- item=video_dict,
- rule_dict=self.rule_dict,
- env=self.env,
- trace_id=trace_id,
- )
- if not pipeline.repeat_video():
- return False
- else:
- video_dict["publish_time"] = video_dict["publish_time_str"]
- self.mq.send_msg(video_dict)
- self.download_cnt += 1
- AliyunLogger.logging(
- code="1002",
- platform=self.platform,
- mode=self.mode,
- env=self.env,
- data=video_dict,
- trace_id=trace_id,
- message="成功发送 MQ 至 ETL",
- )
- return True
- # if __name__ == "__main__":
- # temp_token = "2080949641"
- # temp_cookie = "ua_id=bw4VuFJr6fAuSkwdAAAAAClaW0m9Aua-6IfHaXU_zpo=; wxuin=95302180931488; mm_lang=zh_CN; RK=kreEMgtMMJ; ptcz=8fd1b267c98a1185bbe6455a081f1264048ee388363ca305d9ef4812892c7900; qq_domain_video_guid_verify=2ba78a5010233582; poc_sid=HOinP2Wj322Ex737kV651Zqy6y8fSprOUUvaegBg; _qimei_q36=; _qimei_h38=9eea33ea92afe8a922333fce03000001317916; pgv_pvid=9056371236; _clck=3930572231|1|fgk|0; uuid=6562bbd8859230ce4120dfa063c76997; rand_info=CAESIGAatjSIjvxVJVDxRDN7F/CNFWMifvAVqje98rd++8UY; slave_bizuin=3236647229; data_bizuin=3236647229; bizuin=3236647229; data_ticket=qm3i6jRhObs1yKHttGh0gVI02Mz7FTPfatn0RMLdaWyD7Ukcokm5Dc3mmYLQUZPg; slave_sid=UWxjZnhBREZRRTNKZ3dYZTlYRE9Db2lxQUhOM3lZUlRoMkV0MG1wdVVudGpQTWxnVkxzYW5pV2c3NjB3bnAyQ2lPaXBBVVRPazEybWtKSVEzTnUyazZ6WEJsdnFaWWVDaUFrM3pTTXRkeUNJS3RNVTc2NFRBWkZiVGQzYllacEFRalBBZ2tXZlltblJYS2VS; slave_user=gh_d284c09295eb; xid=cb96e6ba4b4960d74a22869b1bb21406; _clsk=z77guf|1699532621466|4|1|mp.weixin.qq.com/weheat-agent/payload/record"
- # SP = ShiPinHaoAccount(
- # token=temp_token,
- # cookie=temp_cookie,
- # account_name="心煤",
- # platform="shipinhao",
- # mode="author",
- # rule_dict={},
- # env="prod",
- # )
- # SP.get_account_videos()
|