import os import sys from typing import Optional from common.mysql_db_aigc import AigcMysqlHelper sys.path.append(os.getcwd()) from datetime import datetime from common.mysql_db import MysqlHelper class sqlCollect(): """ 视频信息写入库中 """ @classmethod def insert_task(cls, task_mark, video_id, mark, channel): current_time = datetime.now() formatted_time = current_time.strftime("%Y-%m-%d %H:%M:%S") insert_sql = f"""INSERT INTO pj_video_data (task_name, used_video_id, mark_name, data_time, channel) values ('{task_mark}' ,'{video_id}','{mark}', '{formatted_time}', '{channel}')""" MysqlHelper.update_values( sql=insert_sql ) """ 判断该任务id是否用过 """ @classmethod def is_used(cls, video_id, mark_name, channel): sql = """ SELECT used_video_id FROM pj_video_data WHERE used_video_id = %s AND mark_name = %s AND channel = %s ORDER BY data_time DESC LIMIT 1 """ data = MysqlHelper.get_values(sql, (str(video_id), mark_name, channel)) if len(data) == 0 or data == (): return False return True """ 根据时间判断该任务id是否用过 """ @classmethod def is_used_days(cls, video_id, mark_name, channel, day_count): sql = """ SELECT used_video_id FROM pj_video_data WHERE used_video_id = %s AND mark_name = %s AND channel = %s ORDER BY data_time DESC LIMIT 1 AND create_time >= DATE_SUB(NOW(), INTERVAL %s DAY) """ data = MysqlHelper.get_values(sql, (str(video_id), mark_name, channel,int(day_count))) if len(data) == 0 or data == (): return False return True """ 快手小程序判断该任务id是否用过 """ @classmethod def ks_is_used_xcx(cls, video_id, channel): sql = """SELECT used_video_id FROM pj_video_data WHERE used_video_id = %s AND channel = %s AND data_time >= DATE_SUB(NOW(), INTERVAL 7 DAY) ORDER BY data_time DESC LIMIT 1;""" data = MysqlHelper.get_values(sql, (str(video_id), channel)) if len(data) == 0 or data == (): return False return True @classmethod def get_history_id(cls, channel, url): """ 从数据库表中读取 id """ sql = f"""select name_id from accounts where name = %s and platform = %s and useful = 1 limit 1""" data = MysqlHelper.get_values(sql, (url, channel)) if data: return data[0][0] else: return False @classmethod def insert_history_id(cls, account_name, target, channel): insert_sql = f"""INSERT INTO accounts (name, name_id, platform, useful) values ("{account_name}", "{target}", "{channel}", 1 )""" MysqlHelper.update_values( sql=insert_sql ) @classmethod def insert_machine_making_data(cls, name: str, task_mark: str, channel_id: str, url: str, v_id: str, piaoquan_id: str, new_title: str, code: str, formatted_time, old_title: str, oss_object_key: str): insert_sql = f"""INSERT INTO machine_making_data (name, task_mark, channel, user, v_id, pq_uid, title, pq_vid, data_time, old_title, oss_object_key) values ("{name}", "{task_mark}", "{channel_id}", "{url}", "{v_id}" , "{piaoquan_id}", "{new_title}", "{code}", "{formatted_time}", "{old_title}", "{oss_object_key}")""" MysqlHelper.update_values( sql=insert_sql ) @classmethod def get_feed_count(cls, channel): sql = """SELECT count(0) FROM machine_making_data where channel = %s and DATE(data_time) = CURRENT_DATE AND HOUR(data_time) <= HOUR(NOW());""" count = MysqlHelper.get_values(sql, (str(channel))) return count @classmethod def get_crawler_count(cls, platform): sql = """SELECT count(0) FROM crawler_video where platform = %s and DATE(create_time) = CURRENT_DATE AND HOUR(create_time) < HOUR(NOW());""" count = MysqlHelper.get_values(sql, (str(platform))) return count @classmethod def get_ytd_crawler_count(cls, platform): sql = """SELECT count(0) FROM crawler_video where platform = %s and create_time >= CURRENT_DATE - INTERVAL 1 DAY AND create_time < CURRENT_DATE - INTERVAL 1 DAY + INTERVAL HOUR(NOW()) HOUR;""" count = MysqlHelper.get_values(sql, (str(platform))) return count @classmethod def get_bygj_count(cls, name): sql = """SELECT count(0) FROM machine_making_data where name = %s and task_mark = "搬运工具" and DATE(data_time) = CURRENT_DATE AND HOUR(data_time) <= HOUR(NOW());""" count = MysqlHelper.get_values(sql, (str(name))) return count @classmethod def get_mark_count(cls, mark): sql = """SELECT count(0) FROM machine_making_data where task_mark = %s and DATE(data_time) = CURRENT_DATE AND HOUR(data_time) <= HOUR(NOW());""" count = MysqlHelper.get_values(sql, (str(mark))) return count @classmethod def get_bygj_all_count(cls): sql = """SELECT count(0) FROM machine_making_data where task_mark = %s and DATE(data_time) = CURRENT_DATE AND HOUR(data_time) <= HOUR(NOW());""" count = MysqlHelper.get_values(sql, (str("搬运工具"))) return count @classmethod def get_ytd_bygj_count(cls, name): sql = """SELECT count(0) FROM machine_making_data where name = %s and task_mark = "搬运工具" and data_time >= CURRENT_DATE - INTERVAL 1 DAY AND data_time <= CURRENT_DATE - INTERVAL 1 DAY + INTERVAL HOUR(NOW()) HOUR;""" count = MysqlHelper.get_values(sql, (str(name))) return count @classmethod def get_channel_count(cls, channel, name): sql = """SELECT count(0) FROM machine_making_data where channel = %s and name = %s and DATE(data_time) = CURRENT_DATE AND HOUR(data_time) <= HOUR(NOW());""" count = MysqlHelper.get_values(sql, (str(channel),name)) return count @classmethod def get_ytd_channel_count(cls, channel, name): sql = """SELECT count(0) FROM machine_making_data where channel = %s and name = %s and data_time >= CURRENT_DATE - INTERVAL 1 DAY AND data_time <= CURRENT_DATE - INTERVAL 1 DAY + INTERVAL HOUR(NOW()) HOUR;""" count = MysqlHelper.get_values(sql, (str(channel), name)) return count @classmethod def get_name_count(cls, name): sql = """SELECT count(0) FROM machine_making_data where name = %s and DATE(data_time) = CURRENT_DATE AND HOUR(data_time) <= HOUR(NOW());""" count = MysqlHelper.get_values(sql, (str(name))) return count @classmethod def get_ytd_name_count(cls, name): sql = """SELECT count(0) FROM machine_making_data where name = %s and data_time >= CURRENT_DATE - INTERVAL 1 DAY AND data_time <= CURRENT_DATE - INTERVAL 1 DAY + INTERVAL HOUR(NOW()) HOUR;""" count = MysqlHelper.get_values(sql, (str(name))) return count """ 判断该任务id是否用过 """ @classmethod def ks_is_used(cls, photo_id): sql = """ SELECT photo_id FROM ks_category_video WHERE photo_id = %s """ data = MysqlHelper.get_values(sql, (str(photo_id))) if len(data) == 0 or data == (): return False return True """查询视频号user是否添加过""" @classmethod def sph_channel_user(cls, user_id): sql = """ SELECT user_id FROM sph_channel_user WHERE user_id = %s """ data = MysqlHelper.get_values(sql, (str(user_id))) if len(data) == 0 or data == (): return True return False """抓取视频号数据入库""" @classmethod def sph_data_info(cls, channel: str, objectId: str, video_url: str, cover: str, video_title: str, share_cnt: str, like_cnt: str, oss_video_key: str, oss_cover_key: str, nick_name: str, user_name: str, comment_count: str, fav_count: str, create_time: str, duration: str): insert_sql = f"""INSERT INTO sph_data_info (channel, video_id, video_url, video_cover, video_title, share_cnt, like_cnt, oss_url, oss_cover, nick_name, user_name, comment_count, fav_count, create_time, duration) values ("{channel}", "{objectId}", "{video_url}", "{cover}", "{video_title}", "{share_cnt}", "{like_cnt}", "{oss_video_key}", "{oss_cover_key}", "{nick_name}", "{user_name}", "{comment_count}", "{fav_count}", "{create_time}", "{duration}")""" res = MysqlHelper.update_values( sql=insert_sql ) return res """查询是否有视频号是否插入过数据库""" @classmethod def sph_data_info_v_id(cls, video_id, channel): sql = """ SELECT video_id FROM sph_data_info WHERE video_id = %s and channel = %s """ data = MysqlHelper.get_values(sql, (str(video_id), channel)) if data: return True return False @classmethod def sph_data_info_count(cls, user, channel): sql = """SELECT count(*) FROM sph_data_info WHERE nick_name = %s and channel = %s """ data = MysqlHelper.get_values(sql, (user, channel)) return data[0] """获取视频号历史数据""" @classmethod def sph_data_info_list(cls, user): sql = """SELECT video_id, video_title, share_cnt, like_cnt, oss_url, oss_cover, duration FROM sph_data_info WHERE nick_name = %s ORDER BY share_cnt DESC""" data = MysqlHelper.get_values(sql, (user)) if data: return data return None """查询是否有视频号数据""" @classmethod def sph_channel_user_list(cls): sql = """ SELECT user_id FROM sph_channel_user WHERE status = %s """ data = MysqlHelper.get_values(sql, (0)) if len(data) == 0 or data == (): return None return data @classmethod def update_sph_channel_user_status(cls, user): """ 视频号账号状态改为1 代表已经抓取过 """ sql = f"""UPDATE sph_channel_user set status = 1 where user_id = '{user}'""" res = MysqlHelper.update_values( sql=sql ) return res """插入视频号信息""" @classmethod def insert_sph_channel_user(cls, channel, user_id): insert_sql = f"""INSERT INTO sph_channel_user (channel, user_id, status) values ("{channel}", "{user_id}", 0)""" res = MysqlHelper.update_values( sql=insert_sql ) @classmethod def insert_ks_data(cls, user_name: str, user_sex: str, time_data, caption: str, view_count: str, like_count: str, share_count: str, duration: str, main_mv_url: str, thumbnail_url: str, user_id: str, status: str, photo_id: str, category_name: str, age: str, oss_object: Optional[str] = None, video_uid: Optional[str] = None): current_time = datetime.now() formatted_time = current_time.strftime("%Y-%m-%d %H:%M:%S") insert_sql = f"""INSERT INTO ks_category_video (user_name, user_sex, time_data, caption, view_count, like_count, share_count, duration, main_mv_url, thumbnail_url, user_id, status, age_proportion, video_oss_path, pq_video_id, update_time, photo_id, category_name) values ("{user_name}", "{user_sex}", "{time_data}", "{caption}", "{view_count}", "{like_count}", "{share_count}", "{duration}", "{main_mv_url}", "{thumbnail_url}", "{user_id}", "{status}", "{age}", "{oss_object}", "{video_uid}", "{formatted_time}", "{photo_id}", "{category_name}")""" res = MysqlHelper.update_values( sql=insert_sql ) @classmethod def get_shp_dd_data(cls, url): """ 获取视频号单点内容 """ sql = f"""select video_id,title,author_id,author_name,cover_url,video_url,video_duration,from_user_id,from_user_name,from_group_id,from_group_name,source,wx_msg, is_encrypted, decode_key from dandian_content where from_user_name = %s and has_used = 0 ORDER BY create_timestamp DESC limit 2""" data = AigcMysqlHelper.get_values(sql, (url,)) return data @classmethod def get_feed_data(cls, channel): """ 获取feed流视频 """ sql = f"""select video_id,channel,video_url,cover_url,title from automator_feed_video where channel = %s and has_used = 0 and if_50_like = 1 ORDER BY create_time DESC limit 1""" data = AigcMysqlHelper.get_values(sql, (channel,)) return data @classmethod def update_feed_vid(cls, vid): """ 视频号单点修改状态为1 """ sql = f"""UPDATE automator_feed_video set has_used = 1 where video_id = '{vid}'""" res = AigcMysqlHelper.update_values( sql=sql ) return res @classmethod def update_feed_vid_2(cls, vid): """ 视频号单点修改状态为1 """ sql = f"""UPDATE automator_feed_video set has_used = 2 where video_id = '{vid}'""" res = AigcMysqlHelper.update_values( sql=sql ) return res @classmethod def update_shp_dd_vid(cls, vid): """ 视频号单点修改状态为1 """ sql = f"""UPDATE dandian_content set has_used = 1 where video_id = '{vid}'""" res = AigcMysqlHelper.update_values( sql=sql ) return res @classmethod def update_shp_dd_vid_4(cls, vid): """ 视频号单点修改状态为4 """ sql = f"""UPDATE dandian_content set has_used = 4 where video_id = '{vid}'""" res = AigcMysqlHelper.update_values( sql=sql ) return res @classmethod def select_ks_star_data(cls, profile_id): sql = """SELECT * FROM ks_star_info where profile_id = %s""" data = MysqlHelper.get_values(sql, (profile_id)) if data: return data return None @classmethod def insert_ks_star_data(cls, user_id: str, star_id: str, name, kwai_id: str, gender: str, fans_number: str, profile_id: str, star_tag_str: str, industry_tag_str: str, photo_expect_play: str, photo_expect_cpm: str, photo_interaction_rate: str, photo_complete_play_rate: str, fans_increase_num: str, fans_increase_rate: str,): insert_sql = f"""INSERT INTO ks_star_info (user_id, star_id, name, kwai_id, gender, fans_number, profile_id, star_tag_str, industry_tag_str, photo_expect_play, photo_expect_cpm, photo_interaction_rate, photo_complete_play_rate, fans_increase_num, fans_increase_rate) values ("{user_id}", "{star_id}", "{name}", "{kwai_id}", "{gender}", "{fans_number}", "{profile_id}", "{star_tag_str}", "{industry_tag_str}", "{photo_expect_play}", "{photo_expect_cpm}", "{photo_interaction_rate}", "{photo_complete_play_rate}", "{fans_increase_num}", "{fans_increase_rate}")""" res = MysqlHelper.update_values( sql=insert_sql ) return res @classmethod def insert_ks_star_works_info(cls, star_id: str, profile_id: str, pplay_median_data_90: str, pplay_median_data_30: str, pphoto_cnt_data_90: str, pphoto_cnt_data_30: str, pavg_video_duration_data_90: str, pavg_video_duration_data_30: str, star_work_report_90: str, star_work_report_30: str, pavg_share_cnt_90: str, pavg_share_cnt_30: str): insert_sql = f"""INSERT INTO ks_star_works_info (star_id, profile_id, pplay_median_data_90, pplay_median_data_30, pphoto_cnt_data_90, pphoto_cnt_data_30, pavg_video_duration_data_90, pavg_video_duration_data_30, star_work_report_90, star_work_report_30, pavg_share_cnt_90, pavg_share_cnt_30) values ("{star_id}", "{profile_id}", "{pplay_median_data_90}", "{pplay_median_data_30}", "{pphoto_cnt_data_90}", "{pphoto_cnt_data_30}", "{pavg_video_duration_data_90}", "{pavg_video_duration_data_30}", "{star_work_report_90}", "{star_work_report_30}", "{pavg_share_cnt_90}", "{pavg_share_cnt_30}")""" res = MysqlHelper.update_values( sql=insert_sql ) return res @classmethod def insert_ks_list_portrait_info(cls, star_id: str, profile_id: str, type: str, name: str, proportion: str): insert_sql = f"""INSERT INTO ks_list_portrait_info (star_id, profile_id, type, name, proportion) values ("{star_id}", "{profile_id}", "{type}", "{name}", "{proportion}")""" res = MysqlHelper.update_values( sql=insert_sql ) return res """品类视频新条数记录""" @classmethod def insert_spider_supply_targetcnt(cls, channel_id: str, name: str, url: str, number: str, new_count: str, s_cnt: str): insert_sql = f"""INSERT INTO spider_supply_targetcnt (channel_id, name, url, o_cnt, n_cnt, s_cnt) values ("{channel_id}", "{name}", "{url}", "{number}", "{new_count}", "{s_cnt}")""" res = MysqlHelper.update_values( sql=insert_sql ) return res """ 获取相似溯源基础账号 """ @classmethod def get_machine_making_reflux(cls, channel, channel1, channel2, channel3, channel4): sql = """SELECT user FROM machine_making_reflux WHERE (channel = %s or channel = %s or channel = %s or channel = %s or channel = %s) and status = 0""" data = MysqlHelper.get_values(sql, (channel, channel1, channel2, channel3, channel4)) if len(data) == 0 or data == (): return None return data """ 修改相似溯源基础账号状态 """ @classmethod def update_machine_making_reflux(cls, user_id): sql = f"""UPDATE machine_making_reflux set status = 1 where user = '{user_id}'""" res = MysqlHelper.update_values( sql=sql ) return res """插入符合规则的用户主页""" @classmethod def insert_user_data(cls, uid, name, account_id, channel, status): insert_sql = f"""INSERT INTO machine_making_traceability (old_account_id, name, new_account_id, channel, status) values ("{uid}", "{name}","{account_id}","{channel}", {status})""" res = MysqlHelper.update_values( sql=insert_sql ) return res """相似溯源-视频号数据插入""" @classmethod def insert_xssy_sph_info(cls, account_user: str, traceable_user: str, channel: str, traceable_user_v2: str, has_used: str, appid:Optional[str] = None, pq_id:Optional[str] = None): insert_sql = f"""INSERT INTO xssy_sph (account_user, traceable_user, channel, traceable_user_v2, pq_id, has_used, appid) values ("{account_user}", "{traceable_user}","{channel}", "{traceable_user_v2}","{pq_id}", {has_used},"{appid}")""" res = MysqlHelper.update_values( sql=insert_sql ) return res """查询该账号是否存在""" @classmethod def select_crawler_uesr_v3(cls, link: str): sql = """SELECT uid FROM crawler_user_v3 WHERE link = %s and source = 'jiqizidonggaizao'""" data = MysqlHelper.get_values(sql, (link)) if data: return data return None