欧美三区_成人在线免费观看视频_欧美极品少妇xxxxⅹ免费视频_a级毛片免费播放_鲁一鲁中文字幕久久_亚洲一级特黄

用python簡(jiǎn)單實(shí)現(xiàn)mysql數(shù)據(jù)同步到ElasticSearch的教程

系統(tǒng) 1611 0

之前博客有用logstash-input-jdbc同步mysql數(shù)據(jù)到ElasticSearch,但是由于同步時(shí)間最少是一分鐘一次,無(wú)法滿足線上業(yè)務(wù),所以只能自己實(shí)現(xiàn)一個(gè),但是時(shí)間比較緊,所以簡(jiǎn)單實(shí)現(xiàn)一個(gè)

思路:

網(wǎng)上有很多思路用什么mysql的binlog功能什么的,但是我對(duì)mysql了解實(shí)在有限,所以用一個(gè)很呆板的辦法查詢mysql得到數(shù)據(jù),再插入es,因?yàn)閿?shù)據(jù)量不大,而且10秒間隔同步一次,效率還可以,為了避免服務(wù)器之間的時(shí)間差和mysql更新和查詢產(chǎn)生的時(shí)間差,所以在查詢更新時(shí)間條件時(shí)是和上一次同步開(kāi)始時(shí)間比較,這樣不管數(shù)據(jù)多少,更新耗時(shí)多少都不會(huì)少數(shù)據(jù),因?yàn)樵瓌t是同步不漏掉任何數(shù)據(jù),也可以程序多開(kāi)將時(shí)間差和間隔時(shí)間差異化,因?yàn)橛胢ysql中一個(gè)id當(dāng)作es中的id,也避免了重復(fù)數(shù)據(jù)

使用:

只需要按照escongif.py寫(xiě)配置文件,然后寫(xiě)sql文件,最后直接執(zhí)行mstes.py就可以了,我這個(gè)也是參考logstash-input-jdbc的配置形式

MsToEs

|----esconfig.py(配置文件)

|----mstes.py(同步程序)

|----sql_manage.py(數(shù)據(jù)庫(kù)管理)

|----aa.sql(需要用到sql文件)

|----bb.sql(需要用到sql文件)

sql_manage.py:

            
# -*-coding:utf-8 -*-
__author__ = "ZJL"
from sqlalchemy.pool import QueuePool
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker, scoped_session
import traceback
import esconfig
# 用于不需要回滾和提交的操作
def find(func):
 def wrapper(self, *args, **kwargs):
  try:
   return func(self, *args, **kwargs)
  except Exception as e:
   print(traceback.format_exc())
   print(str(e))
   return traceback.format_exc()
  finally:
   self.session.close()
 return wrapper
class MysqlManager(object):
 def __init__(self):
  mysql_connection_string = esconfig.mysql.get("mysql_connection_string")
  self.engine = create_engine('mysql+pymysql://'+mysql_connection_string+'?charset=utf8', poolclass=QueuePool,
         pool_recycle=3600)
  # self.DB_Session = sessionmaker(bind=self.engine)
  # self.session = self.DB_Session()
  self.DB_Session = sessionmaker(bind=self.engine, autocommit=False, autoflush=True, expire_on_commit=False)
  self.db = scoped_session(self.DB_Session)
  self.session = self.db()
 @find
 def select_all_dict(self, sql, keys):
  a = self.session.execute(sql)
  a = a.fetchall()
  lists = []
  for i in a:
   if len(keys) == len(i):
    data_dict = {}
    for k, v in zip(keys, i):
     data_dict[k] = v
    lists.append(data_dict)
   else:
    return False
  return lists
 # 關(guān)閉
 def close(self):
  self.session.close()

          

aa.sql:

            
select 
 CONVERT(c.`id`,CHAR)    as id, 
 c.`code`   as code, 
 c.`project_name` as project_name, 
 c.`name`   as name, 
 date_format(c.`update_time`,'%Y-%m-%dT%H:%i:%s')  as update_time, 
from `cc` c 
where date_format(c.`update_time`,'%Y-%m-%dT%H:%i:%s')>='::datetime_now'; 
          

bb.sql:

            
select 
 CONVERT(c.`id`,CHAR)    as id, 
 CONVERT(c.`age`,CHAR)    as age, 
 c.`code`   as code, 
 c.`name`   as name, 
 c.`project_name` as project_name, 
 date_format(c.`update_time`,'%Y-%m-%dT%H:%i:%s') as update_time, 
from `bb` c 
where date_format(c.`update_time`,'%Y-%m-%dT%H:%i:%s')>='::datetime_now'; 
          

esconfig.py:

            
# -*- coding: utf-8 -*-
#__author__="ZJL"
# sql 文件名與es中的type名一致
mysql = {
 # mysql連接信息
 "mysql_connection_string": "root:123456@127.0.0.1:3306/xxx",
 # sql文件信息
 "statement_filespath":[
  # sql對(duì)應(yīng)的es索引和es類(lèi)型
  {
   "index":"a1",
   "sqlfile":"aa.sql",
   "type":"aa"
  },
  {
   "index":"a1",
   "sqlfile":"bb.sql",
   "type":"bb"
  },
 ],
}
# es的ip和端口
elasticsearch = {
 "hosts":"127.0.0.1:9200",
}
# 字段順序與sql文件字段順序一致,這是存進(jìn)es中的字段名,這里用es的type名作為標(biāo)識(shí)
db_field = {
  "aa":
   ("id",
   "code",
   "name",
   "project_name",
   "update_time",
   ),
 "bb":
  ("id",
   "code",
   "age",
   "project_name",
   "name",
   "update_time",
   ),
}
es_config = {
 # 間隔多少秒同步一次
 "sleep_time":10,
 # 為了解決服務(wù)器之間時(shí)間差問(wèn)題
 "time_difference":3,
 # show_json 用來(lái)展示導(dǎo)入的json格式數(shù)據(jù),
 "show_json":False,
}

          

mstes.py:

            
# -*- coding: utf-8 -*-
#__author__="ZJL"
from sql_manage import MysqlManager
from esconfig import mysql,elasticsearch,db_field,es_config
from elasticsearch import Elasticsearch
from elasticsearch import helpers
import traceback
import time
class TongBu(object):
 def __init__(self):
  try:
   # 是否展示json數(shù)據(jù)在控制臺(tái)
   self.show_json = es_config.get("show_json")
   # 間隔多少秒同步一次
   self.sleep_time = es_config.get("sleep_time")
   # 為了解決同步時(shí)數(shù)據(jù)更新產(chǎn)生的誤差
   self.time_difference = es_config.get("time_difference")
   # 當(dāng)前時(shí)間,留有后用
   self.datetime_now = ""
   # es的ip和端口
   es_host = elasticsearch.get("hosts")
   # 連接es
   self.es = Elasticsearch(es_host)
   # 連接mysql
   self.mm = MysqlManager()
  except :
   print(traceback.format_exc())
 def tongbu_es_mm(self):
  try:
   # 同步開(kāi)始時(shí)間
   start_time = time.time()
   print("start..............",time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(start_time)))
   # 這個(gè)list用于批量插入es
   actions = []
   # 獲得所有sql文件list
   statement_filespath = mysql.get("statement_filespath",[])
   if self.datetime_now:
    # 當(dāng)前時(shí)間加上時(shí)間差(間隔時(shí)間加上執(zhí)行同步用掉的時(shí)間,等于上一次同步開(kāi)始時(shí)間)再字符串格式化
    # sql中格式化時(shí)間時(shí)年月日和時(shí)分秒之間不能空格,不然導(dǎo)入es時(shí)報(bào)解析錯(cuò)誤,所以這里的時(shí)間格式化也統(tǒng)一中間加一個(gè)T
    self.datetime_now = time.strftime("%Y-%m-%dT%H:%M:%S", time.localtime(time.time()-(self.sleep_time+self.time_difference)))
   else:
    self.datetime_now = "1999-01-01T00:00:00"
   if statement_filespath:
    for filepath in statement_filespath:
     # sql文件
     sqlfile = filepath.get("sqlfile")
     # es的索引
     es_index = filepath.get("index")
     # es的type
     es_type = filepath.get("type")
     # 讀取sql文件內(nèi)容
     with open(sqlfile,"r") as opf:
      sqldatas = opf.read()
      # ::datetime_now是一個(gè)自定義的特殊字符串用于增量更新
      if "::datetime_now" in sqldatas:
       sqldatas = sqldatas.replace("::datetime_now",self.datetime_now)
      else:
       sqldatas = sqldatas
      # es和sql字段的映射
      dict_set = db_field.get(es_type)
      # 訪問(wèn)mysql,得到一個(gè)list,元素都是字典,鍵是字段名,值是數(shù)據(jù)
      db_data_list = self.mm.select_all_dict(sqldatas, dict_set)
      if db_data_list:
       # 將數(shù)據(jù)拼裝成es的格式
       for db_data in db_data_list:
        action = {
         "_index": es_index,
         "_type": es_type,
         "@timestamp": time.strftime("%Y-%m-%dT%H:%M:%S", time.localtime(time.time())),
         "_source": db_data
        }
        # 如果沒(méi)有id字段就自動(dòng)生成
        es_id = db_data.get("id", "")
        if es_id:
         action["_id"] = es_id
        # 是否顯示json再終端
        if self.show_json:
         print(action)
        # 將拼裝好的數(shù)據(jù)放進(jìn)list中
        actions.append(action)
   # list不為空就批量插入數(shù)據(jù)到es中
   if len(actions) > 0 :
    helpers.bulk(self.es, actions)
  except Exception as e:
   print(traceback.format_exc())
  else:
   end_time = time.time()
   print("end...................",time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(start_time)))
   self.time_difference = end_time-start_time
  finally:
   # 報(bào)錯(cuò)就關(guān)閉數(shù)據(jù)庫(kù)
   self.mm.close()
def main():
 tb = TongBu()
 # 間隔多少秒同步一次
 sleep_time = tb.sleep_time
 # 死循環(huán)執(zhí)行導(dǎo)入數(shù)據(jù),加上時(shí)間間隔
 while True:
  tb.tongbu_es_mm()
  time.sleep(sleep_time)
if __name__ == '__main__':
 main()

          

以上這篇用python簡(jiǎn)單實(shí)現(xiàn)mysql數(shù)據(jù)同步到ElasticSearch的教程就是小編分享給大家的全部?jī)?nèi)容了,希望能給大家一個(gè)參考,也希望大家多多支持腳本之家。


更多文章、技術(shù)交流、商務(wù)合作、聯(lián)系博主

微信掃碼或搜索:z360901061

微信掃一掃加我為好友

QQ號(hào)聯(lián)系: 360901061

您的支持是博主寫(xiě)作最大的動(dòng)力,如果您喜歡我的文章,感覺(jué)我的文章對(duì)您有幫助,請(qǐng)用微信掃描下面二維碼支持博主2元、5元、10元、20元等您想捐的金額吧,狠狠點(diǎn)擊下面給點(diǎn)支持吧,站長(zhǎng)非常感激您!手機(jī)微信長(zhǎng)按不能支付解決辦法:請(qǐng)將微信支付二維碼保存到相冊(cè),切換到微信,然后點(diǎn)擊微信右上角掃一掃功能,選擇支付二維碼完成支付。

【本文對(duì)您有幫助就好】

您的支持是博主寫(xiě)作最大的動(dòng)力,如果您喜歡我的文章,感覺(jué)我的文章對(duì)您有幫助,請(qǐng)用微信掃描上面二維碼支持博主2元、5元、10元、自定義金額等您想捐的金額吧,站長(zhǎng)會(huì)非常 感謝您的哦?。?!

發(fā)表我的評(píng)論
最新評(píng)論 總共0條評(píng)論
主站蜘蛛池模板: 99久久精品国产免看国产一区 | 青草视频在线观看免费资源 | 欧美一级二级三级 | 国产大陆精品另类xxxx | 日本在线观看中文字幕 | 国产乱色精品成人免费视频 | 欧美zzzz| 亚洲天堂中文字幕 | 成人高清视频免费观看 | av在线大全 | 91网址在线 | 精品国产一区二区三区香蕉沈先生 | 欧美国产激情二区三区 | 日本污视频在线观看 | 舒淇三级浴室洗澡在线观看 | 天天操夜夜爱 | 99精品国产高清在线观看 | 国产在线观看福利 | 久久亚洲国产精品五月天婷 | 男人用嘴添女人下身免费视频 | 久久精品视频99 | 国产精品永久免费视频 | 一区二区三区中文字幕 | 欧美级| 久草在线视频资源站 | 国产日韩久久久精品影院首页 | 亚洲欧美日韩国产精品26u | 91麻豆精品国产91久久久更新时间 | 亚洲男人天堂2021 | 久久久久国产一区二区三区四区 | 一级特黄特黄xxx视频 | 亚洲欧美v视色一区二区 | 成人亚洲欧美日韩在线 | 久久黄视频 | 霍元甲之精武天下 | 国产精品国产三级国产aⅴ无密码 | 久草国产视频 | 天天天天做夜夜夜夜做 | 日韩国产在线观看 | 欧美网站www| 日韩欧美一区二区三区久久 |