import os import sys from typing import Optional from common.mysql_db_aigc import AigcMysqlHelper sys.path.append(os.getcwd()) from datetime import datetime from common import MysqlHelper class sqlCollect(): """ 视频信息写入库中 """ @classmethod def insert_task(cls, task_mark, video_id, mark, channel): current_time = datetime.now() formatted_time = current_time.strftime("%Y-%m-%d %H:%M:%S") insert_sql = f"""INSERT INTO pj_video_data (task_name, used_video_id, mark_name, data_time, channel) values ('{task_mark}' ,'{video_id}','{mark}', '{formatted_time}', '{channel}')""" MysqlHelper.update_values( sql=insert_sql ) """ 判断该任务id是否用过 """ @classmethod def is_used(cls, task_mark, video_id, mark_name, channel): sql = """ SELECT used_video_id FROM pj_video_data WHERE used_video_id = %s AND task_name = %s AND mark_name = %s AND channel = %s ORDER BY data_time DESC LIMIT 1 """ data = MysqlHelper.get_values(sql, (str(video_id), task_mark, mark_name, channel)) if len(data) == 0 or data == (): return True return False @classmethod def get_history_id(cls, channel, url): """ 从数据库表中读取 id """ sql = f"""select name_id from accounts where name = %s and platform = %s and useful = 1 limit 1""" data = MysqlHelper.get_values(sql, (url, channel)) if data: return data[0][0] else: return False @classmethod def insert_history_id(cls, account_name, target, channel): insert_sql = f"""INSERT INTO accounts (name, name_id, platform, useful) values ("{account_name}", "{target}", "{channel}", 1 )""" MysqlHelper.update_values( sql=insert_sql ) @classmethod def insert_machine_making_data(cls, name: str, task_mark: str, channel_id: str, url: str, v_id: str, piaoquan_id: str, new_title: str, code: str, formatted_time, old_title: str, oss_object_key: str): insert_sql = f"""INSERT INTO machine_making_data (name, task_mark, channel, user, v_id, pq_uid, title, pq_vid, data_time, old_title, oss_object_key) values ("{name}", "{task_mark}", "{channel_id}", "{url}", "{v_id}" , "{piaoquan_id}", "{new_title}", "{code}", "{formatted_time}", "{old_title}", "{oss_object_key}")""" MysqlHelper.update_values( sql=insert_sql ) """ 判断该任务id是否用过 """ @classmethod def ks_is_used(cls, photo_id): sql = """ SELECT photo_id FROM ks_category_video WHERE photo_id = %s """ data = MysqlHelper.get_values(sql, (str(photo_id))) if len(data) == 0 or data == (): return False return True @classmethod def insert_ks_data(cls, user_name: str, user_sex: str, time_data, caption: str, view_count: str, like_count: str, share_count: str, duration: str, main_mv_url: str, thumbnail_url: str, user_id: str, status: str, photo_id: str, category_name: str, age: str, oss_object: Optional[str] = None, video_uid: Optional[str] = None): current_time = datetime.now() formatted_time = current_time.strftime("%Y-%m-%d %H:%M:%S") insert_sql = f"""INSERT INTO ks_category_video (user_name, user_sex, time_data, caption, view_count, like_count, share_count, duration, main_mv_url, thumbnail_url, user_id, status, age_proportion, video_oss_path, pq_video_id, update_time, photo_id, category_name) values ("{user_name}", "{user_sex}", "{time_data}", "{caption}", "{view_count}", "{like_count}", "{share_count}", "{duration}", "{main_mv_url}", "{thumbnail_url}", "{user_id}", "{status}", "{age}", "{oss_object}", "{video_uid}", "{formatted_time}", "{photo_id}", "{category_name}")""" res = MysqlHelper.update_values( sql=insert_sql ) @classmethod def get_shp_dd_data(cls, url): """ 获取视频号单点内容 """ sql = f"""select video_id,title,author_id,author_name,cover_url,video_url,video_duration,from_user_id,from_user_name,from_group_id,from_group_name from dandian_shipinhao where from_user_name = %s and has_used = 0 """ data = AigcMysqlHelper.get_values(sql, (url)) return data @classmethod def update_shp_dd_vid(cls, vid): """ 视频号单点修改状态为1 """ sql = f"""UPDATE dandian_shipinhao set has_used = 1 where video_id = '{vid}'""" res = AigcMysqlHelper.update_values( sql=sql ) return res