""" @author: luojunhui """ import asyncio import aiomysql from pandas import DataFrame from datetime import datetime class TaskMySQLClient(object): """ Async MySQL """ def __init__(self): self.mysql_pool = None async def init_pool(self): """ 初始化连接 :return: """ self.mysql_pool = await aiomysql.create_pool( host="rm-bp1159bu17li9hi94.mysql.rds.aliyuncs.com", port=3306, user="crawler", password="crawler123456@", db="piaoquan-crawler", charset="utf8mb4", connect_timeout=120, ) print("mysql init successfully") async def close_pool(self): """ 关闭 mysql 连接 :return: """ self.mysql_pool.close() await self.mysql_pool.wait_closed() async def async_select(self, sql): """ select method :param sql: :return: """ async with self.mysql_pool.acquire() as conn: async with conn.cursor() as cursor: await cursor.execute(sql) result = await cursor.fetchall() return result async def async_insert(self, sql, params): """ insert and update method :param params: :param sql: :return: """ async with self.mysql_pool.acquire() as coon: async with coon.cursor() as cursor: await cursor.execute(sql, params) await coon.commit() class AccountAvgInfo: def __init__(self, gh_id, position, update_time, account_name, fans, read_avg, like_avg, status, account_type, account_mode, account_source, account_status, business_type, read_rate_avg): self.gh_id = gh_id self.position = position self.update_time = update_time self.account_name = account_name self.fans = fans self.read_avg = read_avg self.like_avg = like_avg self.status = status self.account_type = account_type self.account_mode = account_mode self.account_source = account_source self.account_status = account_status self.business_type = business_type self.read_rate_avg = read_rate_avg def __repr__(self): return f"" class ArticleDBTools(object): """ 长文数据库相关功能 """ def __init__(self, mysql_client): """ init mysql :param mysql_client: """ self.mysql_client = mysql_client async def getAccountAvgInfo(self, account_name): """ 获取单个账号历史均值 """ keys = [ "gh_id", "position", "update_time", "account_name", "fans", "read_avg", "like_avg", "status", "account_type", "account_mode", "account_source", "account_status", "business_type", "read_rate_avg" ] sql = f""" SELECT {", ".join(keys)} FROM account_avg_info_v3 WHERE account_name = '{account_name}' and position = 1;""" result = await self.mysql_client.async_select(sql=sql) account_avg_info_list = [AccountAvgInfo(*row) for row in result] if result else [] return account_avg_info_list async def get_account_avg_info(self, account_avg_info_map, available_dates, timestamp): target_date = datetime.fromtimestamp(timestamp).date() # 尝试获取指定日期 info = account_avg_info_map.get(target_date.isoformat()) if info is not None: return info # 如果指定日期不存在,寻找最近日期 closest_date = None for date in available_dates: if closest_date is None: closest_date = date continue days = abs((datetime.fromisoformat(date).date() - target_date).days) closest_days = abs((datetime.fromisoformat(closest_date).date() - target_date).days) if days < closest_days: closest_date = date elif days > closest_days: break return account_avg_info_map.get(closest_date) if closest_date else None async def getSingleAccountArticles(self, account_name): """ 获取单个账号的历史文章 :param gh_id: :return: appMsgId, title, Type, updateTime, ItemIndex, ContentUrl, show_view_count, show_like_count """ keys = [ "appMsgId", "title", "Type", "updateTime", "ItemIndex", "ContentUrl", "show_view_count", "show_like_count", ] sql = f""" SELECT {", ".join(keys)} FROM official_articles_v2 WHERE accountName = '{account_name}';""" result = await self.mysql_client.async_select(sql=sql) return DataFrame(result, columns=keys) async def getArticleByFilter( self, account_name, view_count_filter=None, index_list=None, min_time=None, max_time=None, msg_type=None, ): """ :param account_name: :param index_list: index ranges from 1 to 8 :param min_time: earliest time :param max_time: latest time :param msg_type: msg_type :return: """ if not index_list: index_list = [1] if not msg_type: msg_type = "9" if not min_time: min_time = 0 if not max_time: # 2099年 max_time = 4088051123 articleDataFrame = await self.getSingleAccountArticles(account_name=account_name) filterDataFrame = articleDataFrame[ (articleDataFrame["Type"] == msg_type) & (min_time < articleDataFrame["updateTime"]) & (articleDataFrame["updateTime"] < max_time) & (articleDataFrame["ItemIndex"].isin(index_list)) ] if view_count_filter: filterDataFrame = filterDataFrame[(articleDataFrame["show_view_count"] > view_count_filter)] return filterDataFrame async def get_good_bad_articles(self, account_name, method, view_count_filter, rate=0.1, index_list=None, min_time=None, max_time=None, msg_type=None ): """ 获取质量好和不好的视频 :return: """ article_data_frame = await self.getArticleByFilter( account_name=account_name, view_count_filter=view_count_filter, index_list=index_list, min_time=min_time, max_time=max_time, msg_type=msg_type ) df_rows = len(article_data_frame) if df_rows > 0: match method: case "top": sorted_df = article_data_frame.sort_values(by='show_view_count', reversed=True) topn = max(int(df_rows * rate), 1) top_df = sorted_df.head(topn) tail_df = sorted_df.tail(topn) return top_df, tail_df case "avg": avg_view = article_data_frame['show_view_count'].mean() good_df = article_data_frame[(article_data_frame['show_view_count']) > avg_view * (1.0 + rate)] bad_df = article_data_frame[(article_data_frame['show_view_count']) > avg_view * (1.0 - rate)] return good_df, bad_df case "account_avg": account_read_avg_list = await self.getAccountAvgInfo( account_name=account_name ) account_avg_info_map = {info.update_time: info for info in account_read_avg_list} # 获取所有可用日期并排序 available_dates = sorted(account_avg_info_map.keys()) view_count_avg_list = [] for index, row in article_data_frame.iterrows(): update_time = row['updateTime'] info = await self.get_account_avg_info(account_avg_info_map, available_dates, update_time) view_count_avg_list.append(info.read_avg) article_data_frame['view_count_avg'] = view_count_avg_list good_df = article_data_frame[(article_data_frame['show_view_count']) > (article_data_frame['view_count_avg']) * (1.0 + rate)] bad_df = article_data_frame[(article_data_frame['show_view_count']) > (article_data_frame['view_count_avg']) * (1.0 - rate)] return good_df, bad_df else: return None, None