account_position_read_avg_task.py 7.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200
  1. """
  2. 计算各个账号的阅读均值,以及阅读均值区间估计的上限
  3. """
  4. import json
  5. import numpy as np
  6. from tqdm import tqdm
  7. from scipy import stats
  8. from pymysql.cursors import DictCursor
  9. from applications.const import UpdateAccountReadAvgTaskConst
  10. from applications.db import DatabaseConnector
  11. from applications.utils import fetch_account_fans
  12. from applications.utils import fetch_publishing_account_list
  13. from config import apolloConfig
  14. from config import long_articles_config, denet_config, piaoquan_crawler_config
  15. config = apolloConfig()
  16. const = UpdateAccountReadAvgTaskConst()
  17. unauthorized_account = json.loads(config.getConfigValue("unauthorized_gh_id_fans"))
  18. touliu_accounts = set(json.loads(config.getConfigValue("touliu_gh_id_list")))
  19. backup_account_fans = json.loads(config.getConfigValue("backup_account_fans"))
  20. class AccountPositionReadAvgTask(object):
  21. def __init__(self):
  22. # init piaoquan crawler db client
  23. self.piaoquan_crawler_db_client = DatabaseConnector(piaoquan_crawler_config)
  24. self.piaoquan_crawler_db_client.connect()
  25. # init long articles db client
  26. self.long_articles_db_client = DatabaseConnector(long_articles_config)
  27. self.long_articles_db_client.connect()
  28. # init aigc db client
  29. self.denet_db_client = DatabaseConnector(denet_config)
  30. self.denet_db_client.connect()
  31. def fetch_read_rate_avg_for_each_account(self, dt):
  32. dt = int(dt.replace("-", ""))
  33. sql = f"""
  34. select gh_id, position, read_rate_avg
  35. from {const.ACCOUNT_READ_RATE_TABLE}
  36. where dt_version = {dt};
  37. """
  38. fetch_response_list = self.long_articles_db_client.fetch(
  39. query=sql, cursor_type=DictCursor
  40. )
  41. account_read_rate_dict = {}
  42. for item in fetch_response_list:
  43. key = "{}_{}".format(item["gh_id"], item["position"])
  44. account_read_rate_dict[key] = item["read_rate_avg"]
  45. return account_read_rate_dict
  46. def cal_read_avg_ci(self, gh_id, position):
  47. """
  48. 计算阅读均值的置信区间
  49. """
  50. fetch_query = f"""
  51. select read_avg, update_time
  52. from {const.ACCOUNT_READ_AVG_TABLE}
  53. where gh_id = %s and position = %s
  54. order by update_time desc limit 30;
  55. """
  56. fetch_response_list = self.piaoquan_crawler_db_client.fetch(
  57. query=fetch_query, params=(gh_id, position), cursor_type=DictCursor
  58. )
  59. read_avg_list = [i["read_avg"] for i in fetch_response_list]
  60. n = len(read_avg_list)
  61. mean = np.mean(read_avg_list)
  62. std = np.std(read_avg_list, ddof=1)
  63. se = std / np.sqrt(n)
  64. t = stats.t.ppf(0.975, df=n - 1)
  65. upper_t = mean + t * se
  66. return upper_t
  67. def process_each_record(
  68. self, account, index, fans, read_rate_avg, read_avg, read_avg_ci_upper, dt
  69. ):
  70. gh_id = account["gh_id"]
  71. business_type = (
  72. const.TOULIU if gh_id in touliu_accounts else const.ARTICLES_DAILY
  73. )
  74. # insert into database
  75. insert_sql = f"""
  76. insert into {const.ACCOUNT_READ_AVG_TABLE}
  77. (gh_id, position, update_time, account_name, fans, read_avg, like_avg, status, account_type,
  78. account_mode, account_source, account_status, business_type, read_rate_avg, read_avg_ci_upper)
  79. values
  80. (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s);
  81. """
  82. try:
  83. self.piaoquan_crawler_db_client.save(
  84. query=insert_sql,
  85. params=(
  86. gh_id,
  87. index,
  88. dt,
  89. account["account_name"],
  90. fans,
  91. read_avg,
  92. const.DEFAULT_LIKE,
  93. const.USING_STATUS,
  94. account["account_type"],
  95. account["mode_type"],
  96. account["account_source"],
  97. account["status"],
  98. business_type,
  99. read_rate_avg,
  100. read_avg_ci_upper,
  101. ),
  102. )
  103. except Exception as e:
  104. update_sql = f"""
  105. update {const.ACCOUNT_READ_AVG_TABLE}
  106. set fans = %s, read_avg = %s, read_rate_avg = %s, read_avg_ci_upper = %s
  107. where gh_id = %s and position = %s and update_time = %s
  108. """
  109. try:
  110. self.piaoquan_crawler_db_client.save(
  111. query=update_sql,
  112. params=(
  113. fans,
  114. read_avg,
  115. read_rate_avg,
  116. read_avg_ci_upper,
  117. account["gh_id"],
  118. index,
  119. dt,
  120. ),
  121. )
  122. except Exception as e:
  123. print(e)
  124. # 修改前一天的状态为 0
  125. update_status_sql = f"""
  126. update {const.ACCOUNT_READ_AVG_TABLE}
  127. set status = %s
  128. where update_time != %s and gh_id = %s and position = %s;
  129. """
  130. self.piaoquan_crawler_db_client.save(
  131. query=update_status_sql,
  132. params=(const.NOT_USING_STATUS, dt, account["gh_id"], index),
  133. )
  134. def cal_read_avg_for_each_account(self, account, fans_dict, read_rate_avg_dict, dt):
  135. gh_id = account["gh_id"]
  136. fans = fans_dict.get(gh_id, {}).get(dt, const.DEFAULT_FANS)
  137. # use unauthorized account's fans if not found in aigc
  138. if not fans:
  139. fans = int(unauthorized_account.get(gh_id, const.DEFAULT_FANS))
  140. # use backup account's fans if not found in aigc
  141. if not fans:
  142. fans = int(backup_account_fans.get(gh_id, const.DEFAULT_FANS))
  143. if fans:
  144. for index in const.ARTICLE_INDEX_LIST:
  145. gh_id_position = "{}_{}".format(gh_id, index)
  146. if read_rate_avg_dict.get(gh_id_position):
  147. # fetch read rate avg
  148. read_rate_avg = read_rate_avg_dict[gh_id_position]
  149. # cal read avg
  150. read_avg = fans * read_rate_avg
  151. # cal read avg ci upper
  152. read_avg_ci_upper = self.cal_read_avg_ci(gh_id, index)
  153. # insert into database
  154. self.process_each_record(
  155. account,
  156. index,
  157. fans,
  158. read_rate_avg,
  159. read_avg,
  160. read_avg_ci_upper,
  161. dt,
  162. )
  163. def do_task_list(self, dt):
  164. """
  165. do it
  166. """
  167. # get fans dict from aigc
  168. fans_dict = fetch_account_fans(self.denet_db_client, dt)
  169. # get publishing account list from aigc
  170. account_list = fetch_publishing_account_list(self.denet_db_client)
  171. # fetch each account's read avg for each position
  172. read_rate_avg_dict = self.fetch_read_rate_avg_for_each_account(dt)
  173. for account in tqdm(account_list, desc=dt):
  174. self.cal_read_avg_for_each_account(
  175. account, fans_dict, read_rate_avg_dict, dt
  176. )