def forward_serial_to_tcp(ser, conn):
从串口读取数据并发送到 TCP 客户端
try:
while True:
if ser.in_waiting 0:
data = ser.read(ser.in_waiting)
conn.sendall(data)
else:
time.sleep(0.01)
except Exception as e:
print(f[Serial-TCP] Error: {e})
finally:
print(Serial-TCP thread exiting.)
def forward_tcp_to_serial(ser, conn):
从 TCP 客户端接收数据并写入串口
try:
while True:
data = conn.recv(1024)
if not data:
break
ser.write(data)
except Exception as e:
print(f[TCP-Serial] Error: {e})
finally:
print(TCP-Serial thread exiting.)
def main():
打开串口
try:
ser = serial.Serial(
port=SERIAL_PORT,
baudrate=BAUDRATE,
timeout=0, 非阻塞模式
parity=serial.PARITY_NONE,
stopbits=serial.STOPBITS_ONE,
bytesize=serial.EIGHTBITS
)
print(fSerial port {SERIAL_PORT} opened at {BAUDRATE} baud.)
except serial.SerialException as e:
print(fFailed to open serial port: {e})
sys.exit(1)
创建 TCP 服务器
server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
server_socket.bind((TCP_HOST, TCP_PORT))
server_socket.listen(1)
print(fTCP server listening on {TCP_HOST}:{TCP_PORT})
try:
while True:
print(Waiting for TCP client...)
conn, addr = server_socket.accept()
print(fClient connected from {addr})