record.py 5.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180
  1. import time
  2. from uuid import uuid4
  3. from applications.log import logging
  4. class Record(object):
  5. """
  6. 搜索接口处理逻辑
  7. """
  8. def __init__(self, params, mysql_client, config):
  9. self.video_pool_trace_id = None
  10. self.flow_pool_level = None
  11. self.content_id = None
  12. self.account_name = None
  13. self.contents = None
  14. self.title = None
  15. self.gh_id = None
  16. self.publish_flag = None
  17. self.channel_content_id = None
  18. self.params = params
  19. self.mysql_client = mysql_client
  20. self.article_match_video_table = config.article_match_video_table
  21. self.article_text_table = config.article_text_table
  22. self.trace_id = "search-{}-{}".format(str(uuid4()), str(int(time.time())))
  23. def check_params(self):
  24. """
  25. 检查请求params
  26. :return:
  27. """
  28. try:
  29. self.gh_id = self.params['ghId']
  30. self.title = self.params['title'].split("@@")[-1].replace("'", "")
  31. self.contents = self.params['content'].replace("'", "")
  32. self.account_name = self.params['accountName'].replace("'", "")
  33. self.content_id = self.params['articleId']
  34. self.flow_pool_level = self.params['flowPoolLevelTag']
  35. self.publish_flag = self.params.get('publishFlag', 1)
  36. self.channel_content_id = self.params.get("channelContentId")
  37. self.video_pool_trace_id = self.params.get("videoPoolTraceId")
  38. logging(
  39. code="1001",
  40. info="搜索视频内容接口请求成功, 参数校验成功",
  41. port="title_to_search",
  42. trace_id=self.trace_id,
  43. data=self.params
  44. )
  45. return None
  46. except Exception as e:
  47. result = {
  48. "status": "fail",
  49. "code": 1,
  50. "message": str(e),
  51. "info": "params check error"
  52. }
  53. logging(
  54. code="4001",
  55. info="搜索视频内容接口请求成功, 参数校验失败",
  56. port="title_to_search",
  57. trace_id=self.trace_id,
  58. data=self.params
  59. )
  60. return result
  61. async def input_into_article_match_video_table(self):
  62. """
  63. 把数据插入待处理队列
  64. :return:
  65. """
  66. request_time = int(time.time())
  67. insert_sql = f"""
  68. INSERT INTO {self.article_match_video_table}
  69. (trace_id, content_id, flow_pool_level, gh_id, account_name, request_timestamp, publish_flag, video_pool_trace_id)
  70. VALUES
  71. (%s, %s, %s, %s, %s, %s, %s, %s);
  72. """
  73. await self.mysql_client.async_insert(
  74. sql=insert_sql,
  75. params=(
  76. self.trace_id,
  77. self.content_id,
  78. self.flow_pool_level,
  79. self.gh_id,
  80. self.account_name,
  81. request_time,
  82. self.publish_flag,
  83. self.video_pool_trace_id
  84. )
  85. )
  86. logging(
  87. code="1001",
  88. info="请求文章存储到 long_articles_match_videos中",
  89. function="Record",
  90. trace_id=self.trace_id
  91. )
  92. async def input_into_article_text_table(self):
  93. """
  94. :return:
  95. """
  96. insert_sql = f"""
  97. INSERT INTO {self.article_text_table} (content_id, article_title, article_text)
  98. values (%s, %s, %s);
  99. """
  100. try:
  101. await self.mysql_client.async_insert(
  102. sql=insert_sql,
  103. params=(
  104. self.content_id,
  105. self.title,
  106. self.contents
  107. )
  108. )
  109. logging(
  110. code="1002",
  111. info="请求文章存储到 long_articles_text中",
  112. function="Record",
  113. trace_id=self.trace_id
  114. )
  115. except Exception as e:
  116. logging(
  117. code="1002",
  118. info="请求文章 id 已经存储在 long_article_text中 {}".format(e),
  119. function="Record",
  120. trace_id=self.trace_id
  121. )
  122. async def input_channel_content_id(self):
  123. """
  124. 将content_id, channel_content_id 记录
  125. """
  126. insert_sql = f"""
  127. INSERT IGNORE INTO
  128. crawler_produce_id_map
  129. (channel_content_id, content_id)
  130. VALUES
  131. (%s, %s);
  132. """
  133. try:
  134. await self.mysql_client.async_insert(
  135. sql=insert_sql,
  136. params=(
  137. self.channel_content_id,
  138. self.content_id
  139. )
  140. )
  141. except Exception as e:
  142. logging(
  143. code="1002",
  144. info="insert channel_content_id_error: {}".format(e),
  145. function="Record",
  146. trace_id=self.trace_id
  147. )
  148. async def deal(self):
  149. """
  150. deal
  151. :return:
  152. """
  153. params_error = self.check_params()
  154. if params_error:
  155. return params_error
  156. else:
  157. # 记录数据
  158. await self.input_into_article_match_video_table()
  159. await self.input_into_article_text_table()
  160. # 判断是否传参数:channel_content_id
  161. if self.channel_content_id:
  162. await self.input_channel_content_id()
  163. res = {
  164. "status": "success input to article queue",
  165. "code": 0,
  166. "traceId": self.trace_id
  167. }
  168. return res