123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384 |
- import os
- import json
- import random
- import sys
- import time
- import uuid
- import datetime
- import requests
- import cv2
- sys.path.append(os.getcwd())
- from datetime import datetime
- from common.feishu import Feishu
- from common import PiaoQuanPipeline, AliyunLogger
- from common.db import MysqlHelper
- from common.mq import MQ
- from common.public import clean_title
- def find_target_user(name, user_list):
- """
- 在搜索到到账号列表中找目标列表
- """
- for obj in user_list:
- if obj['contact']["nickname"] == name:
- return obj
- else:
- continue
- return False
- class ShiPinHaoAuthor(object):
- """
- 视频号账号爬虫
- """
- def __init__(self, platform, mode, rule_dict, user_dict, env):
- self.account_name = user_dict["link"]
- self.platform = platform
- self.mode = mode
- self.rule_dict = rule_dict
- self.user_dict = user_dict
- self.env = env
- self.download_cnt = 0
- self.mq = MQ(topic_name="topic_crawler_etl_" + self.env)
- def get_history_id(self):
- """
- 从数据库表中读取 id
- """
- select_user_sql = f"""select name_id from accounts where name = "{self.account_name}" and platform = "{self.platform}" and useful = 1 limit 1"""
- name_id = MysqlHelper.get_values(
- log_type=self.mode,
- crawler=self.platform,
- sql=select_user_sql,
- env=self.env,
- machine="",
- )
- if name_id:
- return name_id[0][0]
- else:
- return False
- def get_account_id(self):
- """
- 读历史数据,如果存在 id,则直接返回 id
- """
- history_id = self.get_history_id()
- if history_id:
- return history_id
- else:
- url = "http://61.48.133.26:30001/Find_Video_Content"
- payload = json.dumps({
- "content": self.account_name,
- "type": "19"
- })
- headers = {
- 'Content-Type': 'application/json'
- }
- response = requests.request("POST", url, headers=headers, data=payload)
- info_list = response.json()['info_list']
- if len(info_list) == 0:
- return False
- target_user = find_target_user(name=self.account_name, user_list=info_list)
- # 写入 MySql 数据库
- if target_user:
- update_sql = f"""INSERT INTO accounts (name, name_id, platform, useful) values ("{self.account_name}", "{target_user['contact']['username']}", "{self.platform}", 1 )"""
- MysqlHelper.update_values(
- log_type=self.mode,
- crawler=self.platform,
- sql=update_sql,
- env=self.env,
- machine="",
- )
- return target_user['contact']["username"]
- else:
- return False
- def get_account_videos(self):
- account_id = self.get_account_id()
- if account_id:
- url = "http://61.48.133.26:30001/FinderGetUpMasterNextPage"
- last_buffer = ""
- for i in range(10):
- if self.download_cnt >= int(
- self.rule_dict.get("videos_cnt", {}).get("min", 30)
- ):
- return
- headers = {
- 'Content-Type': 'application/json'
- }
- payload = json.dumps({
- "username": account_id,
- "last_buffer": last_buffer
- })
- response = requests.request("POST", url, headers=headers, data=payload)
- time.sleep(random.randint(1, 5))
- if "objectId" not in response.text or response.status_code != 200:
- AliyunLogger.logging(
- code="2000",
- platform=self.platform,
- mode=self.mode,
- env=self.env,
- message="没有更多视频了",
- )
- return
- res_json = response.json()
- if len(res_json["UpMasterHomePage"]) == 0:
- AliyunLogger.logging(
- code="2000",
- platform=self.platform,
- mode=self.mode,
- env=self.env,
- message="没有更多视频了",
- )
- return
- if not res_json["UpMasterHomePage"]:
- AliyunLogger.logging(
- code="2000",
- platform=self.platform,
- mode=self.mode,
- env=self.env,
- message="没有更多视频了",
- )
- return
- else:
- last_buffer = res_json.get('last_buffer')
- count = 0
- for obj in res_json["UpMasterHomePage"]:
- try:
- AliyunLogger.logging(
- code="1001",
- platform=self.platform,
- mode=self.mode,
- message="扫描到一条视频",
- env=self.env,
- data=obj,
- )
- repeat_flag = self.process_video_obj(obj, count)
- count += 1
- if not repeat_flag:
- return
- except Exception as e:
- AliyunLogger.logging(
- code="3000",
- platform=self.platform,
- mode=self.mode,
- env=self.env,
- message=f"抓取单条视频异常:{e}\n",
- )
- else:
- AliyunLogger.logging(
- code="3000",
- platform=self.platform,
- mode=self.mode,
- env=self.env,
- message="{}\t获取 id 失败".format(self.account_name),
- )
- def process_video_obj(self, obj, count):
- objectId = obj['objectId']
- objectNonceId = obj['objectNonceId']
- trace_id = self.platform + "new" + str(uuid.uuid1())
- url = "http://61.48.133.26:30001/GetFinderDownloadAddress"
- payload = json.dumps({
- "objectId": objectId,
- "objectNonceId": objectNonceId
- })
- headers = {
- 'Content-Type': 'text/plain'
- }
- response = requests.request("POST", url, headers=headers, data=payload)
- time.sleep(random.randint(0, 1))
- video_obj = response.json()
- publish_time_str = obj['createtime']
- datetime_obj = datetime.strptime(publish_time_str, '%Y-%m-%d %H:%M:%S')
- # 将datetime对象转换为时间戳
- publish_time_stamp = int(datetime_obj.timestamp())
- video_url = video_obj.get('DownloadAddress')
- duration = int(self.video_duration(video_url))
- share_cnt = int(obj['forward_count'])
- like_cnt = int(obj['like_count'])
- # 获取当前时间
- current_time = datetime.now()
- formatted_time = current_time.strftime("%Y-%m-%d %H:%M:%S")
- video_dict = {
- "video_id": objectId,
- "video_title": clean_title(video_obj.get('title').split("\n")[0].split("#")[0]),
- "out_video_id": trace_id,
- "publish_time_stamp": publish_time_stamp,
- "publish_time_str": publish_time_str,
- "play_cnt": 0,
- "fav_count": int(obj['fav_count']),
- "comment_cnt": int(obj['comment_count']),
- "like_cnt": like_cnt,
- "share_cnt": share_cnt,
- "user_id": self.user_dict["uid"],
- "cover_url": video_obj.get('thumb_url'),
- "video_url": video_url,
- "avatar_url": video_obj.get('thumb_url'),
- "width": video_obj.get('width'),
- "height": video_obj.get('height'),
- "duration": duration,
- "platform": self.platform,
- "strategy": self.mode,
- "crawler_rule": self.rule_dict,
- "session": f"shipinhao-author-{int(time.time())}",
- }
- if share_cnt == 0:
- divisor_cnt = 0
- else:
- divisor_cnt = int(share_cnt / like_cnt)
- # 视频时长小于30秒 返回
- if duration < 30:
- values = [[
- obj['nickname'],
- publish_time_str,
- formatted_time,
- int(obj['fav_count']),
- int(obj['comment_count']),
- int(obj['like_count']),
- int(obj['forward_count']),
- divisor_cnt,
- video_obj.get('title').split("\n")[0].split("#")[0],
- duration,
- '否',
- '时长小于30秒',
- video_obj.get('DownloadAddress')
- ]]
- Feishu.insert_columns(self.platform, 'shipinhao', "Vq7NeH", "ROWS", 1, 2)
- time.sleep(0.5)
- Feishu.update_values(self.platform, 'shipinhao', "Vq7NeH", "A2:Z2", values)
- return True
- # 分享小于1000 返回
- if share_cnt < 1000:
- values = [[
- obj['nickname'],
- publish_time_str,
- formatted_time,
- int(obj['fav_count']),
- int(obj['comment_count']),
- int(obj['like_count']),
- int(obj['forward_count']),
- divisor_cnt,
- video_obj.get('title').split("\n")[0].split("#")[0],
- duration,
- '否',
- '分享小于1000',
- video_obj.get('DownloadAddress')
- ]]
- Feishu.insert_columns(self.platform, 'shipinhao', "Vq7NeH", "ROWS", 1, 2)
- time.sleep(0.5)
- Feishu.update_values(self.platform, 'shipinhao', "Vq7NeH", "A2:Z2", values)
- return True
- # 分享小于等于99999
- if share_cnt <= 99999 and divisor_cnt < 2:
- values = [[
- obj['nickname'],
- publish_time_str,
- formatted_time,
- int(obj['fav_count']),
- int(obj['comment_count']),
- int(obj['like_count']),
- int(obj['forward_count']),
- divisor_cnt,
- video_obj.get('title').split("\n")[0].split("#")[0],
- duration,
- '否',
- f'分享小于100000,分享/点赞:{divisor_cnt}',
- video_obj.get('DownloadAddress')
- ]]
- Feishu.insert_columns(self.platform, 'shipinhao', "Vq7NeH", "ROWS", 1, 2)
- time.sleep(0.5)
- Feishu.update_values(self.platform, 'shipinhao', "Vq7NeH", "A2:Z2", values)
- return True
- pipeline = PiaoQuanPipeline(
- platform=self.platform,
- mode=self.mode,
- item=video_dict,
- rule_dict=self.rule_dict,
- env=self.env,
- trace_id=trace_id,
- )
- if not pipeline.repeat_video():
- values = [[
- obj['nickname'],
- publish_time_str,
- formatted_time,
- int(obj['fav_count']),
- int(obj['comment_count']),
- int(obj['like_count']),
- int(obj['forward_count']),
- divisor_cnt,
- video_obj.get('title').split("\n")[0].split("#")[0],
- duration,
- '否',
- '重复视频',
- video_obj.get('DownloadAddress')
- ]]
- Feishu.insert_columns(self.platform, 'shipinhao', "Vq7NeH", "ROWS", 1, 2)
- time.sleep(0.5)
- Feishu.update_values(self.platform, 'shipinhao', "Vq7NeH", "A2:Z2", values)
- if count > 3:
- return False
- else:
- return True
- else:
- values = [[
- obj['nickname'],
- publish_time_str,
- formatted_time,
- int(obj['fav_count']),
- int(obj['comment_count']),
- int(obj['like_count']),
- int(obj['forward_count']),
- divisor_cnt,
- video_obj.get('title').split("\n")[0].split("#")[0],
- duration,
- '是',
- '',
- video_obj.get('DownloadAddress')
- ]]
- Feishu.insert_columns(self.platform, 'shipinhao', "Vq7NeH", "ROWS", 1, 2)
- time.sleep(0.5)
- Feishu.update_values(self.platform, 'shipinhao', "Vq7NeH", "A2:Z2", values)
- video_dict["publish_time"] = video_dict["publish_time_str"]
- self.mq.send_msg(video_dict)
- self.download_cnt += 1
- AliyunLogger.logging(
- code="1002",
- platform=self.platform,
- mode=self.mode,
- env=self.env,
- data=video_dict,
- trace_id=trace_id,
- message="成功发送 MQ 至 ETL",
- )
- time.sleep(5)
- return True
- def video_duration(self, filename):
- cap = cv2.VideoCapture(filename)
- if cap.isOpened():
- rate = cap.get(5)
- frame_num = cap.get(7)
- duration = frame_num / rate
- return duration
- return 0
- if __name__ == "__main__":
- SP = ShiPinHaoAuthor(
- platform="shipinhao",
- mode="author",
- user_dict={"uid": "123456", "link": "老碗哥说文解惑", "user_id": "1234565"},
- rule_dict={},
- env="prod",
- )
- SP.get_account_videos()
|