updateAccountAvgDaily.py 6.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237
  1. """
  2. @author: luojunhui
  3. CREATE TABLE `account_avg_info_v2` (
  4. `gh_id` varchar(32) NOT NULL COMMENT 'ghid',
  5. `position` int(11) NOT NULL COMMENT '位置',
  6. `account_name` varchar(255) DEFAULT NULL COMMENT '账号名称',
  7. `fans` int(10) DEFAULT NULL COMMENT '粉丝量',
  8. `read_avg` double(8,2) DEFAULT NULL COMMENT '阅读均值',
  9. `like_avg` double(8,2) DEFAULT NULL COMMENT '点赞均值',
  10. `update_time` datetime DEFAULT NULL COMMENT '更新时间 dt',
  11. `status` int(1) DEFAULT NULL COMMENT ' 状态',
  12. PRIMARY KEY (`gh_id`,`position`)
  13. ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 AVG_ROW_LENGTH=202 ROW_FORMAT=DYNAMIC
  14. """
  15. import json
  16. import time
  17. import schedule
  18. from datetime import datetime
  19. from pandas import DataFrame
  20. from tqdm import tqdm
  21. from applications import PQMySQL, DeNetMysql, Functions
  22. def filter_outlier_data(group, key='show_view_count'):
  23. """
  24. :param group:
  25. :param key:
  26. :return:
  27. """
  28. mean = group[key].mean()
  29. std = group[key].std()
  30. # 过滤二倍标准差的数据
  31. filtered_group = group[(group[key] > mean - 2 * std) & (group[key] < mean + 2 * std)]
  32. # 过滤均值倍数大于5的数据
  33. new_mean = filtered_group[key].mean()
  34. filtered_group = filtered_group[filtered_group[key] < new_mean * 5]
  35. return filtered_group
  36. class UpdateAvgDaily(object):
  37. """
  38. 日常更新文章
  39. """
  40. pqClient = PQMySQL()
  41. deNetClient = DeNetMysql()
  42. @classmethod
  43. def getAccountList(cls):
  44. """
  45. 获取账号 list
  46. :return:
  47. """
  48. sql = f"""
  49. SELECT t1.`name`, t1.gh_id, t1.follower_count, t3.account_type
  50. FROM `publish_account` t1
  51. JOIN wx_statistics_group_source_account t2
  52. ON t1.id = t2.account_id
  53. JOIN wx_statistics_group_source t3
  54. ON t2.group_source_name = t3.account_source_name
  55. """
  56. response = cls.deNetClient.select(sql)
  57. L = []
  58. for item in response:
  59. temp = {
  60. "accountName": item[0],
  61. "ghId": item[1],
  62. "fans": item[2],
  63. "accountType": item[3]
  64. }
  65. if temp["accountName"] in ['口琴', '二胡']:
  66. continue
  67. elif temp["accountType"] == '服务号':
  68. continue
  69. else:
  70. L.append(temp)
  71. return L
  72. @classmethod
  73. def getAccountIdDict(cls):
  74. """
  75. 获取全部内部账号的id
  76. :return:
  77. """
  78. gh_id_dict = {}
  79. for line in cls.account_list:
  80. gh_id = line['gh_id']
  81. gh_id_dict[gh_id] = line
  82. return gh_id_dict
  83. @classmethod
  84. def insertIntoMysql(cls, data):
  85. """
  86. 将数据插入到 Mysql 中
  87. :param data:
  88. :return:
  89. """
  90. sql = f"""
  91. INSERT INTO account_avg_info_v2
  92. (gh_id, position, account_name, fans, read_avg, like_avg, update_time, status)
  93. values
  94. (%s, %s, %s, %s, %s, %s, %s, %s);
  95. """
  96. cls.pqClient.update(
  97. sql=sql,
  98. params=(
  99. data['gh_id'],
  100. data['position'],
  101. data['account_name'],
  102. data['fans'],
  103. data['avg_read'],
  104. data['avg_like'],
  105. data['update_time'],
  106. 1
  107. )
  108. )
  109. @classmethod
  110. def getAllAvgRead(cls):
  111. """
  112. :return:
  113. """
  114. L = []
  115. record_list = cls.getAccountList()
  116. dt_str = datetime.today().__str__().split(" ")[0]
  117. for item in tqdm(record_list):
  118. index_list = [i for i in range(1, 9)]
  119. for index in index_list:
  120. try:
  121. account_name = item['accountName']
  122. avg_read, avg_like = cls.getArticleByFilter(
  123. account_name=account_name,
  124. index=index,
  125. min_time=int(time.time()) - 31 * 24 * 3600,
  126. max_time=int(time.time())
  127. )
  128. obj = {
  129. "account_name": account_name,
  130. "gh_id": item['ghId'],
  131. "fans": item.get('fans', 0),
  132. "position": index,
  133. "avg_read": avg_read if str(avg_read) != "nan" else 0,
  134. "avg_like": avg_like if str(avg_like) != "nan" else 0,
  135. "update_time": dt_str
  136. }
  137. cls.insertIntoMysql(obj)
  138. L.append(obj)
  139. except Exception as e:
  140. print(e)
  141. with open("new_account_avg_v3.json", "w", encoding="utf-8") as f:
  142. f.write(json.dumps(L, ensure_ascii=False, indent=4))
  143. update_sql = f"""
  144. UPDATE account_avg_info_v2
  145. SET status = %s
  146. where update_time != '{dt_str}';
  147. """
  148. cls.pqClient.update(sql=update_sql, params=0)
  149. @classmethod
  150. def getEachAvgRead(cls, account_name, index):
  151. """
  152. :return:
  153. """
  154. keys = [
  155. "appMsgId",
  156. "title",
  157. "Type",
  158. "updateTime",
  159. "ItemIndex",
  160. "ContentUrl",
  161. "show_view_count",
  162. "show_like_count",
  163. ]
  164. sql = f"""
  165. SELECT {", ".join(keys)}
  166. FROM official_articles_v2
  167. WHERE accountName = '{account_name}' and ItemIndex = {index};"""
  168. result = cls.pqClient.select(sql=sql)
  169. return DataFrame(result, columns=keys)
  170. @classmethod
  171. def getArticleByFilter(
  172. cls,
  173. account_name,
  174. index,
  175. min_time=None,
  176. max_time=None,
  177. msg_type=None,
  178. ):
  179. """
  180. :param account_name:
  181. :param index: index ranges from 1 to 8
  182. :param min_time: earliest time
  183. :param max_time: latest time
  184. :param msg_type: msg_type
  185. :return:
  186. """
  187. if not msg_type:
  188. msg_type = "9"
  189. if not min_time:
  190. min_time = 0
  191. if not max_time:
  192. # 2099年
  193. max_time = 4088051123
  194. articleDataFrame = cls.getEachAvgRead(account_name=account_name, index=index)
  195. filterDataFrame = articleDataFrame[
  196. (articleDataFrame["Type"] == msg_type)
  197. & (min_time <= articleDataFrame["updateTime"])
  198. & (articleDataFrame["updateTime"] <= max_time)
  199. ]
  200. # 过滤异常值
  201. finalDF = filter_outlier_data(filterDataFrame)
  202. return finalDF['show_view_count'].mean(), finalDF['show_like_count'].mean()
  203. def updateAvgJob():
  204. """
  205. :return:
  206. """
  207. S = UpdateAvgDaily()
  208. S.getAllAvgRead()
  209. if __name__ == "__main__":
  210. # updateAvgJob()
  211. schedule.every().day.at("22:30").do(Functions().job_with_thread, updateAvgJob)
  212. while True:
  213. schedule.run_pending()
  214. time.sleep(1)