# -*- coding: utf-8 -*- # @Time: 2023/12/22 import datetime import os import random import sys import time from datetime import datetime import requests import json import urllib3 from extract_data.douyin.douyin_author_help import DouYinHelper sys.path.append(os.getcwd()) from common.aliyun_oss_uploading import Oss from common.common import Common from common.material import Material from common.feishu import Feishu from common.db import MysqlHelper from requests.adapters import HTTPAdapter class douyinAuthor(): """ 获取抖音用户主页id """ @classmethod def get_videoUserId(cls, mark): select_user_sql = f"""select user_id, channel from agc_channel_data where mark = '{mark}' and channel = '抖音' ORDER BY id DESC;""" user_list = MysqlHelper.get_values(select_user_sql, "prod") return user_list """ oss视频地址 存入数据库 """ @classmethod def insert_videoUrl(cls, video_id, account_id, oss_object_key, mark): current_time = datetime.now() formatted_time = current_time.strftime("%Y-%m-%d %H:%M") insert_sql = f"""INSERT INTO agc_video_url (video_id, account_id, oss_object_key, time , status, mark) values ("{video_id}", "{account_id}", "{oss_object_key}", "{formatted_time}", 1, "{mark}")""" MysqlHelper.update_values( sql=insert_sql, env="prod", machine="", ) """ 查询该video_id是否在数据库存在 """ @classmethod def select_videoUrl_id(cls, video_id, mark): select_user_sql = f"""select video_id from agc_video_url where video_id='{video_id}' and mark = '{mark}';""" user_list = MysqlHelper.get_values(select_user_sql, "prod") if user_list: return True else: return False """抖音读取数据 将数据存储到oss上""" @classmethod def get_videoList(cls, data): try: mark = data['mark'] mark_name = data['mark_name'] token = data['token'] feishu_id = data['feishu_id'] channel_id = data['channel'][0] channel = data['channel'][1] user_list = Material.insert_user(feishu_id, channel_id, mark, channel) cookie = Material.get_cookie(feishu_id, token, channel) if len(user_list) == 0: return for account_id in user_list: Common.logger("douyin").info(f"用户主页ID:{account_id}") next_cursor = 0 count = 0 exit_flag = False while True: if exit_flag: # 结束 while 循环 break if next_cursor == None: break if count > 5: continue time.sleep(random.randint(5, 10)) url = 'https://www.douyin.com/aweme/v1/web/aweme/post/' headers = { 'Accept': 'application/json, text/plain, */*', 'Accept-Language': 'zh-CN,zh;q=0.9', 'Cache-Control': 'no-cache', 'Cookie': cookie, 'Pragma': 'no-cache', 'Referer': f'https://www.douyin.com/user/{account_id}', 'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) ' 'Chrome/118.0.0.0 Safari/537.36', } query = DouYinHelper.get_full_query(ua=headers['User-Agent'], extra_data={ 'sec_user_id': account_id, 'max_cursor': next_cursor, 'locate_query': 'false', 'show_live_replay_strategy': '1', 'need_time_list': '1', 'time_list_query': '0', 'whale_cut_token': '', 'cut_version': '1', 'count': '18', 'publish_video_strategy_type': '2', }) urllib3.disable_warnings() s = requests.session() s.mount('http://', HTTPAdapter(max_retries=3)) s.mount('https://', HTTPAdapter(max_retries=3)) response = requests.request(method='GET', url=url, headers=headers, params=query) text = response.text if len(text) == 0: time.sleep(60) continue body = response.content.decode() obj = json.loads(body) has_more = True if obj.get('has_more', 0) == 1 else False next_cursor = str(obj.get('max_cursor')) if has_more else None data = obj.get('aweme_list', []) response.close() if response.status_code != 200: if "-" in mark: mark = mark.split("-")[0] Common.logger("douyin").info( f"接口请求失败,请更换cookie,{response.status_code}") Feishu.bot('recommend', '抖音', f'抖音cookie失效,请及时更换~', mark, mark_name) # 如果返回空信息,则随机睡眠 600, 1200 秒 time.sleep(random.randint(600, 1200)) continue elif len(data) == 0: Common.logger("douyin").info( f"接口请求失败,请更换cookie") continue for j in range(len(data)): try: entity_type = data[j].get('search_impr').get('entity_type') Common.logger("douyin").info( f"非视频:{entity_type}") if entity_type == 'GENERAL': video_id = data[j].get('aweme_id') # 文章id id = cls.select_videoUrl_id(video_id, mark) if id: count += 1 if count > 5: Common.logger("douyin").info( f"重复视频不在抓取该用户,用户主页id:{account_id}") exit_flag = True break continue video_url = data[j].get('video').get('play_addr').get('url_list')[0] # 视频链接 channel_name = mark+'/douyin' oss_object_key = Oss.video_sync_upload_oss(video_url, video_id, account_id, channel_name) status = oss_object_key.get("status") # 发送 oss oss_object_key = oss_object_key.get("oss_object_key") Common.logger("douyin").info(f"抖音视频链接oss发送成功,oss地址:{oss_object_key}") if status == 200: cls.insert_videoUrl(video_id, account_id, oss_object_key, mark) Common.logger("douyin").info( f"视频地址插入数据库成功,视频id:{video_id},用户主页id:{account_id},视频储存地址:{oss_object_key}") # 发送成功 存入数据库 except Exception as e: Common.logger("douyin").warning(f"抓取单条视频异常:{e}\n") continue except Exception as e: Common.logger("douyin").warning(f"抓取异常:{e}\n") return if __name__ == '__main__': douyinAuthor.get_videoList()