123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145 |
- """
- @author: luojunhui
- """
- import json
- import time
- import schedule
- from tqdm import tqdm
- from datetime import datetime, timedelta
- from applications import PQMySQL, DeNetMysql, Functions
- def get_accounts_read_rate():
- """
- 获取阅读率表
- :return:
- """
- with open("config/account_read_rate_map.json", encoding="utf-8") as f:
- account_list = json.loads(f.read())
- return account_list
- class UpdateAccountInfoVersion3(object):
- """
- 更新账号信息 v3
- """
- def __init__(self):
- self.pq = PQMySQL()
- self.de = DeNetMysql()
- def get_account_fans_by_date(self, date):
- """
- 通过日期来获取所有账号的粉丝 dict
- :param date 2024-01-01
- :return: dict key: gh_id, value: fans(int)
- """
- sql = f"""
- select
- t1.fans_count, t2.gh_id
- from datastat_wx t1
- join publish_account t2 on t1.account_id = t2.id
- where t2.channel = 5 and t2.status = 1 and t1.date_str >= '{date}' order by t1.date_str;
- """
- result = self.de.select(sql)
- fans_dict = {}
- for account_info in result:
- fans_count, gh_id = account_info
- fans_dict[gh_id] = fans_count
- return fans_dict
- def update_single_day(self, dt, fans_dict):
- """
- :return:
- """
- task_list = get_accounts_read_rate()
- for item in tqdm(task_list):
- fans = fans_dict.get(item['gh_id'], 0)
- read_avg = fans * item['read_rate_avg']
- insert_sql = f"""
- INSERT INTO account_avg_info_v3
- (gh_id, position, update_time, account_name, fans, read_avg, like_avg, status, account_type, account_mode, account_source, account_status, business_type, read_rate_avg)
- values
- (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s);
- """
- try:
- self.pq.update(
- sql=insert_sql,
- params=(
- item['gh_id'],
- item['position'],
- dt,
- item['account_name'],
- fans,
- read_avg,
- item['like_avg'],
- 1,
- item['account_type'],
- item['account_mode'],
- item['account_source'],
- item['account_status'],
- item['business_type'],
- item['read_rate_avg']
- )
- )
- except Exception as e:
- updateSQL = f"""
- UPDATE account_avg_info_v3
- set fans = %s, read_avg = %s
- where gh_id = %s and position = %s and update_time = %s
- """
- try:
- self.pq.update(
- sql=updateSQL,
- params=(
- fans,
- read_avg,
- item['gh_id'],
- item['position'],
- dt
- )
- )
- print("update success")
- except Exception as e:
- print(e)
- # 修改前一天的状态为 0
- uuu_sql = f"""
- UPDATE account_avg_info_v3
- SET status = %s
- where update_time != %s and gh_id = %s and position = %s;
- """
- self.pq.update(
- sql=uuu_sql,
- params=(
- 0, dt, item['gh_id'], item['position']
- )
- )
- print("修改成功")
- def updateDaily():
- """
- main job
- :return:
- """
- Up = UpdateAccountInfoVersion3()
- dt_object = datetime.fromtimestamp(int(time.time()))
- one_day = timedelta(days=1)
- yesterday = dt_object - one_day
- yesterday_str = yesterday.strftime('%Y-%m-%d')
- fans_dict = Up.get_account_fans_by_date(yesterday_str)
- Up.update_single_day(yesterday_str, fans_dict)
- if __name__ == '__main__':
- schedule.every().day.at("10:15").do(Functions().job_with_thread, updateDaily)
- schedule.every().day.at("10:30").do(Functions().job_with_thread, updateDaily)
- schedule.every().day.at("10:50").do(Functions().job_with_thread, updateDaily)
- while True:
- schedule.run_pending()
- time.sleep(1)
|