record.py 5.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177
  1. import time
  2. from uuid import uuid4
  3. from applications.log import logging
  4. class Record(object):
  5. """
  6. 搜索接口处理逻辑
  7. """
  8. def __init__(self, params, mysql_client, config):
  9. self.flow_pool_level = None
  10. self.content_id = None
  11. self.account_name = None
  12. self.contents = None
  13. self.title = None
  14. self.gh_id = None
  15. self.publish_flag = None
  16. self.channel_content_id = None
  17. self.params = params
  18. self.mysql_client = mysql_client
  19. self.article_match_video_table = config.article_match_video_table
  20. self.article_text_table = config.article_text_table
  21. self.trace_id = "search-{}-{}".format(str(uuid4()), str(int(time.time())))
  22. def check_params(self):
  23. """
  24. 检查请求params
  25. :return:
  26. """
  27. try:
  28. self.gh_id = self.params['ghId']
  29. self.title = self.params['title'].split("@@")[-1].replace("'", "")
  30. self.contents = self.params['content'].replace("'", "")
  31. self.account_name = self.params['accountName'].replace("'", "")
  32. self.content_id = self.params['articleId']
  33. self.flow_pool_level = self.params['flowPoolLevelTag']
  34. self.publish_flag = self.params.get('publishFlag', 1)
  35. self.channel_content_id = self.params.get("channelContentId")
  36. logging(
  37. code="1001",
  38. info="搜索视频内容接口请求成功, 参数校验成功",
  39. port="title_to_search",
  40. trace_id=self.trace_id,
  41. data=self.params
  42. )
  43. return None
  44. except Exception as e:
  45. result = {
  46. "status": "fail",
  47. "code": 1,
  48. "message": str(e),
  49. "info": "params check error"
  50. }
  51. logging(
  52. code="4001",
  53. info="搜索视频内容接口请求成功, 参数校验失败",
  54. port="title_to_search",
  55. trace_id=self.trace_id,
  56. data=self.params
  57. )
  58. return result
  59. async def input_into_article_match_video_table(self):
  60. """
  61. 把数据插入待处理队列
  62. :return:
  63. """
  64. request_time = int(time.time())
  65. insert_sql = f"""
  66. INSERT INTO {self.article_match_video_table}
  67. (trace_id, content_id, flow_pool_level, gh_id, account_name, request_timestamp, publish_flag)
  68. VALUES
  69. (%s, %s, %s, %s, %s, %s, %s);
  70. """
  71. await self.mysql_client.async_insert(
  72. sql=insert_sql,
  73. params=(
  74. self.trace_id,
  75. self.content_id,
  76. self.flow_pool_level,
  77. self.gh_id,
  78. self.account_name,
  79. request_time,
  80. self.publish_flag
  81. )
  82. )
  83. logging(
  84. code="1001",
  85. info="请求文章存储到 long_articles_match_videos中",
  86. function="Record",
  87. trace_id=self.trace_id
  88. )
  89. async def input_into_article_text_table(self):
  90. """
  91. :return:
  92. """
  93. insert_sql = f"""
  94. INSERT INTO {self.article_text_table} (content_id, article_title, article_text)
  95. values (%s, %s, %s);
  96. """
  97. try:
  98. await self.mysql_client.async_insert(
  99. sql=insert_sql,
  100. params=(
  101. self.content_id,
  102. self.title,
  103. self.contents
  104. )
  105. )
  106. logging(
  107. code="1002",
  108. info="请求文章存储到 long_articles_text中",
  109. function="Record",
  110. trace_id=self.trace_id
  111. )
  112. except Exception as e:
  113. logging(
  114. code="1002",
  115. info="请求文章 id 已经存储在 long_article_text中 {}".format(e),
  116. function="Record",
  117. trace_id=self.trace_id
  118. )
  119. async def input_channel_content_id(self):
  120. """
  121. 将content_id, channel_content_id 记录
  122. """
  123. insert_sql = f"""
  124. INSERT IGNORE INTO
  125. crawler_produce_id_map
  126. (channel_content_id, content_id)
  127. VALUES
  128. (%s, %s);
  129. """
  130. try:
  131. await self.mysql_client.async_insert(
  132. sql=insert_sql,
  133. params=(
  134. self.channel_content_id,
  135. self.content_id
  136. )
  137. )
  138. except Exception as e:
  139. logging(
  140. code="1002",
  141. info="insert channel_content_id_error: {}".format(e),
  142. function="Record",
  143. trace_id=self.trace_id
  144. )
  145. async def deal(self):
  146. """
  147. deal
  148. :return:
  149. """
  150. params_error = self.check_params()
  151. if params_error:
  152. return params_error
  153. else:
  154. # 记录数据
  155. await self.input_into_article_match_video_table()
  156. await self.input_into_article_text_table()
  157. # 判断是否传参数:channel_content_id
  158. if self.channel_content_id:
  159. await self.input_channel_content_id()
  160. res = {
  161. "status": "success input to article queue",
  162. "code": 0,
  163. "traceId": self.trace_id
  164. }
  165. return res