之前博客有用logstash-input-jdbc同步mysql數(shù)據(jù)到ElasticSearch,但是由于同步時(shí)間最少是一分鐘一次,無(wú)法滿足線上業(yè)務(wù),所以只能自己實(shí)現(xiàn)一個(gè),但是時(shí)間比較緊,所以簡(jiǎn)單實(shí)現(xiàn)一個(gè)
思路:
網(wǎng)上有很多思路用什么mysql的binlog功能什么的,但是我對(duì)mysql了解實(shí)在有限,所以用一個(gè)很呆板的辦法查詢mysql得到數(shù)據(jù),再插入es,因?yàn)閿?shù)據(jù)量不大,而且10秒間隔同步一次,效率還可以,為了避免服務(wù)器之間的時(shí)間差和mysql更新和查詢產(chǎn)生的時(shí)間差,所以在查詢更新時(shí)間條件時(shí)是和上一次同步開(kāi)始時(shí)間比較,這樣不管數(shù)據(jù)多少,更新耗時(shí)多少都不會(huì)少數(shù)據(jù),因?yàn)樵瓌t是同步不漏掉任何數(shù)據(jù),也可以程序多開(kāi)將時(shí)間差和間隔時(shí)間差異化,因?yàn)橛胢ysql中一個(gè)id當(dāng)作es中的id,也避免了重復(fù)數(shù)據(jù)
使用:
只需要按照escongif.py寫(xiě)配置文件,然后寫(xiě)sql文件,最后直接執(zhí)行mstes.py就可以了,我這個(gè)也是參考logstash-input-jdbc的配置形式
MsToEs
|----esconfig.py(配置文件)
|----mstes.py(同步程序)
|----sql_manage.py(數(shù)據(jù)庫(kù)管理)
|----aa.sql(需要用到sql文件)
|----bb.sql(需要用到sql文件)
sql_manage.py:
# -*-coding:utf-8 -*- __author__ = "ZJL" from sqlalchemy.pool import QueuePool from sqlalchemy import create_engine from sqlalchemy.orm import sessionmaker, scoped_session import traceback import esconfig # 用于不需要回滾和提交的操作 def find(func): def wrapper(self, *args, **kwargs): try: return func(self, *args, **kwargs) except Exception as e: print(traceback.format_exc()) print(str(e)) return traceback.format_exc() finally: self.session.close() return wrapper class MysqlManager(object): def __init__(self): mysql_connection_string = esconfig.mysql.get("mysql_connection_string") self.engine = create_engine('mysql+pymysql://'+mysql_connection_string+'?charset=utf8', poolclass=QueuePool, pool_recycle=3600) # self.DB_Session = sessionmaker(bind=self.engine) # self.session = self.DB_Session() self.DB_Session = sessionmaker(bind=self.engine, autocommit=False, autoflush=True, expire_on_commit=False) self.db = scoped_session(self.DB_Session) self.session = self.db() @find def select_all_dict(self, sql, keys): a = self.session.execute(sql) a = a.fetchall() lists = [] for i in a: if len(keys) == len(i): data_dict = {} for k, v in zip(keys, i): data_dict[k] = v lists.append(data_dict) else: return False return lists # 關(guān)閉 def close(self): self.session.close()
aa.sql:
select CONVERT(c.`id`,CHAR) as id, c.`code` as code, c.`project_name` as project_name, c.`name` as name, date_format(c.`update_time`,'%Y-%m-%dT%H:%i:%s') as update_time, from `cc` c where date_format(c.`update_time`,'%Y-%m-%dT%H:%i:%s')>='::datetime_now';
bb.sql:
select CONVERT(c.`id`,CHAR) as id, CONVERT(c.`age`,CHAR) as age, c.`code` as code, c.`name` as name, c.`project_name` as project_name, date_format(c.`update_time`,'%Y-%m-%dT%H:%i:%s') as update_time, from `bb` c where date_format(c.`update_time`,'%Y-%m-%dT%H:%i:%s')>='::datetime_now';
esconfig.py:
# -*- coding: utf-8 -*- #__author__="ZJL" # sql 文件名與es中的type名一致 mysql = { # mysql連接信息 "mysql_connection_string": "root:123456@127.0.0.1:3306/xxx", # sql文件信息 "statement_filespath":[ # sql對(duì)應(yīng)的es索引和es類(lèi)型 { "index":"a1", "sqlfile":"aa.sql", "type":"aa" }, { "index":"a1", "sqlfile":"bb.sql", "type":"bb" }, ], } # es的ip和端口 elasticsearch = { "hosts":"127.0.0.1:9200", } # 字段順序與sql文件字段順序一致,這是存進(jìn)es中的字段名,這里用es的type名作為標(biāo)識(shí) db_field = { "aa": ("id", "code", "name", "project_name", "update_time", ), "bb": ("id", "code", "age", "project_name", "name", "update_time", ), } es_config = { # 間隔多少秒同步一次 "sleep_time":10, # 為了解決服務(wù)器之間時(shí)間差問(wèn)題 "time_difference":3, # show_json 用來(lái)展示導(dǎo)入的json格式數(shù)據(jù), "show_json":False, }
mstes.py:
# -*- coding: utf-8 -*- #__author__="ZJL" from sql_manage import MysqlManager from esconfig import mysql,elasticsearch,db_field,es_config from elasticsearch import Elasticsearch from elasticsearch import helpers import traceback import time class TongBu(object): def __init__(self): try: # 是否展示json數(shù)據(jù)在控制臺(tái) self.show_json = es_config.get("show_json") # 間隔多少秒同步一次 self.sleep_time = es_config.get("sleep_time") # 為了解決同步時(shí)數(shù)據(jù)更新產(chǎn)生的誤差 self.time_difference = es_config.get("time_difference") # 當(dāng)前時(shí)間,留有后用 self.datetime_now = "" # es的ip和端口 es_host = elasticsearch.get("hosts") # 連接es self.es = Elasticsearch(es_host) # 連接mysql self.mm = MysqlManager() except : print(traceback.format_exc()) def tongbu_es_mm(self): try: # 同步開(kāi)始時(shí)間 start_time = time.time() print("start..............",time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(start_time))) # 這個(gè)list用于批量插入es actions = [] # 獲得所有sql文件list statement_filespath = mysql.get("statement_filespath",[]) if self.datetime_now: # 當(dāng)前時(shí)間加上時(shí)間差(間隔時(shí)間加上執(zhí)行同步用掉的時(shí)間,等于上一次同步開(kāi)始時(shí)間)再字符串格式化 # sql中格式化時(shí)間時(shí)年月日和時(shí)分秒之間不能空格,不然導(dǎo)入es時(shí)報(bào)解析錯(cuò)誤,所以這里的時(shí)間格式化也統(tǒng)一中間加一個(gè)T self.datetime_now = time.strftime("%Y-%m-%dT%H:%M:%S", time.localtime(time.time()-(self.sleep_time+self.time_difference))) else: self.datetime_now = "1999-01-01T00:00:00" if statement_filespath: for filepath in statement_filespath: # sql文件 sqlfile = filepath.get("sqlfile") # es的索引 es_index = filepath.get("index") # es的type es_type = filepath.get("type") # 讀取sql文件內(nèi)容 with open(sqlfile,"r") as opf: sqldatas = opf.read() # ::datetime_now是一個(gè)自定義的特殊字符串用于增量更新 if "::datetime_now" in sqldatas: sqldatas = sqldatas.replace("::datetime_now",self.datetime_now) else: sqldatas = sqldatas # es和sql字段的映射 dict_set = db_field.get(es_type) # 訪問(wèn)mysql,得到一個(gè)list,元素都是字典,鍵是字段名,值是數(shù)據(jù) db_data_list = self.mm.select_all_dict(sqldatas, dict_set) if db_data_list: # 將數(shù)據(jù)拼裝成es的格式 for db_data in db_data_list: action = { "_index": es_index, "_type": es_type, "@timestamp": time.strftime("%Y-%m-%dT%H:%M:%S", time.localtime(time.time())), "_source": db_data } # 如果沒(méi)有id字段就自動(dòng)生成 es_id = db_data.get("id", "") if es_id: action["_id"] = es_id # 是否顯示json再終端 if self.show_json: print(action) # 將拼裝好的數(shù)據(jù)放進(jìn)list中 actions.append(action) # list不為空就批量插入數(shù)據(jù)到es中 if len(actions) > 0 : helpers.bulk(self.es, actions) except Exception as e: print(traceback.format_exc()) else: end_time = time.time() print("end...................",time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(start_time))) self.time_difference = end_time-start_time finally: # 報(bào)錯(cuò)就關(guān)閉數(shù)據(jù)庫(kù) self.mm.close() def main(): tb = TongBu() # 間隔多少秒同步一次 sleep_time = tb.sleep_time # 死循環(huán)執(zhí)行導(dǎo)入數(shù)據(jù),加上時(shí)間間隔 while True: tb.tongbu_es_mm() time.sleep(sleep_time) if __name__ == '__main__': main()
以上這篇用python簡(jiǎn)單實(shí)現(xiàn)mysql數(shù)據(jù)同步到ElasticSearch的教程就是小編分享給大家的全部?jī)?nèi)容了,希望能給大家一個(gè)參考,也希望大家多多支持腳本之家。
更多文章、技術(shù)交流、商務(wù)合作、聯(lián)系博主
微信掃碼或搜索:z360901061

微信掃一掃加我為好友
QQ號(hào)聯(lián)系: 360901061
您的支持是博主寫(xiě)作最大的動(dòng)力,如果您喜歡我的文章,感覺(jué)我的文章對(duì)您有幫助,請(qǐng)用微信掃描下面二維碼支持博主2元、5元、10元、20元等您想捐的金額吧,狠狠點(diǎn)擊下面給點(diǎn)支持吧,站長(zhǎng)非常感激您!手機(jī)微信長(zhǎng)按不能支付解決辦法:請(qǐng)將微信支付二維碼保存到相冊(cè),切換到微信,然后點(diǎn)擊微信右上角掃一掃功能,選擇支付二維碼完成支付。
【本文對(duì)您有幫助就好】元
