account_position_read_avg_task.py 9.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264
  1. """
  2. 计算各个账号的阅读均值,以及阅读均值区间估计的上限
  3. """
  4. import json
  5. import numpy as np
  6. from tqdm import tqdm
  7. from scipy import stats
  8. from pymysql.cursors import DictCursor
  9. from applications.const import UpdateAccountReadAvgTaskConst
  10. from applications.db import DatabaseConnector
  11. from applications.utils import fetch_account_fans
  12. from applications.utils import fetch_publishing_account_list
  13. from config import apolloConfig
  14. from config import long_articles_config, denet_config, piaoquan_crawler_config
  15. config = apolloConfig()
  16. const = UpdateAccountReadAvgTaskConst()
  17. unauthorized_account = json.loads(config.getConfigValue("unauthorized_gh_id_fans"))
  18. touliu_accounts = set(json.loads(config.getConfigValue("touliu_gh_id_list")))
  19. backup_account_fans = json.loads(config.getConfigValue("backup_account_fans"))
  20. class AccountDataTask:
  21. def __init__(self):
  22. # init piaoquan crawler db client
  23. self.piaoquan_crawler_db_client = DatabaseConnector(piaoquan_crawler_config)
  24. self.piaoquan_crawler_db_client.connect()
  25. # init long articles db client
  26. self.long_articles_db_client = DatabaseConnector(long_articles_config)
  27. self.long_articles_db_client.connect()
  28. # init aigc db client
  29. self.denet_db_client = DatabaseConnector(denet_config)
  30. self.denet_db_client.connect()
  31. class AccountPositionReadAvgTask(AccountDataTask):
  32. def fetch_read_rate_avg_for_each_account(self, dt):
  33. dt = int(dt.replace("-", ""))
  34. sql = f"""
  35. select gh_id, position, read_rate_avg
  36. from {const.ACCOUNT_READ_RATE_TABLE}
  37. where dt_version = {dt};
  38. """
  39. fetch_response_list = self.long_articles_db_client.fetch(
  40. query=sql, cursor_type=DictCursor
  41. )
  42. account_read_rate_dict = {}
  43. for item in fetch_response_list:
  44. key = "{}_{}".format(item["gh_id"], item["position"])
  45. account_read_rate_dict[key] = item["read_rate_avg"]
  46. return account_read_rate_dict
  47. def cal_read_avg_ci(self, gh_id, position):
  48. """
  49. 计算阅读均值的置信区间
  50. """
  51. fetch_query = f"""
  52. select read_avg, update_time
  53. from {const.ACCOUNT_READ_AVG_TABLE}
  54. where gh_id = %s and position = %s
  55. order by update_time desc limit 30;
  56. """
  57. fetch_response_list = self.piaoquan_crawler_db_client.fetch(
  58. query=fetch_query, params=(gh_id, position), cursor_type=DictCursor
  59. )
  60. read_avg_list = [i["read_avg"] for i in fetch_response_list]
  61. n = len(read_avg_list)
  62. mean = np.mean(read_avg_list)
  63. std = np.std(read_avg_list, ddof=1)
  64. se = std / np.sqrt(n)
  65. t = stats.t.ppf(0.975, df=n - 1)
  66. upper_t = mean + t * se
  67. return upper_t
  68. def process_each_record(
  69. self, account, index, fans, read_rate_avg, read_avg, read_avg_ci_upper, dt
  70. ):
  71. gh_id = account["gh_id"]
  72. business_type = (
  73. const.TOULIU if gh_id in touliu_accounts else const.ARTICLES_DAILY
  74. )
  75. # insert into database
  76. insert_sql = f"""
  77. insert into {const.ACCOUNT_READ_AVG_TABLE}
  78. (gh_id, position, update_time, account_name, fans, read_avg, like_avg, status, account_type,
  79. account_mode, account_source, account_status, business_type, read_rate_avg, read_avg_ci_upper)
  80. values
  81. (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s);
  82. """
  83. try:
  84. self.piaoquan_crawler_db_client.save(
  85. query=insert_sql,
  86. params=(
  87. gh_id,
  88. index,
  89. dt,
  90. account["account_name"],
  91. fans,
  92. read_avg,
  93. const.DEFAULT_LIKE,
  94. const.USING_STATUS,
  95. account["account_type"],
  96. account["mode_type"],
  97. account["account_source"],
  98. account["status"],
  99. business_type,
  100. read_rate_avg,
  101. read_avg_ci_upper,
  102. ),
  103. )
  104. except Exception as e:
  105. update_sql = f"""
  106. update {const.ACCOUNT_READ_AVG_TABLE}
  107. set fans = %s, read_avg = %s, read_rate_avg = %s, read_avg_ci_upper = %s
  108. where gh_id = %s and position = %s and update_time = %s
  109. """
  110. try:
  111. self.piaoquan_crawler_db_client.save(
  112. query=update_sql,
  113. params=(
  114. fans,
  115. read_avg,
  116. read_rate_avg,
  117. read_avg_ci_upper,
  118. account["gh_id"],
  119. index,
  120. dt,
  121. ),
  122. )
  123. except Exception as e:
  124. print(e)
  125. # 修改前一天的状态为 0
  126. update_status_sql = f"""
  127. update {const.ACCOUNT_READ_AVG_TABLE}
  128. set status = %s
  129. where update_time != %s and gh_id = %s and position = %s;
  130. """
  131. self.piaoquan_crawler_db_client.save(
  132. query=update_status_sql,
  133. params=(const.NOT_USING_STATUS, dt, account["gh_id"], index),
  134. )
  135. def cal_read_avg_for_each_account(self, account, fans_dict, read_rate_avg_dict, dt):
  136. gh_id = account["gh_id"]
  137. fans = fans_dict.get(gh_id, {}).get(dt, const.DEFAULT_FANS)
  138. # use unauthorized account's fans if not found in aigc
  139. if not fans:
  140. fans = int(unauthorized_account.get(gh_id, const.DEFAULT_FANS))
  141. # use backup account's fans if not found in aigc
  142. if not fans:
  143. fans = int(backup_account_fans.get(gh_id, const.DEFAULT_FANS))
  144. if fans:
  145. for index in const.ARTICLE_INDEX_LIST:
  146. gh_id_position = "{}_{}".format(gh_id, index)
  147. if read_rate_avg_dict.get(gh_id_position):
  148. # fetch read rate avg
  149. read_rate_avg = read_rate_avg_dict[gh_id_position]
  150. # cal read avg
  151. read_avg = fans * read_rate_avg
  152. # cal read avg ci upper
  153. read_avg_ci_upper = self.cal_read_avg_ci(gh_id, index)
  154. # insert into database
  155. self.process_each_record(
  156. account,
  157. index,
  158. fans,
  159. read_rate_avg,
  160. read_avg,
  161. read_avg_ci_upper,
  162. dt,
  163. )
  164. def do_task_list(self, dt):
  165. """
  166. do it
  167. """
  168. # get fans dict from aigc
  169. fans_dict = fetch_account_fans(self.denet_db_client, dt)
  170. # get publishing account list from aigc
  171. account_list = fetch_publishing_account_list(self.denet_db_client)
  172. # fetch each account's read avg for each position
  173. read_rate_avg_dict = self.fetch_read_rate_avg_for_each_account(dt)
  174. for account in tqdm(account_list, desc=dt):
  175. self.cal_read_avg_for_each_account(
  176. account, fans_dict, read_rate_avg_dict, dt
  177. )
  178. class AccountOpenRateAvgTask(AccountDataTask):
  179. """
  180. cal open rate avg for each account
  181. """
  182. def insert_record_into_database(self, gh_id, date_str, account_name, open_rate_obj):
  183. avg_open_rate = open_rate_obj["avg_open_rate"]
  184. insert_sql = f"""
  185. insert ignore into account_avg_info_v3
  186. (gh_id, position, update_time, account_name, open_rate_avg)
  187. values (%s, %s, %s, %s, %s);
  188. """
  189. params_list = [
  190. (gh_id, position, date_str, account_name, avg_open_rate)
  191. for position in const.ARTICLE_INDEX_LIST
  192. ]
  193. affected_rows = self.long_articles_db_client.save_many(
  194. insert_sql,
  195. params_list=params_list
  196. )
  197. def update_record(self, gh_id, date_str, open_rate_obj):
  198. avg_open_rate = open_rate_obj["avg_open_rate"]
  199. update_sql = f"""
  200. update account_avg_info_v3
  201. set open_rate_avg = %s
  202. where gh_id = %s and update_time = %s;
  203. """
  204. affected_rows = self.piaoquan_crawler_db_client.save(
  205. update_sql,
  206. params=(
  207. avg_open_rate, gh_id, date_str
  208. )
  209. )
  210. print(affected_rows)
  211. def get_account_open_rate(self, gh_id, date_str):
  212. date_str_ = date_str.replace("-", "")
  213. fetch_query = f"""
  214. select
  215. sum(view_count) as 'total_read',
  216. sum(first_level) as 'total_first_level',
  217. sum(first_level) / sum(view_count) as 'avg_open_rate'
  218. from datastat_sort_strategy
  219. where gh_id = '{gh_id}' and date_str between date_sub(str_to_date('{date_str_}', '%Y%m%d'), interval 30 day)
  220. and str_to_date('{date_str_}', '%Y%m%d');
  221. """
  222. res = self.long_articles_db_client.fetch(
  223. query=fetch_query, cursor_type=DictCursor
  224. )[0]
  225. # self.insert_record_into_database(gh_id=gh_id, date_str=date_str, open_rate_obj=res, account_name=account_name)
  226. # for i in const.ARTICLE_INDEX_LIST:
  227. self.update_record(gh_id, date_str, res)
  228. def deal(self, date_str):
  229. account_list = fetch_publishing_account_list(self.denet_db_client)
  230. for account in tqdm(account_list):
  231. gh_id = account["gh_id"]
  232. self.get_account_open_rate(gh_id, date_str)