123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148 |
- """
- @author: luojunhui
- """
- import json
- import time
- import schedule
- from tqdm import tqdm
- from datetime import datetime, timedelta
- from applications import longArticlesMySQL, PQMySQL, DeNetMysql, Functions
- class UpdateAccountInfoVersion3(object):
- """
- 更新账号信息 v3
- """
- lam = longArticlesMySQL()
- pq = PQMySQL()
- de = DeNetMysql()
- @classmethod
- def getAccountFans(cls):
- """
- :return:
- """
- sql = f"""
- select t1.date_str, t1.fans_count, t2.gh_id, t2.name
- from datastat_wx t1
- join publish_account t2 on t1.account_id = t2.id
- where t2.channel = 5 and t2.status = 1 and t1.date_str >= '2024-07-01' order by t1.date_str;"""
- result = cls.de.select(sql)
- D = {}
- for line in result:
- dt = line[0]
- fans = line[1]
- account_name = line[3]
- if D.get(account_name):
- D[account_name][dt] = fans
- else:
- D[account_name] = {dt: fans}
- # print(json.dumps(D, ensure_ascii=False, indent=4))
- return D
- @classmethod
- def getAccountRate(cls):
- """
- 获取账号阅读率
- :return:
- """
- sql = "select account_name, position, read_rate_avg from long_articles_read_rate;"
- result = cls.lam.select(sql)
- D = {}
- for line in result:
- if D.get(line[0]):
- D[line[0]][line[1]] = line[2]
- else:
- D[line[0]] = {line[1]: line[2]}
- return D
- @classmethod
- def reverseSingleDay(cls, dt, fans_dict, rate_dict):
- """
- :return:
- """
- sql = f"""select * from account_avg_info_v3 where update_time = '2024-09-09';"""
- result = cls.pq.select(sql)
- for line in tqdm(result):
- temp = list(line)
- temp[2] = dt
- temp[4] = fans_dict.get(temp[3], {}).get(dt, 0)
- temp[-1] = rate_dict.get(temp[3], {}).get(temp[1], 0)
- temp[5] = fans_dict.get(temp[3], {}).get(dt, 0) * rate_dict.get(temp[3], {}).get(temp[1], 0)
- temp[7] = 1
- usql = f"""
- INSERT INTO account_avg_info_v3
- (gh_id, position, update_time, account_name, fans, read_avg, like_avg, status, account_type, account_mode, account_source, account_status, business_type, read_rate_avg)
- values
- (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s);
- """
- try:
- cls.pq.update(
- sql=usql,
- params=tuple(temp)
- )
- except Exception as e:
- updateSQL = f"""
- UPDATE account_avg_info_v3
- set fans = %s, read_avg = %s, read_rate_avg = %s
- where account_name = %s and position = %s and update_time = %s
- """
- try:
- cls.pq.update(
- sql=updateSQL,
- params=(
- temp[4],
- temp[5],
- temp[-1],
- temp[3],
- temp[1],
- temp[2]
- )
- )
- except Exception as e:
- print(e)
- # 修改前一天的状态为 0
- uuu_sql = f"""
- UPDATE account_avg_info_v3
- SET status = %s
- where update_time != %s and account_name = %s and position = %s;
- """
- cls.pq.update(
- sql=uuu_sql,
- params=(
- 0, dt, temp[3], temp[1]
- )
- )
- def updateDaily():
- """
- main job
- :return:
- """
- Up = UpdateAccountInfoVersion3()
- fd = Up.getAccountFans()
- rd = Up.getAccountRate()
- dt_object = datetime.fromtimestamp(int(time.time()))
- one_day = timedelta(days=1)
- yesterday = dt_object - one_day
- yesterday_str = yesterday.strftime('%Y-%m-%d')
- # print(yesterday_str)
- Up.reverseSingleDay(yesterday_str, fd, rd)
- if __name__ == '__main__':
- # updateDaily()
- schedule.every().day.at("10:15").do(Functions().job_with_thread, updateDaily)
- schedule.every().day.at("10:30").do(Functions().job_with_thread, updateDaily)
- schedule.every().day.at("10:50").do(Functions().job_with_thread, updateDaily)
- while True:
- schedule.run_pending()
- time.sleep(1)
|