Explorar o código

账号质量分析

luojunhui hai 3 meses
pai
achega
c9ed0ac0cf

+ 1 - 1
applications/pipeline/crawler_pipeline.py

@@ -60,7 +60,7 @@ class CrawlerPipeline(AsyncApolloApi):
 
     async def save_item_to_database(self, media_type: str, item: dict, trace_id: str):
         """deal function"""
-        item['trace_id'] = trace_id
+        item["trace_id"] = trace_id
         match media_type:
             case "video":
                 await self.save_single_record(media_type, item)

+ 3 - 0
applications/service/__init__.py

@@ -1,2 +1,5 @@
 # 日志服务
 from .log_service import LogService
+
+# 前端交互
+from .task_manager_service import TaskManagerService

+ 109 - 0
applications/service/task_manager_service.py

@@ -0,0 +1,109 @@
+from typing import Optional
+
+
+def _build_where(id_eq=None, date_string=None, trace_id=None, task_status=None):
+    conds, params = [], []
+
+    if id_eq is not None:
+        conds.append("id = %s")
+        params.append(id_eq)
+
+    if date_string:  # 字符串非空
+        conds.append("date_string = %s")
+        params.append(date_string)
+
+    if trace_id:
+        conds.append("trace_id LIKE %s")
+        # 如果调用方已经传了 %,就原样用;否则自动做包含匹配
+        params.append(trace_id if "%" in trace_id else f"%{trace_id}%")
+
+    if task_status is not None:
+        conds.append("task_status = %s")
+        params.append(task_status)
+
+    where_clause = " AND ".join(conds) if conds else "1=1"
+    return where_clause, params
+
+
+class TaskConst:
+    INIT_STATUS = 0
+    PROCESSING_STATUS = 1
+    FINISHED_STATUS = 2
+    FAILED_STATUS = 99
+    STATUS_TEXT = {0: "初始化", 1: "处理中", 2: "完成", 99: "失败"}
+
+    DEFAULT_PAGE = 1
+    DEFAULT_SIZE = 50
+
+
+class TaskManagerService(TaskConst):
+    def __init__(self, pool, data):
+        self.pool = pool
+        self.data = data
+
+    async def list_tasks(self):
+        page = self.data.get("page", self.DEFAULT_PAGE)
+        page_size = self.data.get("size", self.DEFAULT_SIZE)
+        sort_by = self.data.get("sort_by", "id")
+        sort_dir = self.data.get("sort_dir", "desc").lower()
+
+        # 过滤条件
+        id_eq: Optional[int] = self.data.get("id") and int(self.data.get("id"))
+        date_string: Optional[str] = self.data.get("date_string")
+        trace_id: Optional[str] = self.data.get("trace_id")
+        task_status: Optional[int] = self.data.get("task_status") and int(
+            self.data.get("task_status")
+        )
+
+        # 1) WHERE 子句
+        where_clause, params = _build_where(id_eq, date_string, trace_id, task_status)
+        sort_whitelist = {
+            "id",
+            "date_string",
+            "task_status",
+            "start_timestamp",
+            "finish_timestamp",
+        }
+        sort_by = sort_by if sort_by in sort_whitelist else "id"
+        sort_dir = "ASC" if str(sort_dir).lower() == "asc" else "DESC"
+
+        # 3) 分页(边界保护)
+        page = max(1, int(page))
+        page_size = max(1, min(int(page_size), 200))  # 适当限流
+        offset = (page - 1) * page_size
+
+        # 4) 统计总数(注意:WHERE 片段直接插入,值用参数化)
+        sql_count = f"""
+                SELECT COUNT(1) AS cnt
+                FROM long_articles_task_manager
+                WHERE {where_clause}
+            """
+        count_rows = await self.pool.async_fetch(query=sql_count, params=tuple(params))
+        total = count_rows[0]["cnt"] if count_rows else 0
+
+        # 5) 查询数据
+        sql_list = f"""
+                SELECT id, date_string, task_name, task_status, start_timestamp, finish_timestamp, trace_id
+                FROM long_articles_task_manager
+                WHERE {where_clause}
+                ORDER BY {sort_by} {sort_dir}
+                LIMIT %s OFFSET %s
+            """
+        list_params = (*params, page_size, offset)
+        rows = await self.pool.async_fetch(query=sql_list, params=list_params)
+
+        return {
+            "total": total,
+            "page": page,
+            "page_size": page_size,
+            "items": rows,
+        }
+
+    async def get_task(self, task_id: int):
+        pass
+
+    async def retry_task(self, task_id: int):
+        pass
+
+    async def cancel_task(self, task_id: int):
+        pass

+ 1 - 1
applications/tasks/crawler_tasks/__init__.py

@@ -7,5 +7,5 @@ __all__ = [
     "CrawlerToutiao",
     "WeixinAccountManager",
     "CrawlerGzhAccountArticles",
-    "CrawlerGzhSearchArticles"
+    "CrawlerGzhSearchArticles",
 ]

+ 5 - 3
applications/tasks/crawler_tasks/crawler_gzh.py

@@ -278,7 +278,9 @@ class CrawlerGzhSearchArticles(CrawlerGzhBaseStrategy):
     ):
         for article in article_list:
             url = article["url"]
-            detail_response = await get_article_detail(url, is_count=True, is_cache=False)
+            detail_response = await get_article_detail(
+                url, is_count=True, is_cache=False
+            )
             if not detail_response:
                 continue
 
@@ -344,6 +346,6 @@ class CrawlerGzhSearchArticles(CrawlerGzhBaseStrategy):
             title="公众号搜索任务执行完成",
             detail={
                 "strategy": strategy,
-                "execute_detail": await self.get_task_execute_result()
-            }
+                "execute_detail": await self.get_task_execute_result(),
+            },
         )

+ 3 - 4
applications/tasks/llm_tasks/candidate_account_process.py

@@ -69,6 +69,7 @@ class CandidateAccountQualityScoreRecognizer(CandidateAccountProcessConst):
     description: 对候选账号池内的账号进行质量分析
                 对于满足质量的账号,添加到抓取账号表里面
     """
+
     def __init__(self, pool, log_client, trace_id):
         self.pool = pool
         self.log_client = log_client
@@ -83,7 +84,7 @@ class CandidateAccountQualityScoreRecognizer(CandidateAccountProcessConst):
             from crawler_candidate_account_pool
             where avg_score is null and status = %s and title_list is not null;
         """
-        response = await self.pool.async_fetch(query=query, params=(self.INIT_STATUS, ))
+        response = await self.pool.async_fetch(query=query, params=(self.INIT_STATUS,))
         await self.log_client.log(
             contents={
                 "trace_id": self.trace_id,
@@ -231,7 +232,5 @@ class CandidateAccountQualityScoreRecognizer(CandidateAccountProcessConst):
             "new_mining_account": execute_response[0]["new_mining_account"],
         }
         await feishu_robot.bot(
-            title="执行账号质量分析任务",
-            detail=detail,
-            mention=False
+            title="执行账号质量分析任务", detail=detail, mention=False
         )

+ 3 - 1
applications/tasks/llm_tasks/process_title.py

@@ -485,7 +485,9 @@ class TitleRewrite(TitleProcess):
             from publish_single_video_source
             where title_rewrite_status = %s;
         """
-        article_list = await self.pool.async_fetch(query=query, params=(self.TITLE_REWRITE_LOCK_STATUS, ))
+        article_list = await self.pool.async_fetch(
+            query=query, params=(self.TITLE_REWRITE_LOCK_STATUS,)
+        )
         if article_list:
             blocked_id_list = [
                 i["id"]

+ 6 - 2
applications/tasks/task_handler.py

@@ -144,10 +144,14 @@ class TaskHandler(TaskMapper):
         strategy = self.data.get("strategy")
         match crawl_mode:
             case "account":
-                task = CrawlerGzhAccountArticles(self.db_client, self.log_client, self.trace_id)
+                task = CrawlerGzhAccountArticles(
+                    self.db_client, self.log_client, self.trace_id
+                )
                 await task.deal(account_method, strategy)
             case "search":
-                task = CrawlerGzhSearchArticles(self.db_client, self.log_client, self.trace_id)
+                task = CrawlerGzhSearchArticles(
+                    self.db_client, self.log_client, self.trace_id
+                )
                 await task.deal(strategy)
             case _:
                 raise ValueError(f"Unsupported crawl mode {crawl_mode}")

+ 1 - 1
applications/tasks/task_scheduler.py

@@ -184,7 +184,7 @@ class TaskScheduler(TaskHandler):
             # 抓取账号管理
             "crawler_account_manager": self._crawler_account_manager_handler,
             # 微信公众号文章抓取
-            "crawler_gzh_articles": self._crawler_gzh_article_handler
+            "crawler_gzh_articles": self._crawler_gzh_article_handler,
         }
 
         if task_name not in handlers:

+ 7 - 4
applications/utils/async_mysql_utils.py

@@ -10,13 +10,16 @@ async def get_top_article_title_list(pool) -> List[Dict]:
     return await pool.async_fetch(query=query, params=("TOP100",))
 
 
-async def get_hot_titles(pool, date_string, position, read_times_threshold) -> List[str]:
+async def get_hot_titles(
+    pool, date_string, position, read_times_threshold
+) -> List[str]:
     """get titles of hot articles"""
     query = """
         select distinct title
         from datastat_sort_strategy
         where position < %s and read_rate >= %s and date_str >= %s;
     """
-    response = await pool.async_fetch(query=query, params=(position, read_times_threshold, date_string))
-    return [i['title'] for i in response]
-
+    response = await pool.async_fetch(
+        query=query, params=(position, read_times_threshold, date_string)
+    )
+    return [i["title"] for i in response]

+ 9 - 0
routes/blueprint.py

@@ -3,6 +3,8 @@ from applications.ab_test import GetCoverService
 from applications.utils import generate_task_trace_id
 
 from applications.tasks import TaskScheduler
+from applications.service import TaskManagerService
+
 
 server_blueprint = Blueprint("api", __name__, url_prefix="/api")
 
@@ -28,4 +30,11 @@ def server_routes(pools, log_service):
         # data = await request.get_json()
         return jsonify({"message": "hello world"})
 
+    @server_blueprint.route("/tasks", methods=["POST"])
+    async def task_list():
+        data = await request.get_json()
+        TMS = TaskManagerService(pool=pools, data=data)
+        res = await TMS.list_tasks()
+        return jsonify(res)
+
     return server_blueprint

+ 2 - 11
task_app.py

@@ -1,8 +1,7 @@
-import asyncio
 import logging
 
+from quart_cors import cors
 from quart import Quart
-from aiomonitor import start_monitor
 from applications.config import aliyun_log_config
 from applications.database import mysql_manager
 from applications.service import LogService
@@ -11,20 +10,15 @@ from routes import server_routes
 log_service = LogService(**aliyun_log_config)
 
 app = Quart(__name__)
+app = cors(app, allow_origin="*")
 routes = server_routes(mysql_manager, log_service)
 app.register_blueprint(routes)
 
 logging.basicConfig(level=logging.INFO)
 
-_monitor = None
-
 
 @app.before_serving
 async def startup():
-    global _monitor
-    loop = asyncio.get_event_loop()
-    _monitor = start_monitor(loop=loop, host="127.0.0.1", port=50101)
-    logging.info(f"Monitor started at {_monitor}")
     logging.info("Starting application...")
     await mysql_manager.init_pools()
     logging.info("Mysql pools init successfully")
@@ -39,6 +33,3 @@ async def shutdown():
     logging.info("Mysql pools close successfully")
     await log_service.stop()
     logging.info("aliyun log service stop successfully")
-    if _monitor:
-        _monitor.close()
-        logging.info("Monitor stopped successfully")

+ 48 - 0
ui/src/api/task.ts

@@ -0,0 +1,48 @@
+// src/api/task.ts
+import axios from "axios";
+
+const api = axios.create({
+  baseURL: import.meta.env.VITE_API_BASE || "/", // 例如 http://localhost:6060
+  timeout: 15000,
+});
+
+export interface TaskItem {
+  id: number;
+  date_string: string | null;
+  task_name: string | null;
+  task_status: number;
+  start_timestamp: number | null;
+  finish_timestamp: number | null;
+  trace_id: string | null;
+  data: string | null;
+  status_text?: string;
+  data_json?: any;
+}
+
+export interface TaskListResp {
+  items: TaskItem[];
+  total: number;
+  page: number;
+  page_size: number;
+}
+
+export async function fetchTasks(params: Record<string, any>) {
+  const { data } = await api.post<TaskListResp>("/api/tasks", params);
+  return data;
+}
+
+
+export async function fetchTaskDetail(id: number) {
+  const { data } = await api.get<TaskItem>(`/api/tasks/${id}`);
+  return data;
+}
+
+export async function retryTask(id: number) {
+  const { data } = await api.post(`/api/tasks/${id}/retry`);
+  return data;
+}
+
+export async function cancelTask(id: number) {
+  const { data } = await api.post(`/api/tasks/${id}/cancel`);
+  return data;
+}

+ 12 - 0
ui/src/router/index.ts

@@ -0,0 +1,12 @@
+import { createRouter, createWebHistory } from 'vue-router'
+import TaskManager from '../views/TaskManager.vue'
+
+const router = createRouter({
+  history: createWebHistory(import.meta.env.BASE_URL),
+  routes: [
+      { path: '/', redirect: '/tasks' },
+      { path: '/tasks', name: 'TaskManager', component: TaskManager },
+  ],
+})
+
+export default router

+ 262 - 0
ui/src/views/TaskManager.vue

@@ -0,0 +1,262 @@
+<template>
+  <div class="task-page">
+    <el-card class="task-card">
+      <!-- 工具栏 -->
+      <div class="toolbar" ref="toolbarRef">
+        <!-- 你的筛选表单(保持不变) -->
+        <el-form :inline="true" :model="filters" label-width="90px" @keyup.enter.native="onSearch">
+          <el-form-item label="ID">
+            <el-input v-model.number="filters.id" placeholder="精确 ID" clearable />
+          </el-form-item>
+          <el-form-item label="日期">
+            <el-date-picker v-model="filters.date_string" type="date" value-format="YYYY-MM-DD" placeholder="YYYY-MM-DD" clearable />
+          </el-form-item>
+          <el-form-item label="Trace ID">
+            <el-input v-model="filters.trace_id" placeholder="模糊匹配" clearable />
+          </el-form-item>
+          <el-form-item label="状态">
+            <el-select v-model="filters.task_status" placeholder="全部" clearable style="width: 160px">
+              <el-option v-for="s in statusOptions" :key="s.value" :label="s.label" :value="s.value" />
+            </el-select>
+          </el-form-item>
+          <el-form-item>
+            <el-button type="primary" @click="onSearch">查询</el-button>
+            <el-button @click="onReset">重置</el-button>
+          </el-form-item>
+          <el-form-item>
+            <el-switch v-model="autoRefresh" @change="toggleAutoRefresh" />
+            <span class="ml-8">自动刷新(5s)</span>
+          </el-form-item>
+        </el-form>
+      </div>
+
+      <!-- 表格 -->
+      <div class="table-wrapper">
+        <el-table
+          :data="rows"
+          border
+          stripe
+          :height="tableHeight"
+          @sort-change="onSortChange"
+          class="custom-table"
+          v-loading="loading"
+          element-loading-text="加载中…"
+        >
+          <el-table-column prop="id" label="ID" sortable="custom" width="100" />
+          <el-table-column prop="date_string" label="日期" sortable="custom" width="140" />
+          <el-table-column prop="task_name" label="任务名称" min-width="200" :show-overflow-tooltip="true" />
+          <el-table-column prop="task_status" label="状态" sortable="custom" width="140">
+            <template #default="{ row }">
+              <el-tag :type="statusType(row.task_status)">{{ row.status_text || row.task_status }}</el-tag>
+            </template>
+          </el-table-column>
+          <el-table-column prop="start_timestamp" label="开始时间" sortable="custom" width="180">
+            <template #default="{ row }">{{ formatTs(row.start_timestamp) }}</template>
+          </el-table-column>
+          <el-table-column prop="finish_timestamp" label="结束时间" sortable="custom" width="180">
+            <template #default="{ row }">{{ formatTs(row.finish_timestamp) }}</template>
+          </el-table-column>
+          <el-table-column prop="trace_id" label="Trace ID" min-width="220" :show-overflow-tooltip="true" />
+          <el-table-column label="操作" fixed="right" width="240">
+            <template #default="{ row }">
+              <el-button size="small" @click="openDetail(row)">详情</el-button>
+              <el-button size="small" type="warning" @click="onRetry(row)" :disabled="row.task_status === 1">重试</el-button>
+              <el-button size="small" type="danger" @click="onCancel(row)" :disabled="row.task_status === 2">取消</el-button>
+            </template>
+          </el-table-column>
+        </el-table>
+      </div>
+
+      <!-- 分页 -->
+      <div class="pager" ref="pagerRef">
+        <el-pagination
+          background
+          layout="total, sizes, prev, pager, next, jumper"
+          :page-sizes="[10, 20, 50, 100]"
+          :page-size="query.page_size"
+          :total="total"
+          :current-page="query.page"
+          @current-change="(p:number)=>{query.page=p; load()}"
+          @size-change="(s:number)=>{query.page_size=s; query.page=1; load()}"
+        />
+      </div>
+    </el-card>
+
+    <!-- 详情 -->
+    <el-drawer v-model="detailOpen" size="52%" title="任务详情">
+      <div class="detail">
+        <el-descriptions :column="2" border>
+          <el-descriptions-item label="ID">{{ detail?.id }}</el-descriptions-item>
+          <el-descriptions-item label="日期">{{ detail?.date_string }}</el-descriptions-item>
+          <el-descriptions-item label="状态">{{ detail?.task_status }}({{ detail?.status_text }})</el-descriptions-item>
+          <el-descriptions-item label="Trace ID">{{ detail?.trace_id }}</el-descriptions-item>
+          <el-descriptions-item label="开始">{{ formatTs(detail?.start_timestamp) }}</el-descriptions-item>
+          <el-descriptions-item label="结束">{{ formatTs(detail?.finish_timestamp) }}</el-descriptions-item>
+          <el-descriptions-item label="任务名" :span="2">{{ detail?.task_name }}</el-descriptions-item>
+        </el-descriptions>
+
+        <h4 class="block-title">请求参数(data)</h4>
+        <pre class="code-block">{{ pretty(detail?.data_json || detail?.data) }}</pre>
+      </div>
+    </el-drawer>
+  </div>
+</template>
+
+<script setup lang="ts">
+import { onMounted, onBeforeUnmount, reactive, ref, nextTick } from 'vue'
+import { ElMessage, ElMessageBox } from 'element-plus'
+import { fetchTasks, fetchTaskDetail, retryTask, cancelTask, type TaskItem } from '../api/task'
+
+const statusOptions = [
+  { label: '初始化(0)', value: 0 },
+  { label: '处理中(1)', value: 1 },
+  { label: '完成(2)', value: 2 },
+  { label: '失败(99)', value: 99 },
+]
+
+const filters = reactive<{ id: number | null; date_string: string | null; trace_id: string; task_status: number | null }>({
+  id: null,
+  date_string: null,
+  trace_id: '',
+  task_status: null,
+})
+
+const query = reactive({ page: 1, page_size: 20, sort_by: 'id', sort_dir: 'desc' as 'asc' | 'desc' })
+const rows = ref<TaskItem[]>([])
+const total = ref(0)
+const loading = ref(false)
+
+/* 自动刷新 */
+let timer: ReturnType<typeof setInterval> | null = null
+const autoRefresh = ref(false)
+const toggleAutoRefresh = () => {
+  if (autoRefresh.value) {
+    timer = setInterval(load, 5000)
+  } else if (timer) {
+    clearInterval(timer)
+    timer = null
+  }
+}
+
+/* 精准表格高度计算 */
+const toolbarRef = ref<HTMLElement | null>(null)
+const pagerRef = ref<HTMLElement | null>(null)
+const tableHeight = ref(400)
+const calcTableHeight = () => {
+  const winH = window.innerHeight
+  const toolbarH = toolbarRef.value?.offsetHeight ?? 0
+  const pagerH = pagerRef.value?.offsetHeight ?? 0
+  const outerPadding = 16 * 2         // .task-page 上下 padding
+  const cardPadding = 20 * 2          // el-card 内部上下 padding(大约值)
+  const gap = 12 + 12                 // toolbar 下方间距 + pager 上方 padding
+  tableHeight.value = Math.max(
+    260,
+    winH - (outerPadding + cardPadding + toolbarH + pagerH + gap)
+  )
+}
+
+/* 加载列表(POST) */
+const load = async () => {
+  loading.value = true
+  try {
+    const params: any = { ...query }
+    if (filters.id) params.id = filters.id
+    if (filters.date_string) params.date_string = filters.date_string
+    if (filters.trace_id) params.trace_id = filters.trace_id
+    if (filters.task_status !== null && filters.task_status !== undefined) params.task_status = filters.task_status
+
+    const resp = await fetchTasks(params)
+    rows.value = resp.items
+    total.value = resp.total
+    await nextTick()
+    calcTableHeight()
+  } finally {
+    loading.value = false
+  }
+}
+
+/* 交互 */
+const onSearch = () => { query.page = 1; load() }
+const onReset = () => {
+  Object.assign(filters, { id: null, date_string: null, trace_id: '', task_status: null })
+  query.page = 1
+  load()
+}
+const onSortChange = (e: any) => {
+  query.sort_by = e.prop || 'id'
+  query.sort_dir = e.order === 'ascending' ? 'asc' : 'desc'
+  load()
+}
+
+/* 工具 */
+const statusType = (s: number) => (s === 0 ? 'info' : s === 1 ? 'warning' : s === 2 ? 'success' : s === 99 ? 'danger' : '')
+const formatTs = (ts?: number | null) => (!ts ? '-' : new Date(ts * 1000).toLocaleString())
+const pretty = (v: any) => { try { return typeof v === 'string' ? v : JSON.stringify(v, null, 2) } catch { return String(v) } }
+
+/* 详情 */
+const detailOpen = ref(false)
+const detail = ref<TaskItem | null>(null)
+const openDetail = async (row: TaskItem) => {
+  const d = await fetchTaskDetail(row.id)
+  detail.value = { ...row, ...d }
+  detailOpen.value = true
+}
+
+/* 行操作 */
+const onRetry = async (row: TaskItem) => {
+  await ElMessageBox.confirm(`确认将任务 #${row.id} 置为初始化并重试?`, '重试确认', { type: 'warning' })
+  await retryTask(row.id)
+  ElMessage.success('已触发重试')
+  load()
+}
+const onCancel = async (row: TaskItem) => {
+  await ElMessageBox.confirm(`确认取消任务 #${row.id}(置为失败)?`, '取消确认', { type: 'warning' })
+  await cancelTask(row.id)
+  ElMessage.success('已取消')
+  load()
+}
+
+/* 生命周期 */
+onMounted(() => {
+  load()
+  window.addEventListener('resize', calcTableHeight)
+})
+onBeforeUnmount(() => {
+  window.removeEventListener('resize', calcTableHeight)
+  if (timer) clearInterval(timer)
+})
+</script>
+
+<style scoped>
+.task-page {
+  padding: 16px;
+  background: #f5f6fa;
+  height: 100vh;          /* 让页面可用高度=视口高度 */
+  box-sizing: border-box;
+}
+.task-card {
+  display: flex;
+  flex-direction: column;
+  height: 100%;
+  box-shadow: 0 6px 18px rgba(0,0,0,0.06);
+  border-radius: 10px;
+}
+.toolbar { margin-bottom: 12px; }
+.table-wrapper { flex: 1; overflow: hidden; }
+.custom-table { border-radius: 8px; overflow: hidden; }
+.pager {
+  display: flex;
+  justify-content: flex-end;
+  padding: 12px 0 0 0;   /* 顶部 12px,底部 0,避免留白 */
+  background: #fff;
+  border-top: 1px solid #f0f2f5;
+  margin: 0;             /* 清除可能的外边距 */
+}
+.detail { padding: 8px 0; }
+.block-title { margin: 14px 0 8px; font-weight: 600; }
+.code-block {
+  background: #0b1021; color: #d1e7ff; padding: 12px;
+  border-radius: 8px; overflow: auto; line-height: 1.5;
+}
+.ml-8 { margin-left: 8px; }
+</style>