_utils.py 3.0 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586
  1. import json
  2. from typing import List
  3. from datetime import datetime, timedelta
  4. from odps.types import Record
  5. from app.infra.shared.tools import fetch_from_odps
  6. from app.infra.internal.piaoquan import fetch_piaoquan_video_list_detail
  7. class VideoDecodeUtils:
  8. # 获取检测的账号 list
  9. @staticmethod
  10. def get_top_head_videos(execute_dt):
  11. uv_threshold = 500
  12. query = f"""
  13. WITH top_video AS
  14. (
  15. SELECT dt, channel, 合作方名,包名,公众号名,hotsencetype, videoid, rootsourceid,title,`merge二级品类`
  16. ,COUNT(DISTINCT mid) AS 访问uv
  17. FROM loghubods.opengid_base_data
  18. WHERE dt = '{execute_dt}'
  19. AND hotsencetype != 1167
  20. AND videoid IS NOT NULL
  21. AND usersharedepth = 0
  22. AND channel IN ('公众号合作-即转-稳定','小程序投流-稳定','服务号合作-Daily-自选','群/企微合作-稳定','公众号买号','服务号买号','公众号投流-稳定','公众号代运营-Daily-系统','公众号合作-Daily-自选','服务号投流','公众号完全代投放','群/企微合作-稳定')
  23. GROUP BY dt
  24. ,channel
  25. ,合作方名
  26. ,包名
  27. ,公众号名
  28. ,hotsencetype
  29. ,videoid
  30. ,rootsourceid
  31. ,title
  32. ,`merge二级品类`
  33. ,推荐状态
  34. HAVING 访问uv >= {uv_threshold}
  35. )
  36. SELECT DISTINCT channel
  37. ,公众号名
  38. ,hotsencetype
  39. ,videoid
  40. ,rootsourceid
  41. ,title
  42. FROM top_video
  43. WHERE 访问uv >= {uv_threshold}
  44. ;
  45. """
  46. result = fetch_from_odps(query)
  47. return result
  48. @staticmethod
  49. def process_odps_data(odps_list: List[Record]):
  50. return [
  51. {
  52. "channel": i.channel,
  53. "account_name": i.公众号名,
  54. "hot_scene_type": i.hotsencetype, # 大数据单词写错。场景: scene
  55. "video_id": i.videoid,
  56. "root_source_id": i.rootsourceid,
  57. "title": i.title,
  58. }
  59. for i in odps_list
  60. ]
  61. @staticmethod
  62. def get_match_video_real_path(raw_match_videos: List, video_id: int):
  63. _response = raw_match_videos[0]["response"]
  64. match_video = json.loads(_response)
  65. for i in match_video:
  66. if i["videoId"] == video_id:
  67. return i["videoOss"]
  68. return None
  69. @staticmethod
  70. async def get_pq_video_real_path(video_id):
  71. response = await fetch_piaoquan_video_list_detail(video_list=[video_id])
  72. video_path = response["data"][0]["ossVideoPath"]
  73. return video_path
  74. @staticmethod
  75. def get_yesterday_dt():
  76. return (datetime.today() - timedelta(days=1)).strftime("%Y%m%d")