cal_account_read_rate_avg_daily.py 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366
  1. """
  2. @author: luojunhui
  3. cal each account && position reading rate
  4. """
  5. import json
  6. from tqdm import tqdm
  7. from pandas import DataFrame
  8. from argparse import ArgumentParser
  9. from datetime import datetime, timezone, timedelta
  10. from applications import DeNetMysql, PQMySQL, longArticlesMySQL, bot
  11. STATISTICS_PERIOD = 31 * 24 * 60 * 60
  12. def float_to_percentage(value, decimals=3) -> str:
  13. """
  14. 把小数转化为百分数
  15. :param value:
  16. :param decimals:
  17. :return:
  18. """
  19. percentage_value = round(value * 100, decimals)
  20. return "{}%".format(percentage_value)
  21. def filter_outlier_data(group, key='show_view_count'):
  22. """
  23. :param group:
  24. :param key:
  25. :return:
  26. """
  27. mean = group[key].mean()
  28. std = group[key].std()
  29. # 过滤二倍标准差的数据
  30. filtered_group = group[(group[key] > mean - 2 * std) & (group[key] < mean + 2 * std)]
  31. # 过滤均值倍数大于5的数据
  32. new_mean = filtered_group[key].mean()
  33. # print("阅读均值", new_mean)
  34. filtered_group = filtered_group[filtered_group[key] < new_mean * 5]
  35. return filtered_group
  36. def timestamp_to_str(timestamp) -> str:
  37. """
  38. :param timestamp:
  39. """
  40. dt_object = datetime.utcfromtimestamp(timestamp).replace(tzinfo=timezone.utc).astimezone()
  41. date_string = dt_object.strftime('%Y-%m-%d')
  42. return date_string
  43. def str_to_timestamp(date_string) -> int:
  44. """
  45. :param date_string:
  46. :return:
  47. """
  48. date_obj = datetime.strptime(date_string, '%Y-%m-%d')
  49. # 使用timestamp()方法将datetime对象转换为时间戳
  50. timestamp = date_obj.timestamp()
  51. return int(timestamp)
  52. def get_account_fans_by_dt(db_client) -> dict:
  53. """
  54. 获取每个账号发粉丝,通过日期来区分
  55. :return:
  56. """
  57. sql = f"""
  58. SELECT
  59. t1.date_str,
  60. t1.fans_count,
  61. t2.gh_id
  62. FROM datastat_wx t1
  63. JOIN publish_account t2 ON t1.account_id = t2.id
  64. WHERE
  65. t2.channel = 5
  66. AND t2.status = 1
  67. AND t1.date_str >= '2024-07-01'
  68. ORDER BY t1.date_str;
  69. """
  70. result = db_client.select(sql)
  71. D = {}
  72. for line in result:
  73. dt = line[0]
  74. fans = line[1]
  75. gh_id = line[2]
  76. if D.get(gh_id):
  77. D[gh_id][dt] = fans
  78. else:
  79. D[gh_id] = {dt: fans}
  80. return D
  81. def get_publishing_accounts(db_client) -> list[dict]:
  82. """
  83. 获取每日正在发布的账号
  84. :return:
  85. """
  86. sql = f"""
  87. SELECT DISTINCT
  88. t3.`name`,
  89. t3.gh_id,
  90. t3.follower_count,
  91. t6.account_source_name,
  92. t6.mode_type,
  93. t6.account_type,
  94. t6.`status`
  95. FROM
  96. publish_plan t1
  97. JOIN publish_plan_account t2 ON t1.id = t2.plan_id
  98. JOIN publish_account t3 ON t2.account_id = t3.id
  99. LEFT JOIN publish_account_wx_type t4 on t3.id = t4.account_id
  100. LEFT JOIN wx_statistics_group_source_account t5 on t3.id = t5.account_id
  101. LEFT JOIN wx_statistics_group_source t6 on t5.group_source_name = t6.account_source_name
  102. WHERE
  103. t1.plan_status = 1
  104. AND t3.channel = 5
  105. AND t3.follower_count > 0
  106. GROUP BY t3.id;
  107. """
  108. account_list = db_client.select(sql)
  109. result_list = [
  110. {
  111. "account_name": i[0],
  112. "gh_id": i[1]
  113. } for i in account_list
  114. ]
  115. return result_list
  116. def get_account_articles_detail(db_client, gh_id_tuple) -> list[dict]:
  117. """
  118. get articles details
  119. :return:
  120. """
  121. sql = f"""
  122. SELECT
  123. ghId, accountName, updateTime, ItemIndex, show_view_count
  124. FROM
  125. official_articles_v2
  126. WHERE
  127. ghId IN {gh_id_tuple} and Type = '9';
  128. """
  129. result = db_client.select(sql)
  130. response_list = [
  131. {
  132. "ghId": i[0],
  133. "accountName": i[1],
  134. "updateTime": i[2],
  135. "ItemIndex": i[3],
  136. "show_view_count": i[4]
  137. }
  138. for i in result
  139. ]
  140. return response_list
  141. def cal_account_read_rate(gh_id_tuple) -> DataFrame:
  142. """
  143. 计算账号位置的阅读率
  144. :return:
  145. """
  146. pq_db = PQMySQL()
  147. de_db = DeNetMysql()
  148. response = []
  149. fans_dict_each_day = get_account_fans_by_dt(db_client=de_db)
  150. account_article_detail = get_account_articles_detail(
  151. db_client=pq_db,
  152. gh_id_tuple=gh_id_tuple
  153. )
  154. for line in account_article_detail:
  155. gh_id = line['ghId']
  156. dt = timestamp_to_str(line['updateTime'])
  157. fans = fans_dict_each_day.get(gh_id, {}).get(dt, 0)
  158. line['fans'] = fans
  159. if fans:
  160. line['readRate'] = line['show_view_count'] / fans if fans else 0
  161. response.append(line)
  162. return DataFrame(response,
  163. columns=['ghId', 'accountName', 'updateTime', 'ItemIndex', 'show_view_count', 'readRate'])
  164. def cal_avg_account_read_rate(df, gh_id, index, dt) -> tuple:
  165. """
  166. 计算账号的阅读率均值
  167. :return:
  168. """
  169. max_time = str_to_timestamp(dt)
  170. min_time = max_time - STATISTICS_PERIOD
  171. filterDataFrame = df[
  172. (df["ghId"] == gh_id)
  173. & (min_time <= df["updateTime"])
  174. & (df["updateTime"] <= max_time)
  175. & (df['ItemIndex'] == index)
  176. ]
  177. # print("位置", index)
  178. finalDF = filter_outlier_data(filterDataFrame)
  179. # finalDF = finalDF.sort_values(by=['updateTime'], ascending=False)
  180. # if index == 1:
  181. # for i in finalDF.values.tolist():
  182. # print(datetime.fromtimestamp(i[2]).strftime('%Y-%m-%d'), i)
  183. return (
  184. finalDF['readRate'].mean(),
  185. finalDF['updateTime'].max(),
  186. finalDF['updateTime'].min(),
  187. len(finalDF)
  188. )
  189. def check_each_position(db_client, gh_id, index, dt, avg_rate) -> dict:
  190. """
  191. 检验某个具体账号的具体文章的阅读率均值和前段日子的比较
  192. :param avg_rate: 当天计算出的阅读率均值
  193. :param db_client: 数据库连接
  194. :param gh_id: 账号 id
  195. :param index: 账号 index
  196. :param dt:
  197. :return:
  198. """
  199. dt = int(dt.replace("-", ""))
  200. select_sql = f"""
  201. SELECT account_name, read_rate_avg
  202. FROM long_articles_read_rate
  203. WHERE gh_id = '{gh_id}' and position = {index} and dt_version < {dt}
  204. ORDER BY dt_version DESC limit 1;
  205. """
  206. result = db_client.select(select_sql)
  207. if result:
  208. account_name = result[0][0]
  209. previous_read_rate_avg = result[0][1]
  210. relative_value = (avg_rate - previous_read_rate_avg) / previous_read_rate_avg
  211. if -0.05 <= relative_value <= 0.05:
  212. return {}
  213. else:
  214. response = {
  215. "账号名称": account_name,
  216. "位置": index,
  217. "当天阅读率均值": float_to_percentage(avg_rate),
  218. "前一天阅读率均值": float_to_percentage(previous_read_rate_avg),
  219. "相对变化率": float_to_percentage(relative_value)
  220. }
  221. return response
  222. def update_single_day(dt, account_list, article_df, lam):
  223. """
  224. 更新单天数据
  225. :param article_df:
  226. :param lam:
  227. :param account_list:
  228. :param dt:
  229. :return:
  230. """
  231. index_list = [1, 2, 3, 4, 5, 6, 7, 8]
  232. error_list = []
  233. insert_error_list = []
  234. for account in tqdm(account_list):
  235. for index in index_list:
  236. avg_rate, max_time, min_time, articles_count = cal_avg_account_read_rate(article_df, account['gh_id'], index, dt)
  237. if articles_count > 0:
  238. if index in {1, 2}:
  239. error_obj = check_each_position(
  240. db_client=lam,
  241. gh_id=account['gh_id'],
  242. index=index,
  243. dt=dt,
  244. avg_rate=avg_rate
  245. )
  246. if error_obj:
  247. error_list.append(error_obj)
  248. # continue
  249. try:
  250. if avg_rate == 0:
  251. continue
  252. insert_sql = f"""
  253. INSERT INTO long_articles_read_rate
  254. (account_name, gh_id, position, read_rate_avg, remark, articles_count, earliest_publish_time, latest_publish_time, dt_version, is_delete)
  255. values
  256. (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s);
  257. """
  258. lam.update(
  259. sql=insert_sql,
  260. params=(
  261. account['account_name'],
  262. account['gh_id'],
  263. index,
  264. avg_rate,
  265. "从 {} 开始往前计算 31 天".format(dt),
  266. articles_count,
  267. timestamp_to_str(min_time),
  268. timestamp_to_str(max_time),
  269. dt.replace("-", ""),
  270. 0
  271. )
  272. )
  273. except Exception as e:
  274. insert_error_list.append(e)
  275. if insert_error_list:
  276. bot(
  277. title="更新阅读率均值,存在sql 插入失败",
  278. detail=insert_error_list
  279. )
  280. if error_list:
  281. bot(
  282. title="更新阅读率均值,头次出现异常值通知",
  283. detail={
  284. "时间": dt,
  285. "异常列表": error_list
  286. }
  287. )
  288. if not error_list and not insert_error_list:
  289. bot(
  290. title="阅读率均值表,更新成功",
  291. detail={
  292. "日期": dt
  293. }
  294. )
  295. def main() -> None:
  296. """
  297. main function
  298. :return:
  299. """
  300. parser = ArgumentParser()
  301. parser.add_argument("--run-date",
  302. help="Run only once for date in format of %Y-%m-%d. \
  303. If no specified, run as daily jobs.")
  304. args = parser.parse_args()
  305. if args.run_date:
  306. dt = args.run_date
  307. else:
  308. dt = datetime.today().strftime('%Y-%m-%d')
  309. lam = longArticlesMySQL()
  310. de = DeNetMysql()
  311. account_list = get_publishing_accounts(db_client=de)
  312. df = cal_account_read_rate(tuple([i['gh_id'] for i in account_list]))
  313. update_single_day(dt, account_list, df, lam)
  314. # start_dt = start_date = datetime(2024, 8, 1)
  315. # end_date = datetime(2024, 10, 22)
  316. # # 计算日期差
  317. # delta = end_date - start_date
  318. # # 生成日期字符串列表
  319. # date_strings = []
  320. # for i in range(delta.days + 1):
  321. # date_strings.append((start_date + timedelta(days=i)).strftime('%Y-%m-%d'))
  322. #
  323. # # 打印结果
  324. # date_str = '2024-09-11'
  325. # date_strings = [date_str,]
  326. # for date_str in tqdm(date_strings):
  327. # update_single_day(date_str, account_list, df, lam)
  328. if __name__ == '__main__':
  329. main()