|
@@ -0,0 +1,112 @@
|
|
|
+import time
|
|
|
+from odps import ODPS
|
|
|
+from transformers import BertModel, BertTokenizer
|
|
|
+import dashvector
|
|
|
+from dashvector import Doc
|
|
|
+from typing import List
|
|
|
+from pandas import DataFrame
|
|
|
+import pandas as pd
|
|
|
+
|
|
|
+# 阿里云向量数据库连接
|
|
|
+client = dashvector.Client(
|
|
|
+ api_key='sk-TbWSOiwIcp9FZkx0fyM9JRomTxmOtD796E4626C1411EEB3525A6F9FFB919B')
|
|
|
+# 索引集合
|
|
|
+collection = client.get('video_title_performance_01')
|
|
|
+assert collection
|
|
|
+
|
|
|
+
|
|
|
+# 阿里云odps连接
|
|
|
+access_id = 'LTAIWYUujJAm7CbH'
|
|
|
+access_key = 'RfSjdiWwED1sGFlsjXv0DlfTnZTG1P'
|
|
|
+endpoint = 'http://service.cn.maxcompute.aliyun.com/api'
|
|
|
+project_name = 'loghubods'
|
|
|
+
|
|
|
+odps = ODPS(
|
|
|
+ access_id=access_id,
|
|
|
+ secret_access_key=access_key,
|
|
|
+ project=project_name,
|
|
|
+ endpoint=endpoint
|
|
|
+)
|
|
|
+
|
|
|
+# 加载预训练的BERT模型和对应的tokenizer
|
|
|
+model_name = 'bert-base-chinese'
|
|
|
+model = BertModel.from_pretrained(model_name)
|
|
|
+tokenizer = BertTokenizer.from_pretrained(model_name)
|
|
|
+
|
|
|
+
|
|
|
+def insert_vector(docs: List[Doc]):
|
|
|
+ if len(docs) == 0:
|
|
|
+ return
|
|
|
+ # 通过dashvector.Doc对象,批量插入
|
|
|
+ resp = collection.insert(docs=docs)
|
|
|
+ print(resp)
|
|
|
+
|
|
|
+
|
|
|
+def text_to_vector(text) -> List[float]:
|
|
|
+ # 使用tokenizer将文本转化为模型需要的格式,这里我们只取一个文本所以使用encode而非batch_encode
|
|
|
+ inputs = tokenizer(text, return_tensors='pt')
|
|
|
+
|
|
|
+ # 用BERT模型处理输入数据
|
|
|
+ outputs = model(**inputs)
|
|
|
+
|
|
|
+ # 提取嵌入向量
|
|
|
+ embeddings = outputs.last_hidden_state # 最后一层的隐藏状态
|
|
|
+
|
|
|
+ # 将嵌入向量转为NumPy数组
|
|
|
+ embeddings = embeddings.detach().numpy().tolist()[0][0]
|
|
|
+
|
|
|
+ return embeddings
|
|
|
+
|
|
|
+
|
|
|
+# 查询视频标题的表现(从阿里云odps中查询)
|
|
|
+def query_video_title_perfermance(start_idx, limit):
|
|
|
+ sql = f"SELECT * FROM video_perfermance_info_3 WHERE title is not NULL AND title != '' ORDER BY videoid LIMIT {start_idx}, {limit};"
|
|
|
+ result = []
|
|
|
+ with odps.execute_sql(sql).open_reader() as reader:
|
|
|
+ for record in reader:
|
|
|
+ # 处理查询结果
|
|
|
+ result.append(record)
|
|
|
+ return result
|
|
|
+
|
|
|
+# 将标题表现拼接为向量数据库的向量对象
|
|
|
+
|
|
|
+
|
|
|
+def video_title_perfermance_to_vector(startIdx, limit) -> List[Doc]:
|
|
|
+ records = query_video_title_perfermance(startIdx, limit)
|
|
|
+ docs = []
|
|
|
+ for record in records:
|
|
|
+ # 获取字段值
|
|
|
+ videoid = str(record.videoid)
|
|
|
+ title = record.title
|
|
|
+ if title is None:
|
|
|
+ continue
|
|
|
+ rntCount = record.回流次数
|
|
|
+ rntHeadCount = record.回流人数
|
|
|
+ shareCount = record.分享次数
|
|
|
+ shareHeadCount = record.分享人数
|
|
|
+ exposureCount = record.曝光次数
|
|
|
+ exposureHeadCount = record.曝光人数
|
|
|
+ playCount = record.播放次数
|
|
|
+ playHeadCount = record.播放人数
|
|
|
+
|
|
|
+ # 将文本转化为向量
|
|
|
+ vector = text_to_vector(title)
|
|
|
+ # 将向量和标题表现拼接为Doc对象
|
|
|
+ doc = Doc(id=videoid, vector=vector, fields={
|
|
|
+ 'title': title, 'rntCount': rntCount, 'rntHeadCount': rntHeadCount,
|
|
|
+ 'shareCount': shareCount, 'shareHeadCount': shareHeadCount,
|
|
|
+ 'exposureCount': exposureCount, 'exposureHeadCount': exposureHeadCount, 'playCount': playCount, 'playHeadCount': playHeadCount
|
|
|
+ })
|
|
|
+ docs.append(doc)
|
|
|
+ return docs
|
|
|
+
|
|
|
+
|
|
|
+def batchInsert():
|
|
|
+ for i in range(100000, 185000, 500):
|
|
|
+ print(i)
|
|
|
+ # 计算耗时
|
|
|
+ start = time.time()
|
|
|
+ docs = video_title_perfermance_to_vector(i, 500)
|
|
|
+ insert_vector(docs)
|
|
|
+ end = time.time()
|
|
|
+ print(f'{i} done in {end - start}s')
|