我在工作的時候,在測試環境下使用的數據庫跟生產環境的數據庫不一致,當我們的測試環境下的數據庫完成測試準備更新到生產環境上的數據庫時候,需要準備更新腳本,真是一不小心沒記下來就會忘了改了哪里,哪里添加了什么,這個真是非常讓人頭疼。因此我就試著用Python來實現自動的生成更新腳本,以免我這爛記性,記不住事。
主要操作如下:
1.在原先 basedao.py 中添加如下方法,這樣舊能很方便的獲取數據庫的數據,為測試數據庫和生產數據庫做對比打下了基礎。
def select_database_struts(self): ''' 查找當前連接配置中的數據庫結構以字典集合 ''' sql = '''SELECT COLUMN_NAME, IS_NULLABLE, COLUMN_TYPE, COLUMN_KEY, COLUMN_COMMENT FROM information_schema.`COLUMNS` WHERE TABLE_SCHEMA="%s" AND TABLE_NAME="{0}" '''%(self.__database) struts = {} for k in self.__primaryKey_dict.keys(): self.__cursor.execute(sql.format(k)) results = self.__cursor.fetchall() struts[k] = {} for result in results: struts[k][result[0]] = {} struts[k][result[0]]["COLUMN_NAME"] = result[0] struts[k][result[0]]["IS_NULLABLE"] = result[1] struts[k][result[0]]["COLUMN_TYPE"] = result[2] struts[k][result[0]]["COLUMN_KEY"] = result[3] struts[k][result[0]]["COLUMN_COMMENT"] = result[4] return self.__config, struts
2.編寫對比的Python腳本
''' 數據庫遷移腳本, 目前支持一下幾種功能: 1.生成舊數據庫中沒有的數據庫表執行 SQL 腳本(支持是否帶表數據),生成的 SQL 腳本在 temp 目錄下(表名.sql)。 2.生成添加列 SQL 腳本,生成的 SQL 腳本統一放在 temp 目錄下的 depoyed.sql 中。 3.生成修改列屬性 SQL 腳本,生成的 SQL 腳本統一放在 temp 目錄下的 depoyed.sql 中。 4.生成刪除列 SQL 腳本,生成的 SQL 腳本統一放在 temp 目錄下的 depoyed.sql 中。 ''' import json, os, sys from basedao import BaseDao temp_path = sys.path[0] + "/temp" if not os.path.exists(temp_path): os.mkdir(temp_path) def main(old, new, has_data=False): ''' @old 舊數據庫(目標數據庫) @new 最新的數據庫(源數據庫) @has_data 是否生成結構+數據的sql腳本 ''' clear_temp() # 先清理 temp 目錄 old_config, old_struts = old new_config, new_struts = new for new_table, new_fields in new_struts.items(): if old_struts.get(new_table) is None: gc_sql(new_config["user"], new_config["password"], new_config["database"], new_table, has_data) else: cmp_table(old_struts[new_table], new_struts[new_table], new_table) def cmp_table(old, new, table): ''' 對比表結構生成 sql ''' old_fields = old new_fields = new sql_add_column = "ALTER TABLE `{TABLE}` ADD COLUMN `{COLUMN_NAME}` {COLUMN_TYPE} COMMENT '{COLUMN_COMMENT}';\n" sql_change_column = "ALTER TABLE `{TABLE}` CHANGE `{COLUMN_NAME}` `{COLUMN_NAME}` {COLUMN_TYPE} COMMENT '{COLUMN_COMMENT}';\n" sql_del_column = "ALTER TABLE `{TABLE}` DROP {COLUMN_NAME};" if old_fields != new_fields: f = open(sys.path[0] + "/temp/deploy.sql", "a", encoding="utf8") content = "" for new_field, new_field_dict in new_fields.items(): old_filed_dict = old_fields.get(new_field) if old_filed_dict is None: # 生成添加列 sql content += sql_add_column.format(TABLE=table, **new_field_dict) else: # 生成修改列 sql if old_filed_dict != new_field_dict: content += sql_change_column.format(TABLE=table, **new_field_dict) pass # 生成刪除列 sql for old_field, old_field_dict in old_fields.items(): if new_fields.get(old_field) is None: content += sql_del_column.format(TABLE=table, COLUMN_NAME=old_field) f.write(content) f.close() def gc_sql(user, pwd, db, table, has_data): ''' 生成 sql 文件 ''' if has_data: sys_order = "mysqldump -u%s -p%s %s %s > %s/%s.sql"%(user, pwd, db, table, temp_path, table) else: sys_order = "mysqldump -u%s -p%s -d %s %s > %s/%s.sql"%(user, pwd, db, table, temp_path, table) os.system(sys_order) def clear_temp(): ''' 每次執行的時候調用這個,先清理下temp目錄下面的舊文件 ''' if os.path.exists(temp_path): files = os.listdir(temp_path) for file in files: f = os.path.join(temp_path, file) if os.path.isfile(f): os.remove(f) print("臨時文件目錄清理完成") if __name__ == "__main__": test1_config = { "user" : "root", "password" : "root", "database" : "test1", } test2_config = { "user" : "root", "password" : "root", "database" : "test2", } test1_dao = BaseDao(**test1_config) test1_struts = test1_dao.select_database_struts() test2_dao = BaseDao(**test2_config) test2_struts = test2_dao.select_database_struts() main(test2_struts, test1_struts)
目前只支持了4種SQL腳本的生成。
以上這篇Python 實現數據庫更新腳本的生成方法就是小編分享給大家的全部內容了,希望能給大家一個參考,也希望大家多多支持腳本之家。
更多文章、技術交流、商務合作、聯系博主
微信掃碼或搜索:z360901061

微信掃一掃加我為好友
QQ號聯系: 360901061
您的支持是博主寫作最大的動力,如果您喜歡我的文章,感覺我的文章對您有幫助,請用微信掃描下面二維碼支持博主2元、5元、10元、20元等您想捐的金額吧,狠狠點擊下面給點支持吧,站長非常感激您!手機微信長按不能支付解決辦法:請將微信支付二維碼保存到相冊,切換到微信,然后點擊微信右上角掃一掃功能,選擇支付二維碼完成支付。
【本文對您有幫助就好】元
