updateAccountAvgDaily.py 4.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156
  1. """
  2. @author: luojunhui
  3. """
  4. import json
  5. import threading
  6. from datetime import datetime
  7. from pandas import DataFrame
  8. from tqdm import tqdm
  9. from applications import PQMySQL
  10. def filter_outlier_data(group, key='show_view_count'):
  11. """
  12. :param group:
  13. :param key:
  14. :return:
  15. """
  16. mean = group[key].mean()
  17. std = group[key].std()
  18. # 过滤二倍标准差的数据
  19. filtered_group = group[(group[key] > mean - 2 * std) & (group[key] < mean + 2 * std)]
  20. # 过滤均值倍数大于5的数据
  21. new_mean = filtered_group[key].mean()
  22. filtered_group = filtered_group[filtered_group[key] < new_mean * 5]
  23. return filtered_group
  24. class UpdateMsgDaily(object):
  25. """
  26. 日常更新文章
  27. """
  28. db_client = PQMySQL()
  29. with open("config/accountInfoV2.json", encoding="utf-8") as f:
  30. account_list = json.loads(f.read())
  31. # subscription_account = [i for i in account_list if i['type'] == '订阅号']
  32. # server_account = [i for i in account_list if i['type'] == '服务号']
  33. @classmethod
  34. def getAccountIdDict(cls):
  35. """
  36. 获取全部内部账号的id
  37. :return:
  38. """
  39. gh_id_dict = {}
  40. for line in cls.account_list:
  41. gh_id = line['gh_id']
  42. gh_id_dict[gh_id] = line
  43. return gh_id_dict
  44. @classmethod
  45. def getAllAvgRead(cls):
  46. """
  47. :return:
  48. """
  49. L = []
  50. record_list = cls.account_list
  51. for item in tqdm(record_list):
  52. index_list = [i for i in range(1, 9)]
  53. for index in index_list:
  54. account_name = item['name']
  55. print(account_name, index)
  56. avg_read, avg_like = cls.getArticleByFilter(
  57. account_name=account_name,
  58. index=index,
  59. min_time=1716480000,
  60. max_time=1721836800
  61. )
  62. obj = {
  63. "account_name": account_name,
  64. "gh_id": item['ghId'],
  65. "fans": item.get('follower_count', 0),
  66. "position": index,
  67. "avg_read": avg_read,
  68. "avg_like": avg_like
  69. }
  70. L.append(obj)
  71. with open("new_account_avg_v2.json", "w", encoding="utf-8") as f:
  72. f.write(json.dumps(L, ensure_ascii=False, indent=4))
  73. @classmethod
  74. def getEachAvgRead(cls, account_name, index):
  75. """
  76. :return:
  77. """
  78. keys = [
  79. "appMsgId",
  80. "title",
  81. "Type",
  82. "updateTime",
  83. "ItemIndex",
  84. "ContentUrl",
  85. "show_view_count",
  86. "show_like_count",
  87. ]
  88. sql = f"""
  89. SELECT {", ".join(keys)}
  90. FROM official_articles_v2
  91. WHERE accountName = '{account_name}' and ItemIndex = {index};"""
  92. result = cls.db_client.select(sql=sql)
  93. return DataFrame(result, columns=keys)
  94. @classmethod
  95. def getArticleByFilter(
  96. cls,
  97. account_name,
  98. index,
  99. min_time=None,
  100. max_time=None,
  101. msg_type=None,
  102. ):
  103. """
  104. :param account_name:
  105. :param index: index ranges from 1 to 8
  106. :param min_time: earliest time
  107. :param max_time: latest time
  108. :param msg_type: msg_type
  109. :return:
  110. """
  111. if not msg_type:
  112. msg_type = "9"
  113. if not min_time:
  114. min_time = 0
  115. if not max_time:
  116. # 2099年
  117. max_time = 4088051123
  118. articleDataFrame = cls.getEachAvgRead(account_name=account_name, index=index)
  119. filterDataFrame = articleDataFrame[
  120. (articleDataFrame["Type"] == msg_type)
  121. & (min_time <= articleDataFrame["updateTime"])
  122. & (articleDataFrame["updateTime"] <= max_time)
  123. ]
  124. # 过滤异常值
  125. finalDF = filter_outlier_data(filterDataFrame)
  126. return finalDF['show_view_count'].mean(), finalDF['show_like_count'].mean()
  127. def job_with_thread(job_func):
  128. """
  129. 每个任务放到单个线程中
  130. :param job_func:
  131. :return:
  132. """
  133. job_thread = threading.Thread(target=job_func)
  134. job_thread.start()
  135. if __name__ == "__main__":
  136. UMD = UpdateMsgDaily()
  137. UMD.getAllAvgRead()