updateAccountAvgDaily.py 6.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238
  1. """
  2. @author: luojunhui
  3. CREATE TABLE `account_avg_info_v2` (
  4. `gh_id` varchar(32) NOT NULL COMMENT 'ghid',
  5. `position` int(11) NOT NULL COMMENT '位置',
  6. `account_name` varchar(255) DEFAULT NULL COMMENT '账号名称',
  7. `fans` int(10) DEFAULT NULL COMMENT '粉丝量',
  8. `read_avg` double(8,2) DEFAULT NULL COMMENT '阅读均值',
  9. `like_avg` double(8,2) DEFAULT NULL COMMENT '点赞均值',
  10. `update_time` datetime DEFAULT NULL COMMENT '更新时间 dt',
  11. `status` int(1) DEFAULT NULL COMMENT ' 状态',
  12. PRIMARY KEY (`gh_id`,`position`)
  13. ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 AVG_ROW_LENGTH=202 ROW_FORMAT=DYNAMIC
  14. """
  15. import json
  16. import time
  17. import schedule
  18. from datetime import datetime
  19. from pandas import DataFrame
  20. from tqdm import tqdm
  21. from applications import PQMySQL, DeNetMysql, Functions
  22. def filter_outlier_data(group, key='show_view_count'):
  23. """
  24. :param group:
  25. :param key:
  26. :return:
  27. """
  28. mean = group[key].mean()
  29. std = group[key].std()
  30. # 过滤二倍标准差的数据
  31. filtered_group = group[(group[key] > mean - 2 * std) & (group[key] < mean + 2 * std)]
  32. # 过滤均值倍数大于5的数据
  33. new_mean = filtered_group[key].mean()
  34. filtered_group = filtered_group[filtered_group[key] < new_mean * 5]
  35. return filtered_group
  36. class UpdateAvgDaily(object):
  37. """
  38. 日常更新文章
  39. """
  40. pqClient = PQMySQL()
  41. deNetClient = DeNetMysql()
  42. @classmethod
  43. def getAccountList(cls):
  44. """
  45. 获取账号 list
  46. :return:
  47. """
  48. sql = f"""
  49. SELECT t1.`name`,t1.gh_id, t1.follower_count
  50. FROM `publish_account` t1
  51. JOIN wx_statistics_group_source_account t2
  52. ON t1.id = t2.account_id
  53. UNION
  54. SELECT t1.`name`, t1.gh_id, t1.follower_count
  55. FROM `publish_account` t1
  56. where t1.`name` in (
  57. '晚年家人',
  58. '历史长河流淌',
  59. '史趣探秘',
  60. '暖心一隅',
  61. '小阳看天下',
  62. '小惠爱厨房');
  63. """
  64. response = cls.deNetClient.select(sql)
  65. L = []
  66. for item in response:
  67. temp = {
  68. "accountName": item[0],
  69. "ghId": item[1],
  70. "fans": item[2]
  71. }
  72. L.append(temp)
  73. return L
  74. @classmethod
  75. def getAccountIdDict(cls):
  76. """
  77. 获取全部内部账号的id
  78. :return:
  79. """
  80. gh_id_dict = {}
  81. for line in cls.account_list:
  82. gh_id = line['gh_id']
  83. gh_id_dict[gh_id] = line
  84. return gh_id_dict
  85. @classmethod
  86. def insertIntoMysql(cls, data):
  87. """
  88. 将数据插入到 Mysql 中
  89. :param data:
  90. :return:
  91. """
  92. sql = f"""
  93. INSERT INTO account_avg_info_v2
  94. (gh_id, position, account_name, fans, read_avg, like_avg, update_time, status)
  95. values
  96. (%s, %s, %s, %s, %s, %s, %s, %s);
  97. """
  98. cls.pqClient.update(
  99. sql=sql,
  100. params=(
  101. data['gh_id'],
  102. data['position'],
  103. data['account_name'],
  104. data['fans'],
  105. data['avg_read'],
  106. data['avg_like'],
  107. data['update_time'],
  108. 1
  109. )
  110. )
  111. @classmethod
  112. def getAllAvgRead(cls):
  113. """
  114. :return:
  115. """
  116. L = []
  117. record_list = cls.getAccountList()
  118. dt_str = datetime.today().__str__().split(" ")[0]
  119. for item in tqdm(record_list):
  120. index_list = [i for i in range(1, 9)]
  121. for index in index_list:
  122. try:
  123. account_name = item['accountName']
  124. avg_read, avg_like = cls.getArticleByFilter(
  125. account_name=account_name,
  126. index=index,
  127. min_time=int(time.time()) - 31 * 24 * 3600,
  128. max_time=int(time.time())
  129. )
  130. obj = {
  131. "account_name": account_name,
  132. "gh_id": item['ghId'],
  133. "fans": item.get('fans', 0),
  134. "position": index,
  135. "avg_read": avg_read if str(avg_read) != "nan" else 0,
  136. "avg_like": avg_like if str(avg_like) != "nan" else 0,
  137. "update_time": dt_str
  138. }
  139. cls.insertIntoMysql(obj)
  140. L.append(obj)
  141. except Exception as e:
  142. print(e)
  143. with open("new_account_avg_v3.json", "w", encoding="utf-8") as f:
  144. f.write(json.dumps(L, ensure_ascii=False, indent=4))
  145. update_sql = f"""
  146. UPDATE account_avg_info_v2
  147. SET status = %s
  148. where update_time != '{dt_str}';
  149. """
  150. cls.pqClient.update(sql=update_sql, params=0)
  151. @classmethod
  152. def getEachAvgRead(cls, account_name, index):
  153. """
  154. :return:
  155. """
  156. keys = [
  157. "appMsgId",
  158. "title",
  159. "Type",
  160. "updateTime",
  161. "ItemIndex",
  162. "ContentUrl",
  163. "show_view_count",
  164. "show_like_count",
  165. ]
  166. sql = f"""
  167. SELECT {", ".join(keys)}
  168. FROM official_articles_v2
  169. WHERE accountName = '{account_name}' and ItemIndex = {index};"""
  170. result = cls.pqClient.select(sql=sql)
  171. return DataFrame(result, columns=keys)
  172. @classmethod
  173. def getArticleByFilter(
  174. cls,
  175. account_name,
  176. index,
  177. min_time=None,
  178. max_time=None,
  179. msg_type=None,
  180. ):
  181. """
  182. :param account_name:
  183. :param index: index ranges from 1 to 8
  184. :param min_time: earliest time
  185. :param max_time: latest time
  186. :param msg_type: msg_type
  187. :return:
  188. """
  189. if not msg_type:
  190. msg_type = "9"
  191. if not min_time:
  192. min_time = 0
  193. if not max_time:
  194. # 2099年
  195. max_time = 4088051123
  196. articleDataFrame = cls.getEachAvgRead(account_name=account_name, index=index)
  197. filterDataFrame = articleDataFrame[
  198. (articleDataFrame["Type"] == msg_type)
  199. & (min_time <= articleDataFrame["updateTime"])
  200. & (articleDataFrame["updateTime"] <= max_time)
  201. ]
  202. # 过滤异常值
  203. finalDF = filter_outlier_data(filterDataFrame)
  204. return finalDF['show_view_count'].mean(), finalDF['show_like_count'].mean()
  205. def updateAvgJob():
  206. """
  207. :return:
  208. """
  209. S = UpdateAvgDaily()
  210. S.getAllAvgRead()
  211. if __name__ == "__main__":
  212. schedule.every().day.at("22:30").do(Functions().job_with_thread, updateAvgJob)
  213. while True:
  214. schedule.run_pending()
  215. time.sleep(1)