updateAccountAvgDaily.py 9.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302
  1. """
  2. @author: luojunhui
  3. CREATE TABLE `account_avg_info_v2` (
  4. `gh_id` varchar(32) NOT NULL COMMENT 'ghid',
  5. `position` int(11) NOT NULL COMMENT '位置',
  6. `account_name` varchar(255) DEFAULT NULL COMMENT '账号名称',
  7. `fans` int(10) DEFAULT NULL COMMENT '粉丝量',
  8. `read_avg` double(8,2) DEFAULT NULL COMMENT '阅读均值',
  9. `like_avg` double(8,2) DEFAULT NULL COMMENT '点赞均值',
  10. `update_time` datetime DEFAULT NULL COMMENT '更新时间 dt',
  11. `status` int(1) DEFAULT NULL COMMENT ' 状态',
  12. PRIMARY KEY (`gh_id`,`position`)
  13. ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 AVG_ROW_LENGTH=202 ROW_FORMAT=DYNAMIC
  14. """
  15. import json
  16. import time
  17. import schedule
  18. from datetime import datetime
  19. from pandas import DataFrame
  20. from tqdm import tqdm
  21. from applications import PQMySQL, DeNetMysql, Functions, log, bot
  22. def filter_outlier_data(group, key='show_view_count'):
  23. """
  24. :param group:
  25. :param key:
  26. :return:
  27. """
  28. mean = group[key].mean()
  29. std = group[key].std()
  30. # 过滤二倍标准差的数据
  31. filtered_group = group[(group[key] > mean - 2 * std) & (group[key] < mean + 2 * std)]
  32. # 过滤均值倍数大于5的数据
  33. new_mean = filtered_group[key].mean()
  34. filtered_group = filtered_group[filtered_group[key] < new_mean * 5]
  35. return filtered_group
  36. class UpdateAvgDaily(object):
  37. """
  38. 日常更新文章
  39. """
  40. pqClient = PQMySQL()
  41. deNetClient = DeNetMysql()
  42. @classmethod
  43. def getAccountList(cls):
  44. """
  45. get official accounts and its ghid, fans, and account_type
  46. :return:
  47. """
  48. sql = f"""
  49. SELECT t1.`name`,
  50. t1.gh_id,
  51. t1.follower_count,
  52. t3.account_type,
  53. t3.account_source_name,
  54. t3.mode_type,
  55. t3.status
  56. FROM `publish_account` t1
  57. JOIN wx_statistics_group_source_account t2
  58. on t1.id = t2.account_id
  59. JOIN wx_statistics_group_source t3
  60. on t2.group_source_name = t3.account_source_name;
  61. """
  62. response = cls.deNetClient.select(sql)
  63. log(
  64. task="updateAccountAvgDaily",
  65. function="getAccountList",
  66. message="获取账号成功,一共获取: {} 个账号".format(len(response))
  67. )
  68. L = []
  69. for item in response:
  70. temp = {
  71. "accountName": item[0],
  72. "ghId": item[1],
  73. "fans": item[2],
  74. "accountType": item[3],
  75. "accountSource": item[4],
  76. "accountMode": item[5],
  77. "accountStatus": item[6]
  78. }
  79. if temp["accountName"] in ['口琴', '二胡']:
  80. continue
  81. elif temp["accountType"] == '服务号':
  82. continue
  83. else:
  84. L.append(temp)
  85. log(
  86. task="updateAccountAvgDaily",
  87. function="getAccountList",
  88. message="过滤账号成功,过滤后一共获取: {} 个账号".format(len(L))
  89. )
  90. return L
  91. @classmethod
  92. def insertIntoMysql(cls, data):
  93. """
  94. 将数据插入到 Mysql 中
  95. :param data:
  96. :return:
  97. """
  98. sql = f"""
  99. INSERT INTO account_avg_info_v2
  100. (gh_id, position, account_name, fans, read_avg, like_avg, update_time, status, account_type, account_mode, account_source, account_status)
  101. values
  102. (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s);
  103. """
  104. try:
  105. cls.pqClient.update(
  106. sql=sql,
  107. params=(
  108. data['gh_id'],
  109. data['position'],
  110. data['account_name'],
  111. data['fans'],
  112. data['avg_read'],
  113. data['avg_like'],
  114. data['update_time'],
  115. 1,
  116. data['account_type'],
  117. data['account_mode'],
  118. data['account_source'],
  119. data['account_status']
  120. )
  121. )
  122. log(
  123. task="updateAccountAvgDaily",
  124. function="insertIntoMysql",
  125. message="数据插入成功",
  126. data=data
  127. )
  128. except Exception as e:
  129. log(
  130. task="updateAccountAvgDaily",
  131. function="insertIntoMysql",
  132. message="数据插入失败, 失败原因是: {}".format(e),
  133. status="fail",
  134. data=data
  135. )
  136. @classmethod
  137. def getAllAvgRead(cls):
  138. """
  139. :return:
  140. """
  141. L = []
  142. record_list = cls.getAccountList()
  143. dt_str = datetime.today().__str__().split(" ")[0]
  144. for item in tqdm(record_list):
  145. index_list = [i for i in range(1, 9)]
  146. for index in index_list:
  147. try:
  148. account_name = item['accountName']
  149. avg_read, avg_like = cls.getArticleByFilter(
  150. account_name=account_name,
  151. index=index,
  152. min_time=int(time.time()) - 31 * 24 * 3600,
  153. max_time=int(time.time())
  154. )
  155. obj = {
  156. "account_name": account_name,
  157. "gh_id": item['ghId'],
  158. "fans": item.get('fans', 0),
  159. "position": index,
  160. "avg_read": avg_read if str(avg_read) != "nan" else 0,
  161. "avg_like": avg_like if str(avg_like) != "nan" else 0,
  162. "update_time": dt_str,
  163. "account_type": item['accountType'],
  164. "account_mode": item['accountMode'],
  165. "account_source": item['accountSource'],
  166. "account_status": item['accountStatus']
  167. }
  168. cls.insertIntoMysql(obj)
  169. L.append(obj)
  170. except Exception as e:
  171. log(
  172. task="updateAccountAvgDaily",
  173. function="getAllAvgRead",
  174. status="fail",
  175. message="更新单个账号单个位置的账号均值失败, 失败原因是: {}".format(e)
  176. )
  177. with open("new_account_avg_v3.json", "w", encoding="utf-8") as f:
  178. f.write(json.dumps(L, ensure_ascii=False, indent=4))
  179. log(
  180. task="updateAccountAvgDaily",
  181. function="getAllAvgRead",
  182. message="账号均值数据写入文件成功"
  183. )
  184. update_sql = f"""
  185. UPDATE account_avg_info_v2
  186. SET status = %s
  187. where update_time != '{dt_str}';
  188. """
  189. try:
  190. cls.pqClient.update(sql=update_sql, params=0)
  191. log(
  192. task="updateAccountAvgDaily",
  193. function="getAllAvgRead",
  194. message="修改非当日数据状态为 0 成功"
  195. )
  196. except Exception as e:
  197. bot(
  198. title="账号均值表,更新非当日数据状态失败",
  199. detail={
  200. "task": "updateAccountAvgDaily"
  201. }
  202. )
  203. log(
  204. task="updateAccountAvgDaily",
  205. function="getAllAvgRead",
  206. status="fail",
  207. message="修改非当日数据状态为 0 失败, 报错为 {}".format(e)
  208. )
  209. @classmethod
  210. def getEachAvgRead(cls, account_name, index):
  211. """
  212. :return:
  213. """
  214. keys = [
  215. "appMsgId",
  216. "title",
  217. "Type",
  218. "updateTime",
  219. "ItemIndex",
  220. "ContentUrl",
  221. "show_view_count",
  222. "show_like_count",
  223. ]
  224. sql = f"""
  225. SELECT {", ".join(keys)}
  226. FROM official_articles_v2
  227. WHERE accountName = '{account_name}' and ItemIndex = {index};"""
  228. result = cls.pqClient.select(sql=sql)
  229. return DataFrame(result, columns=keys)
  230. @classmethod
  231. def getArticleByFilter(
  232. cls,
  233. account_name,
  234. index,
  235. min_time=None,
  236. max_time=None,
  237. msg_type=None,
  238. ):
  239. """
  240. :param account_name:
  241. :param index: index ranges from 1 to 8
  242. :param min_time: earliest time
  243. :param max_time: latest time
  244. :param msg_type: msg_type
  245. :return:
  246. """
  247. if not msg_type:
  248. msg_type = "9"
  249. if not min_time:
  250. min_time = 0
  251. if not max_time:
  252. # 2099年
  253. max_time = 4088051123
  254. articleDataFrame = cls.getEachAvgRead(account_name=account_name, index=index)
  255. filterDataFrame = articleDataFrame[
  256. (articleDataFrame["Type"] == msg_type)
  257. & (min_time <= articleDataFrame["updateTime"])
  258. & (articleDataFrame["updateTime"] <= max_time)
  259. ]
  260. # 过滤异常值
  261. finalDF = filter_outlier_data(filterDataFrame)
  262. return finalDF['show_view_count'].mean(), finalDF['show_like_count'].mean()
  263. def updateAvgJob():
  264. """
  265. :return:
  266. """
  267. S = UpdateAvgDaily()
  268. S.getAllAvgRead()
  269. if __name__ == "__main__":
  270. schedule.every().day.at("22:00").do(Functions().job_with_thread, updateAvgJob)
  271. while True:
  272. schedule.run_pending()
  273. time.sleep(1)
  274. # log(
  275. # task="updateAccountAvgDaily",
  276. # function="main",
  277. # message="更新账号均值任务正常执行"
  278. # )