updateAccountV3.py 7.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231
  1. """
  2. @author: luojunhui
  3. """
  4. import json
  5. import time
  6. from tqdm import tqdm
  7. from datetime import datetime, timedelta
  8. from argparse import ArgumentParser
  9. from applications import PQMySQL, DeNetMysql, longArticlesMySQL
  10. from applications.const import updateAccountReadAvgTaskConst
  11. from config import apolloConfig
  12. config = apolloConfig()
  13. unauthorized_account = json.loads(config.getConfigValue("unauthorized_gh_id_fans"))
  14. touliu_accounts = set(json.loads(config.getConfigValue("touliu_gh_id_list")))
  15. def get_account_fans_by_dt(db_client) -> dict:
  16. """
  17. 获取每个账号发粉丝,通过日期来区分
  18. :return:
  19. """
  20. sql = f"""
  21. SELECT
  22. t1.date_str,
  23. t1.fans_count,
  24. t2.gh_id
  25. FROM datastat_wx t1
  26. JOIN publish_account t2 ON t1.account_id = t2.id
  27. WHERE
  28. t2.channel = 5
  29. # AND t2.status = 1
  30. AND t1.fans_count > 0
  31. AND t1.date_str >= '2024-09-01'
  32. ORDER BY t1.date_str;
  33. """
  34. result = db_client.select(sql)
  35. D = {}
  36. for line in result:
  37. dt = line[0]
  38. fans = line[1]
  39. gh_id = line[2]
  40. if D.get(gh_id):
  41. D[gh_id][dt] = fans
  42. else:
  43. D[gh_id] = {dt: fans}
  44. return D
  45. class UpdateAccountInfoVersion3(object):
  46. """
  47. 更新账号信息 v3
  48. """
  49. def __init__(self):
  50. self.const = updateAccountReadAvgTaskConst()
  51. self.pq = PQMySQL()
  52. self.de = DeNetMysql()
  53. self.lam = longArticlesMySQL()
  54. def get_account_position_read_rate(self, dt):
  55. """
  56. 从长文数据库获取账号阅读均值
  57. :return:
  58. """
  59. dt = int(dt.replace("-", ""))
  60. sql = f"""
  61. SELECT
  62. gh_id, position, read_rate_avg
  63. FROM
  64. long_articles_read_rate_dev
  65. WHERE dt_version = {dt};
  66. """
  67. result = self.lam.select(sql)
  68. account_read_rate_dict = {}
  69. for item in result:
  70. gh_id = item[0]
  71. position = item[1]
  72. rate = item[2]
  73. key = "{}_{}".format(gh_id, position)
  74. account_read_rate_dict[key] = rate
  75. return account_read_rate_dict
  76. def get_publishing_accounts(self):
  77. """
  78. 获取每日正在发布的账号
  79. :return:
  80. """
  81. sql = f"""
  82. SELECT DISTINCT
  83. t3.`name`,
  84. t3.gh_id,
  85. t3.follower_count,
  86. t6.account_source_name,
  87. t6.mode_type,
  88. t6.account_type,
  89. t6.`status`
  90. FROM
  91. publish_plan t1
  92. JOIN publish_plan_account t2 ON t1.id = t2.plan_id
  93. JOIN publish_account t3 ON t2.account_id = t3.id
  94. LEFT JOIN publish_account_wx_type t4 on t3.id = t4.account_id
  95. LEFT JOIN wx_statistics_group_source_account t5 on t3.id = t5.account_id
  96. LEFT JOIN wx_statistics_group_source t6 on t5.group_source_name = t6.account_source_name
  97. WHERE
  98. t1.plan_status = 1
  99. AND t3.channel = 5
  100. GROUP BY t3.id;
  101. """
  102. account_list = self.de.select(sql)
  103. result_list = [
  104. {
  105. "account_name": i[0],
  106. "gh_id": i[1],
  107. "fans": i[2],
  108. "account_source_name": i[3],
  109. "mode_type": i[4],
  110. "account_type": i[5],
  111. "status": i[6]
  112. } for i in account_list
  113. ]
  114. return result_list
  115. def do_task_list(self, dt):
  116. """
  117. do it
  118. """
  119. fans_dict = get_account_fans_by_dt(db_client=self.de)
  120. account_list = self.get_publishing_accounts()
  121. rate_dict = self.get_account_position_read_rate(dt)
  122. for account in tqdm(account_list, desc=dt):
  123. gh_id = account["gh_id"]
  124. business_type = self.const.TOULIU if gh_id in touliu_accounts else self.const.ARTICLES_DAILY
  125. fans = fans_dict.get(gh_id, {}).get(dt, 0)
  126. if not fans:
  127. fans = int(unauthorized_account.get(gh_id, 0))
  128. if fans:
  129. for index in range(1, 9):
  130. gh_id_position = "{}_{}".format(gh_id, index)
  131. if rate_dict.get(gh_id_position):
  132. rate = rate_dict[gh_id_position]
  133. read_avg = fans * rate
  134. print(rate, read_avg)
  135. insert_sql = f"""
  136. INSERT INTO account_avg_info_v3_dev
  137. (gh_id, position, update_time, account_name, fans, read_avg, like_avg, status, account_type, account_mode, account_source, account_status, business_type, read_rate_avg)
  138. values
  139. (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s);
  140. """
  141. try:
  142. self.pq.update(
  143. sql=insert_sql,
  144. params=(
  145. gh_id,
  146. index,
  147. dt,
  148. account['account_name'],
  149. fans,
  150. read_avg,
  151. 0,
  152. 1,
  153. account['account_type'],
  154. account['mode_type'],
  155. account['account_source_name'],
  156. account['status'],
  157. business_type,
  158. rate
  159. )
  160. )
  161. except Exception as e:
  162. updateSQL = f"""
  163. UPDATE account_avg_info_v3_dev
  164. set fans = %s, read_avg = %s, read_rate_avg = %s
  165. where gh_id = %s and position = %s and update_time = %s
  166. """
  167. try:
  168. affected_rows = self.pq.update(
  169. sql=updateSQL,
  170. params=(
  171. fans,
  172. read_avg,
  173. rate,
  174. account['gh_id'],
  175. index,
  176. dt
  177. )
  178. )
  179. except Exception as e:
  180. print(e)
  181. # 修改前一天的状态为 0
  182. update_status_sql = f"""
  183. UPDATE account_avg_info_v3_dev
  184. SET status = %s
  185. where update_time != %s and gh_id = %s and position = %s;
  186. """
  187. rows_affected = self.pq.update(
  188. sql=update_status_sql,
  189. params=(
  190. 0, dt, account['gh_id'], index
  191. )
  192. )
  193. print("修改成功")
  194. def main():
  195. """
  196. main job
  197. :return:
  198. """
  199. parser = ArgumentParser()
  200. parser.add_argument("--run-date",
  201. help="Run only once for date in format of %Y-%m-%d. \
  202. If no specified, run as daily jobs.")
  203. args = parser.parse_args()
  204. Up = UpdateAccountInfoVersion3()
  205. if args.run_date:
  206. Up.do_task_list(dt=args.run_date)
  207. else:
  208. dt_object = datetime.fromtimestamp(int(time.time()))
  209. one_day = timedelta(days=1)
  210. yesterday = dt_object - one_day
  211. yesterday_str = yesterday.strftime('%Y-%m-%d')
  212. Up.do_task_list(dt=yesterday_str)
  213. if __name__ == '__main__':
  214. main()