123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239 |
- """
- @author: luojunhui
- """
- import asyncio
- import aiomysql
- from pandas import DataFrame
- from datetime import datetime
- class TaskMySQLClient(object):
- """
- Async MySQL
- """
- def __init__(self):
- self.mysql_pool = None
- async def init_pool(self):
- """
- 初始化连接
- :return:
- """
- self.mysql_pool = await aiomysql.create_pool(
- host="rm-bp1159bu17li9hi94.mysql.rds.aliyuncs.com",
- port=3306,
- user="crawler",
- password="crawler123456@",
- db="piaoquan-crawler",
- charset="utf8mb4",
- connect_timeout=120,
- )
- print("mysql init successfully")
- async def close_pool(self):
- """
- 关闭 mysql 连接
- :return:
- """
- self.mysql_pool.close()
- await self.mysql_pool.wait_closed()
- async def async_select(self, sql):
- """
- select method
- :param sql:
- :return:
- """
- async with self.mysql_pool.acquire() as conn:
- async with conn.cursor() as cursor:
- await cursor.execute(sql)
- result = await cursor.fetchall()
- return result
- async def async_insert(self, sql, params):
- """
- insert and update method
- :param params:
- :param sql:
- :return:
- """
- async with self.mysql_pool.acquire() as coon:
- async with coon.cursor() as cursor:
- await cursor.execute(sql, params)
- await coon.commit()
- class ArticleDBTools(object):
- """
- 长文数据库相关功能
- """
- def __init__(self, mysql_client):
- """
- init mysql
- :param mysql_client:
- """
- self.mysql_client = mysql_client
- async def getAccountAvgInfo(self, account_name):
- """
- 获取单个账号历史均值
- """
- keys = [
- "gh_id",
- "position",
- "update_time",
- "account_name",
- "fans",
- "read_avg",
- "like_avg",
- "status",
- "account_type",
- "account_mode",
- "account_source",
- "account_status",
- "business_type",
- "read_rate_avg"
- ]
- sql = f"""
- SELECT {", ".join(keys)}
- FROM account_avg_info_v3
- WHERE account_name = '{account_name}'
- and position = 1;"""
- result = await self.mysql_client.async_select(sql=sql)
- return result
- async def get_account_avg_info(self, account_avg_info_map, timestamp):
- target_date = datetime.fromtimestamp(timestamp).date()
- # 获取所有可用日期并排序
- available_dates = sorted(account_avg_info_map.keys())
- # 尝试获取指定日期
- info = account_avg_info_map.get(target_date.isoformat())
- if info is not None:
- return info
- # 如果指定日期不存在,寻找最近日期
- closest_date = None
- for date in reversed(available_dates):
- if (closest_date is None or abs((datetime.fromisoformat(date).date() - target_date).days) <
- abs((datetime.fromisoformat(closest_date).date() - target_date).days)):
- closest_date = date
- return account_avg_info_map.get(closest_date) if closest_date else None
- async def getSingleAccountArticles(self, account_name):
- """
- 获取单个账号的历史文章
- :param gh_id:
- :return:
- appMsgId, title, Type, updateTime, ItemIndex, ContentUrl, show_view_count, show_like_count
- """
- keys = [
- "appMsgId",
- "title",
- "Type",
- "updateTime",
- "ItemIndex",
- "ContentUrl",
- "show_view_count",
- "show_like_count",
- ]
- sql = f"""
- SELECT {", ".join(keys)}
- FROM official_articles_v2
- WHERE accountName = '{account_name}';"""
- result = await self.mysql_client.async_select(sql=sql)
- return DataFrame(result, columns=keys)
- async def getArticleByFilter(
- self,
- account_name,
- view_count_filter=None,
- index_list=None,
- min_time=None,
- max_time=None,
- msg_type=None,
- ):
- """
- :param account_name:
- :param index_list: index ranges from 1 to 8
- :param min_time: earliest time
- :param max_time: latest time
- :param msg_type: msg_type
- :return:
- """
- if not index_list:
- index_list = [1]
- if not msg_type:
- msg_type = "9"
- if not min_time:
- min_time = 0
- if not max_time:
- # 2099年
- max_time = 4088051123
- articleDataFrame = await self.getSingleAccountArticles(account_name=account_name)
- filterDataFrame = articleDataFrame[
- (articleDataFrame["Type"] == msg_type)
- & (min_time < articleDataFrame["updateTime"])
- & (articleDataFrame["updateTime"] < max_time)
- & (articleDataFrame["ItemIndex"].isin(index_list))
- ]
- if view_count_filter:
- filterDataFrame = filterDataFrame[(articleDataFrame["show_view_count"] > view_count_filter)]
- return filterDataFrame
- async def get_good_bad_articles(self,
- account_name,
- method,
- view_count_filter,
- rate=0.1,
- index_list=None,
- min_time=None,
- max_time=None,
- msg_type=None
- ):
- """
- 获取质量好和不好的视频
- :return:
- """
- article_data_frame = await self.getArticleByFilter(
- account_name=account_name,
- view_count_filter=view_count_filter,
- index_list=index_list,
- min_time=min_time,
- max_time=max_time,
- msg_type=msg_type
- )
- df_rows = len(article_data_frame)
- if df_rows > 0:
- match method:
- case "top":
- sorted_df = article_data_frame.sort_values(by='show_view_count', reversed=True)
- topn = max(int(df_rows * rate), 1)
- top_df = sorted_df.head(topn)
- tail_df = sorted_df.tail(topn)
- return top_df, tail_df
- case "avg":
- avg_view = article_data_frame['show_view_count'].mean()
- good_df = article_data_frame[(article_data_frame['show_view_count']) > avg_view * (1.0 + rate)]
- bad_df = article_data_frame[(article_data_frame['show_view_count']) > avg_view * (1.0 - rate)]
- return good_df, bad_df
- case "account_avg":
- account_read_avg_list = await self.getAccountAvgInfo(
- account_name=account_name
- )
- account_avg_info_map = {info[2]: info for info in account_read_avg_list}
- view_count_avg_list = []
- for index, row in article_data_frame.iterrows():
- update_time = row['updateTime']
- info = await self.get_account_avg_info(account_avg_info_map, update_time)
- view_count_avg_list.append(info[5])
- article_data_frame['view_count_avg'] = view_count_avg_list
- good_df = article_data_frame[(article_data_frame['show_view_count']) >
- (article_data_frame['view_count_avg']) * (1.0 + rate)]
- bad_df = article_data_frame[(article_data_frame['show_view_count']) >
- (article_data_frame['view_count_avg']) * (1.0 - rate)]
- return good_df, bad_df
- else:
- return None, None
|