Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added Ethernet-exclusive connection functionality to Scpi_Psu Agent #509

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 36 additions & 9 deletions socs/agents/scpi_psu/agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,9 @@
from ocs import ocs_agent, site_config
from ocs.ocs_twisted import TimeoutLock

from socs.agents.scpi_psu.drivers import PsuInterface
from socs.agents.scpi_psu.ethernet_drivers import \
PsuInterface as PsuEthernetInterface
from socs.agents.scpi_psu.gpib_drivers import PsuInterface as PsuGpibInterface


class ScpiPsuAgent:
Expand Down Expand Up @@ -41,12 +43,23 @@ def init(self, session, params=None):
if not acquired:
return False, "Could not acquire lock"

try:
self.psu = PsuInterface(self.ip_address, self.gpib_slot)
self.idn = self.psu.identify()
except socket.timeout as e:
self.log.error(f"PSU timed out during connect: {e}")
return False, "Timeout"
if self.gpib_slot is not None:
print("Connecting to PSU via GPIB/Ethernet Connection")
self.gpib_slot = int(self.gpib_slot)
try:
self.psu = PsuGpibInterface(self.ip_address, self.gpib_slot)
except socket.timeout as e:
self.log.error("PSU timed out during connect")
return False, "Timeout"
else:
try:
print("Connecting to PSU via Ethernet Connection")
self.psu = PsuEthernetInterface(self.ip_address)
except socket.timeout as e:
self.log.error("PSU timed out during connect")
return False, "Timeout"

self.idn = self.psu.identify()
self.log.info("Connected to psu: {}".format(self.idn))

return True, 'Initialized PSU.'
Expand Down Expand Up @@ -190,8 +203,22 @@ def main(args=None):
args=args)

agent, runner = ocs_agent.init_site_agent(args)

p = ScpiPsuAgent(agent, args.ip_address, int(args.gpib_slot))
print("Connecting to IP: " + args.ip_address)
if args.gpib_slot is None:
print("Connecting to PSU via Ethernet-only Interface")
else:
print("Connecting to GPIB: " + args.gpib_slot)
print("Connecting to PSU via GPIB/Ethernet Interface")

# Test IP Formatting
try:
socket.inet_aton(args.ip_address)
# legal
except socket.error:
# Not legal
print("Invalid IP Format")

p = ScpiPsuAgent(agent, args.ip_address, args.gpib_slot)

agent.register_task('init', p.init)
agent.register_task('set_voltage', p.set_voltage)
Expand Down
116 changes: 116 additions & 0 deletions socs/agents/scpi_psu/ethernet_drivers.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
import socket

print("Imported PSU-Ethernet Interface - If Your PSU uses GPIB verify args in default.yaml")


class PsuInterface:
def __init__(self, ip_address, port=5025, verbose=False, **kwargs):
self.verbose = verbose
self.ip_address = ip_address
self.port = port
self.sock = None
self.conn_socket()

def conn_socket(self):
self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.sock.connect((self.ip_address, self.port))
self.sock.settimeout(5)

def write(self, msg):
message = (msg + '\n').encode('utf-8')

try:
self.sock.send(message)
except socket.error as e:
print(e)
self.initialize()
self.sock.send(message)

def read(self):

try:
output = self.sock.recv(128).decode("utf-8").rstrip('\n').rstrip('\r')
except socket.error as e:
print(e)
s.initialize()
output = self.sock.recv(128).decode("utf-8").rstrip('\n').rstrip('\r')

return output

def enable(self, ch):
'''
Enables output for channel (1,2,3) but does not turn it on.
Depending on state of power supply, it might need to be called
before the output is set.
'''
self.set_chan(ch)
self.write('OUTP:ENAB ON')

def disable(self, ch):
'''
disabled output from a channel (1,2,3). once called, enable must be
called to turn on the channel again
'''
self.write('OUTP:ENAB OFF')

def set_chan(self, ch):
self.write('inst:nsel ' + str(ch))

def set_output(self, ch, out):
'''
set status of power supply channel
ch - channel (1,2,3) to set status
out - ON: True|1|'ON' OFF: False|0|'OFF'

Calls enable to ensure a channel can be turned on. We might want to
make them separate (and let us use disable as a safety feature) but
for now I am thinking we just want to thing to turn on when we tell
it to turn on.
'''
self.set_chan(ch)
self.enable(ch)
if isinstance(out, str):
self.write('CHAN:OUTP ' + out)
elif out:
self.write('CHAN:OUTP ON')
else:
self.write('CHAN:OUTP OFF')

def get_output(self, ch):
'''
check if the output of a channel (1,2,3) is on (True) or off (False)
'''
self.set_chan(ch)
self.write('CHAN:OUTP:STAT?')
out = bool(float(self.read()))
return out

def set_volt(self, ch, volt):
self.set_chan(ch)
self.write('volt ' + str(volt))
if self.verbose:
voltage = self.get_volt(ch)
print("CH " + str(ch) + " is set to " + str(voltage) + " V")

def set_curr(self, ch, curr):
self.set_chan(ch)
self.write('curr ' + str(curr))
if self.verbose:
current = self.get_curr(ch)
print("CH " + str(ch) + " is set to " + str(current) + " A")

def get_volt(self, ch):
self.set_chan(ch)
self.write('MEAS:VOLT? CH' + str(ch))
voltage = float(self.read())
return voltage

def get_curr(self, ch):
self.set_chan(ch)
self.write('MEAS:CURR? CH' + str(ch))
current = float(self.read())
return current

def identify(self):
self.write('*idn?')
return self.read()
File renamed without changes.