cal_account_read_rate_avg_daily.py 7.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266
  1. """
  2. @author: luojunhui
  3. cal each account && position reading rate
  4. """
  5. from tqdm import tqdm
  6. from datetime import datetime, timezone, timedelta
  7. from pandas import DataFrame
  8. from applications import DeNetMysql, PQMySQL, longArticlesMySQL
  9. STATISTICS_PERIOD = 31 * 24 * 60 * 60
  10. def filter_outlier_data(group, key='show_view_count'):
  11. """
  12. :param group:
  13. :param key:
  14. :return:
  15. """
  16. mean = group[key].mean()
  17. std = group[key].std()
  18. # 过滤二倍标准差的数据
  19. filtered_group = group[(group[key] > mean - 2 * std) & (group[key] < mean + 2 * std)]
  20. # 过滤均值倍数大于5的数据
  21. new_mean = filtered_group[key].mean()
  22. filtered_group = filtered_group[filtered_group[key] < new_mean * 5]
  23. return filtered_group
  24. def timestamp_to_str(timestamp) -> str:
  25. """
  26. :param timestamp:
  27. """
  28. dt_object = datetime.utcfromtimestamp(timestamp).replace(tzinfo=timezone.utc).astimezone()
  29. date_string = dt_object.strftime('%Y-%m-%d')
  30. return date_string
  31. def str_to_timestamp(date_string) -> int:
  32. """
  33. :param date_string:
  34. :return:
  35. """
  36. date_obj = datetime.strptime(date_string, '%Y-%m-%d')
  37. # 使用timestamp()方法将datetime对象转换为时间戳
  38. timestamp = date_obj.timestamp()
  39. return timestamp
  40. def get_account_fans_by_dt(db_client) -> dict:
  41. """
  42. 获取每个账号发粉丝,通过日期来区分
  43. :return:
  44. """
  45. sql = f"""
  46. SELECT
  47. t1.date_str,
  48. t1.fans_count,
  49. t2.gh_id
  50. FROM datastat_wx t1
  51. JOIN publish_account t2 ON t1.account_id = t2.id
  52. WHERE
  53. t2.channel = 5
  54. AND t2.status = 1
  55. AND t1.date_str >= '2024-07-01'
  56. ORDER BY t1.date_str;
  57. """
  58. result = db_client.select(sql)
  59. D = {}
  60. for line in result:
  61. dt = line[0]
  62. fans = line[1]
  63. gh_id = line[2]
  64. if D.get(gh_id):
  65. D[gh_id][dt] = fans
  66. else:
  67. D[gh_id] = {dt: fans}
  68. return D
  69. def get_publishing_accounts(db_client) -> list[dict]:
  70. """
  71. 获取每日正在发布的账号
  72. :return:
  73. """
  74. sql = f"""
  75. SELECT DISTINCT
  76. t3.`name`,
  77. t3.gh_id,
  78. t3.follower_count,
  79. t6.account_source_name,
  80. t6.mode_type,
  81. t6.account_type,
  82. t6.`status`
  83. FROM
  84. publish_plan t1
  85. JOIN publish_plan_account t2 ON t1.id = t2.plan_id
  86. JOIN publish_account t3 ON t2.account_id = t3.id
  87. LEFT JOIN publish_account_wx_type t4 on t3.id = t4.account_id
  88. LEFT JOIN wx_statistics_group_source_account t5 on t3.id = t5.account_id
  89. LEFT JOIN wx_statistics_group_source t6 on t5.group_source_name = t6.account_source_name
  90. WHERE
  91. t1.plan_status = 1
  92. AND t3.channel = 5
  93. AND t3.follower_count > 0
  94. GROUP BY t3.id;
  95. """
  96. account_list = db_client.select(sql)
  97. result_list = [
  98. {
  99. "account_name": i[0],
  100. "gh_id": i[1]
  101. } for i in account_list
  102. ]
  103. return result_list
  104. def get_account_articles_detail(db_client, gh_id_tuple) -> list[dict]:
  105. """
  106. get articles details
  107. :return:
  108. """
  109. sql = f"""
  110. SELECT
  111. ghId, accountName, updateTime, ItemIndex, show_view_count
  112. FROM
  113. official_articles_v2
  114. WHERE
  115. ghId IN {gh_id_tuple} and Type = '9';
  116. """
  117. result = db_client.select(sql)
  118. response_list = [
  119. {
  120. "ghId": i[0],
  121. "accountName": i[1],
  122. "updateTime": i[2],
  123. "ItemIndex": i[3],
  124. "show_view_count": i[4]
  125. }
  126. for i in result
  127. ]
  128. return response_list
  129. def cal_account_read_rate(gh_id_tuple) -> DataFrame:
  130. """
  131. 计算账号位置的阅读率
  132. :return:
  133. """
  134. pq_db = PQMySQL()
  135. de_db = DeNetMysql()
  136. response = []
  137. fans_dict_each_day = get_account_fans_by_dt(db_client=de_db)
  138. account_article_detail = get_account_articles_detail(
  139. db_client=pq_db,
  140. gh_id_tuple=gh_id_tuple
  141. )
  142. for line in account_article_detail:
  143. gh_id = line['ghId']
  144. dt = timestamp_to_str(line['updateTime'])
  145. fans = fans_dict_each_day.get(gh_id, {}).get(dt, 0)
  146. line['fans'] = fans
  147. if fans:
  148. line['readRate'] = line['show_view_count'] / fans if fans else 0
  149. response.append(line)
  150. return DataFrame(response,
  151. columns=['ghId', 'accountName', 'updateTime', 'ItemIndex', 'show_view_count', 'readRate'])
  152. def cal_avg_account_read_rate(df, gh_id, index, dt) -> tuple:
  153. """
  154. 计算账号的阅读率均值
  155. :return:
  156. """
  157. max_time = str_to_timestamp(dt)
  158. min_time = max_time - STATISTICS_PERIOD
  159. filterDataFrame = df[
  160. (df["ghId"] == gh_id)
  161. & (min_time <= df["updateTime"])
  162. & (df["updateTime"] <= max_time)
  163. & (df['ItemIndex'] == index)
  164. ]
  165. finalDF = filter_outlier_data(filterDataFrame)
  166. return (
  167. finalDF['readRate'].mean(),
  168. finalDF['updateTime'].max(),
  169. finalDF['updateTime'].min(),
  170. len(finalDF)
  171. )
  172. def update_single_day(dt, account_list, article_df, lam):
  173. """
  174. 更新单天数据
  175. :param article_df:
  176. :param lam:
  177. :param account_list:
  178. :param dt:
  179. :return:
  180. """
  181. index_list = [1, 2, 3, 4, 5, 6, 7, 8]
  182. for account in tqdm(account_list):
  183. for index in index_list:
  184. avg_rate, max_time, min_time, a_count = cal_avg_account_read_rate(article_df, account['gh_id'], index, dt)
  185. # print(account['account_name'], "\t", index, "\t", avg_rate, "\t", max_time, "\t", min_time, "\t", a_count,
  186. # "\t", account['gh_id'])
  187. try:
  188. if avg_rate == 0:
  189. continue
  190. insert_sql = f"""
  191. INSERT INTO long_articles_read_rate_dev
  192. (account_name, gh_id, position, read_rate_avg, remark, articles_count, earliest_publish_time, latest_publish_time, dt_version, is_delete)
  193. values
  194. (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s);
  195. """
  196. lam.update(
  197. sql=insert_sql,
  198. params=(
  199. account['account_name'],
  200. account['gh_id'],
  201. index,
  202. avg_rate,
  203. "从 {} 开始往前计算 31 天".format(dt),
  204. a_count,
  205. timestamp_to_str(min_time),
  206. timestamp_to_str(max_time),
  207. dt.replace("-", ""),
  208. 0
  209. )
  210. )
  211. except Exception as e:
  212. print(e)
  213. def main() -> None:
  214. """
  215. main function
  216. :return:
  217. """
  218. lam = longArticlesMySQL()
  219. de = DeNetMysql()
  220. # dt = '2024-10-22'
  221. account_list = get_publishing_accounts(db_client=de)
  222. # 获取这些账号所有的文章
  223. df = cal_account_read_rate(tuple([i['gh_id'] for i in account_list]))
  224. start_dt = start_date = datetime(2024, 8, 1)
  225. end_date = datetime(2024, 10, 22)
  226. # 计算日期差
  227. delta = end_date - start_date
  228. # 生成日期字符串列表
  229. date_strings = []
  230. for i in range(delta.days + 1):
  231. date_strings.append((start_date + timedelta(days=i)).strftime('%Y-%m-%d'))
  232. # 打印结果
  233. date_str = '2024-09-11'
  234. date_strings = [date_str,]
  235. for date_str in tqdm(date_strings):
  236. update_single_day(date_str, account_list, df, lam)
  237. if __name__ == '__main__':
  238. main()