Skip to content
This repository has been archived by the owner on Dec 20, 2024. It is now read-only.

Commit

Permalink
remove thread macanism because python does not support it well => go …
Browse files Browse the repository at this point in the history
…full asyncio
  • Loading branch information
Rodriguez committed Oct 22, 2023
1 parent 73d86c3 commit 2616598
Show file tree
Hide file tree
Showing 7 changed files with 173 additions and 56 deletions.
11 changes: 11 additions & 0 deletions platform/docs/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,3 +9,14 @@ The platform can dynamically mount and unmount:
- Devices (mounting device will load attached interfaces, unmounting device will unload them)




## Starting sequence

- start the running event loop
- connect a primary client to the primary broker (defined in the platform admin config)
- mount device 'machine' for this server
- read actif tree and mount each device



65 changes: 65 additions & 0 deletions platform/panduza_platform/core/monitored_event_loop.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
import asyncio

# ---

class MonitoredEventLoop(asyncio.SelectorEventLoop):
"""Event loop that provide some load metric
"""

# ---

def __init__(self, log, *args, **kwargs):
super().__init__(*args, **kwargs)
self._total_time = 0
self._select_time = 0

self._before_select = None

self.log = log
self.perf_cycle_time = 2

self.log.info(f"EVENT LOOP UP !!")

# ---

# TOTAL TIME:
def run_forever(self):
self.ref_time = self.time()
try:
self.log.info(f"EVENT LOOP RUN")
super().run_forever()
finally:
finished = self.time()
# self._total_time = finished - started

# ---

# SELECT TIME:
def _run_once(self):
# print("_run_once")
self._before_select = self.time()
super()._run_once()

# ---

def _process_events(self, *args, **kwargs):
after_select = self.time()
self._select_time += after_select - self._before_select

# print("_process_events", args, kwargs)
super()._process_events(*args, **kwargs)

cycle_time = self.time() - self.ref_time
self.log.info(f"EVENT {cycle_time}")
if cycle_time >= self.perf_cycle_time:

work_time = cycle_time - self._select_time
self.load = round((work_time/self.perf_cycle_time) * 100.0, 3)
if self.load > 60:
self.log.info(f"loop load {self.load}% !!")
else:
self.log.info(f"{self.load}%")
self._select_time = 0
self.ref_time = self.time()

# ---
143 changes: 91 additions & 52 deletions platform/panduza_platform/core/platform.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,14 @@
import threading
import importlib
import socket
import aiomonitor
import asyncio


from sys import platform

from core.monitored_event_loop import MonitoredEventLoop

from .conf import PLATFORM_VERSION
from log.platform import platform_logger

Expand Down Expand Up @@ -68,6 +73,15 @@ def __init__(self, run_dir="/etc"):
#
self.force_log = False



# If true, it means that the platform event loop can continue to run and work
self.alive = True

# To enable or disable event loop monitoring
# Should be false for release
self.event_loop_debug = True

# ---

def run(self):
Expand All @@ -94,6 +108,8 @@ def mount_device(self, device_cfg):
pass

# device.attach_pclient(client)

# _PZA_DEV_mount_interfaces
# self.threads[0].attach_worker(interface)


Expand Down Expand Up @@ -189,7 +205,7 @@ def load_tree_overide(self, tree_filepath):
def panic(self):
"""
"""
self.__alive = False
self.alive = False

# ---

Expand Down Expand Up @@ -229,81 +245,104 @@ def get_interface_instance(self, bench, device, name):

# ---

async def __idle_task(self):



# Wait for ever
while(self.alive):
await asyncio.sleep(1)

# ---

def __oper_mode(self):
"""Run the operational mode
First all the static aspects are resolved.
The number of instances, clients and thread are fixed for the rest of the execution.
Then the dynamic aspect starts
Threads are started and worker are bring to life
This mode start the main event loop then the initialisation task
"""

try:
if os.path.isfile(STATUS_FILE_PATH):
os.remove(STATUS_FILE_PATH)

# Start the loop and monitor activity
# If the debug flag is enabled, start monitored event loop
if self.event_loop_debug:
self.event_loop = MonitoredEventLoop(self.log)
asyncio.set_event_loop(self.event_loop)
with aiomonitor.start_monitor(self.event_loop):
self.event_loop.run_until_complete(self.__idle_task())
else:
self.event_loop = asyncio.get_event_loop()
self.event_loop.run_until_complete(self.__idle_task())


self.__load_tree()


self.__load_devices()


# Device for the platform (need to be improved !!!)
device_machine = self.device_factory.produce_device({
"name": socket.gethostname(),
"ref": "Panduza.Machine",
})
self.devices.append(device_machine)
# try:
# if os.path.isfile(STATUS_FILE_PATH):
# os.remove(STATUS_FILE_PATH)

self.load_interface("server", device_machine, {
"name": "device",
"driver": "py.device"
})
self.load_interface("server", device_machine, {
"name": "py",
"driver": "py.platform"
})

# self.__load_tree()


# self.__load_devices()


# # Device for the platform (need to be improved !!!)
# device_machine = self.device_factory.produce_device({
# "name": socket.gethostname(),
# "ref": "Panduza.Machine",
# })
# self.devices.append(device_machine)

# self.load_interface("server", device_machine, {
# "name": "device",
# "driver": "py.device"
# })
# self.load_interface("server", device_machine, {
# "name": "py",
# "driver": "py.platform"
# })


# modify interfaces with tree bench configs
# # modify interfaces with tree bench configs

# create clients
# # create clients

client = PlatformClient("localhost", 1883)
# client = PlatformClient("localhost", 1883)

# attach interface to client
# # attach interface to client

# attach clients os.remove(myfile)to thread
# # attach clients os.remove(myfile)to thread


for interface in self.interfaces:
interface.attach_pclient(client)
# for interface in self.interfaces:
# interface.attach_pclient(client)

# Prepare interface internal data
for interface in self.interfaces:
interface.initialize()
# # Prepare interface internal data
# for interface in self.interfaces:
# interface.initialize()

# Create and start thread pool
t = PlatformThread(self)
self.threads.append(t)
# # Create and start thread pool
# t = PlatformThread(self)
# self.threads.append(t)

# attach clients to thread
self.threads[0].attach_worker(client)
# # attach clients to thread
# self.threads[0].attach_worker(client)

# attach interfaces to thread
for interface in self.interfaces:
self.threads[0].attach_worker(interface)
# # attach interfaces to thread
# for interface in self.interfaces:
# self.threads[0].attach_worker(interface)

#
for thr in self.threads:
thr.start()
# #
# for thr in self.threads:
# thr.start()

self.__alive = True
while self.__alive:
time.sleep(0.1)
# self.alive = True
# while self.alive:
# time.sleep(0.1)

self.__stop()
# self.__stop()

# # Run all the interfaces on differents threads
# thread_id=0
Expand Down Expand Up @@ -337,7 +376,7 @@ def __oper_mode(self):
def __stop(self):
"""To stop the entire platform
"""
self.__alive = False
self.alive = False

#
self.log.warning("Platform stopping...")
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from core.platform_device import PlatformDevice


from devices.panduza.fake_bps.fake_bpc import DrvPanduzaFakeBpc

class DevicePanduzaFakeBps(PlatformDevice):

Expand All @@ -20,6 +21,8 @@ def _PZA_DEV_config(self):
def _PZA_DEV_mount_interfaces(self):
"""
"""
print('!!!!!ddd')
bpc = DrvPanduzaFakeBpc()
pass

# ---
Expand Down
4 changes: 1 addition & 3 deletions platform/panduza_platform/drivers/ammeter/__init__.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,9 @@

from .drv_hanmatek_hm310t_ammeter import DriverHM310tAmmeter
from .drv_panduza_fake_ammeter import DriverFakeAmmeter


PZA_DRIVERS_LIST=[
DriverHM310tAmmeter,
DriverFakeAmmeter
DriverHM310tAmmeter
]


3 changes: 2 additions & 1 deletion platform/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,8 @@ def run(self):
'pyusb',
'PyHamcrest',
'grako',
'pyserial-asyncio==0.6'
'pyserial-asyncio==0.6',
'aiomonitor'
],

classifiers=[
Expand Down

0 comments on commit 2616598

Please sign in to comment.