schedule_app.py 2.1 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758
  1. from tqdm import tqdm
  2. from pymysql.cursors import DictCursor
  3. from applications.api import fetch_piaoquan_video_list_detail
  4. from applications.db import DatabaseConnector
  5. from config import denet_config, long_articles_config
  6. denet_client = DatabaseConnector(denet_config)
  7. denet_client.connect()
  8. lam_client = DatabaseConnector(long_articles_config)
  9. lam_client.connect()
  10. # step 1, get publish_content_id var publish_content_plan
  11. sql = f"""
  12. select id, crawler_channel_content_id, source_id, publish_stage_id, t2.channel_content_id
  13. from publish_content t1
  14. join produce_plan_exe_record t2 on t1.source_id = t2.plan_exe_id
  15. where t1.plan_id = '20250520112612679208238' and t1.status = 2;
  16. """
  17. success_task_list = denet_client.fetch(sql, cursor_type=DictCursor)
  18. ori_publish_id_list = []
  19. ori_publish_content_id_publish_stage_id_dict = {}
  20. for task in tqdm(success_task_list):
  21. # step2, get channel_channel_content_id var source_id
  22. channel_content_id = task['channel_content_id'].replace(task['crawler_channel_content_id'], '').replace('#', '')
  23. # step3, get ori_source_id as content_id
  24. video_id = task['publish_stage_id']
  25. if video_id:
  26. # continue
  27. try:
  28. pq_video_detail = fetch_piaoquan_video_list_detail([video_id])
  29. video_oss_path = pq_video_detail['data'][0]['ossVideoPath']
  30. title = pq_video_detail['data'][0]['title']
  31. update_query = f"""
  32. update ai_video_ab_test
  33. set title = %s, pq_video_id = %s, video_oss_path = %s, status = %s
  34. where new_channel_content_id = %s and status = %s;
  35. """
  36. affected_rows = lam_client.save(update_query, (title, video_id, video_oss_path, 1, channel_content_id, 0))
  37. print(affected_rows)
  38. except Exception as e:
  39. print(e)
  40. print(video_id)
  41. else:
  42. print(task)
  43. #
  44. # L = {}
  45. # for task in response:
  46. # publish_content_id = task['id']
  47. # source_id = task['source_id']
  48. # L[source_id] = ori_publish_content_id_publish_stage_id_dict[publish_content_id]
  49. #
  50. # print(json.dumps(L, indent=4, ensure_ascii=False))