cal_account_read_rate_avg_daily.py 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347
  1. """
  2. @author: luojunhui
  3. cal each account && position reading rate
  4. """
  5. import json
  6. from tqdm import tqdm
  7. from pandas import DataFrame
  8. from argparse import ArgumentParser
  9. from datetime import datetime
  10. from pymysql.cursors import DictCursor
  11. from applications import bot, Functions, log
  12. from applications import create_feishu_columns_sheet
  13. from applications.db import DatabaseConnector
  14. from applications.const import UpdateAccountReadRateTaskConst
  15. from applications.utils import fetch_publishing_account_list
  16. from applications.utils import fetch_account_fans
  17. from config import apolloConfig, long_articles_config, piaoquan_crawler_config, denet_config
  18. const = UpdateAccountReadRateTaskConst()
  19. config = apolloConfig()
  20. unauthorized_account = json.loads(config.getConfigValue("unauthorized_gh_id_fans"))
  21. backup_account_fans = json.loads(config.getConfigValue("backup_account_fans"))
  22. functions = Functions()
  23. read_rate_table = "long_articles_read_rate"
  24. def filter_outlier_data(group, key='show_view_count'):
  25. """
  26. :param group:
  27. :param key:
  28. :return:
  29. """
  30. mean = group[key].mean()
  31. std = group[key].std()
  32. # 过滤二倍标准差的数据
  33. filtered_group = group[(group[key] > mean - 2 * std) & (group[key] < mean + 2 * std)]
  34. # 过滤均值倍数大于5的数据
  35. new_mean = filtered_group[key].mean()
  36. # print("阅读均值", new_mean)
  37. filtered_group = filtered_group[filtered_group[key] < new_mean * 5]
  38. return filtered_group
  39. def get_account_articles_detail(db_client, gh_id_tuple, min_publish_timestamp) -> list[dict]:
  40. """
  41. get articles details
  42. :return:
  43. """
  44. sql = f"""
  45. SELECT
  46. ghId, accountName, ItemIndex,
  47. avg(show_view_count),
  48. FROM_UNIXTIME(publish_timestamp, '%Y-%m-%d') AS pub_dt,
  49. publish_timestamp
  50. FROM
  51. official_articles_v2
  52. WHERE
  53. ghId IN {gh_id_tuple} and Type = '{const.BULK_PUBLISH_TYPE}' and publish_timestamp >= {min_publish_timestamp}
  54. GROUP BY ghId, accountName, ItemIndex, pub_dt;
  55. """
  56. response_list = db_client.fetch(query=sql, cursor_type=DictCursor)
  57. return response_list
  58. def get_fans_from_group_send_accounts(db_client,gh_id, dt):
  59. """
  60. 获取指定日期发送的文章的账号粉丝数
  61. """
  62. query = """
  63. SELECT CAST(SUM(sent_count) / 8 as signed) as fans
  64. FROM long_articles_group_send_result
  65. WHERE publish_date = %s AND gh_id = %s;
  66. """
  67. response = db_client.fetch(query=query, cursor_type=DictCursor, params=(dt, gh_id))
  68. if response:
  69. return response[0]['fans']
  70. return 0
  71. def cal_account_read_rate(db_client, article_list, fans_dict) -> DataFrame:
  72. """
  73. 计算账号位置的阅读率
  74. :return:
  75. """
  76. response = []
  77. for line in article_list:
  78. gh_id = line['ghId']
  79. dt = functions.timestamp_to_str(timestamp=line['publish_timestamp'], string_format='%Y-%m-%d')
  80. fans = fans_dict.get(gh_id, {}).get(dt, const.DEFAULT_FANS)
  81. if gh_id in const.GROUP_ACCOUNT_SET:
  82. fans = get_fans_from_group_send_accounts(db_client, gh_id, dt)
  83. if not fans:
  84. fans = int(unauthorized_account.get(gh_id, const.DEFAULT_FANS))
  85. if not fans:
  86. fans = int(backup_account_fans.get(gh_id, const.DEFAULT_FANS))
  87. log(
  88. task='cal_read_rate_avg_task',
  89. function='cal_account_read_rate',
  90. message='未获取到粉丝,使用备份粉丝表',
  91. data=line
  92. )
  93. line['fans'] = fans
  94. if fans > const.MIN_FANS:
  95. line['readRate'] = line['show_view_count'] / fans if fans else 0
  96. response.append(line)
  97. return DataFrame(response, columns=['ghId', 'accountName', 'ItemIndex', 'show_view_count', 'publish_timestamp', 'readRate'])
  98. def cal_avg_account_read_rate(df, gh_id, index, dt) -> dict:
  99. """
  100. 计算账号的阅读率均值
  101. :return:
  102. """
  103. max_time = functions.str_to_timestamp(date_string=dt)
  104. min_time = max_time - const.STATISTICS_PERIOD
  105. # 通过
  106. filter_dataframe = df[
  107. (df["ghId"] == gh_id)
  108. & (min_time <= df["publish_timestamp"])
  109. & (df["publish_timestamp"] <= max_time)
  110. & (df['ItemIndex'] == index)
  111. ]
  112. # 用二倍标准差过滤
  113. final_dataframe = filter_outlier_data(filter_dataframe)
  114. return {
  115. "read_rate_avg": final_dataframe['readRate'].mean(),
  116. "max_publish_time": final_dataframe['publish_timestamp'].max(),
  117. "min_publish_time": final_dataframe['publish_timestamp'].min(),
  118. "records": len(final_dataframe)
  119. }
  120. def check_each_position(db_client, gh_id, index, dt, avg_rate) -> dict:
  121. """
  122. 检验某个具体账号的具体文章的阅读率均值和前段日子的比较
  123. :param avg_rate: 当天计算出的阅读率均值
  124. :param db_client: 数据库连接
  125. :param gh_id: 账号 id
  126. :param index: 账号 index
  127. :param dt:
  128. :return:
  129. """
  130. dt = int(dt.replace("-", ""))
  131. select_sql = f"""
  132. SELECT account_name, read_rate_avg
  133. FROM {read_rate_table}
  134. WHERE gh_id = '{gh_id}' and position = {index} and dt_version < {dt}
  135. ORDER BY dt_version DESC limit 1;
  136. """
  137. result = db_client.fetch(select_sql)
  138. if result:
  139. account_name = result[0][0]
  140. previous_read_rate_avg = result[0][1]
  141. relative_value = (avg_rate - previous_read_rate_avg) / previous_read_rate_avg
  142. if -const.RELATIVE_VALUE_THRESHOLD <= relative_value <= const.RELATIVE_VALUE_THRESHOLD:
  143. return {}
  144. else:
  145. response = {
  146. "account_name": account_name,
  147. "position": index,
  148. "read_rate_avg_yesterday": Functions().float_to_percentage(avg_rate),
  149. "read_rate_avg_the_day_before_yesterday": Functions().float_to_percentage(previous_read_rate_avg),
  150. "relative_change_rate": [
  151. {
  152. "text": Functions().float_to_percentage(relative_value),
  153. "color": "red" if relative_value < 0 else "green"
  154. }
  155. ]
  156. }
  157. return response
  158. def update_single_day(dt, account_list, article_df, lam):
  159. """
  160. 更新单天数据
  161. :param article_df:
  162. :param lam:
  163. :param account_list:
  164. :param dt:
  165. :return:
  166. """
  167. error_list = []
  168. insert_error_list = []
  169. update_timestamp = functions.str_to_timestamp(date_string=dt)
  170. # 因为计算均值的时候是第二天,所以需要把时间前移一天
  171. avg_date = functions.timestamp_to_str(
  172. timestamp=update_timestamp - const.ONE_DAY_IN_SECONDS,
  173. string_format='%Y-%m-%d'
  174. )
  175. # processed_account_set
  176. processed_account_set = set()
  177. for account in tqdm(account_list, desc=dt):
  178. for index in const.ARTICLE_INDEX_LIST:
  179. read_rate_detail = cal_avg_account_read_rate(
  180. df=article_df,
  181. gh_id=account['gh_id'],
  182. index=index,
  183. dt=dt
  184. )
  185. read_rate_avg = read_rate_detail['read_rate_avg']
  186. max_publish_time = read_rate_detail['max_publish_time']
  187. min_publish_time = read_rate_detail['min_publish_time']
  188. articles_count = read_rate_detail['records']
  189. if articles_count:
  190. processed_account_set.add(account['gh_id'])
  191. # check read rate in position 1 and 2
  192. if index in [1, 2]:
  193. error_obj = check_each_position(
  194. db_client=lam,
  195. gh_id=account['gh_id'],
  196. index=index,
  197. dt=dt,
  198. avg_rate=read_rate_avg
  199. )
  200. if error_obj:
  201. error_list.append(error_obj)
  202. # insert into database
  203. try:
  204. if not read_rate_avg:
  205. continue
  206. insert_sql = f"""
  207. INSERT INTO {read_rate_table}
  208. (account_name, gh_id, position, read_rate_avg, remark, articles_count, earliest_publish_time, latest_publish_time, dt_version, is_delete)
  209. values
  210. (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s);
  211. """
  212. lam.save(
  213. query=insert_sql,
  214. params=(
  215. account['account_name'],
  216. account['gh_id'],
  217. index,
  218. read_rate_avg,
  219. "从 {} 开始往前计算 31 天".format(dt),
  220. articles_count,
  221. functions.timestamp_to_str(timestamp=min_publish_time, string_format='%Y-%m-%d'),
  222. functions.timestamp_to_str(timestamp=max_publish_time, string_format='%Y-%m-%d'),
  223. avg_date.replace("-", ""),
  224. 0
  225. )
  226. )
  227. except Exception as e:
  228. print(e)
  229. insert_error_list.append(str(e))
  230. # bot sql error
  231. if insert_error_list:
  232. bot(
  233. title="更新阅读率均值,存在sql 插入失败",
  234. detail=insert_error_list
  235. )
  236. # bot outliers
  237. if error_list:
  238. columns = [
  239. create_feishu_columns_sheet(sheet_type="plain_text", sheet_name="account_name", display_name="账号名称"),
  240. create_feishu_columns_sheet(sheet_type="plain_text", sheet_name="position", display_name="文章位置"),
  241. create_feishu_columns_sheet(sheet_type="plain_text", sheet_name="read_rate_avg_yesterday",
  242. display_name="昨日阅读率均值"),
  243. create_feishu_columns_sheet(sheet_type="plain_text", sheet_name="read_rate_avg_the_day_before_yesterday",
  244. display_name="前天阅读率均值"),
  245. create_feishu_columns_sheet(sheet_type="options", sheet_name="relative_change_rate",
  246. display_name="相对变化率")
  247. ]
  248. bot(
  249. title="阅读率均值表异常信息, 总共处理{}个账号".format(len(processed_account_set)),
  250. detail={
  251. "columns": columns,
  252. "rows": error_list
  253. },
  254. table=True,
  255. mention=False
  256. )
  257. # if no error, send success info
  258. if not error_list and not insert_error_list:
  259. bot(
  260. title="阅读率均值表更新成功, 总共处理{}个账号".format(len(processed_account_set)),
  261. detail={
  262. "日期": dt
  263. },
  264. mention=False
  265. )
  266. def main() -> None:
  267. """
  268. main function
  269. :return:
  270. """
  271. parser = ArgumentParser()
  272. parser.add_argument("--run-date",
  273. help="Run only once for date in format of %Y-%m-%d. \
  274. If no specified, run as daily jobs.")
  275. args = parser.parse_args()
  276. if args.run_date:
  277. dt = args.run_date
  278. else:
  279. dt = datetime.today().strftime('%Y-%m-%d')
  280. # init stat period
  281. max_time = functions.str_to_timestamp(date_string=dt)
  282. min_time = max_time - const.STATISTICS_PERIOD
  283. min_stat_date = functions.timestamp_to_str(timestamp=min_time, string_format='%Y-%m-%d')
  284. # init database connector
  285. long_articles_db_client = DatabaseConnector(db_config=long_articles_config)
  286. long_articles_db_client.connect()
  287. piaoquan_crawler_db_client = DatabaseConnector(db_config=piaoquan_crawler_config)
  288. piaoquan_crawler_db_client.connect()
  289. denet_db_client = DatabaseConnector(db_config=denet_config)
  290. denet_db_client.connect()
  291. # get account list
  292. account_list = fetch_publishing_account_list(db_client=denet_db_client)
  293. # get fans dict
  294. fans_dict = fetch_account_fans(db_client=denet_db_client, start_date=min_stat_date)
  295. # get data frame from official_articles_v2
  296. gh_id_tuple = tuple([i['gh_id'] for i in account_list])
  297. article_list = get_account_articles_detail(db_client=piaoquan_crawler_db_client, gh_id_tuple=gh_id_tuple, min_publish_timestamp=min_time)
  298. # cal account read rate and make a dataframe
  299. read_rate_dataframe = cal_account_read_rate(db_client=long_articles_db_client, article_list=article_list, fans_dict=fans_dict)
  300. # update each day's data
  301. update_single_day(dt, account_list, read_rate_dataframe, long_articles_db_client)
  302. if __name__ == '__main__':
  303. main()