""" @author: luojunhui """ import asyncio import aiomysql from pandas import DataFrame class TaskMySQLClient(object): """ Async MySQL """ def __init__(self): self.mysql_pool = None async def init_pool(self): """ 初始化连接 :return: """ self.mysql_pool = await aiomysql.create_pool( host="rm-bp1159bu17li9hi94.mysql.rds.aliyuncs.com", port=3306, user="crawler", password="crawler123456@", db="piaoquan-crawler", charset="utf8mb4", connect_timeout=120, ) print("mysql init successfully") async def close_pool(self): """ 关闭 mysql 连接 :return: """ self.mysql_pool.close() await self.mysql_pool.wait_closed() async def async_select(self, sql): """ select method :param sql: :return: """ async with self.mysql_pool.acquire() as conn: async with conn.cursor() as cursor: await cursor.execute(sql) result = await cursor.fetchall() return result async def async_insert(self, sql, params): """ insert and update method :param params: :param sql: :return: """ async with self.mysql_pool.acquire() as coon: async with coon.cursor() as cursor: await cursor.execute(sql, params) await coon.commit() class ArticleDBTools(object): """ 长文数据库相关功能 """ def __init__(self, mysql_client): """ init mysql :param mysql_client: """ self.mysql_client = mysql_client async def getSingleAccountArticles(self, account_name): """ 获取单个账号的历史文章 :param gh_id: :return: appMsgId, title, Type, updateTime, ItemIndex, ContentUrl, show_view_count, show_like_count """ keys = [ "appMsgId", "title", "Type", "updateTime", "ItemIndex", "ContentUrl", "show_view_count", "show_like_count", ] sql = f""" SELECT {", ".join(keys)} FROM official_articles WHERE accountName = '{account_name}';""" result = await self.mysql_client.async_select(sql=sql) return DataFrame(result, columns=keys) async def getArticleByFilter( self, account_name, index_list=None, min_time=None, max_time=None, msg_type=None, ): """ :param account_name: :param index_list: index ranges from 1 to 8 :param min_time: earliest time :param max_time: latest time :param msg_type: msg_type :return: """ if not index_list: index_list = [1] if not msg_type: msg_type = "9" if not min_time: min_time = 0 if not max_time: # 2099年 max_time = 4088051123 articleDataFrame = await self.getSingleAccountArticles(account_name=account_name) filterDataFrame = articleDataFrame[ (articleDataFrame["Type"] == msg_type) & (min_time < articleDataFrame["updateTime"]) & (articleDataFrame["updateTime"] < max_time) & (articleDataFrame["ItemIndex"].isin(index_list)) ] return filterDataFrame async def get_good_bad_articles(self, account_name, method, rate=0.1, index_list=None, min_time=None, max_time=None, msg_type=None ): """ 获取质量好和不好的视频 :return: """ article_data_frame = await self.getArticleByFilter( account_name=account_name, index_list=index_list, min_time=min_time, max_time=max_time, msg_type=msg_type ) df_rows = len(article_data_frame) if df_rows > 0: match method: case "top": sorted_df = article_data_frame.sort_values(by='show_view_count', reversed=True) topn = max(int(df_rows * rate), 1) top_df = sorted_df.head(topn) tail_df = sorted_df.tail(topn) return top_df, tail_df case "avg": avg_view = article_data_frame['show_view_count'].mean() good_df = article_data_frame[(article_data_frame['show_view_count']) > avg_view * (1.0 + rate)] bad_df = article_data_frame[(article_data_frame['show_view_count']) > avg_view * (1.0 - rate)] return good_df, bad_df else: return None, None