123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230 |
- """
- @author: luojunhui
- """
- import json
- import time
- from tqdm import tqdm
- from datetime import datetime, timedelta
- from argparse import ArgumentParser
- from applications import PQMySQL, DeNetMysql, longArticlesMySQL
- from applications.const import updateAccountReadAvgTaskConst
- from config import apolloConfig
- config = apolloConfig()
- unauthorized_account = json.loads(config.getConfigValue("unauthorized_gh_id_fans"))
- touliu_accounts = set(json.loads(config.getConfigValue("touliu_gh_id_list")))
- def get_account_fans_by_dt(db_client) -> dict:
- """
- 获取每个账号发粉丝,通过日期来区分
- :return:
- """
- sql = f"""
- SELECT
- t1.date_str,
- t1.fans_count,
- t2.gh_id
- FROM datastat_wx t1
- JOIN publish_account t2 ON t1.account_id = t2.id
- WHERE
- t2.channel = 5
- AND t2.status = 1
- AND t1.date_str >= '2024-09-01'
- ORDER BY t1.date_str;
- """
- result = db_client.select(sql)
- D = {}
- for line in result:
- dt = line[0]
- fans = line[1]
- gh_id = line[2]
- if D.get(gh_id):
- D[gh_id][dt] = fans
- else:
- D[gh_id] = {dt: fans}
- return D
- class UpdateAccountInfoVersion3(object):
- """
- 更新账号信息 v3
- """
- def __init__(self):
- self.const = updateAccountReadAvgTaskConst()
- self.pq = PQMySQL()
- self.de = DeNetMysql()
- self.lam = longArticlesMySQL()
- def get_account_position_read_rate(self, dt):
- """
- 从长文数据库获取账号阅读均值
- :return:
- """
- dt = int(dt.replace("-", ""))
- sql = f"""
- SELECT
- gh_id, position, read_rate_avg
- FROM
- long_articles_read_rate
- WHERE dt_version = {dt};
- """
- result = self.lam.select(sql)
- account_read_rate_dict = {}
- for item in result:
- gh_id = item[0]
- position = item[1]
- rate = item[2]
- key = "{}_{}".format(gh_id, position)
- account_read_rate_dict[key] = rate
- return account_read_rate_dict
- def get_publishing_accounts(self):
- """
- 获取每日正在发布的账号
- :return:
- """
- sql = f"""
- SELECT DISTINCT
- t3.`name`,
- t3.gh_id,
- t3.follower_count,
- t6.account_source_name,
- t6.mode_type,
- t6.account_type,
- t6.`status`
- FROM
- publish_plan t1
- JOIN publish_plan_account t2 ON t1.id = t2.plan_id
- JOIN publish_account t3 ON t2.account_id = t3.id
- LEFT JOIN publish_account_wx_type t4 on t3.id = t4.account_id
- LEFT JOIN wx_statistics_group_source_account t5 on t3.id = t5.account_id
- LEFT JOIN wx_statistics_group_source t6 on t5.group_source_name = t6.account_source_name
- WHERE
- t1.plan_status = 1
- AND t3.channel = 5
- GROUP BY t3.id;
- """
- account_list = self.de.select(sql)
- result_list = [
- {
- "account_name": i[0],
- "gh_id": i[1],
- "fans": i[2],
- "account_source_name": i[3],
- "mode_type": i[4],
- "account_type": i[5],
- "status": i[6]
- } for i in account_list
- ]
- return result_list
- def do_task_list(self, dt):
- """
- do it
- """
- fans_dict = get_account_fans_by_dt(db_client=self.de)
- account_list = self.get_publishing_accounts()
- rate_dict = self.get_account_position_read_rate(dt)
- for account in tqdm(account_list, desc=dt):
- gh_id = account["gh_id"]
- business_type = self.const.TOULIU if gh_id in touliu_accounts else self.const.ARTICLES_DAILY
- fans = fans_dict.get(gh_id, {}).get(dt, 0)
- if not fans:
- fans = int(unauthorized_account.get(gh_id, 0))
- if fans:
- for index in range(1, 9):
- gh_id_position = "{}_{}".format(gh_id, index)
- if rate_dict.get(gh_id_position):
- rate = rate_dict[gh_id_position]
- read_avg = fans * rate
- print(rate, read_avg)
- insert_sql = f"""
- INSERT INTO account_avg_info_v3
- (gh_id, position, update_time, account_name, fans, read_avg, like_avg, status, account_type, account_mode, account_source, account_status, business_type, read_rate_avg)
- values
- (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s);
- """
- try:
- self.pq.update(
- sql=insert_sql,
- params=(
- gh_id,
- index,
- dt,
- account['account_name'],
- fans,
- read_avg,
- 0,
- 1,
- account['account_type'],
- account['mode_type'],
- account['account_source_name'],
- account['status'],
- business_type,
- rate
- )
- )
- except Exception as e:
- updateSQL = f"""
- UPDATE account_avg_info_v3
- set fans = %s, read_avg = %s, read_rate_avg = %s
- where gh_id = %s and position = %s and update_time = %s
- """
- try:
- affected_rows = self.pq.update(
- sql=updateSQL,
- params=(
- fans,
- read_avg,
- rate,
- account['gh_id'],
- index,
- dt
- )
- )
- except Exception as e:
- print(e)
- # 修改前一天的状态为 0
- update_status_sql = f"""
- UPDATE account_avg_info_v3
- SET status = %s
- where update_time != %s and gh_id = %s and position = %s;
- """
- rows_affected = self.pq.update(
- sql=update_status_sql,
- params=(
- 0, dt, account['gh_id'], index
- )
- )
- print("修改成功")
- def main():
- """
- main job
- :return:
- """
- parser = ArgumentParser()
- parser.add_argument("--run-date",
- help="Run only once for date in format of %Y-%m-%d. \
- If no specified, run as daily jobs.")
- args = parser.parse_args()
- Up = UpdateAccountInfoVersion3()
- if args.run_date:
- Up.do_task_list(dt=args.run_date)
- else:
- dt_object = datetime.fromtimestamp(int(time.time()))
- one_day = timedelta(days=1)
- yesterday = dt_object - one_day
- yesterday_str = yesterday.strftime('%Y-%m-%d')
- Up.do_task_list(dt=yesterday_str)
- if __name__ == '__main__':
- main()
|