updateAccountV3.py 6.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196
  1. """
  2. @author: luojunhui
  3. """
  4. import json
  5. import time
  6. from tqdm import tqdm
  7. from datetime import datetime, timedelta
  8. from argparse import ArgumentParser
  9. from applications import PQMySQL, DeNetMysql, longArticlesMySQL
  10. TOULIU_ACCOUNTS = {
  11. 'gh_93e00e187787',
  12. 'gh_ac43e43b253b',
  13. 'gh_68e7fdc09fe4',
  14. 'gh_77f36c109fb1',
  15. 'gh_b181786a6c8c',
  16. 'gh_1ee2e1b39ccf'
  17. }
  18. ARTICLES_DAILY = 1
  19. TOULIU = 2
  20. class UpdateAccountInfoVersion3(object):
  21. """
  22. 更新账号信息 v3
  23. """
  24. def __init__(self):
  25. self.pq = PQMySQL()
  26. self.de = DeNetMysql()
  27. self.lam = longArticlesMySQL()
  28. def get_account_position_read_rate(self):
  29. """
  30. 从长文数据库获取账号阅读均值
  31. :return:
  32. """
  33. sql = f"""
  34. SELECT
  35. gh_id, position, read_rate_avg
  36. FROM
  37. long_articles_read_rate
  38. WHERE is_delete = 0;
  39. """
  40. result = self.lam.select(sql)
  41. account_read_rate_dict = {}
  42. for item in result:
  43. gh_id = item[0]
  44. position = item[1]
  45. rate = item[2]
  46. key = "{}_{}".format(gh_id, position)
  47. account_read_rate_dict[key] = rate
  48. return account_read_rate_dict
  49. def get_publishing_accounts(self):
  50. """
  51. 获取每日正在发布的账号
  52. :return:
  53. """
  54. sql = f"""
  55. SELECT DISTINCT
  56. t3.`name`,
  57. t3.gh_id,
  58. t3.follower_count,
  59. t6.account_source_name,
  60. t6.mode_type,
  61. t6.account_type,
  62. t6.`status`
  63. FROM
  64. publish_plan t1
  65. JOIN publish_plan_account t2 ON t1.id = t2.plan_id
  66. JOIN publish_account t3 ON t2.account_id = t3.id
  67. LEFT JOIN publish_account_wx_type t4 on t3.id = t4.account_id
  68. LEFT JOIN wx_statistics_group_source_account t5 on t3.id = t5.account_id
  69. LEFT JOIN wx_statistics_group_source t6 on t5.group_source_name = t6.account_source_name
  70. WHERE
  71. t1.plan_status = 1
  72. AND t3.channel = 5
  73. GROUP BY t3.id;
  74. """
  75. account_list = self.de.select(sql)
  76. result_list = [
  77. {
  78. "account_name": i[0],
  79. "gh_id": i[1],
  80. "fans": i[2],
  81. "account_source_name": i[3],
  82. "mode_type": i[4],
  83. "account_type": i[5],
  84. "status": i[6]
  85. } for i in account_list
  86. ]
  87. return result_list
  88. def do_task_list(self, dt):
  89. """
  90. do it
  91. """
  92. account_list = self.get_publishing_accounts()
  93. rate_dict = self.get_account_position_read_rate()
  94. for account in tqdm(account_list):
  95. business_type = TOULIU if account['gh_id'] in TOULIU_ACCOUNTS else ARTICLES_DAILY
  96. fans = account['fans']
  97. if fans:
  98. for index in range(1, 9):
  99. gh_id_position = "{}_{}".format(account['gh_id'], index)
  100. if rate_dict.get(gh_id_position):
  101. rate = rate_dict[gh_id_position]
  102. read_avg = fans * rate
  103. insert_sql = f"""
  104. INSERT INTO account_avg_info_v3
  105. (gh_id, position, update_time, account_name, fans, read_avg, like_avg, status, account_type, account_mode, account_source, account_status, business_type, read_rate_avg)
  106. values
  107. (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s);
  108. """
  109. try:
  110. self.pq.update(
  111. sql=insert_sql,
  112. params=(
  113. account['gh_id'],
  114. index,
  115. dt,
  116. account['account_name'],
  117. fans,
  118. read_avg,
  119. 0,
  120. 1,
  121. account['account_type'],
  122. account['mode_type'],
  123. account['account_source_name'],
  124. account['status'],
  125. business_type,
  126. rate
  127. )
  128. )
  129. except Exception as e:
  130. updateSQL = f"""
  131. UPDATE account_avg_info_v3
  132. set fans = %s, read_avg = %s
  133. where gh_id = %s and position = %s and update_time = %s
  134. """
  135. try:
  136. self.pq.update(
  137. sql=updateSQL,
  138. params=(
  139. fans,
  140. read_avg,
  141. account['gh_id'],
  142. index,
  143. dt
  144. )
  145. )
  146. print("update success")
  147. except Exception as e:
  148. print(e)
  149. # 修改前一天的状态为 0
  150. uuu_sql = f"""
  151. UPDATE account_avg_info_v3
  152. SET status = %s
  153. where update_time != %s and gh_id = %s and position = %s;
  154. """
  155. self.pq.update(
  156. sql=uuu_sql,
  157. params=(
  158. 0, dt, account['gh_id'], index
  159. )
  160. )
  161. print("修改成功")
  162. def main():
  163. """
  164. main job
  165. :return:
  166. """
  167. parser = ArgumentParser()
  168. parser.add_argument("--run-date",
  169. help="Run only once for date in format of %Y-%m-%d. \
  170. If no specified, run as daily jobs.")
  171. args = parser.parse_args()
  172. Up = UpdateAccountInfoVersion3()
  173. if args.run_date:
  174. Up.do_task_list(dt=args.run_date)
  175. else:
  176. dt_object = datetime.fromtimestamp(int(time.time()))
  177. one_day = timedelta(days=1)
  178. yesterday = dt_object - one_day
  179. yesterday_str = yesterday.strftime('%Y-%m-%d')
  180. Up.do_task_list(dt=yesterday_str)
  181. if __name__ == '__main__':
  182. main()