欧美三区_成人在线免费观看视频_欧美极品少妇xxxxⅹ免费视频_a级毛片免费播放_鲁一鲁中文字幕久久_亚洲一级特黄

+ python + pipelinedb 監控數據庫日志

系統 1881 0

1. 數據庫中創建stream以及視圖

            
              CREATE FOREIGN TABLE t_error_log(
err_date date, 
hostname varchar(128), 
err_time timestamp without time zone, 
db_user varchar(128), 
db_name varchar(128), 
client_addr varchar(128), 
log_level varchar(128), 
err_log varchar
) server pipelinedb;

CREATE VIEW vw_error with (action=materialize) AS 
SELECT 
  err_date, hostname, db_name, db_user, client_addr, err_log,
  count(*) 
FROM t_error_log
GROUP BY err_date, hostname, db_name, db_user, client_addr, err_log;


CREATE VIEW vw_error_date with (action=materialize) AS 
SELECT 
  err_date,
  count(*) 
FROM t_error_log
GROUP BY err_date;


CREATE VIEW vw_error_date_host with (action=materialize) AS 
SELECT 
  err_date, hostname,
  count(*) 
FROM t_error_log
GROUP BY err_date, hostname;


CREATE VIEW vw_error_date_host_db with (action=materialize) AS 
SELECT 
  err_date, hostname, db_name,
  count(*) 
FROM t_error_log
GROUP BY err_date, hostname, db_name;

CREATE VIEW vw_error_date_host_db_cli with (action=materialize) AS 
SELECT 
  err_date, hostname, db_name, client_addr,
  count(*) 
FROM t_error_log
GROUP BY err_date, hostname, db_name, client_addr;

            
          

?

2. 從kafka中取數,插入到數據庫中的腳本 errorlog.py

            
              #!//usr/bin/python
# coding:utf-8

from kafka import KafkaConsumer
from kafka import TopicPartition
import sys
import time
import json
import re
import psycopg2
import multiprocessing
import logging

KAFKA_LIST = ["mq01s.test.com:9092,mq02s.test.com:9092,mq03s.test.com:9092"]
CLIENT_ID = "test_pipelinedb" ##" % time.time()
DATABASE_NAME = ''
HOST = ''
PORT = ''
USER_NAME = ''
PASSWORD = ''
CHAR_SET = ''

# send mail to dbops

def george_init():
    global DATABASE_NAME
    DATABASE_NAME = 'pipelinedb'
    global HOST
    HOST = 'dbwtest03bc.test.com'
    global PORT
    PORT = '5432'
    global USER_NAME
    USER_NAME = 'mytest'
    global password
    pssword = ''
    global CHAR_SET
    CHAR_SET = 'utf8'

# conn log_parse
def get_dbops_conn():
    george_init()
    return psycopg2.connect(host = HOST, database = DATABASE_NAME, user = USER_NAME, password = PASSWORD, port = PORT)

def get_cursor(conn):
    return conn.cursor()

# close connect
def conn_close(conn):
    if conn != None:
        conn.close()

# close cursor
def cursor_close(cursor):
    if cursor != None:
        cursor.close()

# close all
def close(cursor, conn):
    cursor_close(cursor)
    conn_close(conn)

# usage
def Usage():
    if len(sys.argv) < 2:
        print('Usage: python %s 
              
                ' % sys.argv[0])
        print('       e.g: python %s test_data' % sys.argv[0])
        sys.exit(1)

# get the message of topic from kafka, return all_data
# client_id = zc_time
def get_data_kafka():
    logger = logging.getLogger('[Parse-pglog]')
    KAFKA_LIST = ["mq01s.test.com:9092","mq02s.test.com:9092","mq03s.test.com:9092"]
    topic_name = "pglog_dd" # sys.argv[1]
    client_id = "test_pipeline" ##%s" % time.time()
#    consumer = KafkaConsumer(topic_name, bootstrap_servers=KAFKA_LIST, client_id=client_id, group_id=client_id, auto_offset_reset="earliest")
    while True:
        consumer = KafkaConsumer(topic_name, bootstrap_servers=KAFKA_LIST, client_id=client_id, group_id=client_id, auto_offset_reset="earliest")
        for data in consumer:
            yield data.value
        logger.info('get kafka data for end!')
    logger.info('get kafka data while end!')

# parse msg.value from get_data_kafka, and find the message.
# return error meesage
# data_v are from kafka data.
def find_err_message(data_v):
    main_errors = ['WARNING:', 'ERROR:', 'FATAL:', 'PANIC:']
    exclude_errors = ['canceling statement due to statement timeout', 'recovery is in progress', 'nonstandard use of ', 'column "waiting" does not exist at character','cannot execute CREATE TABLE in a read-only transaction','cannot execute DROP TABLE in a read-only transaction','cannot execute SELECT INTO in a read-only transaction','duplicate key value violates unique constraint "t_notif_push_user_pkey"','database is read-only! (user social_rws)']
    j_data = json.loads(data_v)
    message = j_data.get("message")
    if any(err in message for err in main_errors):
        if not any(ex_err in message for ex_err in exclude_errors):
            return data_v

# get hostname from data
def get_host(err_data_v):
    j_data = json.loads(err_data_v)
    hostname = j_data['beat']['hostname']
    return hostname

# match message to tuple
# return match message
def match_msg(err_data_v):
    for data in err_data_v:
        j_data = json.loads(err_data_v)
        err_msg = j_data.get("message")
        c_msg = r'(\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2})\S*\s*\d* (\w{3}) (\[\d+\]): (\[\d+-\d+\]) user=(\S*\w*).\s*\S*db=(\S*\w*) ([a-zA-Z0-9\-\.]+|\[local\]|\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}|[0-9a-fA-F:]+)?[:\d]* (LOG|WARNING|ERROR|FATAL|PANIC)?[:\d]*  (.*)'
        regex = re.compile(c_msg)
        msg = regex.match(err_msg)
        if msg:
            return msg.groups()

# open file to append data return 
def open_file_for_a(file):
    return open(file, 'a')

# close_file 
# fo = append_file(file)
# close_file(fo)
def close_file(file):
    return file.close()

# insert into file
# fo = append_file(file)
# insert_err_to_file(fo, msg)
def insert_err_to_file(fo,hostname,err_msg):
    fo.write('\n %s : %s '  % (hostname ,err_msg))
    fo.write('\n*******************************')



# insert pg error log to db

def insert_pgerr_to_db(conn_insert,hostname,err_msg):
    #cur_date = time.strftime('%Y-%m-%d',time.localtime(time.time()))
    logger = logging.getLogger('[Parse-pglog]')
    hostname = hostname
    try:
        err_time = err_msg[0]
        db_user = err_msg[4]
        db_name = err_msg[5]
        client_addr = err_msg[6]
        err_log_level = err_msg[7]
        err_log = err_msg[8]
        err_date = err_time[:10]
        if '\'' in err_log:
            err_log = err_log.replace('\'','\"')
        sql_insert_err = '''insert into t_error_log(err_date, hostname, err_time, db_user, db_name, client_addr, log_level, err_log) 
        values (\'%s\', \'%s\', \'%s\', \'%s\', \'%s\', \'%s\', \'%s\', \'%s\')
        ''' % (err_date, hostname, err_time, db_user, db_name, client_addr, err_log_level, err_log)
        cursor_in_err = get_cursor(conn_insert)
        execute_insert = cursor_in_err.execute(sql_insert_err)
        logger.info('Execute %s ' % sql_insert_err)
        conn_insert.commit()
        logger.info('Commited!')
        cursor_close(cursor_in_err)
    except Exception,e:
        logger.error(e)


def main():
    # Usage()
    logfile = '/root/test/201907/log_parse_pglog.log'
    logger = logging.getLogger('[Parse-pglog]')
    logger.setLevel(logging.INFO)  
    handler = logging.FileHandler(logfile, mode='a')
    handler.setLevel(logging.INFO)
    formatter = logging.Formatter('%(asctime)s %(name)-12s %(levelname)-8s %(message)s')
    handler.setFormatter(formatter)
    logger.addHandler(handler)

    logger.info('Start to parse pglog')

    p = multiprocessing.Pool(8)
    logger.info('Open multiprocessing pool 8.')
    
    while True:
        datas_v = get_data_kafka()
        logger.info('Connect kafka.')
        conn_insert = get_dbops_conn()
        logger.info('Connect DB.')
        # fo = open_file_for_a('kafka_data.txt')
        for data_v in datas_v:
            err_j_data = find_err_message(data_v) # contains error message
            if err_j_data is not None:
                hostname = get_host(err_j_data)
                match_err_msg = match_msg(err_j_data) # message to list
                insert_pgerr_to_db(conn_insert,hostname,match_err_msg)
                try:
                    logger.info('err_time:%s , hostname:%s , err_msg: %s' % (match_err_msg[0][:10], hostname, match_err_msg[8]))
                except Exception,e:
                    logger.info(e)
                # insert_err_to_file(fo,hostname,match_err_msg)
        logger.info('for end!')
    logger.info('while end!')
    conn_close(conn_insert)
        #fo.close()

if __name__ == "__main__":
    main()


              
            
          

3. 運行之后取查視圖中的數據

?


更多文章、技術交流、商務合作、聯系博主

微信掃碼或搜索:z360901061

微信掃一掃加我為好友

QQ號聯系: 360901061

您的支持是博主寫作最大的動力,如果您喜歡我的文章,感覺我的文章對您有幫助,請用微信掃描下面二維碼支持博主2元、5元、10元、20元等您想捐的金額吧,狠狠點擊下面給點支持吧,站長非常感激您!手機微信長按不能支付解決辦法:請將微信支付二維碼保存到相冊,切換到微信,然后點擊微信右上角掃一掃功能,選擇支付二維碼完成支付。

【本文對您有幫助就好】

您的支持是博主寫作最大的動力,如果您喜歡我的文章,感覺我的文章對您有幫助,請用微信掃描上面二維碼支持博主2元、5元、10元、自定義金額等您想捐的金額吧,站長會非常 感謝您的哦!!!

發表我的評論
最新評論 總共0條評論
主站蜘蛛池模板: 日本毛片爽看免费视频 | 久久久久久免费播放一级毛片 | 成人欧美一区二区三区在线播放 | 日本动漫三级 | 亚洲国产精品热久久 | 色阁阁日韩欧美在线 | 日本欧美一级 | 亚洲一二三四2021不卡 | 免费看一级视频 | 日本中文字幕网站 | 拍真实国产伦偷精品 | 亚洲国产第一区二区香蕉 | 国产人妻互换一区二区水牛影视 | 日韩一级欧美一级毛片在线 | 国内自拍第五一页 | 国产精品成人国产乱一区 | 久久精品这里是免费国产 | 日韩欧美一区二区三区不卡在线 | 中文字幕欧美在线 | 我的朋友丈夫 | 国产福利在线观看永久免费 | 波多野结衣全部系列在线观看 | 日本www视频在线观看 | 在线视频 中文字幕 | 欧美13videosex性极品 | 日韩在线观看网站 | 欧美一级黄视频 | 欧美成人一级 | 青娱乐免费视频在线观看 | 九九热这里 | 日韩亚洲欧美视频 | 丰满年轻岳中文字幕一区二区 | 国产精品入口麻豆 | 君岛美绪一区二区三区 | 日韩午夜在线 | 色喜亚洲美女沟沟炮交国模 | 亚洲视频毛片 | 久久国产精品一区二区三区 | 国产99一区二区 | 热re66久久精品国产99re | 逼逼网|