updateAccountV3.py 8.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212
  1. """
  2. @author: luojunhui
  3. """
  4. import json
  5. import time
  6. import numpy as np
  7. from scipy import stats
  8. from tqdm import tqdm
  9. from datetime import datetime, timedelta
  10. from argparse import ArgumentParser
  11. from pymysql.cursors import DictCursor
  12. from applications.const import UpdateAccountReadAvgTaskConst
  13. from applications.db import DatabaseConnector
  14. from applications.utils import fetch_account_fans
  15. from applications.utils import fetch_publishing_account_list
  16. from config import apolloConfig
  17. from config import long_articles_config, denet_config, piaoquan_crawler_config
  18. read_rate_table = "long_articles_read_rate"
  19. read_avg_table = "account_avg_info_v3"
  20. config = apolloConfig()
  21. const = UpdateAccountReadAvgTaskConst()
  22. unauthorized_account = json.loads(config.getConfigValue("unauthorized_gh_id_fans"))
  23. touliu_accounts = set(json.loads(config.getConfigValue("touliu_gh_id_list")))
  24. backup_account_fans = json.loads(config.getConfigValue("backup_account_fans"))
  25. class UpdateAccountInfoVersion3(object):
  26. """
  27. 更新账号的平均阅读率
  28. """
  29. def __init__(self):
  30. # init piaoquan crawler db client
  31. self.piaoquan_crawler_db_client = DatabaseConnector(piaoquan_crawler_config)
  32. self.piaoquan_crawler_db_client.connect()
  33. # init long articles db client
  34. self.long_articles_db_client = DatabaseConnector(long_articles_config)
  35. self.long_articles_db_client.connect()
  36. # init aigc db client
  37. self.denet_db_client = DatabaseConnector(denet_config)
  38. self.denet_db_client.connect()
  39. def fetch_read_rate_avg_for_each_account(self, dt):
  40. """
  41. 从长文数据库获取账号阅读均值
  42. :return:
  43. """
  44. dt = int(dt.replace("-", ""))
  45. sql = f"""
  46. select gh_id, position, read_rate_avg
  47. from {read_rate_table}
  48. where dt_version = {dt};
  49. """
  50. fetch_response_list = self.long_articles_db_client.fetch(query=sql, cursor_type=DictCursor)
  51. account_read_rate_dict = {}
  52. for item in fetch_response_list:
  53. key = "{}_{}".format(item['gh_id'], item['position'])
  54. account_read_rate_dict[key] = item['read_rate_avg']
  55. return account_read_rate_dict
  56. def cal_read_avg_ci(self, gh_id, position):
  57. """
  58. 计算阅读均值的置信区间
  59. """
  60. fetch_query = f"""
  61. select read_avg
  62. from {read_avg_table}
  63. where gh_id = %s and position = %s
  64. order by update_time desc limit {const.STAT_PERIOD};
  65. """
  66. fetch_response_list = self.piaoquan_crawler_db_client.fetch(
  67. query=fetch_query, params=(gh_id, position), cursor_type=DictCursor
  68. )
  69. read_avg_list = [i["read_avg"] for i in fetch_response_list]
  70. n = len(read_avg_list)
  71. mean = np.mean(read_avg_list)
  72. std = np.std(read_avg_list, ddof=1)
  73. se = std / np.sqrt(n)
  74. t = stats.t.ppf(const.DEFAULT_UPPER_QUANTILE, df=n - 1)
  75. upper_t = mean + t * se
  76. return upper_t
  77. def do_task_list(self, dt):
  78. """
  79. do it
  80. """
  81. # get fans dict from aigc
  82. fans_dict = fetch_account_fans(self.denet_db_client, dt)
  83. # get publishing account list from aigc
  84. account_list = fetch_publishing_account_list(self.denet_db_client)
  85. # fetch each account's read avg for each position
  86. read_rate_avg_dict = self.fetch_read_rate_avg_for_each_account(dt)
  87. for account in tqdm(account_list, desc=dt):
  88. gh_id = account["gh_id"]
  89. business_type = const.TOULIU if gh_id in touliu_accounts else const.ARTICLES_DAILY
  90. fans = fans_dict.get(gh_id, {}).get(dt, const.DEFAULT_FANS)
  91. # use unauthorized account's fans if not found in aigc
  92. if not fans:
  93. fans = int(unauthorized_account.get(gh_id, const.DEFAULT_FANS))
  94. # use backup account's fans if not found in aigc
  95. if not fans:
  96. fans = int(backup_account_fans.get(gh_id, const.DEFAULT_FANS))
  97. if fans:
  98. for index in const.ARTICLE_INDEX_LIST:
  99. gh_id_position = "{}_{}".format(gh_id, index)
  100. if read_rate_avg_dict.get(gh_id_position):
  101. # fetch read rate avg
  102. read_rate_avg = read_rate_avg_dict[gh_id_position]
  103. # cal read avg
  104. read_avg = fans * read_rate_avg
  105. # cal read avg ci upper
  106. read_avg_ci_upper = self.cal_read_avg_ci(gh_id, index)
  107. # insert into database
  108. insert_sql = f"""
  109. insert into {read_avg_table}
  110. (gh_id, position, update_time, account_name, fans, read_avg, like_avg, status, account_type,
  111. account_mode, account_source, account_status, business_type, read_rate_avg, read_avg_ci_upper)
  112. values
  113. (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s);
  114. """
  115. try:
  116. self.piaoquan_crawler_db_client.save(
  117. query=insert_sql,
  118. params=(
  119. gh_id,
  120. index,
  121. dt,
  122. account['account_name'],
  123. fans,
  124. read_avg,
  125. const.DEFAULT_LIKE,
  126. const.USING_STATUS,
  127. account['account_type'],
  128. account['mode_type'],
  129. account['account_source'],
  130. account['status'],
  131. business_type,
  132. read_rate_avg,
  133. read_avg_ci_upper
  134. )
  135. )
  136. except Exception as e:
  137. update_sql = f"""
  138. update {read_avg_table}
  139. set fans = %s, read_avg = %s, read_rate_avg = %s, read_avg_ci_upper = %s
  140. where gh_id = %s and position = %s and update_time = %s
  141. """
  142. try:
  143. self.piaoquan_crawler_db_client.save(
  144. query=update_sql,
  145. params=(
  146. fans,
  147. read_avg,
  148. read_rate_avg,
  149. read_avg_ci_upper,
  150. account['gh_id'],
  151. index,
  152. dt
  153. )
  154. )
  155. except Exception as e:
  156. print(e)
  157. # 修改前一天的状态为 0
  158. update_status_sql = f"""
  159. update {read_avg_table}
  160. set status = %s
  161. where update_time != %s and gh_id = %s and position = %s;
  162. """
  163. self.piaoquan_crawler_db_client.save(
  164. query=update_status_sql,
  165. params=(
  166. const.NOT_USING_STATUS, dt, account['gh_id'], index
  167. )
  168. )
  169. def main():
  170. """
  171. main job
  172. :return:
  173. """
  174. parser = ArgumentParser()
  175. parser.add_argument("--run-date",
  176. help="Run only once for date in format of %Y-%m-%d. \
  177. If no specified, run as daily jobs.")
  178. args = parser.parse_args()
  179. update_account_read_avg_task = UpdateAccountInfoVersion3()
  180. if args.run_date:
  181. update_account_read_avg_task.do_task_list(dt=args.run_date)
  182. else:
  183. dt_object = datetime.fromtimestamp(int(time.time()))
  184. one_day = timedelta(days=1)
  185. yesterday = dt_object - one_day
  186. yesterday_str = yesterday.strftime('%Y-%m-%d')
  187. update_account_read_avg_task.do_task_list(dt=yesterday_str)
  188. if __name__ == '__main__':
  189. main()