123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156 |
- """
- @author: luojunhui
- """
- import json
- import threading
- from datetime import datetime
- from pandas import DataFrame
- from tqdm import tqdm
- from applications import PQMySQL
- def filter_outlier_data(group, key='show_view_count'):
- """
- :param group:
- :param key:
- :return:
- """
- mean = group[key].mean()
- std = group[key].std()
- # 过滤二倍标准差的数据
- filtered_group = group[(group[key] > mean - 2 * std) & (group[key] < mean + 2 * std)]
- # 过滤均值倍数大于5的数据
- new_mean = filtered_group[key].mean()
- filtered_group = filtered_group[filtered_group[key] < new_mean * 5]
- return filtered_group
- class UpdateMsgDaily(object):
- """
- 日常更新文章
- """
- db_client = PQMySQL()
- with open("config/accountInfoV2.json", encoding="utf-8") as f:
- account_list = json.loads(f.read())
- # subscription_account = [i for i in account_list if i['type'] == '订阅号']
- # server_account = [i for i in account_list if i['type'] == '服务号']
- @classmethod
- def getAccountIdDict(cls):
- """
- 获取全部内部账号的id
- :return:
- """
- gh_id_dict = {}
- for line in cls.account_list:
- gh_id = line['gh_id']
- gh_id_dict[gh_id] = line
- return gh_id_dict
- @classmethod
- def getAllAvgRead(cls):
- """
- :return:
- """
- L = []
- record_list = cls.account_list
- for item in tqdm(record_list):
- index_list = [i for i in range(1, 9)]
- for index in index_list:
- account_name = item['name']
- print(account_name, index)
- avg_read, avg_like = cls.getArticleByFilter(
- account_name=account_name,
- index=index,
- min_time=1716480000,
- max_time=1721836800
- )
- obj = {
- "account_name": account_name,
- "gh_id": item['ghId'],
- "fans": item.get('follower_count', 0),
- "position": index,
- "avg_read": avg_read,
- "avg_like": avg_like
- }
- L.append(obj)
- with open("new_account_avg_v2.json", "w", encoding="utf-8") as f:
- f.write(json.dumps(L, ensure_ascii=False, indent=4))
- @classmethod
- def getEachAvgRead(cls, account_name, index):
- """
- :return:
- """
- keys = [
- "appMsgId",
- "title",
- "Type",
- "updateTime",
- "ItemIndex",
- "ContentUrl",
- "show_view_count",
- "show_like_count",
- ]
- sql = f"""
- SELECT {", ".join(keys)}
- FROM official_articles_v2
- WHERE accountName = '{account_name}' and ItemIndex = {index};"""
- result = cls.db_client.select(sql=sql)
- return DataFrame(result, columns=keys)
- @classmethod
- def getArticleByFilter(
- cls,
- account_name,
- index,
- min_time=None,
- max_time=None,
- msg_type=None,
- ):
- """
- :param account_name:
- :param index: index ranges from 1 to 8
- :param min_time: earliest time
- :param max_time: latest time
- :param msg_type: msg_type
- :return:
- """
- if not msg_type:
- msg_type = "9"
- if not min_time:
- min_time = 0
- if not max_time:
- # 2099年
- max_time = 4088051123
- articleDataFrame = cls.getEachAvgRead(account_name=account_name, index=index)
- filterDataFrame = articleDataFrame[
- (articleDataFrame["Type"] == msg_type)
- & (min_time <= articleDataFrame["updateTime"])
- & (articleDataFrame["updateTime"] <= max_time)
- ]
- # 过滤异常值
- finalDF = filter_outlier_data(filterDataFrame)
- return finalDF['show_view_count'].mean(), finalDF['show_like_count'].mean()
- def job_with_thread(job_func):
- """
- 每个任务放到单个线程中
- :param job_func:
- :return:
- """
- job_thread = threading.Thread(target=job_func)
- job_thread.start()
- if __name__ == "__main__":
- UMD = UpdateMsgDaily()
- UMD.getAllAvgRead()
|