articleTools.py 9.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270
  1. """
  2. @author: luojunhui
  3. """
  4. import asyncio
  5. import aiomysql
  6. from pandas import DataFrame
  7. from datetime import datetime
  8. class TaskMySQLClient(object):
  9. """
  10. Async MySQL
  11. """
  12. def __init__(self):
  13. self.mysql_pool = None
  14. async def init_pool(self):
  15. """
  16. 初始化连接
  17. :return:
  18. """
  19. self.mysql_pool = await aiomysql.create_pool(
  20. host="rm-bp1159bu17li9hi94.mysql.rds.aliyuncs.com",
  21. port=3306,
  22. user="crawler",
  23. password="crawler123456@",
  24. db="piaoquan-crawler",
  25. charset="utf8mb4",
  26. connect_timeout=120,
  27. )
  28. print("mysql init successfully")
  29. async def close_pool(self):
  30. """
  31. 关闭 mysql 连接
  32. :return:
  33. """
  34. self.mysql_pool.close()
  35. await self.mysql_pool.wait_closed()
  36. async def async_select(self, sql):
  37. """
  38. select method
  39. :param sql:
  40. :return:
  41. """
  42. async with self.mysql_pool.acquire() as conn:
  43. async with conn.cursor() as cursor:
  44. await cursor.execute(sql)
  45. result = await cursor.fetchall()
  46. return result
  47. async def async_insert(self, sql, params):
  48. """
  49. insert and update method
  50. :param params:
  51. :param sql:
  52. :return:
  53. """
  54. async with self.mysql_pool.acquire() as coon:
  55. async with coon.cursor() as cursor:
  56. await cursor.execute(sql, params)
  57. await coon.commit()
  58. class AccountAvgInfo:
  59. def __init__(self, gh_id, position, update_time, account_name, fans, read_avg, like_avg, status,
  60. account_type, account_mode, account_source, account_status, business_type, read_rate_avg):
  61. self.gh_id = gh_id
  62. self.position = position
  63. self.update_time = update_time
  64. self.account_name = account_name
  65. self.fans = fans
  66. self.read_avg = read_avg
  67. self.like_avg = like_avg
  68. self.status = status
  69. self.account_type = account_type
  70. self.account_mode = account_mode
  71. self.account_source = account_source
  72. self.account_status = account_status
  73. self.business_type = business_type
  74. self.read_rate_avg = read_rate_avg
  75. def __repr__(self):
  76. return f"<AccountAvgInfo {self.account_name}>"
  77. class ArticleDBTools(object):
  78. """
  79. 长文数据库相关功能
  80. """
  81. def __init__(self, mysql_client):
  82. """
  83. init mysql
  84. :param mysql_client:
  85. """
  86. self.mysql_client = mysql_client
  87. async def getAccountAvgInfo(self, account_name):
  88. """
  89. 获取单个账号历史均值
  90. """
  91. keys = [
  92. "gh_id",
  93. "position",
  94. "update_time",
  95. "account_name",
  96. "fans",
  97. "read_avg",
  98. "like_avg",
  99. "status",
  100. "account_type",
  101. "account_mode",
  102. "account_source",
  103. "account_status",
  104. "business_type",
  105. "read_rate_avg"
  106. ]
  107. sql = f"""
  108. SELECT {", ".join(keys)}
  109. FROM account_avg_info_v3
  110. WHERE account_name = '{account_name}'
  111. and position = 1;"""
  112. result = await self.mysql_client.async_select(sql=sql)
  113. account_avg_info_list = [AccountAvgInfo(*row) for row in result] if result else []
  114. return account_avg_info_list
  115. async def get_account_avg_info(self, account_avg_info_map, available_dates, timestamp):
  116. target_date = datetime.fromtimestamp(timestamp).date()
  117. # 尝试获取指定日期
  118. info = account_avg_info_map.get(target_date.isoformat())
  119. if info is not None:
  120. return info
  121. # 如果指定日期不存在,寻找最近日期
  122. closest_date = None
  123. for date in available_dates:
  124. if closest_date is None:
  125. closest_date = date
  126. continue
  127. days = abs((datetime.fromisoformat(date).date() - target_date).days)
  128. closest_days = abs((datetime.fromisoformat(closest_date).date() - target_date).days)
  129. if days < closest_days:
  130. closest_date = date
  131. elif days > closest_days:
  132. break
  133. return account_avg_info_map.get(closest_date) if closest_date else None
  134. async def getSingleAccountArticles(self, account_name):
  135. """
  136. 获取单个账号的历史文章
  137. :param gh_id:
  138. :return:
  139. appMsgId, title, Type, updateTime, ItemIndex, ContentUrl, show_view_count, show_like_count
  140. """
  141. keys = [
  142. "appMsgId",
  143. "title",
  144. "Type",
  145. "updateTime",
  146. "ItemIndex",
  147. "ContentUrl",
  148. "show_view_count",
  149. "show_like_count",
  150. ]
  151. sql = f"""
  152. SELECT {", ".join(keys)}
  153. FROM official_articles_v2
  154. WHERE accountName = '{account_name}';"""
  155. result = await self.mysql_client.async_select(sql=sql)
  156. return DataFrame(result, columns=keys)
  157. async def getArticleByFilter(
  158. self,
  159. account_name,
  160. view_count_filter=None,
  161. index_list=None,
  162. min_time=None,
  163. max_time=None,
  164. msg_type=None,
  165. ):
  166. """
  167. :param account_name:
  168. :param index_list: index ranges from 1 to 8
  169. :param min_time: earliest time
  170. :param max_time: latest time
  171. :param msg_type: msg_type
  172. :return:
  173. """
  174. if not index_list:
  175. index_list = [1]
  176. if not msg_type:
  177. msg_type = "9"
  178. if not min_time:
  179. min_time = 0
  180. if not max_time:
  181. # 2099年
  182. max_time = 4088051123
  183. articleDataFrame = await self.getSingleAccountArticles(account_name=account_name)
  184. filterDataFrame = articleDataFrame[
  185. (articleDataFrame["Type"] == msg_type)
  186. & (min_time < articleDataFrame["updateTime"])
  187. & (articleDataFrame["updateTime"] < max_time)
  188. & (articleDataFrame["ItemIndex"].isin(index_list))
  189. ]
  190. if view_count_filter:
  191. filterDataFrame = filterDataFrame[(filterDataFrame["show_view_count"] > view_count_filter)]
  192. return filterDataFrame
  193. async def get_good_bad_articles(self,
  194. account_name,
  195. interest_type,
  196. view_count_filter,
  197. rate=0.1,
  198. index_list=None,
  199. min_time=None,
  200. max_time=None,
  201. msg_type=None
  202. ):
  203. """
  204. 获取质量好和不好的视频
  205. :return:
  206. """
  207. article_data_frame = await self.getArticleByFilter(
  208. account_name=account_name,
  209. view_count_filter=view_count_filter,
  210. index_list=index_list,
  211. min_time=min_time,
  212. max_time=max_time,
  213. msg_type=msg_type
  214. )
  215. df_rows = len(article_data_frame)
  216. if df_rows > 0:
  217. match interest_type:
  218. case "top":
  219. sorted_df = article_data_frame.sort_values(by='show_view_count', reversed=True)
  220. topn = max(int(df_rows * rate), 1)
  221. top_df = sorted_df.head(topn)
  222. tail_df = sorted_df.tail(topn)
  223. return top_df, tail_df
  224. case "avg":
  225. avg_view = article_data_frame['show_view_count'].mean()
  226. good_df = article_data_frame[(article_data_frame['show_view_count']) > avg_view * (1.0 + rate)]
  227. bad_df = article_data_frame[(article_data_frame['show_view_count']) > avg_view * (1.0 - rate)]
  228. return good_df, bad_df
  229. case "account_avg":
  230. account_read_avg_list = await self.getAccountAvgInfo(
  231. account_name=account_name
  232. )
  233. account_avg_info_map = {info.update_time: info for info in account_read_avg_list}
  234. # 获取所有可用日期并排序
  235. available_dates = sorted(account_avg_info_map.keys())
  236. view_count_avg_list = []
  237. for index, row in article_data_frame.iterrows():
  238. update_time = row['updateTime']
  239. info = await self.get_account_avg_info(account_avg_info_map, available_dates, update_time)
  240. view_count_avg_list.append(info.read_avg)
  241. article_data_frame['view_count_avg'] = view_count_avg_list
  242. good_df = article_data_frame[(article_data_frame['show_view_count']) >
  243. (article_data_frame['view_count_avg']) * (1.0 + rate)]
  244. bad_df = article_data_frame[(article_data_frame['show_view_count']) >
  245. (article_data_frame['view_count_avg']) * (1.0 - rate)]
  246. return good_df, bad_df
  247. else:
  248. return None, None