123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196 |
- from datetime import datetime
- from loguru import logger
- from common.aliyun_log import AliyunLogger
- from common.feishu_utils import Feishu
- from common.redis import get_top_data, in_job_video_data
- import html
- import json
- import random
- import re
- import time
- import requests
- from urllib.parse import urlparse, parse_qs
- from loguru import logger
- from common.sql_help import sqlCollect
- class Top:
- def get_text_dy_video(self,url):
- max_retries = 3
- retry_count = 0
- while retry_count < max_retries:
- try:
- if "http" not in url:
- video_id = url
- elif "&vid=" in url:
- parsed_url = urlparse(url)
- params = parse_qs(parsed_url.query)
- video_id = params.get('vid', [None])[0]
- elif "?modal_id=" in url:
- parsed_url = urlparse(url)
- params = parse_qs(parsed_url.query)
- video_id = params.get('modal_id', [None])[0]
- else:
- headers = {
- 'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;'
- 'q=0.8,application/signed-exchange;v=b3;q=0.7',
- 'Accept-Language': 'zh-CN,zh;q=0.9',
- 'Cache-Control': 'no-cache',
- 'Pragma': 'no-cache',
- 'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) '
- 'Chrome/127.0.0.0 Safari/537.36',
- }
- response = requests.request(url=url, method='GET', headers=headers, allow_redirects=False, timeout=30)
- location = response.headers.get('Location', None)
- video_id = re.search(r'/video/(\d+)/?', location.split('?')[0] if location else url).group(1)
- url = "http://8.217.192.46:8889/crawler/dou_yin/detail"
- if not video_id or not video_id.strip():
- return None, None, None
- payload = json.dumps({
- "content_id": str(video_id)
- })
- headers = {
- 'Content-Type': 'application/json'
- }
- time.sleep(random.uniform(5, 10))
- response = requests.request("POST", url, headers=headers, data=payload, timeout= 60)
- response = response.json()
- code = response["code"]
- if code == 0:
- data = response["data"]["data"]
- channel_account_id = data["channel_account_id"]
- return channel_account_id
- if code == 22002:
- if '抖音内容已被删除或无法访问' in response['msg']:
- return "作品不存在"
- except Exception as e:
- retry_count += 1
- logger.error(f"[+] 抖音{url}获取视频链接失败,失败信息{e}")
- time.sleep(1)
- return None
- def get_text_ks_video(self,url):
- try:
- if "http" not in url:
- video_id = url
- else:
- headers = {
- 'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;'
- 'q=0.8,application/signed-exchange;v=b3;q=0.7',
- 'Accept-Language': 'zh-CN,zh;q=0.9',
- 'Cache-Control': 'no-cache',
- 'Pragma': 'no-cache',
- 'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) '
- 'Chrome/127.0.0.0 Safari/537.36',
- }
- response = requests.request(url=url, method='GET', headers=headers, allow_redirects=False, timeout= 30)
- location = response.headers.get('Location', None)
- video_id = re.search(r'/(f|photo|short-video|long-video)/(.*)/?',
- location.split('?')[0] if location else url).group(2)
- url = "http://8.217.192.46:8889/crawler/kuai_shou/detail"
- if not video_id or not video_id.strip():
- return None
- payload = json.dumps({
- "content_id": str(video_id)
- })
- headers = {
- 'Content-Type': 'application/json'
- }
- time.sleep(random.uniform(5, 10))
- response = requests.request("POST", url, headers=headers, data=payload, timeout= 30)
- response = response.json()
- code = response["code"]
- if code == 0:
- data = response["data"]["data"]
- channel_account_id = data['channel_account_id']
- return channel_account_id
- elif code == 27006:
- if "作品不存在" in response['msg'] or "内容不存在" in response['msg'] or "私密作品" in response['msg']:
- return "作品不存在"
- time.sleep(3)
- except Exception as e:
- logger.error(f"[+] 快手{url}获取视频链接失败,失败信息{e}")
- return None
- def main(self,data):
- channel_account_id = None
- tag_transport_channel = None
- data = json.loads(data)
- AliyunLogger.logging(data['channel'], data, "开始获取","fail")
- channel_id = data['channel']
- url_id, data_channel = sqlCollect.get_channle_id(data['videoid'])
- if not url_id:
- logger.info(f"[+] 任务{data},没有该视频信息")
- AliyunLogger.logging(data['channel'], data, "没有该视频信息","fail")
- return
- if "&vid=" in url_id or "?modal_id=" in url_id:
- host = urlparse(url_id).netloc
- else:
- msg = html.unescape(url_id).split('?')[0]
- pattern = re.search(r'https?://[^\s<>"\'\u4e00-\u9fff]+', msg)
- if not pattern:
- in_job_video_data("task:top_all_data", json.dumps(data, ensure_ascii=False, indent=4))
- return
- url_id = pattern.group()
- host = urlparse(url_id).netloc
- if host in ['v.douyin.com', 'www.douyin.com', 'www.iesdouyin.com'] or data_channel in "抖音":
- tag_transport_channel = "抖音"
- logger.info(f"[+] {url_id}开始获取抖音视频链接")
- channel_account_id = self.get_text_dy_video(url=url_id)
- elif host in ['v.kuaishou.com', 'www.kuaishou.com', 'v.m.chenzhongtech.com', 'creater.eozatvmq.com'] or data_channel in "快手":
- tag_transport_channel = "快手"
- logger.info(f"[+] {url_id}开始获取快手视频链接")
- channel_account_id= self.get_text_ks_video(url=url_id)
- if not channel_account_id or channel_account_id == "作品不存在":
- in_job_video_data("task:top_all_data", json.dumps(data, ensure_ascii=False, indent=4))
- AliyunLogger.logging(data['channel'], data, f"没有获取到视频用户ID,等待重新获取/{channel_account_id}","fail")
- return
- status = sqlCollect.select_pj_video_data(channel_account_id)
- if status:
- logger.info(f"[+] 任务{data},该用户站外ID已添加过")
- AliyunLogger.logging(data['channel'], data, "该用户站外ID已添加过", channel_account_id)
- return
- data["channel_account_id"] = channel_account_id
- if channel_id in ["抖音关键词抓取", "快手关键词抓取"]:
- data["tag_transport_channel"] = tag_transport_channel
- redis_data = f"task:top_data_{'dy' if channel_id == '抖音关键词抓取' else 'ks'}_gjc"
- else:
- data["tag_transport_channel"] = tag_transport_channel
- redis_data = f"task:top_data_{'ks' if tag_transport_channel == '快手' else 'dy'}_gz"
- AliyunLogger.logging(data['channel'], data, "获取成功等待写入改造任务", channel_account_id)
- in_job_video_data(redis_data, json.dumps(data, ensure_ascii=False, indent=4))
- sqlCollect.insert_pj_video_data(channel_account_id, channel_id)
- logger.info(f"[+] 开始写入飞书表格")
- current_time = datetime.now()
- formatted_time = current_time.strftime("%Y-%m-%d %H:%M:%S")
- values = [
- [
- data['uid'],
- data['videoid'],
- data['return_uv'],
- data['type'],
- data['type_owner'],
- data['channel'],
- data['channel_owner'],
- data['title'],
- data['dt'],
- channel_account_id,
- tag_transport_channel,
- formatted_time
- ]
- ]
- Feishu.insert_columns("KUIksoqZkhvZOrtqA1McPwObn7d", "57c076", "ROWS", 1, 2)
- time.sleep(0.5)
- Feishu.update_values("KUIksoqZkhvZOrtqA1McPwObn7d", "57c076", "A2:Z2", values)
- logger.info(f"[+] 成功写入飞书表格")
- return
- if __name__ == '__main__':
- url,channel = sqlCollect.get_channle_id(45843781)
- print(url,channel)
- # top = Top()
- # top.main("task:top_all_data")
|