account_position_read_avg_task.py 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281
  1. """
  2. 计算各个账号的阅读均值,以及阅读均值区间估计的上限
  3. """
  4. import json
  5. import traceback
  6. import numpy as np
  7. from tqdm import tqdm
  8. from scipy import stats
  9. from pymysql.cursors import DictCursor
  10. from applications import log
  11. from applications.const import UpdateAccountReadAvgTaskConst
  12. from applications.db import DatabaseConnector
  13. from applications.utils import fetch_account_fans
  14. from applications.utils import fetch_publishing_account_list
  15. from config import apolloConfig
  16. from config import long_articles_config, denet_config, piaoquan_crawler_config
  17. config = apolloConfig()
  18. const = UpdateAccountReadAvgTaskConst()
  19. unauthorized_account = json.loads(config.getConfigValue("unauthorized_gh_id_fans"))
  20. touliu_accounts = set(json.loads(config.getConfigValue("touliu_gh_id_list")))
  21. backup_account_fans = json.loads(config.getConfigValue("backup_account_fans"))
  22. class AccountDataTask:
  23. def __init__(self):
  24. # init piaoquan crawler db client
  25. self.piaoquan_crawler_db_client = DatabaseConnector(piaoquan_crawler_config)
  26. self.piaoquan_crawler_db_client.connect()
  27. # init long articles db client
  28. self.long_articles_db_client = DatabaseConnector(long_articles_config)
  29. self.long_articles_db_client.connect()
  30. # init aigc db client
  31. self.denet_db_client = DatabaseConnector(denet_config)
  32. self.denet_db_client.connect()
  33. class AccountPositionReadAvgTask(AccountDataTask):
  34. def fetch_read_rate_avg_for_each_account(self, dt):
  35. dt = int(dt.replace("-", ""))
  36. sql = f"""
  37. select gh_id, position, read_rate_avg
  38. from {const.ACCOUNT_READ_RATE_TABLE}
  39. where dt_version = {dt};
  40. """
  41. fetch_response_list = self.long_articles_db_client.fetch(
  42. query=sql, cursor_type=DictCursor
  43. )
  44. account_read_rate_dict = {}
  45. for item in fetch_response_list:
  46. key = "{}_{}".format(item["gh_id"], item["position"])
  47. account_read_rate_dict[key] = item["read_rate_avg"]
  48. return account_read_rate_dict
  49. def cal_read_avg_ci(self, gh_id, position):
  50. """
  51. 计算阅读均值的置信区间
  52. """
  53. fetch_query = f"""
  54. select read_avg, update_time
  55. from {const.ACCOUNT_READ_AVG_TABLE}
  56. where gh_id = %s and position = %s
  57. order by update_time desc limit 30;
  58. """
  59. fetch_response_list = self.piaoquan_crawler_db_client.fetch(
  60. query=fetch_query, params=(gh_id, position), cursor_type=DictCursor
  61. )
  62. read_avg_list = [i["read_avg"] for i in fetch_response_list]
  63. n = len(read_avg_list)
  64. mean = np.mean(read_avg_list)
  65. std = np.std(read_avg_list, ddof=1)
  66. se = std / np.sqrt(n)
  67. t = stats.t.ppf(0.975, df=n - 1)
  68. upper_t = mean + t * se
  69. return upper_t
  70. def process_each_record(
  71. self, account, index, fans, read_rate_avg, read_avg, read_avg_ci_upper, dt
  72. ):
  73. gh_id = account["gh_id"]
  74. business_type = (
  75. const.TOULIU if gh_id in touliu_accounts else const.ARTICLES_DAILY
  76. )
  77. # insert into database
  78. insert_sql = f"""
  79. insert into {const.ACCOUNT_READ_AVG_TABLE}
  80. (gh_id, position, update_time, account_name, fans, read_avg, like_avg, status, account_type,
  81. account_mode, account_source, account_status, business_type, read_rate_avg, read_avg_ci_upper)
  82. values
  83. (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s);
  84. """
  85. try:
  86. self.piaoquan_crawler_db_client.save(
  87. query=insert_sql,
  88. params=(
  89. gh_id,
  90. index,
  91. dt,
  92. account["account_name"],
  93. fans,
  94. read_avg,
  95. const.DEFAULT_LIKE,
  96. const.USING_STATUS,
  97. account["account_type"],
  98. account["mode_type"],
  99. account["account_source"],
  100. account["status"],
  101. business_type,
  102. read_rate_avg,
  103. read_avg_ci_upper,
  104. ),
  105. )
  106. except Exception as e:
  107. update_sql = f"""
  108. update {const.ACCOUNT_READ_AVG_TABLE}
  109. set fans = %s, read_avg = %s, read_rate_avg = %s, read_avg_ci_upper = %s
  110. where gh_id = %s and position = %s and update_time = %s
  111. """
  112. try:
  113. self.piaoquan_crawler_db_client.save(
  114. query=update_sql,
  115. params=(
  116. fans,
  117. read_avg,
  118. read_rate_avg,
  119. read_avg_ci_upper,
  120. account["gh_id"],
  121. index,
  122. dt,
  123. ),
  124. )
  125. except Exception as e:
  126. print(e)
  127. # 修改前一天的状态为 0
  128. update_status_sql = f"""
  129. update {const.ACCOUNT_READ_AVG_TABLE}
  130. set status = %s
  131. where update_time != %s and gh_id = %s and position = %s;
  132. """
  133. self.piaoquan_crawler_db_client.save(
  134. query=update_status_sql,
  135. params=(const.NOT_USING_STATUS, dt, account["gh_id"], index),
  136. )
  137. def cal_read_avg_for_each_account(self, account, fans_dict, read_rate_avg_dict, dt):
  138. gh_id = account["gh_id"]
  139. fans = fans_dict.get(gh_id, {}).get(dt, const.DEFAULT_FANS)
  140. # use unauthorized account's fans if not found in aigc
  141. if not fans:
  142. fans = int(unauthorized_account.get(gh_id, const.DEFAULT_FANS))
  143. # use backup account's fans if not found in aigc
  144. if not fans:
  145. fans = int(backup_account_fans.get(gh_id, const.DEFAULT_FANS))
  146. if fans:
  147. for index in const.ARTICLE_INDEX_LIST:
  148. gh_id_position = "{}_{}".format(gh_id, index)
  149. if read_rate_avg_dict.get(gh_id_position):
  150. # fetch read rate avg
  151. read_rate_avg = read_rate_avg_dict[gh_id_position]
  152. # cal read avg
  153. read_avg = fans * read_rate_avg
  154. # cal read avg ci upper
  155. read_avg_ci_upper = self.cal_read_avg_ci(gh_id, index)
  156. # insert into database
  157. self.process_each_record(
  158. account,
  159. index,
  160. fans,
  161. read_rate_avg,
  162. read_avg,
  163. read_avg_ci_upper,
  164. dt,
  165. )
  166. def do_task_list(self, dt):
  167. """
  168. do it
  169. """
  170. # get fans dict from aigc
  171. fans_dict = fetch_account_fans(self.denet_db_client, dt)
  172. # get publishing account list from aigc
  173. account_list = fetch_publishing_account_list(self.denet_db_client)
  174. # fetch each account's read avg for each position
  175. read_rate_avg_dict = self.fetch_read_rate_avg_for_each_account(dt)
  176. for account in tqdm(account_list, desc=dt):
  177. try:
  178. self.cal_read_avg_for_each_account(
  179. account, fans_dict, read_rate_avg_dict, dt
  180. )
  181. except Exception as e:
  182. log(
  183. task="account_read_avg_producer",
  184. function="do_task_list",
  185. status="fail",
  186. message=str(e),
  187. data={
  188. "gh_id": account["gh_id"],
  189. "date": dt.replace("-", ""),
  190. "traceback": traceback.format_exc(),
  191. },
  192. )
  193. class AccountOpenRateAvgTask(AccountDataTask):
  194. """
  195. cal open rate avg for each account
  196. """
  197. def set_avg_open_rate_for_each_account(
  198. self, gh_id: str, date_string: str, avg_read_rate: float
  199. ) -> int:
  200. update_sql = f"""
  201. update account_avg_info_v3
  202. set open_rate_avg = %s
  203. where gh_id = %s and update_time = %s;
  204. """
  205. return self.piaoquan_crawler_db_client.save(
  206. update_sql, params=(avg_read_rate, gh_id, date_string)
  207. )
  208. def get_account_open_rate(self, gh_id: str, date_string: str) -> float:
  209. """
  210. get open rate for each account
  211. """
  212. fetch_query = f"""
  213. select
  214. sum(view_count) as 'total_read',
  215. sum(first_level) as 'total_first_level',
  216. sum(first_level) / sum(view_count) as 'avg_open_rate'
  217. from datastat_sort_strategy
  218. where gh_id = '{gh_id}' and date_str between date_sub(str_to_date('{date_string}', '%Y%m%d'), interval {const.STAT_PERIOD} day)
  219. and str_to_date('{date_string}', '%Y%m%d');
  220. """
  221. res = self.long_articles_db_client.fetch(
  222. query=fetch_query, cursor_type=DictCursor
  223. )[0]
  224. return float(res["avg_open_rate"])
  225. def do_task_list(self, date_string: str):
  226. """
  227. INPUT date_string: '%Y-%m-%d'
  228. """
  229. account_list = fetch_publishing_account_list(self.denet_db_client)
  230. for account in tqdm(account_list):
  231. gh_id = account["gh_id"]
  232. try:
  233. avg_read_rate = self.get_account_open_rate(
  234. gh_id=gh_id, date_string=date_string.replace("-", "")
  235. )
  236. self.set_avg_open_rate_for_each_account(
  237. gh_id, date_string, avg_read_rate
  238. )
  239. except Exception as e:
  240. log(
  241. task="account_open_rate_producer",
  242. function="deal",
  243. status="fail",
  244. message=str(e),
  245. data={
  246. "gh_id": gh_id,
  247. "date": date_string.replace("-", ""),
  248. "traceback": traceback.format_exc(),
  249. },
  250. )