備案網(wǎng)站內(nèi)容說明seo關(guān)鍵詞排名優(yōu)化系統(tǒng)源碼
批量數(shù)據(jù)庫服務(wù)器連接測試與數(shù)據(jù)匯總:Python實現(xiàn)方案
作為數(shù)據(jù)庫服務(wù)器運維人員,我們經(jīng)常需要面對大量服務(wù)器的連接測試和數(shù)據(jù)匯總工作。本文將介紹一個使用Python實現(xiàn)的高效解決方案,可以幫助我們快速完成這些任務(wù)。
需求概述
- 從配置文件中讀取要測試的數(shù)據(jù)庫服務(wù)器IP地址列表。
- 批量測試數(shù)據(jù)庫服務(wù)器的連接情況。
- 在所有可連接的服務(wù)器上執(zhí)行相同的SQL查詢。
- 將查詢結(jié)果匯總到一個單獨的數(shù)據(jù)庫中,并包含對應(yīng)服務(wù)器的IP地址。
- 自動創(chuàng)建結(jié)果表,表名按日期隨機生成。
- 提供詳細(xì)的日志輸出,包括實時的處理進度。
實現(xiàn)方案
我們使用Python來實現(xiàn)這個方案,主要利用了以下庫和技術(shù):
pyodbc
: 用于數(shù)據(jù)庫連接和操作configparser
: 讀取配置文件concurrent.futures
: 實現(xiàn)并發(fā)處理logging
: 日志記錄- 多線程技術(shù):提高處理效率
代碼實現(xiàn)
以下是完整的Python代碼實現(xiàn):
import pyodbc
import logging
import configparser
from concurrent.futures import ThreadPoolExecutor, as_completed
import time
from datetime import datetime
import random
import string# 配置日志
def setup_logger():"""設(shè)置日志記錄器,同時輸出到文件和控制臺"""logger = logging.getLogger()logger.setLevel(logging.INFO)# 文件處理器file_handler = logging.FileHandler('db_query_aggregation.log')file_handler.setLevel(logging.INFO)file_formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')file_handler.setFormatter(file_formatter)# 控制臺處理器console_handler = logging.StreamHandler()console_handler.setLevel(logging.INFO)console_formatter = logging.Formatter('%(message)s')console_handler.setFormatter(console_formatter)logger.addHandler(file_handler)logger.addHandler(console_handler)return loggerlogger = setup_logger()def read_config(config_file):"""讀取配置文件:param config_file: 配置文件路徑:return: 包含配置信息的字典"""try:config = configparser.ConfigParser()config.read(config_file)return {'ip_list_file': config['Files']['ip_list_file'],'source_db_username': config['SourceDB']['username'],'source_db_password': config['SourceDB']['password'],'target_db_info': dict(config['TargetDB']),'max_workers': int(config['Settings']['max_workers']),'query': config['Query']['sql']}except Exception as e:logger.error(f"讀取配置文件時出錯: {e}")raisedef read_ip_list(file_path):"""從文件中讀取IP地址列表:param file_path: IP地址文件路徑:return: IP地址列表"""try:with open(file_path, 'r') as file:return [line.strip() for line in file if line.strip()]except IOError as e:logger.error(f"無法讀取IP地址文件: {e}")return []def create_connection(server, database, username, password):"""創(chuàng)建數(shù)據(jù)庫連接:param server: 服務(wù)器地址:param database: 數(shù)據(jù)庫名稱:param username: 用戶名:param password: 密碼:return: 數(shù)據(jù)庫連接對象,如果連接失敗則返回None"""try:conn_str = f'DRIVER={{ODBC Driver 17 for SQL Server}};SERVER={server};DATABASE={database};UID={username};PWD={password}'return pyodbc.connect(conn_str, timeout=5)except pyodbc.Error as e:logger.error(f"連接到服務(wù)器 {server} 失敗: {e}")return Nonedef execute_query(connection, query):"""執(zhí)行SQL查詢:param connection: 數(shù)據(jù)庫連接對象:param query: SQL查詢語句:return: 查詢結(jié)果列表"""try:cursor = connection.cursor()cursor.execute(query)return cursor.fetchall()except pyodbc.Error as e:logger.error(f"執(zhí)行查詢時出錯: {e}")return []def process_server(ip, username, password, query):"""處理單個服務(wù)器的查詢:param ip: 服務(wù)器IP地址:param username: 數(shù)據(jù)庫用戶名:param password: 數(shù)據(jù)庫密碼:param query: SQL查詢語句:return: 元組 (IP地址, 查詢結(jié)果)"""start_time = time.time()logger.info(f"開始處理服務(wù)器 {ip}")try:conn = create_connection(ip, 'master', username, password)if conn:results = execute_query(conn, query)conn.close()end_time = time.time()processing_time = end_time - start_timelogger.info(f"服務(wù)器 {ip} 處理完成. 獲取 {len(results)} 行數(shù)據(jù). 耗時 {processing_time:.2f} 秒")return ip, resultsexcept Exception as e:logger.error(f"處理服務(wù)器 {ip} 時發(fā)生錯誤: {e}")logger.info(f"服務(wù)器 {ip} 處理失敗")return ip, []def create_target_table(connection, table_name, columns):"""在目標(biāo)數(shù)據(jù)庫中創(chuàng)建表:param connection: 目標(biāo)數(shù)據(jù)庫連接對象:param table_name: 要創(chuàng)建的表名:param columns: 列定義列表"""try:cursor = connection.cursor()create_table_query = f"CREATE TABLE {table_name} (ServerIP VARCHAR(15), {', '.join(columns)})"cursor.execute(create_table_query)connection.commit()logger.info(f"成功創(chuàng)建表 {table_name}")except pyodbc.Error as e:logger.error(f"創(chuàng)建目標(biāo)表時出錯: {e}")raisedef insert_data(connection, table_name, data):"""將數(shù)據(jù)插入目標(biāo)數(shù)據(jù)庫:param connection: 目標(biāo)數(shù)據(jù)庫連接對象:param table_name: 目標(biāo)表名:param data: 要插入的數(shù)據(jù)列表:return: 插入的行數(shù)"""try:cursor = connection.cursor()placeholders = ', '.join(['?' for _ in range(len(data[0]))])insert_query = f"INSERT INTO {table_name} VALUES ({placeholders})"cursor.fast_executemany = Truecursor.executemany(insert_query, data)connection.commit()return cursor.rowcountexcept pyodbc.Error as e:logger.error(f"插入數(shù)據(jù)時出錯: {e}")connection.rollback()return 0def generate_table_name():"""生成隨機表名:return: 生成的表名"""date_str = datetime.now().strftime("%Y%m%d")random_str = ''.join(random.choices(string.ascii_lowercase, k=5))return f"QueryResults_{date_str}_{random_str}"def main():"""主函數(shù),協(xié)調(diào)整個數(shù)據(jù)查詢和匯總過程"""try:# 讀取配置config = read_config('config.ini')ip_list = read_ip_list(config['ip_list_file'])if not ip_list:logger.error("IP地址列表為空,程序終止")returnlogger.info(f"開始處理 {len(ip_list)} 個服務(wù)器")# 并發(fā)查詢所有服務(wù)器results = []with ThreadPoolExecutor(max_workers=config['max_workers']) as executor:future_to_ip = {executor.submit(process_server, ip, config['source_db_username'], config['source_db_password'], config['query']): ip for ip in ip_list}for future in as_completed(future_to_ip):ip, result = future.result()if result:results.extend([(ip,) + tuple(row) for row in result])if not results:logger.info("沒有查詢到數(shù)據(jù),程序終止")return# 連接目標(biāo)數(shù)據(jù)庫target_conn = create_connection(**config['target_db_info'])if not target_conn:logger.error("無法連接到目標(biāo)數(shù)據(jù)庫,程序終止")return# 創(chuàng)建目標(biāo)表并插入數(shù)據(jù)table_name = generate_table_name()columns = [f"Column{i} VARCHAR(100)" for i in range(len(results[0]) - 1)]create_target_table(target_conn, table_name, columns)rows_inserted = insert_data(target_conn, table_name, results)target_conn.close()logger.info(f"數(shù)據(jù)匯總完成。插入 {rows_inserted} 行到表 {table_name}")print(f"查詢結(jié)果已插入表: {table_name}")except Exception as e:logger.critical(f"程序執(zhí)行過程中發(fā)生嚴(yán)重錯誤: {e}")print(f"程序執(zhí)行過程中發(fā)生錯誤,請查看日志文件獲取詳細(xì)信息。")if __name__ == "__main__":main()
代碼說明
-
配置文件讀取:使用
configparser
模塊讀取配置文件,包括數(shù)據(jù)庫連接信息、查詢語句等。 -
多線程處理:使用
ThreadPoolExecutor
并發(fā)執(zhí)行查詢,提高效率。 -
異常處理:每個關(guān)鍵操作都包含了異常處理,確保程序的穩(wěn)定性。
-
模塊化設(shè)計:將不同功能分解為獨立的函數(shù),提高代碼的可讀性和可維護性。
-
日志記錄:使用
logging
模塊記錄詳細(xì)的操作日志,同時輸出到文件和控制臺。 -
動態(tài)表創(chuàng)建:在目標(biāo)數(shù)據(jù)庫中動態(tài)創(chuàng)建表,表名包含日期和隨機字符串。
-
數(shù)據(jù)匯總:將所有服務(wù)器的查詢結(jié)果匯總到一個列表中,包括服務(wù)器IP地址。
-
批量數(shù)據(jù)插入:使用
executemany
批量插入數(shù)據(jù)到目標(biāo)表。
使用說明
- 創(chuàng)建
config.ini
配置文件,包含以下內(nèi)容:
[Files]
ip_list_file = server_ip_list.txt[SourceDB]
username = your_source_username
password = your_source_password[TargetDB]
server = your_target_server
database = your_target_database
username = your_target_username
password = your_target_password[Settings]
max_workers = 50[Query]
sql = SELECT column1, column2 FROM your_table
-
準(zhǔn)備一個包含要查詢的服務(wù)器IP地址的文本文件(如
server_ip_list.txt
)。 -
運行腳本,它將并發(fā)查詢所有服務(wù)器,匯總結(jié)果(包括服務(wù)器IP),并插入到目標(biāo)數(shù)據(jù)庫的新表中。
-
腳本執(zhí)行完成后,會輸出生成的表名。
結(jié)論
這個Python腳本提供了一個高效、靈活的解決方案,可以批量測試數(shù)據(jù)庫服務(wù)器連接、執(zhí)行查詢并匯總結(jié)果。它具有以下優(yōu)點:
- 并發(fā)處理,大幅提高效率
- 詳細(xì)的日志記錄,便于監(jiān)控和調(diào)試
- 靈活的配置,易于適應(yīng)不同環(huán)境
- 異常處理完善,提高程序穩(wěn)定性
- 結(jié)果包含服務(wù)器IP,便于追蹤數(shù)據(jù)來源
對于需要管理大量數(shù)據(jù)庫服務(wù)器的運維人員來說,這個腳本可以顯著提高工作效率。您可以根據(jù)實際需求進一步調(diào)整和優(yōu)化這個腳本,例如添加更多的錯誤處理、優(yōu)化查詢性能,或者擴展功能以支持更復(fù)雜的操作。
通過使用這個腳本,您可以輕松地對多個數(shù)據(jù)庫服務(wù)器進行批量操作,并將結(jié)果匯總到一個中心位置,大大簡化了數(shù)據(jù)庫管理和監(jiān)控的工作流程。