articleTools.py 8.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239
  1. """
  2. @author: luojunhui
  3. """
  4. import asyncio
  5. import aiomysql
  6. from pandas import DataFrame
  7. from datetime import datetime
  8. class TaskMySQLClient(object):
  9. """
  10. Async MySQL
  11. """
  12. def __init__(self):
  13. self.mysql_pool = None
  14. async def init_pool(self):
  15. """
  16. 初始化连接
  17. :return:
  18. """
  19. self.mysql_pool = await aiomysql.create_pool(
  20. host="rm-bp1159bu17li9hi94.mysql.rds.aliyuncs.com",
  21. port=3306,
  22. user="crawler",
  23. password="crawler123456@",
  24. db="piaoquan-crawler",
  25. charset="utf8mb4",
  26. connect_timeout=120,
  27. )
  28. print("mysql init successfully")
  29. async def close_pool(self):
  30. """
  31. 关闭 mysql 连接
  32. :return:
  33. """
  34. self.mysql_pool.close()
  35. await self.mysql_pool.wait_closed()
  36. async def async_select(self, sql):
  37. """
  38. select method
  39. :param sql:
  40. :return:
  41. """
  42. async with self.mysql_pool.acquire() as conn:
  43. async with conn.cursor() as cursor:
  44. await cursor.execute(sql)
  45. result = await cursor.fetchall()
  46. return result
  47. async def async_insert(self, sql, params):
  48. """
  49. insert and update method
  50. :param params:
  51. :param sql:
  52. :return:
  53. """
  54. async with self.mysql_pool.acquire() as coon:
  55. async with coon.cursor() as cursor:
  56. await cursor.execute(sql, params)
  57. await coon.commit()
  58. class ArticleDBTools(object):
  59. """
  60. 长文数据库相关功能
  61. """
  62. def __init__(self, mysql_client):
  63. """
  64. init mysql
  65. :param mysql_client:
  66. """
  67. self.mysql_client = mysql_client
  68. async def getAccountAvgInfo(self, account_name):
  69. """
  70. 获取单个账号历史均值
  71. """
  72. keys = [
  73. "gh_id",
  74. "position",
  75. "update_time",
  76. "account_name",
  77. "fans",
  78. "read_avg",
  79. "like_avg",
  80. "status",
  81. "account_type",
  82. "account_mode",
  83. "account_source",
  84. "account_status",
  85. "business_type",
  86. "read_rate_avg"
  87. ]
  88. sql = f"""
  89. SELECT {", ".join(keys)}
  90. FROM account_avg_info_v3
  91. WHERE account_name = '{account_name}'
  92. and position = 1;"""
  93. result = await self.mysql_client.async_select(sql=sql)
  94. return result
  95. async def get_account_avg_info(self, account_avg_info_map, timestamp):
  96. target_date = datetime.fromtimestamp(timestamp).date()
  97. # 获取所有可用日期并排序
  98. available_dates = sorted(account_avg_info_map.keys())
  99. # 尝试获取指定日期
  100. info = account_avg_info_map.get(target_date.isoformat())
  101. if info is not None:
  102. return info
  103. # 如果指定日期不存在,寻找最近日期
  104. closest_date = None
  105. for date in reversed(available_dates):
  106. if (closest_date is None or abs((datetime.fromisoformat(date).date() - target_date).days) <
  107. abs((datetime.fromisoformat(closest_date).date() - target_date).days)):
  108. closest_date = date
  109. return account_avg_info_map.get(closest_date) if closest_date else None
  110. async def getSingleAccountArticles(self, account_name):
  111. """
  112. 获取单个账号的历史文章
  113. :param gh_id:
  114. :return:
  115. appMsgId, title, Type, updateTime, ItemIndex, ContentUrl, show_view_count, show_like_count
  116. """
  117. keys = [
  118. "appMsgId",
  119. "title",
  120. "Type",
  121. "updateTime",
  122. "ItemIndex",
  123. "ContentUrl",
  124. "show_view_count",
  125. "show_like_count",
  126. ]
  127. sql = f"""
  128. SELECT {", ".join(keys)}
  129. FROM official_articles_v2
  130. WHERE accountName = '{account_name}';"""
  131. result = await self.mysql_client.async_select(sql=sql)
  132. return DataFrame(result, columns=keys)
  133. async def getArticleByFilter(
  134. self,
  135. account_name,
  136. view_count_filter=None,
  137. index_list=None,
  138. min_time=None,
  139. max_time=None,
  140. msg_type=None,
  141. ):
  142. """
  143. :param account_name:
  144. :param index_list: index ranges from 1 to 8
  145. :param min_time: earliest time
  146. :param max_time: latest time
  147. :param msg_type: msg_type
  148. :return:
  149. """
  150. if not index_list:
  151. index_list = [1]
  152. if not msg_type:
  153. msg_type = "9"
  154. if not min_time:
  155. min_time = 0
  156. if not max_time:
  157. # 2099年
  158. max_time = 4088051123
  159. articleDataFrame = await self.getSingleAccountArticles(account_name=account_name)
  160. filterDataFrame = articleDataFrame[
  161. (articleDataFrame["Type"] == msg_type)
  162. & (min_time < articleDataFrame["updateTime"])
  163. & (articleDataFrame["updateTime"] < max_time)
  164. & (articleDataFrame["ItemIndex"].isin(index_list))
  165. ]
  166. if view_count_filter:
  167. filterDataFrame = filterDataFrame[(articleDataFrame["show_view_count"] > view_count_filter)]
  168. return filterDataFrame
  169. async def get_good_bad_articles(self,
  170. account_name,
  171. method,
  172. view_count_filter,
  173. rate=0.1,
  174. index_list=None,
  175. min_time=None,
  176. max_time=None,
  177. msg_type=None
  178. ):
  179. """
  180. 获取质量好和不好的视频
  181. :return:
  182. """
  183. article_data_frame = await self.getArticleByFilter(
  184. account_name=account_name,
  185. view_count_filter=view_count_filter,
  186. index_list=index_list,
  187. min_time=min_time,
  188. max_time=max_time,
  189. msg_type=msg_type
  190. )
  191. df_rows = len(article_data_frame)
  192. if df_rows > 0:
  193. match method:
  194. case "top":
  195. sorted_df = article_data_frame.sort_values(by='show_view_count', reversed=True)
  196. topn = max(int(df_rows * rate), 1)
  197. top_df = sorted_df.head(topn)
  198. tail_df = sorted_df.tail(topn)
  199. return top_df, tail_df
  200. case "avg":
  201. avg_view = article_data_frame['show_view_count'].mean()
  202. good_df = article_data_frame[(article_data_frame['show_view_count']) > avg_view * (1.0 + rate)]
  203. bad_df = article_data_frame[(article_data_frame['show_view_count']) > avg_view * (1.0 - rate)]
  204. return good_df, bad_df
  205. case "account_avg":
  206. account_read_avg_list = await self.getAccountAvgInfo(
  207. account_name=account_name
  208. )
  209. account_avg_info_map = {info[2]: info for info in account_read_avg_list}
  210. view_count_avg_list = []
  211. for index, row in article_data_frame.iterrows():
  212. update_time = row['updateTime']
  213. info = await self.get_account_avg_info(account_avg_info_map, update_time)
  214. view_count_avg_list.append(info[5])
  215. article_data_frame['view_count_avg'] = view_count_avg_list
  216. good_df = article_data_frame[(article_data_frame['show_view_count']) >
  217. (article_data_frame['view_count_avg']) * (1.0 + rate)]
  218. bad_df = article_data_frame[(article_data_frame['show_view_count']) >
  219. (article_data_frame['view_count_avg']) * (1.0 - rate)]
  220. return good_df, bad_df
  221. else:
  222. return None, None