diff --git a/examples/tinyPDC.py b/examples/tinyPDC.py index d47c172..db907e3 100644 --- a/examples/tinyPDC.py +++ b/examples/tinyPDC.py @@ -1,6 +1,7 @@ from synchrophasor.pdc import Pdc from synchrophasor.frame import DataFrame - +import pickle +import socket """ tinyPDC will connect to pmu_ip:pmu_port and send request for header message, configuration and eventually @@ -10,23 +11,32 @@ if __name__ == "__main__": - pdc = Pdc(pdc_id=7, pmu_ip="127.0.0.1", pmu_port=1410) + pdc = Pdc(pdc_id=7, pmu_ip=socket.gethostbyname("rasp"), pmu_port=10000,method='udp') pdc.logger.setLevel("DEBUG") pdc.run() # Connect to PMU - header = pdc.get_header() # Get header message from PMU + # header = pdc.get_header() # Get header message from PMU config = pdc.get_config() # Get configuration from PMU pdc.start() # Request to start sending measurements - + timestamps=[] + i=0 while True: data = pdc.get() # Keep receiving data if type(data) == DataFrame: - print(data.get_measurements()) + data=data.get_measurements() + i+=1 + timestamps.append(data['time']) if not data: - pdc.quit() # Close connection + continue + if i==240: break + + pdc.stop() + pdc.quit() + with open("timestamps","wb+")as handle: + pickle.dump(timestamps,handle) diff --git a/synchrophasor/frame.py b/synchrophasor/frame.py index e92468e..119497c 100644 --- a/synchrophasor/frame.py +++ b/synchrophasor/frame.py @@ -29,6 +29,14 @@ __license__ = "BSD-3" __version__ = "1.0.0-alpha" +############################################################### + +# UDP connection was implemented by Yuri Poledna under the supervision of + +# Prof. Eduardo Parente in R&D project supported by Brazilian electric utility + +# Companhia Paranaense de Energia – COPEL. +############################################################### class CommonFrame(metaclass=ABCMeta): """ @@ -75,15 +83,17 @@ def __init__(self, frame_type, pmu_id_code, soc=None, frasec=None, version=1): :param int version: :return: """ - + self.set_frame_type(frame_type) self.set_version(version) self.set_id_code(pmu_id_code) - + if soc or frasec: self.set_time(soc, frasec) + def get_receivedData(): + return self.receivedData def set_frame_type(self, frame_type): """ ### set_frame_type() ### @@ -133,7 +143,7 @@ def get_frame_type(self): def extract_frame_type(byte_data): """This method will only return type of the frame. It shall be used for stream splitter - since there is no need to create instance of specific frame which will cause lower performance.""" + since there Phasor 5 Angle(rad): 0.6818265914916992is no need to create instance of specific frame which will cause lower performance.""" # Check if frame is valid if not CommandFrame._check_crc(byte_data): @@ -226,12 +236,11 @@ def set_time(self, soc=None, frasec=None): t = time() # Get current timestamp - if soc: + if soc is not None: self.set_soc(soc) else: self.set_soc(int(t)) # Get current timestamp - - if frasec: + if frasec is not None: if isinstance(frasec, collections.Sequence): self.set_frasec(*frasec) else: @@ -241,7 +250,6 @@ def set_time(self, soc=None, frasec=None): # overflow (24 bit number). self.set_frasec(int((((repr((t % 1))).split("."))[1])[0:6])) - def set_soc(self, soc): """ ### set_soc() ### @@ -370,8 +378,6 @@ def set_frasec(self, fr_seconds, leap_dir="+", leap_occ=False, leap_pen=False, t frasec |= fr_seconds # Bits 23-0: Fraction of second. self._frasec = frasec - - def get_frasec(self): return self._int2frasec(self._frasec) @@ -392,7 +398,7 @@ def _int2frasec(frasec_int): leap_occ = bool(leap_occ) leap_pen = bool(leap_pen) - fr_seconds = frasec_int & (2**23-1) + fr_seconds = frasec_int & (2**24-1) return fr_seconds, leap_dir, leap_occ, leap_pen, time_quality @@ -592,7 +598,6 @@ def _int2format(data_format): def _check_crc(byte_data): crc_calculated = crc16xmodem(byte_data[0:-2], 0xffff).to_bytes(2, "big") # Calculate CRC - if byte_data[-2:] != crc_calculated: return False @@ -632,7 +637,6 @@ def convert2bytes(self, byte_message): @abstractmethod def convert2frame(byte_data, cfg=None): - convert_method = { 0: DataFrame.convert2frame, 1: HeaderFrame.convert2frame, @@ -1114,7 +1118,7 @@ def _phunit2int(scale, phasor_type="v"): * ``scale`` **(int)** - scale factor. * ``phasor_type`` **(char)** - ``v`` - voltage, ``i`` - current. Default value: ``v``. - +phasor **Returns:** * ``int`` which represents phasor channels conversion factor. @@ -1847,14 +1851,15 @@ class DataFrame(CommonFrame): TRIGGER_REASON_WORDS = { code: word for word, code in TRIGGER_REASON.items() } - def __init__(self, pmu_id_code, stat, phasors, freq, dfreq, analog, digital, cfg, soc=None, frasec=None): + def __init__(self, pmu_id_code, stat, phasors, freq, dfreq, analog, digital, cfg, soc=None, frasec=None,receivedData1=b'0'): if not isinstance(cfg, ConfigFrame2): raise FrameError("CFG should describe current data stream (ConfigurationFrame2)") # Common frame for Configuration frame 2 with PMU simulator ID CODE which sends configuration frame. super().__init__("data", pmu_id_code, soc, frasec) - + + self.receivedData=receivedData1 self.cfg = cfg self.set_stat(stat) self.set_phasors(phasors) @@ -1864,6 +1869,8 @@ def __init__(self, pmu_id_code, stat, phasors, freq, dfreq, analog, digital, cfg self.set_digital(digital) + def getReceivedData(self): + return self.receivedData def set_stat(self, stat): if self.cfg._num_pmu > 1: @@ -2037,7 +2044,6 @@ def get_phasors(self, convert2polar=True): @staticmethod def _phasor2int(phasor, data_format): - if not isinstance(phasor, tuple): raise TypeError("Provide phasor measurement as tuple. Rectangular - (Re, Im); Polar - (Mg, An).") @@ -2045,16 +2051,11 @@ def _phasor2int(phasor, data_format): data_format = DataFrame._int2format(data_format) if data_format[0]: # Polar representation - if data_format[1]: # Floating Point - - if not -3.142 <= phasor[1] <= 3.142: - raise ValueError("Angle must be in range -3.14 <= ANGLE <= 3.14") - + # raise ValueError("Angle must be in range -3.14 <= ANGLE <= 3.14") mg = pack("!f", float(phasor[0])) an = pack("!f", float(phasor[1])) measurement = mg + an - else: # Polar 16-bit representations if not 0 <= phasor[0] <= 65535: @@ -2085,7 +2086,6 @@ def _phasor2int(phasor, data_format): re = pack("!h", phasor[0]) im = pack("!h", phasor[1]) measurement = re + im - return int.from_bytes(measurement, "big", signed=False) @@ -2141,15 +2141,12 @@ def _freq2int(freq, data_format): data_format = DataFrame._int2format(data_format) if data_format[3]: # FREQ/DFREQ floating point - if not -32.767 <= freq <= 32.767: - raise ValueError("FREQ must be in range -32.767 <= FREQ <= 32.767.") - + #raise ValueError("FREQ must be in range -32.767 <= FREQ <= 32.767.") freq = unpack("!I", pack("!f", float(freq)))[0] else: if not -32767 <= freq <= 32767: raise ValueError("FREQ must be 16-bit signed integer. -32767 <= FREQ <= 32767.") freq = unpack("!H", pack("!h", freq))[0] - return freq @@ -2368,12 +2365,12 @@ def get_measurements(self): "phasors": self.get_phasors(), "analog": self.get_analog(), "digital": self.get_digital(), - "frequency": self.cfg.get_fnom() + self.get_freq() / 1000, + #"frequency": self.cfg.get_fnom() + self.get_freq() / 1000, + "frequency": self.get_freq(), "rocof": self.get_dfreq() }) - data_frame = { "pmu_id": self._pmu_id_code, - "time": self.get_soc() + self.get_frasec()[0] / self.cfg.get_time_base(), + "time": (self.get_soc() + (self.get_frasec()[0] / self.cfg.get_time_base())), "measurements": measurements } return data_frame @@ -2414,7 +2411,8 @@ def convert2bytes(self): @staticmethod def convert2frame(byte_data, cfg): - + + try: if not CommonFrame._check_crc(byte_data): @@ -2429,7 +2427,6 @@ def convert2frame(byte_data, cfg): pmu_code = int.from_bytes(byte_data[4:6], byteorder="big", signed=False) soc = int.from_bytes(byte_data[6:10], byteorder="big", signed=False) frasec = CommonFrame._int2frasec(int.from_bytes(byte_data[10:14], byteorder="big", signed=False)) - start_byte = 14 if num_pmu > 1: @@ -2515,7 +2512,7 @@ def convert2frame(byte_data, cfg): digital.append(dig) start_byte += 2 - return DataFrame(pmu_code, stat, phasors, freq, dfreq, analog, digital, cfg, soc, frasec) + return DataFrame(pmu_code, stat, phasors, freq, dfreq, analog, digital, cfg, soc, frasec, byte_data) except Exception as error: raise FrameError("Error while creating Data frame: " + str(error)) diff --git a/synchrophasor/pdc.py b/synchrophasor/pdc.py index 8dc640a..ef45777 100644 --- a/synchrophasor/pdc.py +++ b/synchrophasor/pdc.py @@ -1,5 +1,7 @@ import logging import socket +import struct +import sys from sys import stdout from synchrophasor.frame import * @@ -10,6 +12,15 @@ __license__ = "BSD-3" __version__ = "1.0.0-alpha" +############################################################### + +# UDP connection was implemented by Yuri Poledna under the supervision of + +# Prof. Eduardo Parente in R&D project supported by Brazilian electric utility + +# Companhia Paranaense de Energia – COPEL. + +############################################################### class Pdc(object): @@ -21,8 +32,7 @@ class Pdc(object): handler.setFormatter(formatter) logger.addHandler(handler) - - def __init__(self, pdc_id=1, pmu_ip="127.0.0.1", pmu_port=4712, buffer_size=2048, method="tcp"): + def __init__(self, pdc_id=1, pmu_ip="127.0.0.1", pmu_port=4712, buffer_size=2048, method="tcp", config2=None): self.pdc_id = pdc_id self.buffer_size = buffer_size @@ -33,45 +43,77 @@ def __init__(self, pdc_id=1, pmu_ip="127.0.0.1", pmu_port=4712, buffer_size=2048 self.pmu_address = (pmu_ip, pmu_port) self.pmu_socket = None self.pmu_cfg1 = None - self.pmu_cfg2 = None + + #self.pmu_cfg2 = None + self.pmu_cfg2 = config2 + self.pmu_header = None + def get_cfg2(self, config2): + return self.pmu_cfg2 + + def set_cfg2(self, config2): + self.pmu_cfg2 = config2 def run(self): if self.pmu_socket: - self.logger.info("[%d] - PDC already connected to PMU (%s:%d)", self.pdc_id, self.pmu_ip, self.pmu_port) + self.logger.info("[%d] - PDC already connected to PMU (%s:%d)", + self.pdc_id, self.pmu_ip, self.pmu_port) else: try: # Connect to PMU - self.pmu_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - self.pmu_socket.connect(self.pmu_address) - self.logger.info("[%d] - PDC successfully connected to PMU (%s:%d)", - self.pdc_id, self.pmu_ip, self.pmu_port) + if(self.method == "tcp"): + self.pmu_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + self.pmu_socket.connect(self.pmu_address) + self.logger.info("[%d] - PDC successfully connected to PMU (%s:%d)as %s", + self.pdc_id, self.pmu_ip, self.pmu_port, self.method) + if(self.method == "udp"): + self.pmu_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) + # self.pmu_socket.connect(self.pmu_address) + self.pmu_socket.bind(('', self.pmu_port)) + self.logger.info("[%d] - PDC successfully connected to PMU (%s:%d) as %s", + self.pdc_id, self.pmu_ip, self.pmu_port, self.method) + if(self.method == "multicast"): + self.pmu_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) + self.pmu_socket.bind(('', self.pmu_port)) + self.pmu_socket.setsockopt(socket.IPPROTO_IP, socket.IP_ADD_MEMBERSHIP, struct.pack( + '4sL', socket.inet_aton(self.pmu_ip), socket.INADDR_ANY)) + self.logger.info("[%d] - PDC successfully connected to PMU (%s:%d) as %s", + self.pdc_id, self.pmu_ip, self.pmu_port, self.method) except Exception as e: - self.logger.error("[%d] - Error while connecting to (%s:%d)", self.pdc_id, self.pmu_ip, self.pmu_port) + self.logger.error("[%d] - Error while connecting to (%s:%d)", + self.pdc_id, self.pmu_ip, self.pmu_port) self.logger.error(str(e)) - def start(self): """ Request from PMU to start sending data :return: NoneType """ start = CommandFrame(self.pdc_id, "start") - self.pmu_socket.sendall(start.convert2bytes()) - self.logger.info("[%d] - Requesting to start sending from PMU (%s:%d)", self.pdc_id, self.pmu_ip, self.pmu_port) - + if(self.method == "tcp"): + self.pmu_socket.sendall(start.convert2bytes()) + else: + self.pmu_socket.sendto(start.convert2bytes(), (self.pmu_ip, self.pmu_port)) + self.logger.info("[%d] - Requesting to start sending from PMU (%s:%d)", + self.pdc_id, self.pmu_ip, self.pmu_port) def stop(self): """ Request from PMU to start sending data :return: NoneType """ - start = CommandFrame(self.pdc_id, "stop") - self.pmu_socket.sendall(start.convert2bytes()) - self.logger.info("[%d] - Requesting to stop sending from PMU (%s:%d)", self.pdc_id, self.pmu_ip, self.pmu_port) - + if(self.method == "tcp"): + start = CommandFrame(self.pdc_id, "stop") + self.pmu_socket.sendall(start.convert2bytes()) + self.logger.info("[%d] - Requesting to stop sending from PMU (%s:%d)", + self.pdc_id, self.pmu_ip, self.pmu_port) + else: + start = CommandFrame(self.pdc_id, "stop") + self.pmu_socket.sendto(start.convert2bytes(), self.pmu_address) + self.logger.info("[%d] - Requesting to stop sending from PMU (%s:%d)", + self.pdc_id, self.pmu_ip, self.pmu_port) def get_header(self): """ @@ -81,7 +123,8 @@ def get_header(self): get_header = CommandFrame(self.pdc_id, "header") self.pmu_socket.sendall(get_header.convert2bytes()) - self.logger.info("[%d] - Requesting header message from PMU (%s:%d)", self.pdc_id, self.pmu_ip, self.pmu_port) + self.logger.info("[%d] - Requesting header message from PMU (%s:%d)", + self.pdc_id, self.pmu_ip, self.pmu_port) header = self.get() if isinstance(header, HeaderFrame): @@ -89,7 +132,6 @@ def get_header(self): else: raise PdcError("Invalid Header message received") - def get_config(self, version="cfg2"): """ Request for Configuration frame. @@ -97,18 +139,27 @@ def get_config(self, version="cfg2"): :return: ConfigFrame """ get_config = CommandFrame(self.pdc_id, version) - self.pmu_socket.sendall(get_config.convert2bytes()) - config = self.get() - if type(config) == ConfigFrame1: - self.pmu_cfg1 = config - elif type(config) == ConfigFrame2: - self.pmu_cfg2 = config + if(self.method == "udp"): + self.pmu_socket.sendto(get_config.convert2bytes(), self.pmu_address) + config = self.get() + if type(config) == ConfigFrame1: + self.pmu_cfg1 = config + elif type(config) == ConfigFrame2: + self.pmu_cfg2 = config + return config else: - raise PdcError("Invalid Configuration message received") + self.pmu_socket.sendall(get_config.convert2bytes()) - return config + config = self.get() + if type(config) == ConfigFrame1: + self.pmu_cfg1 = config + elif type(config) == ConfigFrame2: + self.pmu_cfg2 = config + else: + raise PdcError("Invalid Configuration message received") + return config def get(self): """ @@ -124,41 +175,46 @@ def get(self): Should get this in first iteration. FRAMESIZE is needed to determine when one complete message has been received. """ - - while len(received_data) < 4: - received_data += self.pmu_socket.recv(self.buffer_size) - - bytes_received = len(received_data) - total_frame_size = int.from_bytes(received_data[2:4], byteorder="big", signed=False) - # Keep receiving until every byte of that message is received - while bytes_received < total_frame_size: - message_chunk = self.pmu_socket.recv(min(total_frame_size - bytes_received, self.buffer_size)) - if not message_chunk: - break - received_data += message_chunk - bytes_received += len(message_chunk) - + if(self.method == "tcp"): + + received_data += self.pmu_socket.recv(4) + bytes_received = len(received_data) + total_frame_size = int.from_bytes(received_data[2:4], byteorder="big", signed=False) + # print(total_frame_size) + while bytes_received < total_frame_size: + message_chunk = self.pmu_socket.recv( + min(total_frame_size - bytes_received, self.buffer_size)) + if not message_chunk: + break + received_data += message_chunk + bytes_received += len(message_chunk) + + if((self.method == "udp") or (self.method == "multicast")): + received_data = self.pmu_socket.recv(1024) + total_frame_size = int.from_bytes(received_data[2:4], byteorder="big", signed=False) + # print(received_data) # If complete message is received try to decode it if len(received_data) == total_frame_size: + # print(self.pmu_cfg2) try: - received_message = CommonFrame.convert2frame(received_data, self.pmu_cfg2) # Try to decode received data - self.logger.debug("[%d] - Received %s from PMU (%s:%d)", self.pdc_id, type(received_message).__name__, - self.pmu_ip, self.pmu_port) + received_message = CommonFrame.convert2frame( + received_data, self.pmu_cfg2) # Try to decode received data + #self.logger.debug("[%d] - Received %s from PMU (%s:%d)", self.pdc_id, type(received_message).__name__,self.pmu_ip, self.pmu_port) except FrameError: self.logger.warning("[%d] - Received unknown message from PMU (%s:%d)", self.pdc_id, self.pmu_ip, self.pmu_port) return received_message - def quit(self): """ Close connection to PMU :return: NoneType """ self.pmu_socket.close() - self.logger.info("[%d] - Connection to PMU closed (%s:%d)", self.pdc_id, self.pmu_ip, self.pmu_port) + self.logger.info("[%d] - Connection to PMU closed (%s:%d)", + self.pdc_id, self.pmu_ip, self.pmu_port) class PdcError(BaseException): diff --git a/synchrophasor/pmu.py b/synchrophasor/pmu.py index 14e5518..47902f2 100644 --- a/synchrophasor/pmu.py +++ b/synchrophasor/pmu.py @@ -5,10 +5,10 @@ from threading import Thread from multiprocessing import Queue from multiprocessing import Process -from sys import stdout +from sys import stdout, exc_info from time import sleep from synchrophasor.frame import * - +from traceback import print_exception __author__ = "Stevan Sandi" __copyright__ = "Copyright (c) 2016, Tomo Popovic, Stevan Sandi, Bozo Krstajic" @@ -17,19 +17,25 @@ __version__ = "1.0.0-alpha" -class Pmu(object): +############################################################### - logger = logging.getLogger(__name__) - logger.setLevel(logging.INFO) - handler = logging.StreamHandler(stdout) - formatter = logging.Formatter("%(asctime)s %(levelname)s %(message)s") - handler.setFormatter(formatter) - logger.addHandler(handler) +# UDP connection was implemented by Yuri Poledna under the supervision of +# Prof. Eduardo Parente in R&D project supported by Brazilian electric utility - def __init__(self, pmu_id=7734, data_rate=30, port=4712, ip="127.0.0.1", - method="tcp", buffer_size=2048, set_timestamp=True): +# Companhia Paranaense de Energia – COPEL. +############################################################### + +class Pmu(object): + + def __init__(self, pmu_id=7734, data_rate=30, port=4712, ip="127.0.0.1", method="tcp", buffer_size=2048, set_timestamp=True): + self.logger = logging.getLogger(__name__) + self.logger.setLevel(logging.INFO) + self.handler = logging.StreamHandler(stdout) + self.formatter = logging.Formatter("%(asctime)s %(levelname)s %(message)s") + self.handler.setFormatter(self.formatter) + self.logger.addHandler(self.handler) self.port = port self.ip = ip @@ -67,7 +73,6 @@ def __init__(self, pmu_id=7734, data_rate=30, port=4712, ip="127.0.0.1", self.clients = [] self.client_buffers = [] - def set_id(self, pmu_id): self.cfg1.set_id_code(pmu_id) @@ -80,7 +85,6 @@ def set_id(self, pmu_id): self.logger.info("[%d] - PMU Id changed.", self.cfg2.get_id_code()) - def set_configuration(self, config=None): # If none configuration given IEEE sample configuration will be loaded @@ -107,7 +111,6 @@ def set_configuration(self, config=None): self.logger.info("[%d] - PMU configuration changed.", self.cfg2.get_id_code()) - def set_header(self, header=None): if isinstance(header, HeaderFrame): @@ -122,7 +125,6 @@ def set_header(self, header=None): self.logger.info("[%d] - PMU header changed.", self.cfg2.get_id_code()) - def set_data_rate(self, data_rate): self.cfg1.set_data_rate(data_rate) @@ -136,7 +138,6 @@ def set_data_rate(self, data_rate): self.logger.info("[%d] - PMU reporting data rate changed.", self.cfg2.get_id_code()) - def set_data_format(self, data_format): self.cfg1.set_data_format(data_format, self.cfg1.get_num_pmu()) @@ -149,7 +150,6 @@ def set_data_format(self, data_format): self.logger.info("[%d] - PMU data format changed.", self.cfg2.get_id_code()) - def send(self, frame): if not isinstance(frame, CommonFrame) and not isinstance(frame, bytes): @@ -158,25 +158,27 @@ def send(self, frame): for buffer in self.client_buffers: buffer.put(frame) - def send_data(self, phasors=[], analog=[], digital=[], freq=0, dfreq=0, stat=("ok", True, "timestamp", False, False, False, 0, "<10", 0), soc=None, frasec=None): # PH_UNIT conversion if phasors and self.cfg2.get_num_pmu() > 1: # Check if multistreaming: if not (self.cfg2.get_num_pmu() == len(self.cfg2.get_data_format()) == len(phasors)): - raise PmuError("Incorrect input. Please provide PHASORS as list of lists with NUM_PMU elements.") + raise PmuError( + "Incorrect input. Please provide PHASORS as list of lists with NUM_PMU elements.") for i, df in self.cfg2.get_data_format(): if not df[1]: # Check if phasor representation is integer - phasors[i] = map(lambda x: int(x / (0.00001 * self.cfg2.get_ph_units()[i])), phasors[i]) + phasors[i] = map(lambda x: int( + x / (0.00001 * self.cfg2.get_ph_units()[i])), phasors[i]) elif not self.cfg2.get_data_format()[1]: phasors = map(lambda x: int(x / (0.00001 * self.cfg2.get_ph_units())), phasors) # AN_UNIT conversion if analog and self.cfg2.get_num_pmu() > 1: # Check if multistreaming: if not (self.cfg2.get_num_pmu() == len(self.cfg2.get_data_format()) == len(analog)): - raise PmuError("Incorrect input. Please provide analog ANALOG as list of lists with NUM_PMU elements.") + raise PmuError( + "Incorrect input. Please provide analog ANALOG as list of lists with NUM_PMU elements.") for i, df in self.cfg2.get_data_format(): if not df[2]: # Check if analog representation is integer @@ -184,45 +186,74 @@ def send_data(self, phasors=[], analog=[], digital=[], freq=0, dfreq=0, elif not self.cfg2.get_data_format()[2]: analog = map(lambda x: int(x / self.cfg2.get_analog_units()), analog) - data_frame = DataFrame(self.cfg2.get_id_code(), stat, phasors, freq, dfreq, analog, digital, self.cfg2) + data_frame = DataFrame(self.cfg2.get_id_code(), stat, phasors, + freq, dfreq, analog, digital, self.cfg2) for buffer in self.client_buffers: buffer.put(data_frame) - def run(self): if not self.cfg1 and not self.cfg2 and not self.cfg3: raise PmuError("Cannot run PMU without configuration.") # Create TCP socket, bind port and listen for incoming connections - self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) - self.socket.bind((self.ip, self.port)) - self.socket.listen(5) - - self.listener = Thread(target=self.acceptor) # Run acceptor thread to handle new connection + if(self.method == "tcp"): + self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) + self.socket.bind((self.ip, self.port)) + self.socket.listen(5) + if(self.method == "udp"): + self.socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) + self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) + self.socket.bind(("", self.port)) + # self.socket.listen(5) + + # Run acceptor thread to handle new connection + self.listener = Thread(target=self.acceptor, daemon=True, name="listenerPMUthread") self.listener.daemon = True self.listener.start() - def acceptor(self): + if(self.method == "tcp"): + while True: - while True: - - self.logger.info("[%d] - Waiting for connection on %s:%d", self.cfg2.get_id_code(), self.ip, self.port) - + self.logger.info("[%d] - Waiting for connection on %s:%d", + self.cfg2.get_id_code(), self.ip, self.port) + + # Accept a connection on the bound socket and fork a child process to handle it. + conn, address = self.socket.accept() + # Create Queue which will represent buffer for specific client and add it + # to list of all client buffers + buffer = Queue() + self.client_buffers.append(buffer) + + process = Process(target=self.pdc_handler, args=(conn, address, buffer, self.cfg2.get_id_code(), + self.cfg2.get_data_rate(), self.cfg1, self.cfg2, + self.cfg3, self.header, self.buffer_size, + self.set_timestamp, self.logger.level, self.method, self.logger), daemon=True, name="acceptorPMUprocess") + process.daemon = True + process.start() + self.clients.append(process) + + # Close the connection fd in the parent, since the child process has its own reference. + conn.close() + else: + self.logger.info("[%d] - Waiting for connection on %s:%d", + self.cfg2.get_id_code(), self.ip, self.port) # Accept a connection on the bound socket and fork a child process to handle it. - conn, address = self.socket.accept() - - # Create Queue which will represent buffer for specific client and add it o list of all client buffers + conn = self.socket + address = "0.0.0.0" + # Create Queue which will represent buffer for specific client and add it + # to list of all client buffers buffer = Queue() self.client_buffers.append(buffer) process = Process(target=self.pdc_handler, args=(conn, address, buffer, self.cfg2.get_id_code(), self.cfg2.get_data_rate(), self.cfg1, self.cfg2, self.cfg3, self.header, self.buffer_size, - self.set_timestamp, self.logger.level)) + self.set_timestamp, self.logger.level, self.method, self.logger), + daemon=True, name="acceptorPMUprocess") process.daemon = True process.start() self.clients.append(process) @@ -230,30 +261,29 @@ def acceptor(self): # Close the connection fd in the parent, since the child process has its own reference. conn.close() - def join(self): while self.listener.is_alive(): self.listener.join(0.5) - @staticmethod def pdc_handler(connection, address, buffer, pmu_id, data_rate, cfg1, cfg2, cfg3, header, - buffer_size, set_timestamp, log_level): - + buffer_size, set_timestamp, log_level, method, logger): + from time import time # Recreate Logger (handler implemented as static method due to Windows process spawning issues) - logger = logging.getLogger(address[0]+str(address[1])) - logger.setLevel(log_level) - handler = logging.StreamHandler(stdout) - formatter = logging.Formatter("%(asctime)s %(levelname)s %(message)s") - handler.setFormatter(formatter) - logger.addHandler(handler) + # if method=="tcp": + # logger = logging.getLogger(address[0]+str(address[1])) + # logger.setLevel(log_level) + # handler = logging.StreamHandler(stdout) + # formatter = logging.Formatter("%(asctime)s %(levelname)s %(message)s") + # handler.setFormatter(formatter) + # logger.addHandler(handler) - logger.info("[%d] - Connection from %s:%d", pmu_id, address[0], address[1]) + # logger.info("[%d] - Connection from %s:%d", pmu_id, address[0], address[1]) # Wait for start command from connected PDC/PMU to start sending sending_measurements_enabled = False - + currentTime = time() # Calculate delay between data frames if data_rate > 0: delay = 1.0 / data_rate @@ -261,99 +291,190 @@ def pdc_handler(connection, address, buffer, pmu_id, data_rate, cfg1, cfg2, cfg3 delay = -data_rate try: + address_list = [] while True: command = None received_data = b"" - readable, writable, exceptional = select([connection], [], [], 0) # Check for client commands - + readable, writable, exceptional = select( + [connection], [], [], 0) # Check for client commands + import datetime if readable: """ Keep receiving until SYNC + FRAMESIZE is received, 4 bytes in total. Should get this in first iteration. FRAMESIZE is needed to determine when one complete message has been received. """ - while len(received_data) < 4: - received_data += connection.recv(buffer_size) - - bytes_received = len(received_data) - total_frame_size = int.from_bytes(received_data[2:4], byteorder="big", signed=False) - - # Keep receiving until every byte of that message is received - while bytes_received < total_frame_size: - message_chunk = connection.recv(min(total_frame_size - bytes_received, buffer_size)) - if not message_chunk: - break - received_data += message_chunk - bytes_received += len(message_chunk) + if(method == "tcp"): + while len(received_data) < 4: + received_data += connection.recv(buffer_size) + + bytes_received = len(received_data) + total_frame_size = int.from_bytes( + received_data[2:4], byteorder="big", signed=False) + + # Keep receiving until every byte of that message is received + while bytes_received < total_frame_size: + message_chunk = connection.recv( + min(total_frame_size - bytes_received, buffer_size)) + if not message_chunk: + break + received_data += message_chunk + bytes_received += len(message_chunk) + + if(method == "udp"): + received_data, address = connection.recvfrom(1024) + total_frame_size = int.from_bytes( + received_data[2:4], byteorder="big", signed=False) # If complete message is received try to decode it if len(received_data) == total_frame_size: try: - received_message = CommonFrame.convert2frame(received_data) # Try to decode received data + received_message = CommonFrame.convert2frame( + received_data) # Try to decode received data if isinstance(received_message, CommandFrame): command = received_message.get_command() - logger.info("[%d] - Received command: [%s] <- (%s:%d)", pmu_id, command, - address[0], address[1]) + logger.debug("[%d] INFO - Received command: [%s] <- (%s:%d)" % + (pmu_id, command, address[0], address[1])) else: - logger.info("[%d] - Received [%s] <- (%s:%d)", pmu_id, - type(received_message).__name__, address[0], address[1]) + logger.debug("[%d] - Received [%s] <- (%s:%d)" % (pmu_id, + type(received_message).__name__, address[0], address[1])) except FrameError: - logger.warning("[%d] - Received unknown message <- (%s:%d)", pmu_id, address[0], address[1]) + logger.debug(("[%d] - Received unknown message <- (%s:%d)") % + (pmu_id, address[0], address[1])) else: - logger.warning("[%d] - Message not received completely <- (%s:%d)", pmu_id, address[0], address[1]) - - if command: - if command == "start": - sending_measurements_enabled = True - logger.info("[%d] - Start sending -> (%s:%d)", pmu_id, address[0], address[1]) - - elif command == "stop": - logger.info("[%d] - Stop sending -> (%s:%d)", pmu_id, address[0], address[1]) - sending_measurements_enabled = False - - elif command == "header": - if set_timestamp: header.set_time() - connection.sendall(header.convert2bytes()) - logger.info("[%d] - Requested Header frame sent -> (%s:%d)", - pmu_id, address[0], address[1]) - - elif command == "cfg1": - if set_timestamp: cfg1.set_time() - connection.sendall(cfg1.convert2bytes()) - logger.info("[%d] - Requested Configuration frame 1 sent -> (%s:%d)", - pmu_id, address[0], address[1]) - - elif command == "cfg2": - if set_timestamp: cfg2.set_time() - connection.sendall(cfg2.convert2bytes()) - logger.info("[%d] - Requested Configuration frame 2 sent -> (%s:%d)", - pmu_id, address[0], address[1]) - - elif command == "cfg3": - if set_timestamp: cfg3.set_time() - connection.sendall(cfg3.convert2bytes()) - logger.info("[%d] - Requested Configuration frame 3 sent -> (%s:%d)", - pmu_id, address[0], address[1]) - - if sending_measurements_enabled and not buffer.empty(): - - data = buffer.get() - if isinstance(data, CommonFrame): # If not raw bytes convert to bytes - if set_timestamp: data.set_time() - data = data.convert2bytes() - - sleep(delay) - connection.sendall(data) - logger.debug("[%d] - Message sent at [%f] -> (%s:%d)", - pmu_id, time(), address[0], address[1]) + logger.debug(("[%d] - Message not received completely <- (%s:%d)") % + (pmu_id, address[0], address[1])) + + if(method == "tcp"): + if command: + if command == "start": + sending_measurements_enabled = True + logger.debug(("[%d] - Start sending -> (%s:%d)"), + pmu_id, address[0], address[1]) + if address not in address_list: + address_list.append(address) + + elif command == "stop": + logger.debug("[%d] - Stop sending -> (%s:%d)", + pmu_id, address[0], address[1]) + if address in address_list: + address_list.pop(address_list.index(address)) + + if len(address_list)==0: + sending_measurements_enabled = False + + + elif command == "header": + if set_timestamp: + header.set_time() + connection.sendall(header.convert2bytes()) + logger.debug("[%d] - Requested Header frame sent -> (%s:%d)", + pmu_id, address[0], address[1]) + + elif command == "cfg1": + if set_timestamp: + cfg1.set_time() + connection.sendall(cfg1.convert2bytes()) + logger.debug("[%d] - Requested Configuration frame 1 sent -> (%s:%d)", + pmu_id, address[0], address[1]) + + elif command == "cfg2": + if set_timestamp: + cfg2.set_time() + connection.sendall(cfg2.convert2bytes()) + logger.debug("[%d] - Requested Configuration frame 2 sent -> (%s:%d)", + pmu_id, address[0], address[1]) + + elif command == "cfg3": + if set_timestamp: + cfg3.set_time() + connection.sendall(cfg3.convert2bytes()) + logger.debug("[%d] - Requested Configuration frame 3 sent -> (%s:%d)", + pmu_id, address[0], address[1]) + + if sending_measurements_enabled and not buffer.empty(): + + data = buffer.get(block=True) + + if isinstance(data, CommonFrame): # If not raw bytes convert to bytes + if set_timestamp: + data.set_time() + data = data.convert2bytes() + + # sleep(delay) + connection.sendall(data) + logger.debug("[%d] - Message sent at [%f] -> (%s:%d)", + pmu_id, time(), address[0], address[1]) + else: + if(time() - currentTime >= 60): + currentTime = time() + connection.sendto(cfg2.convert2bytes(), address) + # logger.debug(("[%d] - Requested Configuration frame 2 sent -> (%s:%d)")%(pmu_id, address[0], address[1])) + if command: + if command == "start": + sending_measurements_enabled = True + logger.debug("[%d] - Start sending -> (%s:%d)", + pmu_id, address[0], address[1]) + if address not in address_list: + address_list.append(address) + + elif command == "stop": + logger.debug("[%d] - Stop sending -> (%s:%d)", + pmu_id, address[0], address[1]) + + if address in address_list: + address_list.pop(address_list.index(address)) + + if len(address_list)==0: + sending_measurements_enabled = False + + elif command == "header": + if set_timestamp: + header.set_time() + connection.sendto(header.convert2bytes(), address) + logger.debug("[%d] - Requested Header frame sent -> (%s:%d)", + pmu_id, address[0], address[1]) + + elif command == "cfg1": + if set_timestamp: + cfg1.set_time() + connection.sendto(cfg1.convert2bytes(), address) + logger.debug("[%d] - Requested Configuration frame 1 sent -> (%s:%d)", + pmu_id, address[0], address[1]) + + elif command == "cfg2": + connection.sendto(cfg2.convert2bytes(), address) + logger.debug("[%d] - Requested Configuration frame 2 sent -> (%s:%d)", + pmu_id, address[0], address[1]) + + elif command == "cfg3": + if set_timestamp: + cfg3.set_time() + connection.sendto(cfg3.convert2bytes(), address) + logger.debug("[%d] - Requested Configuration frame 3 sent -> (%s:%d)", + pmu_id, address[0], address[1]) + + if sending_measurements_enabled: # and not buffer.empty(): + + data = buffer.get(block=True) + if isinstance(data, CommonFrame): # If not raw bytes convert to bytes + data = data.convert2bytes() + + # sleep(delay) + for address in address_list: + connection.sendto(data, address) + #self.logger.debug("[%d] - Message sent at [%f] -> (%s:%d)"%(pmu_id, time(), address[0], address[1])) except Exception as e: print(e) + exc_information = exc_info() finally: connection.close() - logger.info("[%d] - Connection from %s:%d has been closed.", pmu_id, address[0], address[1]) + logger.debug("[%d] - Connection from %s:%d has been closed.", pmu_id, address[0], address[1]) + print(print_exception(*exc_information)) + del exc_information class PmuError(BaseException): diff --git a/synchrophasor/splitter.py b/synchrophasor/splitter.py index e85a755..b04f317 100644 --- a/synchrophasor/splitter.py +++ b/synchrophasor/splitter.py @@ -9,6 +9,14 @@ __license__ = "BSD-3" __version__ = "1.0.0-aplha" +############################################################### + +# UDP connection was implemented by Yuri Poledna under the supervision of + +# Prof. Eduardo Parente in R&D project supported by Brazilian electric utility + +# Companhia Paranaense de Energia – COPEL. +############################################################### class StreamSplitter(object):