| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302 | 
							- """
 
- @author: luojunhui
 
- CREATE TABLE `account_avg_info_v2` (
 
-   `gh_id` varchar(32) NOT NULL COMMENT 'ghid',
 
-   `position` int(11) NOT NULL COMMENT '位置',
 
-   `account_name` varchar(255) DEFAULT NULL COMMENT '账号名称',
 
-   `fans` int(10) DEFAULT NULL COMMENT '粉丝量',
 
-   `read_avg` double(8,2) DEFAULT NULL COMMENT '阅读均值',
 
-   `like_avg` double(8,2) DEFAULT NULL COMMENT '点赞均值',
 
-   `update_time` datetime DEFAULT NULL COMMENT '更新时间 dt',
 
-   `status` int(1) DEFAULT NULL COMMENT ' 状态',
 
-   PRIMARY KEY (`gh_id`,`position`)
 
- ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 AVG_ROW_LENGTH=202 ROW_FORMAT=DYNAMIC
 
- """
 
- import json
 
- import time
 
- import schedule
 
- from datetime import datetime
 
- from pandas import DataFrame
 
- from tqdm import tqdm
 
- from applications import PQMySQL, DeNetMysql, Functions, log, bot
 
- def filter_outlier_data(group, key='show_view_count'):
 
-     """
 
-     :param group:
 
-     :param key:
 
-     :return:
 
-     """
 
-     mean = group[key].mean()
 
-     std = group[key].std()
 
-     # 过滤二倍标准差的数据
 
-     filtered_group = group[(group[key] > mean - 2 * std) & (group[key] < mean + 2 * std)]
 
-     # 过滤均值倍数大于5的数据
 
-     new_mean = filtered_group[key].mean()
 
-     filtered_group = filtered_group[filtered_group[key] < new_mean * 5]
 
-     return filtered_group
 
- class UpdateAvgDaily(object):
 
-     """
 
-     日常更新文章
 
-     """
 
-     pqClient = PQMySQL()
 
-     deNetClient = DeNetMysql()
 
-     @classmethod
 
-     def getAccountList(cls):
 
-         """
 
-         get official accounts and its ghid, fans, and account_type
 
-         :return:
 
-         """
 
-         sql = f"""
 
-         SELECT  t1.`name`,
 
-                 t1.gh_id, 
 
-                 t1.follower_count, 
 
-                 t3.account_type, 
 
-                 t3.account_source_name, 
 
-                 t3.mode_type, 
 
-                 t3.status
 
-         FROM `publish_account` t1
 
-         JOIN wx_statistics_group_source_account t2
 
-             on t1.id = t2.account_id
 
-         JOIN wx_statistics_group_source t3
 
-             on t2.group_source_name = t3.account_source_name;
 
-         """
 
-         response = cls.deNetClient.select(sql)
 
-         log(
 
-             task="updateAccountAvgDaily",
 
-             function="getAccountList",
 
-             message="获取账号成功,一共获取: {} 个账号".format(len(response))
 
-         )
 
-         L = []
 
-         for item in response:
 
-             temp = {
 
-                 "accountName": item[0],
 
-                 "ghId": item[1],
 
-                 "fans": item[2],
 
-                 "accountType": item[3],
 
-                 "accountSource": item[4],
 
-                 "accountMode": item[5],
 
-                 "accountStatus": item[6]
 
-             }
 
-             if temp["accountName"] in ['口琴', '二胡']:
 
-                 continue
 
-             elif temp["accountType"] == '服务号':
 
-                 continue
 
-             else:
 
-                 L.append(temp)
 
-         log(
 
-             task="updateAccountAvgDaily",
 
-             function="getAccountList",
 
-             message="过滤账号成功,过滤后一共获取: {} 个账号".format(len(L))
 
-         )
 
-         return L
 
-     @classmethod
 
-     def insertIntoMysql(cls, data):
 
-         """
 
-         将数据插入到 Mysql 中
 
-         :param data:
 
-         :return:
 
-         """
 
-         sql = f"""
 
-         INSERT INTO account_avg_info_v2
 
-         (gh_id, position, account_name, fans, read_avg, like_avg, update_time, status, account_type, account_mode, account_source, account_status)
 
-         values 
 
-         (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s);
 
-         """
 
-         try:
 
-             cls.pqClient.update(
 
-                 sql=sql,
 
-                 params=(
 
-                     data['gh_id'],
 
-                     data['position'],
 
-                     data['account_name'],
 
-                     data['fans'],
 
-                     data['avg_read'],
 
-                     data['avg_like'],
 
-                     data['update_time'],
 
-                     1,
 
-                     data['account_type'],
 
-                     data['account_mode'],
 
-                     data['account_source'],
 
-                     data['account_status']
 
-                 )
 
-             )
 
-             log(
 
-                 task="updateAccountAvgDaily",
 
-                 function="insertIntoMysql",
 
-                 message="数据插入成功",
 
-                 data=data
 
-             )
 
-         except Exception as e:
 
-             log(
 
-                 task="updateAccountAvgDaily",
 
-                 function="insertIntoMysql",
 
-                 message="数据插入失败, 失败原因是: {}".format(e),
 
-                 status="fail",
 
-                 data=data
 
-             )
 
-     @classmethod
 
-     def getAllAvgRead(cls):
 
-         """
 
-         :return:
 
-         """
 
-         L = []
 
-         record_list = cls.getAccountList()
 
-         dt_str = datetime.today().__str__().split(" ")[0]
 
-         for item in tqdm(record_list):
 
-             index_list = [i for i in range(1, 9)]
 
-             for index in index_list:
 
-                 try:
 
-                     account_name = item['accountName']
 
-                     avg_read, avg_like = cls.getArticleByFilter(
 
-                         account_name=account_name,
 
-                         index=index,
 
-                         min_time=int(time.time()) - 31 * 24 * 3600,
 
-                         max_time=int(time.time())
 
-                     )
 
-                     obj = {
 
-                         "account_name": account_name,
 
-                         "gh_id": item['ghId'],
 
-                         "fans": item.get('fans', 0),
 
-                         "position": index,
 
-                         "avg_read": avg_read if str(avg_read) != "nan" else 0,
 
-                         "avg_like": avg_like if str(avg_like) != "nan" else 0,
 
-                         "update_time": dt_str,
 
-                         "account_type": item['accountType'],
 
-                         "account_mode": item['accountMode'],
 
-                         "account_source": item['accountSource'],
 
-                         "account_status": item['accountStatus']
 
-                     }
 
-                     cls.insertIntoMysql(obj)
 
-                     L.append(obj)
 
-                 except Exception as e:
 
-                     log(
 
-                         task="updateAccountAvgDaily",
 
-                         function="getAllAvgRead",
 
-                         status="fail",
 
-                         message="更新单个账号单个位置的账号均值失败, 失败原因是: {}".format(e)
 
-                     )
 
-         with open("new_account_avg_v3.json", "w", encoding="utf-8") as f:
 
-             f.write(json.dumps(L, ensure_ascii=False, indent=4))
 
-         log(
 
-             task="updateAccountAvgDaily",
 
-             function="getAllAvgRead",
 
-             message="账号均值数据写入文件成功"
 
-         )
 
-         update_sql = f"""
 
-         UPDATE account_avg_info_v2
 
-         SET status = %s
 
-         where update_time != '{dt_str}';
 
-         """
 
-         try:
 
-             cls.pqClient.update(sql=update_sql, params=0)
 
-             log(
 
-                 task="updateAccountAvgDaily",
 
-                 function="getAllAvgRead",
 
-                 message="修改非当日数据状态为 0 成功"
 
-             )
 
-         except Exception as e:
 
-             bot(
 
-                 title="账号均值表,更新非当日数据状态失败",
 
-                 detail={
 
-                     "task": "updateAccountAvgDaily"
 
-                 }
 
-             )
 
-             log(
 
-                 task="updateAccountAvgDaily",
 
-                 function="getAllAvgRead",
 
-                 status="fail",
 
-                 message="修改非当日数据状态为 0 失败, 报错为 {}".format(e)
 
-             )
 
-     @classmethod
 
-     def getEachAvgRead(cls, account_name, index):
 
-         """
 
-         :return:
 
-         """
 
-         keys = [
 
-             "appMsgId",
 
-             "title",
 
-             "Type",
 
-             "updateTime",
 
-             "ItemIndex",
 
-             "ContentUrl",
 
-             "show_view_count",
 
-             "show_like_count",
 
-         ]
 
-         sql = f"""
 
-                     SELECT {", ".join(keys)}
 
-                     FROM official_articles_v2
 
-                     WHERE accountName = '{account_name}' and ItemIndex = {index};"""
 
-         result = cls.pqClient.select(sql=sql)
 
-         return DataFrame(result, columns=keys)
 
-     @classmethod
 
-     def getArticleByFilter(
 
-             cls,
 
-             account_name,
 
-             index,
 
-             min_time=None,
 
-             max_time=None,
 
-             msg_type=None,
 
-     ):
 
-         """
 
-         :param account_name:
 
-         :param index: index ranges from 1 to 8
 
-         :param min_time: earliest time
 
-         :param max_time: latest time
 
-         :param msg_type: msg_type
 
-         :return:
 
-         """
 
-         if not msg_type:
 
-             msg_type = "9"
 
-         if not min_time:
 
-             min_time = 0
 
-         if not max_time:
 
-             # 2099年
 
-             max_time = 4088051123
 
-         articleDataFrame = cls.getEachAvgRead(account_name=account_name, index=index)
 
-         filterDataFrame = articleDataFrame[
 
-             (articleDataFrame["Type"] == msg_type)
 
-             & (min_time <= articleDataFrame["updateTime"])
 
-             & (articleDataFrame["updateTime"] <= max_time)
 
-             ]
 
-         # 过滤异常值
 
-         finalDF = filter_outlier_data(filterDataFrame)
 
-         return finalDF['show_view_count'].mean(), finalDF['show_like_count'].mean()
 
- def updateAvgJob():
 
-     """
 
-     :return:
 
-     """
 
-     S = UpdateAvgDaily()
 
-     S.getAllAvgRead()
 
- if __name__ == "__main__":
 
-     schedule.every().day.at("22:00").do(Functions().job_with_thread, updateAvgJob)
 
-     while True:
 
-         schedule.run_pending()
 
-         time.sleep(1)
 
-         # log(
 
-         #     task="updateAccountAvgDaily",
 
-         #     function="main",
 
-         #     message="更新账号均值任务正常执行"
 
-         # )
 
 
  |