|
@@ -6,6 +6,7 @@ import asyncio
|
|
|
from static.config import db_article, db_video, mysql_coroutines
|
|
|
from applications.functions.log import logging
|
|
|
from applications.functions.pqFunctions import *
|
|
|
+from applications.functions.apollo import Config
|
|
|
|
|
|
|
|
|
class MatchTask3(object):
|
|
@@ -13,11 +14,15 @@ class MatchTask3(object):
|
|
|
处理已经匹配过小程序的文章
|
|
|
"""
|
|
|
|
|
|
- def __init__(self, mysql_client):
|
|
|
+ def __init__(self, pq_client, denet_client, long_article_client):
|
|
|
"""
|
|
|
- :param mysql_client:
|
|
|
+ 初始化HistoryArticleMySQLClient
|
|
|
"""
|
|
|
- self.mysql_client = mysql_client
|
|
|
+ self.config = Config(env="prod")
|
|
|
+ self.pq_client = pq_client
|
|
|
+ self.denet_client = denet_client
|
|
|
+ self.long_article_client = long_article_client
|
|
|
+ self.account_negative_category = json.loads(self.config.get_config_value("account_negative_category"))
|
|
|
|
|
|
async def getTaskList(self):
|
|
|
"""
|
|
@@ -44,7 +49,7 @@ class MatchTask3(object):
|
|
|
ORDER BY request_time_stamp
|
|
|
LIMIT {mysql_coroutines};
|
|
|
"""
|
|
|
- tasks = await self.mysql_client.async_select(sql=select_sql1)
|
|
|
+ tasks = await self.pq_client.async_select(sql=select_sql1)
|
|
|
task_obj_list = [
|
|
|
{
|
|
|
"trace_id": item[0],
|
|
@@ -75,7 +80,7 @@ class MatchTask3(object):
|
|
|
FROM {db_video}
|
|
|
where content_id = '{content_id}' and oss_status = 1 order by request_time DESC;
|
|
|
"""
|
|
|
- content_videos = await self.mysql_client.async_select(select_sql)
|
|
|
+ content_videos = await self.pq_client.async_select(select_sql)
|
|
|
video_list = [
|
|
|
{
|
|
|
"title": line[0],
|
|
@@ -102,7 +107,7 @@ class MatchTask3(object):
|
|
|
FROM {db_article}
|
|
|
WHERE content_id = '{content_id}' and kimi_title is not null limit 1;
|
|
|
"""
|
|
|
- info = await self.mysql_client.async_select(sql=select_sql)
|
|
|
+ info = await self.pq_client.async_select(sql=select_sql)
|
|
|
kimi_title = info[0]
|
|
|
video_id_list = await getNewVideoIds(video_info_list)
|
|
|
vid1, vid2, vid3 = video_id_list[0], video_id_list[1], video_id_list[2]
|
|
@@ -118,7 +123,7 @@ class MatchTask3(object):
|
|
|
WHERE trace_id = %s
|
|
|
"""
|
|
|
|
|
|
- await self.mysql_client.async_insert(
|
|
|
+ await self.pq_client.async_insert(
|
|
|
sql=update_sql,
|
|
|
params=(
|
|
|
kimi_title,
|
|
@@ -136,14 +141,114 @@ class MatchTask3(object):
|
|
|
trace_id=trace_id
|
|
|
)
|
|
|
|
|
|
+ async def get_content_pool_level(self, content_id) -> str:
|
|
|
+ """
|
|
|
+ 获取文章的内容池等级
|
|
|
+ :param content_id:
|
|
|
+ """
|
|
|
+ select_sql = f"""
|
|
|
+ SELECT produce_plan.plan_tag
|
|
|
+ FROM produce_plan
|
|
|
+ JOIN produce_plan_exe_record
|
|
|
+ ON produce_plan.id = produce_plan_exe_record.plan_id
|
|
|
+ WHERE produce_plan_exe_record.plan_exe_id = '{content_id}';
|
|
|
+ """
|
|
|
+ result = await self.denet_client.async_select(sql=select_sql)
|
|
|
+ if result:
|
|
|
+ return result[0][0]
|
|
|
+ else:
|
|
|
+ logging(
|
|
|
+ code="5858",
|
|
|
+ function="task3.get_content_pool_level",
|
|
|
+ info="没有找到该文章的内容池等级",
|
|
|
+ data={'content_id': content_id}
|
|
|
+ )
|
|
|
+ return "ERROR"
|
|
|
+
|
|
|
+ async def check_title_category(self, content_id, gh_id, trace_id) -> bool:
|
|
|
+ """
|
|
|
+ 判断该文章的品类是否属于该账号的品类
|
|
|
+ :param trace_id:
|
|
|
+ :param content_id:
|
|
|
+ :param gh_id:
|
|
|
+ :return:
|
|
|
+ """
|
|
|
+ bad_category_list = self.account_negative_category.get(gh_id, [])
|
|
|
+ logging(
|
|
|
+ code="history1101",
|
|
|
+ info="该账号的 negative 类型列表",
|
|
|
+ trace_id=trace_id,
|
|
|
+ data=bad_category_list
|
|
|
+ )
|
|
|
+ if bad_category_list:
|
|
|
+ sql = f"""
|
|
|
+ SELECT category
|
|
|
+ FROM article_category
|
|
|
+ WHERE produce_content_id = '{content_id}';
|
|
|
+ """
|
|
|
+ result = await self.long_article_client.async_select(sql)
|
|
|
+ if result:
|
|
|
+ category = result[0][0]
|
|
|
+ logging(
|
|
|
+ code="history1102",
|
|
|
+ info="文章的品类-{}".format(category),
|
|
|
+ trace_id=trace_id
|
|
|
+ )
|
|
|
+ if category in bad_category_list:
|
|
|
+ return True
|
|
|
+ return False
|
|
|
+
|
|
|
async def processTask(self, params):
|
|
|
"""
|
|
|
异步执行
|
|
|
:param params:
|
|
|
:return:
|
|
|
"""
|
|
|
+ MISMATCH_STATUS = 95
|
|
|
+ TASK_INIT_STATUS = 0
|
|
|
content_id = params['content_id']
|
|
|
trace_id = params['trace_id']
|
|
|
+ gh_id = params['gh_id']
|
|
|
+ flow_pool_level = await self.get_content_pool_level(content_id)
|
|
|
+ flow_pool_level_list = flow_pool_level.split("/")
|
|
|
+ if "autoArticlePoolLevel4" in flow_pool_level_list:
|
|
|
+ # 判断文章的品类是否属于该账号的 negative 类型
|
|
|
+ negative_category_status = await self.check_title_category(
|
|
|
+ content_id=content_id,
|
|
|
+ gh_id=gh_id,
|
|
|
+ trace_id=trace_id
|
|
|
+ )
|
|
|
+ if negative_category_status:
|
|
|
+ # 修改状态为品类不匹配状态
|
|
|
+ logging(
|
|
|
+ code="history1002",
|
|
|
+ info="文章属于该账号的 negative 类型",
|
|
|
+ trace_id=trace_id
|
|
|
+ )
|
|
|
+ update_sql = f"""
|
|
|
+ UPDATE {db_article}
|
|
|
+ SET content_status = %s
|
|
|
+ WHERE trace_id = %s and content_status = %s;
|
|
|
+ """
|
|
|
+ affected_rows = await self.pq_client.async_insert(
|
|
|
+ sql=update_sql,
|
|
|
+ params=(
|
|
|
+ MISMATCH_STATUS,
|
|
|
+ trace_id,
|
|
|
+ TASK_INIT_STATUS
|
|
|
+ )
|
|
|
+ )
|
|
|
+ logging(
|
|
|
+ code="history1003",
|
|
|
+ info="已经修改该文章状态为 品类不匹配状态",
|
|
|
+ trace_id=trace_id
|
|
|
+ )
|
|
|
+ if affected_rows == 0:
|
|
|
+ print("修改行数为 0,多个进程抢占同一个 task, 抢占失败,进程退出")
|
|
|
+ return
|
|
|
+ # 处理完成之后,直接return
|
|
|
+ return
|
|
|
+
|
|
|
# 判断该篇文章是否存在未下架的视频,且判断是否有3条, 如果没有三条,则启动新抓取任务,后续优化点
|
|
|
oss_path_list = await self.getHistoryVideoOssPath(content_id=content_id)
|
|
|
if oss_path_list:
|
|
@@ -174,4 +279,3 @@ class MatchTask3(object):
|
|
|
code="9008",
|
|
|
info="没有要处理的请求"
|
|
|
)
|
|
|
-
|