# -*- coding: utf-8 -*- # @Time: 2023/12/26 import os import random import sys import datetime sys.path.append(os.getcwd()) from datetime import datetime, timedelta from common.db import MysqlHelper from common.feishu import Feishu from common import Common class Material(): # 获取所有用户名 @classmethod def feishu_name(cls): summary = Feishu.get_values_batch("summary", "n9xlLF") list = [] for row in summary[2:]: mark = row[0] mark_name = row[1] list.append({"mark": mark, "mark_name": mark_name}) return list # 获取汇总表所有 @classmethod def feishu_list(cls): summary = Feishu.get_values_batch("summary", "n9xlLF") list = [] for row in summary[2:]: mark = row[0] mark_name = row[1] feishu_id = row[3] video_call = row[4] pq_id = row[7] number = {"mark": mark, "feishu_id": feishu_id, "video_call": video_call, "pq_id": pq_id, "mark_name": mark_name} list.append(number) return list # 获取汇总表所有 @classmethod def feishu_gs_list(cls): try: summary = Feishu.get_values_batch("summary", "gGiXDp") list = [] for row in summary[1:]: mark = row[0] mark_name = row[1] feishu_id = row[3] video_call = row[4] pq_id = row[7] sum_count = row[8] sheet = row[5] zd_count = row[8] platform_list = [] if sheet: parts = sheet.split(',') for part in parts: sub_parts = part.split('--') platform_list.append(sub_parts) number = {"mark": mark, "feishu_id": feishu_id, "video_call": video_call, "pq_id": pq_id, "mark_name": mark_name, "sum_count": sum_count, "platform_list": platform_list, "zd_count": zd_count} list.append(number) except Exception as e: Common.logger("gensui").warning(f"抓取异常:{e}\n") return '' return list # 获取爆款汇总表所有 @classmethod def feishu_bk_list(cls): try: summary = Feishu.get_values_batch("summary", "xJxEUH") list = [] for row in summary[1:]: mark = row[0] mark_name = row[1] feishu_id = row[3] video_call = row[4] pq_id = row[5] if pq_id: number = {"mark": mark, "feishu_id": feishu_id, "video_call": video_call, "pq_id": pq_id, "mark_name": mark_name} list.append(number) except Exception as e: Common.logger("gensui").warning(f"抓取异常:{e}\n") return '' return list # 获取管理后台cookie @classmethod def get_houtai_cookie(cls): douyin_token = Feishu.get_values_batch("MW7VsTb1vhctEot34g7ckrjdnIe", "pLWwBm") for item in douyin_token: if item[0] == '管理后台': return item[1] # 获取汇总表所有待抓取 用户 @classmethod def get_all_user(cls, type): summary = Feishu.get_values_batch("summary", "n9xlLF") list = [] try: for row in summary[2:]: mark = row[0] mark_name = row[1] feishu_id = row[3] sheet = row[5] token = row[6] parts = sheet.split(',') result = [] for part in parts: sub_parts = part.split('--') result.append(sub_parts) douyin = result[0] kuaishou = result[1] if type == "douyin": number = {"mark": mark, "feishu_id": feishu_id, "channel": douyin, "token": token, "mark_name": mark_name} list.append(number) elif type == "kuaishou": number = {"mark": mark, "feishu_id": feishu_id, "channel": kuaishou, "token": token, "mark_name": mark_name} list.append(number) except Exception as e: Common.logger("zhuaqu").warning(f"抓取异常:{e}\n") return '' return list # 获取跟随任务汇总表所有待抓取 用户 @classmethod def get_all_gs_user(cls, type): summary = Feishu.get_values_batch("summary", "gGiXDp") list = [] for row in summary[1:]: mark = row[0] mark_name = row[1] feishu_id = row[3] sheet = row[5] token = row[6] zn_id = row[4] parts = zn_id.split(',') zn_result = [] for part in parts: sub_parts = part.split('--') zn_result.append(sub_parts) zn_link = zn_result[0][0] # 脚本链接 if sheet: parts = sheet.split(',') result = [] for part in parts: sub_parts = part.split('--') result.append(sub_parts) douyin = result[0] kuaishou = result[1] if type == "douyin": number = {"mark": mark, "feishu_id": feishu_id, "channel": douyin, "token": token, "mark_name": mark_name, "sheet": "1"} list.append(number) elif type == "kuaishou": number = {"mark": mark, "feishu_id": feishu_id, "channel": kuaishou, "token": token, "mark_name": mark_name, "sheet": "1"} list.append(number) elif type == "zhannei": number = {"mark": mark, "feishu_id": feishu_id, "channel": zn_link, "token": token, "mark_name": mark_name, "sheet": None} list.append(number) return list # 获取抖音 cookie @classmethod def get_cookie(cls, feishu_id, token, channel): token = Feishu.get_values_batch(feishu_id, token) for item in token: if item[0] == channel: return item[1] # 获取抖音视频链接 存入数据库 @classmethod def insert_user(cls, feishu_id, channel_id, mark, channel): user_list = [] # 获取抖音视频链接 douyin = Feishu.get_values_batch(feishu_id, channel_id) # 提取账号昵称和账号主页链接 for row in douyin[1:]: if channel == "站内": uid = row[0] else: uid = row[1] if uid: insert_sql = f"""INSERT INTO agc_channel_data (user_id, channel, mark) values ('{uid}', '{channel}', '{mark}')""" MysqlHelper.update_values( sql=insert_sql, env="prod", machine="", ) user_list.append(uid) return user_list @classmethod def get_uid(cls, uid, mark): current_time = datetime.now() formatted_time = current_time.strftime("%Y-%m-%d") uid_list = f"""select account_id FROM agc_video_deposit where time = '{formatted_time}' AND audio = '{uid}' and mark = '{mark}' GROUP BY account_id """ id_list = MysqlHelper.get_values(uid_list, "prod") return id_list # 获取音频类型+字幕+标题 @classmethod def get_all_data(cls, feishu_id, link, mark): list = [] video_list = [] # 获取音频类型+字幕 all_data = Feishu.get_values_batch(feishu_id, link) for row in all_data[1:]: uid = row[1] text = row[2] video = row[0] number = {"uid": uid, "text": text} if uid: list.append(number) if video: video_list.append(video) while True: list1 = random.choice(list) uid1 = list1['uid'] srt = list1['text'] id_list = cls.get_uid(uid1, mark) if len(id_list) < 2: return uid1, srt, video_list # 获取音频类型+字幕+标题 @classmethod def get_allbk_data(cls, feishu_id, link, mark): list_data = [] # 获取音频类型+字幕 all_data = Feishu.get_values_batch(feishu_id, link) for row in all_data[1:]: # excel_base_date = datetime(1899, 12, 30) # excel_date_number = row[0] # date_from_excel = excel_base_date + timedelta(days=excel_date_number) # # 获取当前时间 # current_date = datetime.now().date() # date_from_excel_str = date_from_excel.strftime("%Y-%m-%d") # current_date_str = current_date.strftime("%Y-%m-%d") # if date_from_excel_str == current_date_str: uid = row[0] text = row[1] video = row[2] number = {"uid": uid, "text": text, "video": video} if uid: list_data.append(number) else: return list_data # else: # return list_data return list_data # 获取站内视频id @classmethod def get_zn_user(cls, feishu_id, link): video_list = [] # 获取音频类型+字幕 all_data = Feishu.get_values_batch(feishu_id, link) for row in all_data[1:]: video = row[0] if video: video_list.append(video) return video_list # 获取用户名 @classmethod def get_user_id(cls, feishu_id, channel_id): user_list = [] # 获取抖音视频链接 douyin = Feishu.get_values_batch(feishu_id, channel_id) # 提取账号昵称和账号主页链接 for row in douyin[1:]: uid = row[1] user_list.append(uid) return user_list