generated from tnlmarsha/Best-README-Template
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathlan_connect.py
163 lines (127 loc) · 5.27 KB
/
lan_connect.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
"""
┌────────────────────────────────────────────────────────────────────────────┐
└ Connect to Chroma Over Ethernet ┘
┌────────────────────────────────────────────────────────────────────────────┐
│ The short script is a example that open a socket, sends a query, │
| print the return message and closes the socket. │
│============================================================================|
| Initialize instrument IP and Port at the top as globals, in 'if __name__' |
│ block at the bottom, or in any of the function calls using these params |
└────────────────────────────────────────────────────────────────────────────┘
"""
import socket
import sys
import time
# —————————————————————————–
# Chroma IP & port here
# —————————————————————————–
INSTRUMENT_IP:str = "192.168.0.17"
INSTRUMENT_PORT:int = 5024 # the port number of the instrument service
BUFFER_SIZE:int = 4096
def get_socket(instrument_ip:str, instrument_port:int, buffer_size:int) -> socket.socket:
"""get_socket get instrument socket using provided ip, port and buffer size
prints error and quits immediately if socket error
prints initial socket buffer content if success
Parameters
----------
instrument_ip : str
IP address of instrument
instrument_port : int
port of instrument
buffer_size : int
length of buffer (bytes) to read
Returns
-------
socket.socket
instrument socket indicated by passed arguments
"""
try:
# create an AF_INET, STREAM socket (TCP)
s:socket.socket = socket.socket(socket.AF_INET, socket.sock_STREAM)
except socket.error:
print("Failed to create socket.")
sys.exit()
try:
# Connect to remote server
s.connect(
(
instrument_ip,
instrument_port
)
)
info = s.recv(buffer_size)
print(info)
except socket.error:
print("failed to connect to ip " + INSTRUMENT_IP)
return s
def query_socket(inst:socket.socket, scpi_command:str) -> bytes:
"""query_socket queries socket provided using SCPI command provided
NOTE: the return type is `bytes`
Parameters
----------
inst : socket.socket
your instrument socket
scpi_command : str
command to send to instrument
Returns
-------
bytes
response from instrument
Print error and exit immediately if socket error
"""
try:
# Send scpi_command string
inst.sendall(scpi_command)
time.sleep(1)
except socket.error:
# Send failed
print("Send failed")
sys.exit()
response: bytes = inst.recv(4096)
return response
def socketClose(inst:socket.socket):
inst.close()
time.sleep(0.5)
def main(instrument_ip:str, instrument_port:int, buffer_size:int, scpi_command:bytes):
"""main initializes socket using ip, port and buffer size
main goes on to query instrument 10 times using SCPI `scpi_command`
Parameters
----------
instrument_ip : str
IP address of instrument
instrument_port : int
port of instrument
buffer_size : int
length of buffer (bytes) to read
"""
instrument_socket_instance:socket = get_socket(
instrument_ip=instrument_ip,
instrument_port=instrument_port,
buffer_size=buffer_size
)
# —————————————————————————–————————————————
# NOTE: scpi_command is not string; it's bytes
# NOTE: same is true of instrument_response
# —————————————————————————————————————————–
for count, _ in enumerate(range(10)):
instrument_response:bytes = query_socket(
inst=instrument_socket_instance,
scpi_command=scpi_command
)
print(f'{count+1} :: {str(instrument_response)}')
socketClose(instrument_socket_instance)
input('Press "Enter" to exit')
if __name__ == "__main__":
# —————————————————————————–——————————
# You can set these here or at the top
# ———————————————————————————————————–
instrument_ip = INSTRUMENT_IP
instrument_port = INSTRUMENT_PORT
buffer_size = BUFFER_SIZE
scpi_command:bytes = b"*IDN?"
main(
instrument_ip=instrument_ip,
instrument_port=instrument_port,
buffer_size=buffer_size,
scpi_command=scpi_command
)