updateAccountV3.py 4.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148
  1. """
  2. @author: luojunhui
  3. """
  4. import json
  5. import time
  6. import schedule
  7. from tqdm import tqdm
  8. from datetime import datetime, timedelta
  9. from applications import longArticlesMySQL, PQMySQL, DeNetMysql, Functions
  10. class UpdateAccountInfoVersion3(object):
  11. """
  12. 更新账号信息 v3
  13. """
  14. lam = longArticlesMySQL()
  15. pq = PQMySQL()
  16. de = DeNetMysql()
  17. @classmethod
  18. def getAccountFans(cls):
  19. """
  20. :return:
  21. """
  22. sql = f"""
  23. select t1.date_str, t1.fans_count, t2.gh_id, t2.name
  24. from datastat_wx t1
  25. join publish_account t2 on t1.account_id = t2.id
  26. where t2.channel = 5 and t2.status = 1 and t1.date_str >= '2024-07-01' order by t1.date_str;"""
  27. result = cls.de.select(sql)
  28. D = {}
  29. for line in result:
  30. dt = line[0]
  31. fans = line[1]
  32. account_name = line[3]
  33. if D.get(account_name):
  34. D[account_name][dt] = fans
  35. else:
  36. D[account_name] = {dt: fans}
  37. # print(json.dumps(D, ensure_ascii=False, indent=4))
  38. return D
  39. @classmethod
  40. def getAccountRate(cls):
  41. """
  42. 获取账号阅读率
  43. :return:
  44. """
  45. sql = "select account_name, position, read_rate_avg from long_articles_read_rate;"
  46. result = cls.lam.select(sql)
  47. D = {}
  48. for line in result:
  49. if D.get(line[0]):
  50. D[line[0]][line[1]] = line[2]
  51. else:
  52. D[line[0]] = {line[1]: line[2]}
  53. return D
  54. @classmethod
  55. def reverseSingleDay(cls, dt, fans_dict, rate_dict):
  56. """
  57. :return:
  58. """
  59. sql = f"""select * from account_avg_info_v3 where update_time = '2024-09-09';"""
  60. result = cls.pq.select(sql)
  61. for line in tqdm(result):
  62. temp = list(line)
  63. temp[2] = dt
  64. temp[4] = fans_dict.get(temp[3], {}).get(dt, 0)
  65. temp[-1] = rate_dict.get(temp[3], {}).get(temp[1], 0)
  66. temp[5] = fans_dict.get(temp[3], {}).get(dt, 0) * rate_dict.get(temp[3], {}).get(temp[1], 0)
  67. temp[7] = 1
  68. usql = f"""
  69. INSERT INTO account_avg_info_v3
  70. (gh_id, position, update_time, account_name, fans, read_avg, like_avg, status, account_type, account_mode, account_source, account_status, business_type, read_rate_avg)
  71. values
  72. (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s);
  73. """
  74. try:
  75. cls.pq.update(
  76. sql=usql,
  77. params=tuple(temp)
  78. )
  79. except Exception as e:
  80. updateSQL = f"""
  81. UPDATE account_avg_info_v3
  82. set fans = %s, read_avg = %s, read_rate_avg = %s
  83. where account_name = %s and position = %s and update_time = %s
  84. """
  85. try:
  86. cls.pq.update(
  87. sql=updateSQL,
  88. params=(
  89. temp[4],
  90. temp[5],
  91. temp[-1],
  92. temp[3],
  93. temp[1],
  94. temp[2]
  95. )
  96. )
  97. except Exception as e:
  98. print(e)
  99. # 修改前一天的状态为 0
  100. uuu_sql = f"""
  101. UPDATE account_avg_info_v3
  102. SET status = %s
  103. where update_time != %s and account_name = %s and position = %s;
  104. """
  105. cls.pq.update(
  106. sql=uuu_sql,
  107. params=(
  108. 0, dt, temp[3], temp[1]
  109. )
  110. )
  111. def updateDaily():
  112. """
  113. main job
  114. :return:
  115. """
  116. Up = UpdateAccountInfoVersion3()
  117. fd = Up.getAccountFans()
  118. rd = Up.getAccountRate()
  119. dt_object = datetime.fromtimestamp(int(time.time()))
  120. one_day = timedelta(days=1)
  121. yesterday = dt_object - one_day
  122. yesterday_str = yesterday.strftime('%Y-%m-%d')
  123. # print(yesterday_str)
  124. Up.reverseSingleDay(yesterday_str, fd, rd)
  125. if __name__ == '__main__':
  126. # updateDaily()
  127. schedule.every().day.at("10:15").do(Functions().job_with_thread, updateDaily)
  128. schedule.every().day.at("10:30").do(Functions().job_with_thread, updateDaily)
  129. schedule.every().day.at("10:50").do(Functions().job_with_thread, updateDaily)
  130. while True:
  131. schedule.run_pending()
  132. time.sleep(1)