""" @author: luojunhui """ import json import time from tqdm import tqdm from datetime import datetime, timedelta from argparse import ArgumentParser from applications import PQMySQL, DeNetMysql, longArticlesMySQL TOULIU_ACCOUNTS = { 'gh_93e00e187787', 'gh_ac43e43b253b', 'gh_68e7fdc09fe4', 'gh_77f36c109fb1', 'gh_b181786a6c8c', 'gh_1ee2e1b39ccf' } ARTICLES_DAILY = 1 TOULIU = 2 def get_account_fans_by_dt(db_client) -> dict: """ 获取每个账号发粉丝,通过日期来区分 :return: """ sql = f""" SELECT t1.date_str, t1.fans_count, t2.gh_id FROM datastat_wx t1 JOIN publish_account t2 ON t1.account_id = t2.id WHERE t2.channel = 5 AND t2.status = 1 AND t1.date_str >= '2024-09-01' ORDER BY t1.date_str; """ result = db_client.select(sql) D = {} for line in result: dt = line[0] fans = line[1] gh_id = line[2] if D.get(gh_id): D[gh_id][dt] = fans else: D[gh_id] = {dt: fans} return D class UpdateAccountInfoVersion3(object): """ 更新账号信息 v3 """ def __init__(self): self.pq = PQMySQL() self.de = DeNetMysql() self.lam = longArticlesMySQL() def get_account_position_read_rate(self, dt): """ 从长文数据库获取账号阅读均值 :return: """ dt = int(dt.replace("-", "")) sql = f""" SELECT gh_id, position, read_rate_avg FROM long_articles_read_rate WHERE dt_version = {dt}; """ result = self.lam.select(sql) account_read_rate_dict = {} for item in result: gh_id = item[0] position = item[1] rate = item[2] key = "{}_{}".format(gh_id, position) account_read_rate_dict[key] = rate return account_read_rate_dict def get_publishing_accounts(self): """ 获取每日正在发布的账号 :return: """ sql = f""" SELECT DISTINCT t3.`name`, t3.gh_id, t3.follower_count, t6.account_source_name, t6.mode_type, t6.account_type, t6.`status` FROM publish_plan t1 JOIN publish_plan_account t2 ON t1.id = t2.plan_id JOIN publish_account t3 ON t2.account_id = t3.id LEFT JOIN publish_account_wx_type t4 on t3.id = t4.account_id LEFT JOIN wx_statistics_group_source_account t5 on t3.id = t5.account_id LEFT JOIN wx_statistics_group_source t6 on t5.group_source_name = t6.account_source_name WHERE t1.plan_status = 1 AND t3.channel = 5 GROUP BY t3.id; """ account_list = self.de.select(sql) result_list = [ { "account_name": i[0], "gh_id": i[1], "fans": i[2], "account_source_name": i[3], "mode_type": i[4], "account_type": i[5], "status": i[6] } for i in account_list ] return result_list def do_task_list(self, dt): """ do it """ fans_dict = get_account_fans_by_dt(db_client=self.de) account_list = self.get_publishing_accounts() rate_dict = self.get_account_position_read_rate(dt) for account in tqdm(account_list): business_type = TOULIU if account['gh_id'] in TOULIU_ACCOUNTS else ARTICLES_DAILY fans = fans_dict.get(account['gh_id'], {}).get(dt, 0) if fans: for index in range(1, 9): gh_id_position = "{}_{}".format(account['gh_id'], index) if rate_dict.get(gh_id_position): rate = rate_dict[gh_id_position] read_avg = fans * rate print(rate, read_avg) insert_sql = f""" INSERT INTO account_avg_info_v3 (gh_id, position, update_time, account_name, fans, read_avg, like_avg, status, account_type, account_mode, account_source, account_status, business_type, read_rate_avg) values (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s); """ try: self.pq.update( sql=insert_sql, params=( account['gh_id'], index, dt, account['account_name'], fans, read_avg, 0, 1, account['account_type'], account['mode_type'], account['account_source_name'], account['status'], business_type, rate ) ) except Exception as e: updateSQL = f""" UPDATE account_avg_info_v3 set fans = %s, read_avg = %s, read_rate_avg = %s where gh_id = %s and position = %s and update_time = %s """ try: affected_rows = self.pq.update( sql=updateSQL, params=( fans, read_avg, rate, account['gh_id'], index, dt ) ) except Exception as e: print(e) # 修改前一天的状态为 0 update_status_sql = f""" UPDATE account_avg_info_v3 SET status = %s where update_time != %s and gh_id = %s and position = %s; """ rows_affected = self.pq.update( sql=update_status_sql, params=( 0, dt, account['gh_id'], index ) ) print("修改成功") def main(): """ main job :return: """ parser = ArgumentParser() parser.add_argument("--run-date", help="Run only once for date in format of %Y-%m-%d. \ If no specified, run as daily jobs.") args = parser.parse_args() Up = UpdateAccountInfoVersion3() if args.run_date: Up.do_task_list(dt=args.run_date) else: dt_object = datetime.fromtimestamp(int(time.time())) one_day = timedelta(days=1) yesterday = dt_object - one_day yesterday_str = yesterday.strftime('%Y-%m-%d') Up.do_task_list(dt=yesterday_str) if __name__ == '__main__': main()