123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302 |
- """
- @author: luojunhui
- CREATE TABLE `account_avg_info_v2` (
- `gh_id` varchar(32) NOT NULL COMMENT 'ghid',
- `position` int(11) NOT NULL COMMENT '位置',
- `account_name` varchar(255) DEFAULT NULL COMMENT '账号名称',
- `fans` int(10) DEFAULT NULL COMMENT '粉丝量',
- `read_avg` double(8,2) DEFAULT NULL COMMENT '阅读均值',
- `like_avg` double(8,2) DEFAULT NULL COMMENT '点赞均值',
- `update_time` datetime DEFAULT NULL COMMENT '更新时间 dt',
- `status` int(1) DEFAULT NULL COMMENT ' 状态',
- PRIMARY KEY (`gh_id`,`position`)
- ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 AVG_ROW_LENGTH=202 ROW_FORMAT=DYNAMIC
- """
- import json
- import time
- import schedule
- from datetime import datetime
- from pandas import DataFrame
- from tqdm import tqdm
- from applications import PQMySQL, DeNetMysql, Functions, log, bot
- def filter_outlier_data(group, key='show_view_count'):
- """
- :param group:
- :param key:
- :return:
- """
- mean = group[key].mean()
- std = group[key].std()
- # 过滤二倍标准差的数据
- filtered_group = group[(group[key] > mean - 2 * std) & (group[key] < mean + 2 * std)]
- # 过滤均值倍数大于5的数据
- new_mean = filtered_group[key].mean()
- filtered_group = filtered_group[filtered_group[key] < new_mean * 5]
- return filtered_group
- class UpdateAvgDaily(object):
- """
- 日常更新文章
- """
- pqClient = PQMySQL()
- deNetClient = DeNetMysql()
- @classmethod
- def getAccountList(cls):
- """
- get official accounts and its ghid, fans, and account_type
- :return:
- """
- sql = f"""
- SELECT t1.`name`,
- t1.gh_id,
- t1.follower_count,
- t3.account_type,
- t3.account_source_name,
- t3.mode_type,
- t3.status
- FROM `publish_account` t1
- JOIN wx_statistics_group_source_account t2
- on t1.id = t2.account_id
- JOIN wx_statistics_group_source t3
- on t2.group_source_name = t3.account_source_name;
- """
- response = cls.deNetClient.select(sql)
- log(
- task="updateAccountAvgDaily",
- function="getAccountList",
- message="获取账号成功,一共获取: {} 个账号".format(len(response))
- )
- L = []
- for item in response:
- temp = {
- "accountName": item[0],
- "ghId": item[1],
- "fans": item[2],
- "accountType": item[3],
- "accountSource": item[4],
- "accountMode": item[5],
- "accountStatus": item[6]
- }
- if temp["accountName"] in ['口琴', '二胡']:
- continue
- elif temp["accountType"] == '服务号':
- continue
- else:
- L.append(temp)
- log(
- task="updateAccountAvgDaily",
- function="getAccountList",
- message="过滤账号成功,过滤后一共获取: {} 个账号".format(len(L))
- )
- return L
- @classmethod
- def insertIntoMysql(cls, data):
- """
- 将数据插入到 Mysql 中
- :param data:
- :return:
- """
- sql = f"""
- INSERT INTO account_avg_info_v2
- (gh_id, position, account_name, fans, read_avg, like_avg, update_time, status, account_type, account_mode, account_source, account_status)
- values
- (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s);
- """
- try:
- cls.pqClient.update(
- sql=sql,
- params=(
- data['gh_id'],
- data['position'],
- data['account_name'],
- data['fans'],
- data['avg_read'],
- data['avg_like'],
- data['update_time'],
- 1,
- data['account_type'],
- data['account_mode'],
- data['account_source'],
- data['account_status']
- )
- )
- log(
- task="updateAccountAvgDaily",
- function="insertIntoMysql",
- message="数据插入成功",
- data=data
- )
- except Exception as e:
- log(
- task="updateAccountAvgDaily",
- function="insertIntoMysql",
- message="数据插入失败, 失败原因是: {}".format(e),
- status="fail",
- data=data
- )
- @classmethod
- def getAllAvgRead(cls):
- """
- :return:
- """
- L = []
- record_list = cls.getAccountList()
- dt_str = datetime.today().__str__().split(" ")[0]
- for item in tqdm(record_list):
- index_list = [i for i in range(1, 9)]
- for index in index_list:
- try:
- account_name = item['accountName']
- avg_read, avg_like = cls.getArticleByFilter(
- account_name=account_name,
- index=index,
- min_time=int(time.time()) - 31 * 24 * 3600,
- max_time=int(time.time())
- )
- obj = {
- "account_name": account_name,
- "gh_id": item['ghId'],
- "fans": item.get('fans', 0),
- "position": index,
- "avg_read": avg_read if str(avg_read) != "nan" else 0,
- "avg_like": avg_like if str(avg_like) != "nan" else 0,
- "update_time": dt_str,
- "account_type": item['accountType'],
- "account_mode": item['accountMode'],
- "account_source": item['accountSource'],
- "account_status": item['accountStatus']
- }
- cls.insertIntoMysql(obj)
- L.append(obj)
- except Exception as e:
- log(
- task="updateAccountAvgDaily",
- function="getAllAvgRead",
- status="fail",
- message="更新单个账号单个位置的账号均值失败, 失败原因是: {}".format(e)
- )
- with open("new_account_avg_v3.json", "w", encoding="utf-8") as f:
- f.write(json.dumps(L, ensure_ascii=False, indent=4))
- log(
- task="updateAccountAvgDaily",
- function="getAllAvgRead",
- message="账号均值数据写入文件成功"
- )
- update_sql = f"""
- UPDATE account_avg_info_v2
- SET status = %s
- where update_time != '{dt_str}';
- """
- try:
- cls.pqClient.update(sql=update_sql, params=0)
- log(
- task="updateAccountAvgDaily",
- function="getAllAvgRead",
- message="修改非当日数据状态为 0 成功"
- )
- except Exception as e:
- bot(
- title="账号均值表,更新非当日数据状态失败",
- detail={
- "task": "updateAccountAvgDaily"
- }
- )
- log(
- task="updateAccountAvgDaily",
- function="getAllAvgRead",
- status="fail",
- message="修改非当日数据状态为 0 失败, 报错为 {}".format(e)
- )
- @classmethod
- def getEachAvgRead(cls, account_name, index):
- """
- :return:
- """
- keys = [
- "appMsgId",
- "title",
- "Type",
- "updateTime",
- "ItemIndex",
- "ContentUrl",
- "show_view_count",
- "show_like_count",
- ]
- sql = f"""
- SELECT {", ".join(keys)}
- FROM official_articles_v2
- WHERE accountName = '{account_name}' and ItemIndex = {index};"""
- result = cls.pqClient.select(sql=sql)
- return DataFrame(result, columns=keys)
- @classmethod
- def getArticleByFilter(
- cls,
- account_name,
- index,
- min_time=None,
- max_time=None,
- msg_type=None,
- ):
- """
- :param account_name:
- :param index: index ranges from 1 to 8
- :param min_time: earliest time
- :param max_time: latest time
- :param msg_type: msg_type
- :return:
- """
- if not msg_type:
- msg_type = "9"
- if not min_time:
- min_time = 0
- if not max_time:
- # 2099年
- max_time = 4088051123
- articleDataFrame = cls.getEachAvgRead(account_name=account_name, index=index)
- filterDataFrame = articleDataFrame[
- (articleDataFrame["Type"] == msg_type)
- & (min_time <= articleDataFrame["updateTime"])
- & (articleDataFrame["updateTime"] <= max_time)
- ]
- # 过滤异常值
- finalDF = filter_outlier_data(filterDataFrame)
- return finalDF['show_view_count'].mean(), finalDF['show_like_count'].mean()
- def updateAvgJob():
- """
- :return:
- """
- S = UpdateAvgDaily()
- S.getAllAvgRead()
- if __name__ == "__main__":
- schedule.every().day.at("22:00").do(Functions().job_with_thread, updateAvgJob)
- while True:
- schedule.run_pending()
- time.sleep(1)
- # log(
- # task="updateAccountAvgDaily",
- # function="main",
- # message="更新账号均值任务正常执行"
- # )
|