__init__.py 3.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116
  1. """
  2. @author: luojunhui
  3. """
  4. from datetime import datetime
  5. from applications.functions.video_item import VideoProducer
  6. from applications.functions.log import logging
  7. from applications.match_algorithm.rank import title_similarity_rank
  8. from .spiderAB import SearchABTest
  9. from .spiderSchedule import SearchMethod
  10. async def videoSender(video_obj, user, trace_id, platform, content_id, table, dbClient):
  11. """
  12. 异步处理微信 video_obj
  13. 公众号和站内账号一一对应
  14. :param dbClient:
  15. :param table:
  16. :param content_id:
  17. :param platform:
  18. :param user:
  19. :param trace_id:
  20. :param video_obj:
  21. :return:
  22. """
  23. Video = VideoProducer()
  24. if platform == "xg_search":
  25. mq_obj = Video.xg_video_producer(
  26. video_obj=video_obj,
  27. user=user,
  28. trace_id=trace_id,
  29. )
  30. elif platform == "baidu_search":
  31. mq_obj = Video.baidu_video_producer(
  32. video_obj=video_obj,
  33. user=user,
  34. trace_id=trace_id,
  35. )
  36. elif platform == "wx_search":
  37. mq_obj = Video.wx_video_producer(
  38. video_obj=video_obj,
  39. user=user,
  40. trace_id=trace_id,
  41. )
  42. elif platform == "dy_search":
  43. mq_obj = Video.dy_video_producer(
  44. video_obj=video_obj,
  45. user=user,
  46. trace_id=trace_id,
  47. )
  48. else:
  49. mq_obj = {}
  50. mq_obj['trace_id'] = trace_id
  51. mq_obj['content_id'] = content_id
  52. insert_sql = f"""
  53. INSERT INTO {table}
  54. (content_id, out_video_id, platform, video_title, play_count, like_count, publish_time, crawler_time, duration, video_url, cover_url, user_id, trace_id)
  55. values
  56. (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s);
  57. """
  58. await dbClient.asyncInsert(
  59. sql=insert_sql,
  60. params=(
  61. content_id,
  62. mq_obj['video_id'],
  63. platform,
  64. mq_obj['video_title'],
  65. mq_obj['play_cnt'],
  66. mq_obj['like_cnt'],
  67. datetime.fromtimestamp(mq_obj['publish_time_stamp']).strftime('%Y-%m-%d %H:%M:%S'),
  68. datetime.now().__str__(),
  69. mq_obj['duration'],
  70. mq_obj['video_url'],
  71. mq_obj['cover_url'],
  72. mq_obj['user_id'],
  73. trace_id
  74. )
  75. )
  76. async def searchVideos(info, ghIdMap, dbClient):
  77. """
  78. search and send msg to ETL
  79. :param dbClient:
  80. :param ghIdMap:
  81. :param info:
  82. :return:
  83. """
  84. SearchAB = SearchABTest(info=info, searchMethod=SearchMethod())
  85. # 启三个搜索,每个搜索都保证要搜索到, 分别用key1, key2, key3去搜索
  86. trace_id = info['traceId']
  87. gh_id = info['ghId']
  88. content_id = info['contentId']
  89. recall_list = await SearchAB.ab_5()
  90. logging(
  91. code="1006",
  92. info="搜索到{}条视频".format(len(recall_list)),
  93. data=recall_list,
  94. trace_id=info['traceId']
  95. )
  96. # 按照标题相似度排序
  97. ranked_list = title_similarity_rank(content_title=info['oriTitle'].split("@@")[-1], recall_list=recall_list)
  98. for recall_obj in ranked_list:
  99. if recall_obj:
  100. platform = recall_obj['platform']
  101. recall_video = recall_obj['result']
  102. if recall_video:
  103. await videoSender(
  104. video_obj=recall_video,
  105. user=ghIdMap.get(gh_id, 69637498),
  106. trace_id=trace_id,
  107. platform=platform,
  108. content_id=content_id,
  109. table=info['spider'],
  110. dbClient=dbClient
  111. )