""" @author: luojunhui """ import json import time import schedule from tqdm import tqdm from datetime import datetime, timedelta from applications import PQMySQL, DeNetMysql, Functions def get_accounts_read_rate(): """ 获取阅读率表 :return: """ with open("config/account_read_rate_map.json", encoding="utf-8") as f: account_list = json.loads(f.read()) return account_list class UpdateAccountInfoVersion3(object): """ 更新账号信息 v3 """ def __init__(self): self.pq = PQMySQL() self.de = DeNetMysql() def get_account_fans_by_date(self, date): """ 通过日期来获取所有账号的粉丝 dict :param date 2024-01-01 :return: dict key: gh_id, value: fans(int) """ sql = f""" select t1.fans_count, t2.gh_id from datastat_wx t1 join publish_account t2 on t1.account_id = t2.id where t2.channel = 5 and t2.status = 1 and t1.date_str >= '{date}' order by t1.date_str; """ result = self.de.select(sql) fans_dict = {} for account_info in result: fans_count, gh_id = account_info fans_dict[gh_id] = fans_count return fans_dict def update_single_day(self, dt, fans_dict): """ :return: """ task_list = get_accounts_read_rate() for item in tqdm(task_list): fans = fans_dict.get(item['gh_id'], 0) read_avg = fans * item['read_rate_avg'] insert_sql = f""" INSERT INTO account_avg_info_v3 (gh_id, position, update_time, account_name, fans, read_avg, like_avg, status, account_type, account_mode, account_source, account_status, business_type, read_rate_avg) values (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s); """ try: self.pq.update( sql=insert_sql, params=( item['gh_id'], item['position'], dt, item['account_name'], fans, read_avg, item['like_avg'], 1, item['account_type'], item['account_mode'], item['account_source'], item['account_status'], item['business_type'], item['read_rate_avg'] ) ) except Exception as e: updateSQL = f""" UPDATE account_avg_info_v3 set fans = %s, read_avg = %s where gh_id = %s and position = %s and update_time = %s """ try: self.pq.update( sql=updateSQL, params=( fans, read_avg, item['gh_id'], item['position'], dt ) ) print("update success") except Exception as e: print(e) # 修改前一天的状态为 0 uuu_sql = f""" UPDATE account_avg_info_v3 SET status = %s where update_time != %s and gh_id = %s and position = %s; """ self.pq.update( sql=uuu_sql, params=( 0, dt, item['gh_id'], item['position'] ) ) print("修改成功") def updateDaily(): """ main job :return: """ Up = UpdateAccountInfoVersion3() dt_object = datetime.fromtimestamp(int(time.time())) one_day = timedelta(days=1) yesterday = dt_object - one_day yesterday_str = yesterday.strftime('%Y-%m-%d') fans_dict = Up.get_account_fans_by_date(yesterday_str) Up.update_single_day(yesterday_str, fans_dict) if __name__ == '__main__': schedule.every().day.at("10:15").do(Functions().job_with_thread, updateDaily) schedule.every().day.at("10:30").do(Functions().job_with_thread, updateDaily) schedule.every().day.at("10:50").do(Functions().job_with_thread, updateDaily) while True: schedule.run_pending() time.sleep(1)