|
@@ -0,0 +1,116 @@
|
|
|
|
+import ssl
|
|
|
|
+import datetime
|
|
|
|
+import time
|
|
|
|
+import requests
|
|
|
|
+import json
|
|
|
|
+from pymysql.cursors import DictCursor
|
|
|
|
+
|
|
|
|
+from elasticsearch import Elasticsearch, ApiError
|
|
|
|
+from elasticsearch import helpers
|
|
|
|
+from applications.db import DatabaseConnector
|
|
|
|
+from config import long_articles_config
|
|
|
|
+from config.es_mappings import index_name, mappings, settings
|
|
|
|
+
|
|
|
|
+db_client = DatabaseConnector(long_articles_config)
|
|
|
|
+db_client.connect()
|
|
|
|
+
|
|
|
|
+ctx = ssl.create_default_context(cafile="es_certs.crt")
|
|
|
|
+
|
|
|
|
+es_password = 'nkvvASQuQ0XUGRq5OLvm'
|
|
|
|
+es = Elasticsearch(
|
|
|
|
+ "https://192.168.205.85:9200",
|
|
|
|
+ # 如果启用了用户名密码
|
|
|
|
+ basic_auth=("elastic", es_password),
|
|
|
|
+ ssl_context=ctx
|
|
|
|
+)
|
|
|
|
+
|
|
|
|
+
|
|
|
|
+def create_index():
|
|
|
|
+ # 2. 防守式删除旧索引(可选)
|
|
|
|
+ if es.indices.exists(index=index_name):
|
|
|
|
+ es.indices.delete(index=index_name)
|
|
|
|
+
|
|
|
|
+ # 3. 创建索引
|
|
|
|
+ try:
|
|
|
|
+ es.indices.create(
|
|
|
|
+ index=index_name,
|
|
|
|
+ settings=settings,
|
|
|
|
+ mappings=mappings
|
|
|
|
+ )
|
|
|
|
+ print(f"✅ Index <{index_name}> created.")
|
|
|
|
+ except ApiError as e:
|
|
|
|
+ # 打印 ES 返回的具体错误
|
|
|
|
+ print(f"❌ Failed: {e.meta.error['type']} – {e.meta.error['reason']}")
|
|
|
|
+
|
|
|
|
+
|
|
|
|
+def get_max_article_id():
|
|
|
|
+ response = es.search(
|
|
|
|
+ index=index_name,
|
|
|
|
+ size=1,
|
|
|
|
+ sort="article_id:desc",
|
|
|
|
+ _source=["article_id"]
|
|
|
|
+ )
|
|
|
|
+ # print(json.dumps(response.body, indent=4))
|
|
|
|
+ return response['hits']['hits'][0]['_source']['article_id']
|
|
|
|
+
|
|
|
|
+
|
|
|
|
+def get_articles(id_):
|
|
|
|
+ fetch_query = f"""
|
|
|
|
+ select article_id, platform, out_account_id, title
|
|
|
|
+ from crawler_meta_article
|
|
|
|
+ where status = 1 and article_id > %s
|
|
|
|
+ order by article_id limit 10000;
|
|
|
|
+ """
|
|
|
|
+ # 执行查询
|
|
|
|
+ results = db_client.fetch(fetch_query, cursor_type=DictCursor, params=(id_,))
|
|
|
|
+ docs = [
|
|
|
|
+ {
|
|
|
|
+ "_index": index_name,
|
|
|
|
+ "_id": item['article_id'],
|
|
|
|
+ "_source": {
|
|
|
|
+ "article_id": item['article_id'],
|
|
|
|
+ "platform": item['platform'],
|
|
|
|
+ "out_account_id": item['out_account_id'],
|
|
|
|
+ "title": item['title']
|
|
|
|
+ }
|
|
|
|
+ } for item in results
|
|
|
|
+]
|
|
|
|
+ return docs
|
|
|
|
+
|
|
|
|
+
|
|
|
|
+def search():
|
|
|
|
+ query = {
|
|
|
|
+ "query": {
|
|
|
|
+ "match": {
|
|
|
|
+ "title": "刘伯承元帅"
|
|
|
|
+ }
|
|
|
|
+ },
|
|
|
|
+ "_source": ["article_id", "title"]
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ a = time.time()
|
|
|
|
+ resp = es.search(index=index_name, body=query)
|
|
|
|
+ b = time.time()
|
|
|
|
+ for hit in resp["hits"]["hits"]:
|
|
|
|
+ print(hit["_source"])
|
|
|
|
+
|
|
|
|
+ print(b - a)
|
|
|
|
+
|
|
|
|
+def get_cluster_docs_stats():
|
|
|
|
+ """获取集群文档统计信息"""
|
|
|
|
+ stats = es.nodes.stats()
|
|
|
|
+ # print(stats)
|
|
|
|
+ # print(type(stats))
|
|
|
|
+ print(json.dumps(stats.body, indent=4, ensure_ascii=False))
|
|
|
|
+
|
|
|
|
+
|
|
|
|
+if __name__ == "__main__":
|
|
|
|
+ max_id = get_max_article_id()
|
|
|
|
+ i = 0
|
|
|
|
+ while int(max_id) < 27492350:
|
|
|
|
+ articles = get_articles(max_id)
|
|
|
|
+ res = helpers.bulk(es, articles)
|
|
|
|
+ print(es.count(index=index_name))
|
|
|
|
+ max_id = get_max_article_id()
|
|
|
|
+ i += 1
|
|
|
|
+ print(i)
|