網(wǎng)站創(chuàng)建域名臨沂網(wǎng)站建設(shè)方案服務(wù)
python實(shí)現(xiàn)TCP Socket控制測試儀器
- 說明
- 前提條件
- 代碼
說明
本例是用python實(shí)現(xiàn)TCP Socket,最終目的是為了控制LCR電容測試儀器。
前提條件
- 測試儀器 配備了以太網(wǎng)接口,支持TCP/IP協(xié)議棧
- 用戶可以通過IP地址和端口號連接到儀器,建立TCP通信
- Python及socket包
代碼
# 導(dǎo)入依賴包
import socket# 獲取socket套接字對象:供后續(xù)通信
def get_socket(ip_str, port):# 設(shè)置默認(rèn)超時(shí)時(shí)間:避免網(wǎng)絡(luò)通信卡死。若5秒內(nèi)未能完成操作(如連接、讀取數(shù)據(jù)),會(huì)拋出異常socket.setdefaulttimeout(5)# 創(chuàng)建一個(gè)基于 IPv4 的 TCP 套接字,支持點(diǎn)對點(diǎn)可靠連接tcp_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)# ip地址 和 端口 進(jìn)行連接server_addr = (ip_str, port) # 具體地址和端口得看儀器tcp_socket.connect(server_addr)# tcp_socket.recv_into()return tcp_socket# 發(fā)送儀器指令:socket套接字對象, 儀器命令字符串
def socket_send_cmd(tcp_socket, cmd):# 構(gòu)建待發(fā)送命令字符串,命令以"\r\n"結(jié)尾:LCR儀器通常以標(biāo)準(zhǔn)通信協(xié)議(如 SCPI)解析命令,要求命令以 \r\n(回車+換行)結(jié)束。send_str = cmd + "\r\n"# 使用sendall將數(shù)據(jù)完整發(fā)送至遠(yuǎn)端設(shè)備,即便數(shù)據(jù)量較大,也能確保發(fā)送完成tcp_socket.sendall(send_str.encode('utf-8'))# 發(fā)送空字節(jié)以確保發(fā)送結(jié)束tcp_socket.send(b"")# 獲取儀器返回結(jié)果:socket套接字對象
def socket_read_result(tcp_socket):while True: # 無限循環(huán),直到讀取到完整結(jié)果try:global read_buf_g # 聲明全局變量,用于存儲(chǔ)讀取的緩沖數(shù)據(jù)result = tcp_socket.recv(256).decode('utf-8') # 從套接字中讀取最多256字節(jié)數(shù)據(jù)if "\n" in result: # 判斷是否接收到完整行數(shù)據(jù)符號\n(行以“\n”換行符結(jié)束)# 緩沖“\n”后的信息留給下一次拼接完整信息tmp = result[result.find("\n")+1:] # +1:是為了去掉\n# 拼接完整返回信息:# 將前面緩沖的數(shù)據(jù) 跟 這次讀取數(shù)據(jù)‘\n’ 前面的數(shù)據(jù) 拼接result = read_buf_g + result[:result.find("\n")]# 把上面緩沖的信息放在全局變量中,方便下次獲取read_buf_g = tmp# 退出循環(huán),返回結(jié)果break# 如果本次接收信息還沒有完整信息標(biāo)志\n的話,則把信息加入緩存當(dāng)中read_buf_g += resultexcept Exception:print("34 read timeout") # 若發(fā)生異常(如超時(shí)),打印提示信息并繼續(xù)嘗試讀取continuereturn result# 本例LCR電容測試儀器的ip設(shè)為 169.254.1.22,端口為5025
ser_dev = get_socket("169.254.1.22",5025)
# 發(fā)生測量指令(SCPI指令)
socket_send_cmd(ser_dev, ":FETC?")
# 獲取結(jié)果
rd_data = socket_read_result(ser_dev)
# 打印結(jié)果
print(rd_data)