articleTools.py 5.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173
  1. """
  2. @author: luojunhui
  3. """
  4. import asyncio
  5. import aiomysql
  6. from pandas import DataFrame
  7. class TaskMySQLClient(object):
  8. """
  9. Async MySQL
  10. """
  11. def __init__(self):
  12. self.mysql_pool = None
  13. async def init_pool(self):
  14. """
  15. 初始化连接
  16. :return:
  17. """
  18. self.mysql_pool = await aiomysql.create_pool(
  19. host="rm-bp1159bu17li9hi94.mysql.rds.aliyuncs.com",
  20. port=3306,
  21. user="crawler",
  22. password="crawler123456@",
  23. db="piaoquan-crawler",
  24. charset="utf8mb4",
  25. connect_timeout=120,
  26. )
  27. print("mysql init successfully")
  28. async def close_pool(self):
  29. """
  30. 关闭 mysql 连接
  31. :return:
  32. """
  33. self.mysql_pool.close()
  34. await self.mysql_pool.wait_closed()
  35. async def async_select(self, sql):
  36. """
  37. select method
  38. :param sql:
  39. :return:
  40. """
  41. async with self.mysql_pool.acquire() as conn:
  42. async with conn.cursor() as cursor:
  43. await cursor.execute(sql)
  44. result = await cursor.fetchall()
  45. return result
  46. async def async_insert(self, sql, params):
  47. """
  48. insert and update method
  49. :param params:
  50. :param sql:
  51. :return:
  52. """
  53. async with self.mysql_pool.acquire() as coon:
  54. async with coon.cursor() as cursor:
  55. await cursor.execute(sql, params)
  56. await coon.commit()
  57. class ArticleDBTools(object):
  58. """
  59. 长文数据库相关功能
  60. """
  61. def __init__(self, mysql_client):
  62. """
  63. init mysql
  64. :param mysql_client:
  65. """
  66. self.mysql_client = mysql_client
  67. async def getSingleAccountArticles(self, account_name):
  68. """
  69. 获取单个账号的历史文章
  70. :param gh_id:
  71. :return:
  72. appMsgId, title, Type, updateTime, ItemIndex, ContentUrl, show_view_count, show_like_count
  73. """
  74. keys = [
  75. "appMsgId",
  76. "title",
  77. "Type",
  78. "updateTime",
  79. "ItemIndex",
  80. "ContentUrl",
  81. "show_view_count",
  82. "show_like_count",
  83. ]
  84. sql = f"""
  85. SELECT {", ".join(keys)}
  86. FROM official_articles
  87. WHERE accountName = '{account_name}';"""
  88. result = await self.mysql_client.async_select(sql=sql)
  89. return DataFrame(result, columns=keys)
  90. async def getArticleByFilter(
  91. self,
  92. account_name,
  93. index_list=None,
  94. min_time=None,
  95. max_time=None,
  96. msg_type=None,
  97. ):
  98. """
  99. :param account_name:
  100. :param index_list: index ranges from 1 to 8
  101. :param min_time: earliest time
  102. :param max_time: latest time
  103. :param msg_type: msg_type
  104. :return:
  105. """
  106. if not index_list:
  107. index_list = [1]
  108. if not msg_type:
  109. msg_type = "9"
  110. if not min_time:
  111. min_time = 0
  112. if not max_time:
  113. # 2099年
  114. max_time = 4088051123
  115. articleDataFrame = await self.getSingleAccountArticles(account_name=account_name)
  116. filterDataFrame = articleDataFrame[
  117. (articleDataFrame["Type"] == msg_type)
  118. & (min_time < articleDataFrame["updateTime"])
  119. & (articleDataFrame["updateTime"] < max_time)
  120. & (articleDataFrame["ItemIndex"].isin(index_list))
  121. ]
  122. return filterDataFrame
  123. async def get_good_bad_articles(self,
  124. account_name,
  125. method,
  126. rate=0.1,
  127. index_list=None,
  128. min_time=None,
  129. max_time=None,
  130. msg_type=None
  131. ):
  132. """
  133. 获取质量好和不好的视频
  134. :return:
  135. """
  136. article_data_frame = await self.getArticleByFilter(
  137. account_name=account_name,
  138. index_list=index_list,
  139. min_time=min_time,
  140. max_time=max_time,
  141. msg_type=msg_type
  142. )
  143. df_rows = len(article_data_frame)
  144. if df_rows > 0:
  145. match method:
  146. case "top":
  147. sorted_df = article_data_frame.sort_values(by='show_view_count', reversed=True)
  148. topn = max(int(df_rows * rate), 1)
  149. top_df = sorted_df.head(topn)
  150. tail_df = sorted_df.tail(topn)
  151. return top_df, tail_df
  152. case "avg":
  153. avg_view = article_data_frame['show_view_count'].mean()
  154. good_df = article_data_frame[(article_data_frame['show_view_count']) > avg_view * (1.0 + rate)]
  155. bad_df = article_data_frame[(article_data_frame['show_view_count']) > avg_view * (1.0 - rate)]
  156. return good_df, bad_df
  157. else:
  158. return None, None