|
@@ -7,152 +7,103 @@ import time
|
|
|
from tqdm import tqdm
|
|
|
from datetime import datetime, timedelta
|
|
|
from argparse import ArgumentParser
|
|
|
+from pymysql.cursors import DictCursor
|
|
|
|
|
|
-from applications import PQMySQL, DeNetMysql, longArticlesMySQL
|
|
|
from applications.const import updateAccountReadAvgTaskConst
|
|
|
+from applications.db import DatabaseConnector
|
|
|
+from applications.utils import fetch_account_fans
|
|
|
+from applications.utils import fetch_publishing_account_list
|
|
|
from config import apolloConfig
|
|
|
+from config import long_articles_config, denet_config, piaoquan_crawler_config
|
|
|
|
|
|
+read_rate_table = "long_articles_read_rate"
|
|
|
+read_avg_table = "account_avg_info_v3"
|
|
|
config = apolloConfig()
|
|
|
+const = updateAccountReadAvgTaskConst()
|
|
|
unauthorized_account = json.loads(config.getConfigValue("unauthorized_gh_id_fans"))
|
|
|
touliu_accounts = set(json.loads(config.getConfigValue("touliu_gh_id_list")))
|
|
|
-
|
|
|
-
|
|
|
-def get_account_fans_by_dt(db_client) -> dict:
|
|
|
- """
|
|
|
- 获取每个账号发粉丝,通过日期来区分
|
|
|
- :return:
|
|
|
- """
|
|
|
- sql = f"""
|
|
|
- SELECT
|
|
|
- t1.date_str,
|
|
|
- t1.fans_count,
|
|
|
- t2.gh_id
|
|
|
- FROM datastat_wx t1
|
|
|
- JOIN publish_account t2 ON t1.account_id = t2.id
|
|
|
- WHERE
|
|
|
- t2.channel = 5
|
|
|
-# AND t2.status = 1
|
|
|
- AND t1.fans_count > 0
|
|
|
- AND t1.date_str >= '2024-09-01'
|
|
|
- ORDER BY t1.date_str;
|
|
|
- """
|
|
|
- result = db_client.select(sql)
|
|
|
- D = {}
|
|
|
- for line in result:
|
|
|
- dt = line[0]
|
|
|
- fans = line[1]
|
|
|
- gh_id = line[2]
|
|
|
- if D.get(gh_id):
|
|
|
- D[gh_id][dt] = fans
|
|
|
- else:
|
|
|
- D[gh_id] = {dt: fans}
|
|
|
- return D
|
|
|
-
|
|
|
+backup_account_fans = json.loads(config.getConfigValue("backup_account_fans"))
|
|
|
|
|
|
class UpdateAccountInfoVersion3(object):
|
|
|
"""
|
|
|
- 更新账号信息 v3
|
|
|
+ 更新账号的评价阅读率
|
|
|
"""
|
|
|
|
|
|
def __init__(self):
|
|
|
- self.const = updateAccountReadAvgTaskConst()
|
|
|
- self.pq = PQMySQL()
|
|
|
- self.de = DeNetMysql()
|
|
|
- self.lam = longArticlesMySQL()
|
|
|
+ # init piaoquan crawler db client
|
|
|
+ self.piaoquan_crawler_db_client = DatabaseConnector(piaoquan_crawler_config)
|
|
|
+ self.piaoquan_crawler_db_client.connect()
|
|
|
+
|
|
|
+ # init long articles db client
|
|
|
+ self.long_articles_db_client = DatabaseConnector(long_articles_config)
|
|
|
+ self.long_articles_db_client.connect()
|
|
|
+
|
|
|
+ # init aigc db client
|
|
|
+ self.denet_db_client = DatabaseConnector(denet_config)
|
|
|
+ self.denet_db_client.connect()
|
|
|
|
|
|
- def get_account_position_read_rate(self, dt):
|
|
|
+ def fetch_read_rate_avg_for_each_account(self, dt):
|
|
|
"""
|
|
|
从长文数据库获取账号阅读均值
|
|
|
:return:
|
|
|
"""
|
|
|
dt = int(dt.replace("-", ""))
|
|
|
sql = f"""
|
|
|
- SELECT
|
|
|
- gh_id, position, read_rate_avg
|
|
|
- FROM
|
|
|
- long_articles_read_rate_dev
|
|
|
- WHERE dt_version = {dt};
|
|
|
+ select gh_id, position, read_rate_avg
|
|
|
+ from {read_rate_table}
|
|
|
+ where dt_version = {dt};
|
|
|
"""
|
|
|
-
|
|
|
- result = self.lam.select(sql)
|
|
|
+ fetch_response_list = self.long_articles_db_client.fetch(query=sql, cursor_type=DictCursor)
|
|
|
account_read_rate_dict = {}
|
|
|
- for item in result:
|
|
|
- gh_id = item[0]
|
|
|
- position = item[1]
|
|
|
- rate = item[2]
|
|
|
- key = "{}_{}".format(gh_id, position)
|
|
|
- account_read_rate_dict[key] = rate
|
|
|
+ for item in fetch_response_list:
|
|
|
+ key = "{}_{}".format(item['gh_id'], item['position'])
|
|
|
+ account_read_rate_dict[key] = item['read_rate_avg']
|
|
|
return account_read_rate_dict
|
|
|
|
|
|
- def get_publishing_accounts(self):
|
|
|
- """
|
|
|
- 获取每日正在发布的账号
|
|
|
- :return:
|
|
|
- """
|
|
|
- sql = f"""
|
|
|
- SELECT DISTINCT
|
|
|
- t3.`name`,
|
|
|
- t3.gh_id,
|
|
|
- t3.follower_count,
|
|
|
- t6.account_source_name,
|
|
|
- t6.mode_type,
|
|
|
- t6.account_type,
|
|
|
- t6.`status`
|
|
|
- FROM
|
|
|
- publish_plan t1
|
|
|
- JOIN publish_plan_account t2 ON t1.id = t2.plan_id
|
|
|
- JOIN publish_account t3 ON t2.account_id = t3.id
|
|
|
- LEFT JOIN publish_account_wx_type t4 on t3.id = t4.account_id
|
|
|
- LEFT JOIN wx_statistics_group_source_account t5 on t3.id = t5.account_id
|
|
|
- LEFT JOIN wx_statistics_group_source t6 on t5.group_source_name = t6.account_source_name
|
|
|
- WHERE
|
|
|
- t1.plan_status = 1
|
|
|
- AND t3.channel = 5
|
|
|
- GROUP BY t3.id;
|
|
|
- """
|
|
|
- account_list = self.de.select(sql)
|
|
|
- result_list = [
|
|
|
- {
|
|
|
- "account_name": i[0],
|
|
|
- "gh_id": i[1],
|
|
|
- "fans": i[2],
|
|
|
- "account_source_name": i[3],
|
|
|
- "mode_type": i[4],
|
|
|
- "account_type": i[5],
|
|
|
- "status": i[6]
|
|
|
- } for i in account_list
|
|
|
- ]
|
|
|
- return result_list
|
|
|
-
|
|
|
def do_task_list(self, dt):
|
|
|
"""
|
|
|
do it
|
|
|
"""
|
|
|
- fans_dict = get_account_fans_by_dt(db_client=self.de)
|
|
|
- account_list = self.get_publishing_accounts()
|
|
|
- rate_dict = self.get_account_position_read_rate(dt)
|
|
|
+ # get fans dict from aigc
|
|
|
+ fans_dict = fetch_account_fans(self.piaoquan_crawler_db_client, dt)
|
|
|
+
|
|
|
+ # get publishing account list from aigc
|
|
|
+ account_list = fetch_publishing_account_list(self.denet_db_client)
|
|
|
+
|
|
|
+ # fetch each account's read avg for each position
|
|
|
+ read_rate_avg_dict = self.fetch_read_rate_avg_for_each_account(dt)
|
|
|
+
|
|
|
for account in tqdm(account_list, desc=dt):
|
|
|
gh_id = account["gh_id"]
|
|
|
- business_type = self.const.TOULIU if gh_id in touliu_accounts else self.const.ARTICLES_DAILY
|
|
|
+ business_type = const.TOULIU if gh_id in touliu_accounts else const.ARTICLES_DAILY
|
|
|
fans = fans_dict.get(gh_id, {}).get(dt, 0)
|
|
|
+
|
|
|
+ # use unauthorized account's fans if not found in aigc
|
|
|
if not fans:
|
|
|
fans = int(unauthorized_account.get(gh_id, 0))
|
|
|
+
|
|
|
+ # use backup account's fans if not found in aigc
|
|
|
+ if not fans:
|
|
|
+ fans = int(backup_account_fans.get(gh_id, 0))
|
|
|
+
|
|
|
if fans:
|
|
|
for index in range(1, 9):
|
|
|
gh_id_position = "{}_{}".format(gh_id, index)
|
|
|
- if rate_dict.get(gh_id_position):
|
|
|
- rate = rate_dict[gh_id_position]
|
|
|
- read_avg = fans * rate
|
|
|
- print(rate, read_avg)
|
|
|
+ if read_rate_avg_dict.get(gh_id_position):
|
|
|
+ # fetch read rate avg
|
|
|
+ read_rate_avg = read_rate_avg_dict[gh_id_position]
|
|
|
+ # cal read avg
|
|
|
+ read_avg = fans * read_rate_avg
|
|
|
+ print(read_rate_avg, read_avg)
|
|
|
insert_sql = f"""
|
|
|
- INSERT INTO account_avg_info_v3_dev
|
|
|
- (gh_id, position, update_time, account_name, fans, read_avg, like_avg, status, account_type, account_mode, account_source, account_status, business_type, read_rate_avg)
|
|
|
- values
|
|
|
- (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s);
|
|
|
+ insert into {read_avg_table}
|
|
|
+ (gh_id, position, update_time, account_name, fans, read_avg, like_avg, status, account_type, account_mode, account_source, account_status, business_type, read_rate_avg)
|
|
|
+ values
|
|
|
+ (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s);
|
|
|
"""
|
|
|
try:
|
|
|
- self.pq.update(
|
|
|
- sql=insert_sql,
|
|
|
+ self.piaoquan_crawler_db_client.save(
|
|
|
+ query=insert_sql,
|
|
|
params=(
|
|
|
gh_id,
|
|
|
index,
|
|
@@ -164,25 +115,25 @@ class UpdateAccountInfoVersion3(object):
|
|
|
1,
|
|
|
account['account_type'],
|
|
|
account['mode_type'],
|
|
|
- account['account_source_name'],
|
|
|
+ account['account_source'],
|
|
|
account['status'],
|
|
|
business_type,
|
|
|
- rate
|
|
|
+ read_rate_avg
|
|
|
)
|
|
|
)
|
|
|
except Exception as e:
|
|
|
- updateSQL = f"""
|
|
|
- UPDATE account_avg_info_v3_dev
|
|
|
- set fans = %s, read_avg = %s, read_rate_avg = %s
|
|
|
- where gh_id = %s and position = %s and update_time = %s
|
|
|
+ update_sql = f"""
|
|
|
+ update {read_avg_table}
|
|
|
+ set fans = %s, read_avg = %s, read_rate_avg = %s
|
|
|
+ where gh_id = %s and position = %s and update_time = %s
|
|
|
"""
|
|
|
try:
|
|
|
- affected_rows = self.pq.update(
|
|
|
- sql=updateSQL,
|
|
|
+ self.piaoquan_crawler_db_client.save(
|
|
|
+ query=update_sql,
|
|
|
params=(
|
|
|
fans,
|
|
|
read_avg,
|
|
|
- rate,
|
|
|
+ read_rate_avg,
|
|
|
account['gh_id'],
|
|
|
index,
|
|
|
dt
|
|
@@ -193,17 +144,16 @@ class UpdateAccountInfoVersion3(object):
|
|
|
|
|
|
# 修改前一天的状态为 0
|
|
|
update_status_sql = f"""
|
|
|
- UPDATE account_avg_info_v3_dev
|
|
|
- SET status = %s
|
|
|
- where update_time != %s and gh_id = %s and position = %s;
|
|
|
+ update {read_avg_table}
|
|
|
+ set status = %s
|
|
|
+ where update_time != %s and gh_id = %s and position = %s;
|
|
|
"""
|
|
|
- rows_affected = self.pq.update(
|
|
|
- sql=update_status_sql,
|
|
|
+ self.piaoquan_crawler_db_client.save(
|
|
|
+ query=update_status_sql,
|
|
|
params=(
|
|
|
0, dt, account['gh_id'], index
|
|
|
)
|
|
|
)
|
|
|
- print("修改成功")
|
|
|
|
|
|
|
|
|
def main():
|