当前位置:首页 > 日记本 > 正文内容

python 从数据库导入数据到elasticsearch title有则更新,无则添加

zhangchap1年前 (2023-03-29)日记本166

1、常规方法,速度较慢

import mysql.connector
from elasticsearch import Elasticsearch
import time
# 连接 MySQL 数据库
mysql_conn = mysql.connector.connect(
    host="localhost",
    user="root",
    password="FiroRegePUE0000idB3",
    database="car"
)

# 连接 Elasticsearch
es = Elasticsearch(['http://localhost:9200'])

# 定义 MySQL 查询语句
query = 'SELECT title, content FROM yj_ask_1'

# 执行查询
cursor = None
while cursor is None:
    try:
        cursor = mysql_conn.cursor()
        cursor.execute(query)
    except mysql.connector.errors.OperationalError as e:
        if e.errno == mysql.connector.errorcode.CR_SERVER_LOST or e.errno == mysql.connector.errorcode.CR_CONN_HOST_ERROR:
            print("Reconnecting...")
            time.sleep(1)
            mysql_conn.ping(True)
        else:
            raise

# 遍历查询结果
for title, content in cursor:

    # 查询 Elasticsearch 中是否已存在该文章
    es_query = {
        "query": {
            "term": {"title.keyword": title}
        }
    }
    es_results = es.search(index='articles', body=es_query, request_timeout=30)['hits']['hits']

    if len(es_results) > 0:
        # 更新 Elasticsearch 中的文章数据
        es_id = es_results[0]['_id']
        es_body = {
            "content": content
        }
        es.update(index='articles', id=es_id, body={"doc": es_body}, request_timeout=30)
    else:
        # 在 Elasticsearch 中追加文章数据
        es_body = {
            "title": title,
            "content": content
        }
        es.index(index='articles', body=es_body, request_timeout=30)

# 关闭 MySQL 连接和 Elasticsearch 连接
cursor.close()
mysql_conn.close()
es.transport.close()

2、异步,速度有所提高

import mysql.connector
from elasticsearch import Elasticsearch
from elasticsearch.helpers import bulk
import asyncio


async def update_article(title, content, es):
    # 查询 Elasticsearch 中是否已存在该文章
    es_query = {
        "query": {
            "term": {"title.keyword": title}
        }
    }
    es_results = es.search(index='articles', body=es_query, request_timeout=30)['hits']['hits']

    if len(es_results) > 0:
        # 更新 Elasticsearch 中的文章数据
        es_id = es_results[0]['_id']
        es_body = {
            "content": content
        }
        await es.update(index='articles', id=es_id, body={"doc": es_body}, request_timeout=30)
    else:
        # 在 Elasticsearch 中追加文章数据
        es_body = {
            "title": title,
            "content": content
        }
        await es.index(index='articles', body=es_body, request_timeout=30)


async def import_data():
    # 连接 MySQL 数据库
    mysql_conn = mysql.connector.connect(
        host="localhost",
        user="root",
        password="FiroRgePUE00idB3",
        database="car"
    )

    # 连接 Elasticsearch
    es = Elasticsearch(['http://localhost:9200'])

    # 定义 MySQL 查询语句
    query = 'SELECT title, content FROM yj_ask_1'

    # 执行查询
    cursor = mysql_conn.cursor()
    cursor.execute(query)

    # 将查询结果转换为 Elasticsearch 文档格式
    actions = []
    for title, content in cursor:
        action = {
            '_op_type': 'update',
            '_index': 'articles',
            '_id': title,
            'doc': {'content': content},
            'doc_as_upsert': True
        }
        actions.append(action)

    # 使用 Elasticsearch 的 bulk API 一次性索引或更新多个文档
    tasks = [update_article(action['_id'], action['doc']['content'], es) for action in actions]
    await asyncio.gather(*tasks)

    # 关闭 MySQL 连接和 Elasticsearch 连接
    cursor.close()
    mysql_conn.close()
    es.transport.close()


if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    loop.run_until_complete(import_data())


分享给朋友:

相关文章

更换服务器需要设置的几点

此设置只针对个人习惯,因为需要开通的网站及服务器较多,做个记录!就不公开了 此设置只针对个人习惯,因为需要开通的网站及服务器较多,做个记录!就不公开了 此设置只针对个人习惯,因为需要开...

火狐添加自定义搜索引擎

直接网址搜索自定义添加:https://mycroftproject.com/...

宝塔重启服务器后,Redis就启动不了解决方案

宝塔重启服务器后,Redis就启动不了解决方案

1.更改权限 chown -R redis.redis /www/server/redis/ 2.设置持久化...

Nginx+PHP,PHP如何优化配置?

具体修改FPM配置文件参数: 若你的php日志出现: WARNING: [pool www] seems busy (you may need to increase pm.sta...

python补全网址代码示例

from urllib.parse import urljoin absurl = urljoin(backend,url) #backend:根...

python 随机生成时间戳写入txt文件/运行sql语句

import time from random import randint with open('time.txt', ...

发表评论

访客

◎欢迎参与讨论,请在这里发表您的看法和观点。