updateAccountV3.py 7.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182
  1. """
  2. @author: luojunhui
  3. """
  4. import json
  5. import time
  6. from tqdm import tqdm
  7. from datetime import datetime, timedelta
  8. from argparse import ArgumentParser
  9. from pymysql.cursors import DictCursor
  10. from applications.const import UpdateAccountReadAvgTaskConst
  11. from applications.db import DatabaseConnector
  12. from applications.utils import fetch_account_fans
  13. from applications.utils import fetch_publishing_account_list
  14. from config import apolloConfig
  15. from config import long_articles_config, denet_config, piaoquan_crawler_config
  16. read_rate_table = "long_articles_read_rate"
  17. read_avg_table = "account_avg_info_v3"
  18. config = apolloConfig()
  19. const = UpdateAccountReadAvgTaskConst()
  20. unauthorized_account = json.loads(config.getConfigValue("unauthorized_gh_id_fans"))
  21. touliu_accounts = set(json.loads(config.getConfigValue("touliu_gh_id_list")))
  22. backup_account_fans = json.loads(config.getConfigValue("backup_account_fans"))
  23. class UpdateAccountInfoVersion3(object):
  24. """
  25. 更新账号的平均阅读率
  26. """
  27. def __init__(self):
  28. # init piaoquan crawler db client
  29. self.piaoquan_crawler_db_client = DatabaseConnector(piaoquan_crawler_config)
  30. self.piaoquan_crawler_db_client.connect()
  31. # init long articles db client
  32. self.long_articles_db_client = DatabaseConnector(long_articles_config)
  33. self.long_articles_db_client.connect()
  34. # init aigc db client
  35. self.denet_db_client = DatabaseConnector(denet_config)
  36. self.denet_db_client.connect()
  37. def fetch_read_rate_avg_for_each_account(self, dt):
  38. """
  39. 从长文数据库获取账号阅读均值
  40. :return:
  41. """
  42. dt = int(dt.replace("-", ""))
  43. sql = f"""
  44. select gh_id, position, read_rate_avg
  45. from {read_rate_table}
  46. where dt_version = {dt};
  47. """
  48. fetch_response_list = self.long_articles_db_client.fetch(query=sql, cursor_type=DictCursor)
  49. account_read_rate_dict = {}
  50. for item in fetch_response_list:
  51. key = "{}_{}".format(item['gh_id'], item['position'])
  52. account_read_rate_dict[key] = item['read_rate_avg']
  53. return account_read_rate_dict
  54. def do_task_list(self, dt):
  55. """
  56. do it
  57. """
  58. # get fans dict from aigc
  59. fans_dict = fetch_account_fans(self.denet_db_client, dt)
  60. # get publishing account list from aigc
  61. account_list = fetch_publishing_account_list(self.denet_db_client)
  62. # fetch each account's read avg for each position
  63. read_rate_avg_dict = self.fetch_read_rate_avg_for_each_account(dt)
  64. for account in tqdm(account_list, desc=dt):
  65. gh_id = account["gh_id"]
  66. business_type = const.TOULIU if gh_id in touliu_accounts else const.ARTICLES_DAILY
  67. fans = fans_dict.get(gh_id, {}).get(dt, const.DEFAULT_FANS)
  68. # use unauthorized account's fans if not found in aigc
  69. if not fans:
  70. fans = int(unauthorized_account.get(gh_id, const.DEFAULT_FANS))
  71. # use backup account's fans if not found in aigc
  72. if not fans:
  73. fans = int(backup_account_fans.get(gh_id, const.DEFAULT_FANS))
  74. if fans:
  75. for index in const.ARTICLE_INDEX_LIST:
  76. gh_id_position = "{}_{}".format(gh_id, index)
  77. if read_rate_avg_dict.get(gh_id_position):
  78. # fetch read rate avg
  79. read_rate_avg = read_rate_avg_dict[gh_id_position]
  80. # cal read avg
  81. read_avg = fans * read_rate_avg
  82. # insert into database
  83. insert_sql = f"""
  84. insert into {read_avg_table}
  85. (gh_id, position, update_time, account_name, fans, read_avg, like_avg, status, account_type, account_mode, account_source, account_status, business_type, read_rate_avg)
  86. values
  87. (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s);
  88. """
  89. try:
  90. self.piaoquan_crawler_db_client.save(
  91. query=insert_sql,
  92. params=(
  93. gh_id,
  94. index,
  95. dt,
  96. account['account_name'],
  97. fans,
  98. read_avg,
  99. const.DEFAULT_LIKE,
  100. const.USING_STATUS,
  101. account['account_type'],
  102. account['mode_type'],
  103. account['account_source'],
  104. account['status'],
  105. business_type,
  106. read_rate_avg
  107. )
  108. )
  109. except Exception as e:
  110. update_sql = f"""
  111. update {read_avg_table}
  112. set fans = %s, read_avg = %s, read_rate_avg = %s
  113. where gh_id = %s and position = %s and update_time = %s
  114. """
  115. try:
  116. self.piaoquan_crawler_db_client.save(
  117. query=update_sql,
  118. params=(
  119. fans,
  120. read_avg,
  121. read_rate_avg,
  122. account['gh_id'],
  123. index,
  124. dt
  125. )
  126. )
  127. except Exception as e:
  128. print(e)
  129. # 修改前一天的状态为 0
  130. update_status_sql = f"""
  131. update {read_avg_table}
  132. set status = %s
  133. where update_time != %s and gh_id = %s and position = %s;
  134. """
  135. self.piaoquan_crawler_db_client.save(
  136. query=update_status_sql,
  137. params=(
  138. const.NOT_USING_STATUS, dt, account['gh_id'], index
  139. )
  140. )
  141. def main():
  142. """
  143. main job
  144. :return:
  145. """
  146. parser = ArgumentParser()
  147. parser.add_argument("--run-date",
  148. help="Run only once for date in format of %Y-%m-%d. \
  149. If no specified, run as daily jobs.")
  150. args = parser.parse_args()
  151. update_account_read_avg_task = UpdateAccountInfoVersion3()
  152. if args.run_date:
  153. update_account_read_avg_task.do_task_list(dt=args.run_date)
  154. else:
  155. dt_object = datetime.fromtimestamp(int(time.time()))
  156. one_day = timedelta(days=1)
  157. yesterday = dt_object - one_day
  158. yesterday_str = yesterday.strftime('%Y-%m-%d')
  159. update_account_read_avg_task.do_task_list(dt=yesterday_str)
  160. if __name__ == '__main__':
  161. main()