Skip to content

Commit

Permalink
tests fixed
Browse files Browse the repository at this point in the history
  • Loading branch information
MalteUniOldenburg committed Apr 9, 2024
1 parent f8e170b commit 2a9ba67
Show file tree
Hide file tree
Showing 4 changed files with 93 additions and 68 deletions.
133 changes: 77 additions & 56 deletions mango_library/negotiation/winzent/winzent_base_agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,10 @@ def __init__(self, container, ttl, time_to_sleep=3, send_message_paths=False, et
# store other agents as neighbors in a list
self.neighbors = {}

#TODO:Test events
self._acknowledgment_event = asyncio.Event()
self._solution_found_event = asyncio.Event()

# some parameters necessary for a negotiation
self._solution_found = False # True if there is already a solution for the current problem
self._negotiation_running = False # is currently a negotiation running
Expand All @@ -69,20 +73,13 @@ def __init__(self, container, ttl, time_to_sleep=3, send_message_paths=False, et
self._acknowledgements_sent = [] # the id of the acceptance notifications to with an acknowledgement was sent
self._list_of_acknowledgements_sent = []
self._waiting_for_acknowledgements = False # True if the agent is still waiting for acknowledgements
self.negotiation_done = None # True if the negotiation is done
self.negotiation_done = asyncio.Future() # True if the negotiation is done
self._own_request = None # the agent stores its own request when it starts a negotiation
self._current_ttl = ttl # the current time to live for messages, indicates how far messages will be forwarded
self._time_to_sleep = time_to_sleep # time to sleep between regular tasks
self._lock = asyncio.Lock()
# tasks which should be triggered regularly
self.tasks = []
task_settings = [
(self.trigger_solver),
]
for trigger_fkt in task_settings:
t = asyncio.create_task(trigger_fkt())
t.add_done_callback(self.raise_exceptions)
self.tasks.append(t)

@property
def solution_found(self):
Expand Down Expand Up @@ -146,6 +143,8 @@ async def start_negotiation(self, start_dates, values):
:param start_dates: timespan for the negotiation
:param values: power value to negotiate about
"""
if self.aid == "agent0":
print(f"start neg for {self.aid}")
self.governor.power_balance_strategy.start_time = start_dates[0]
values = [math.ceil(value) for value in values]
self._solution_found = False
Expand Down Expand Up @@ -175,50 +174,53 @@ async def trigger_solver(self):
The time_to_sleep needs to be set according to the network size of the
agents.
"""
while not self._stopped.done():
await asyncio.sleep(0.1)
if self._negotiation_running:
await asyncio.sleep(self._time_to_sleep)
now = datetime.now()

current_time = now.strftime("%H:%M:%S")
logger.debug(f"{self.aid}: Timer ran out at = {str(current_time)}")
# After sleeping, the solver is triggered. This is necessary
# in case when not the complete negotiation problem can be
# solved. The solver is triggered after the timeout to
# determine the solution according to the power that
# is available.
self.governor.triggered_due_to_timeout = True
await self.solve()
self._negotiation_running = False
if self._waiting_for_acknowledgements:
await asyncio.sleep(self._time_to_sleep)
# Time for waiting for acknowledgements is done, therefore
# do not wait for acknowledgements anymore
logger.debug(
f"*** {self.aid} did not receive all acknowledgements. Negotiation was not successful."
)
# self.final = {}
self._waiting_for_acknowledgements = False
for acc_msg in self._curr_sent_acceptances:
withdrawal = WinzentMessage(time_span=acc_msg.time_span,
is_answer=True, answer_to=acc_msg.id,
msg_type=xboole.MessageType.WithdrawalNotification,
ttl=self._current_ttl, receiver=acc_msg.receiver,
value=[acc_msg.value[0]],
id=str(uuid.uuid4()),
sender=self.aid
)
if self.send_message_paths:
await self.send_message(withdrawal, msg_path=self.negotiation_connections[acc_msg.receiver])
else:
await self.send_message(withdrawal, receiver=acc_msg.receiver)
logger.info(f"{self.aid} reset because the waiting time for the remaining acknowledgements"
f" is over.")
for acc in self._curr_sent_acceptances:
logger.info(f"{self.aid}: {acc.value[0]} from {acc.receiver} not received.")
self._curr_sent_acceptances = []
await self.reset()
await asyncio.sleep(0.1)
try:
await asyncio.wait_for(self._solution_found_event.wait(), timeout=self._time_to_sleep)
except asyncio.TimeoutError:
now = datetime.now()

current_time = now.strftime("%H:%M:%S")
logger.debug(f"{self.aid}: Timer ran out at = {str(current_time)}")
# After sleeping, the solver is triggered. This is necessary
# in case when not the complete negotiation problem can be
# solved. The solver is triggered after the timeout to
# determine the solution according to the power that
# is available.
self.governor.triggered_due_to_timeout = True
await self.solve()
self._negotiation_running = False
try:
await asyncio.wait_for(self._acknowledgment_event.wait(), timeout=self._time_to_sleep)
except asyncio.TimeoutError:
# Time for waiting for acknowledgements is done, therefore
# do not wait for acknowledgements anymore
logger.debug(
f"*** {self.aid} did not receive all acknowledgements. Negotiation was not successful."
)
# self.final = {}
self._waiting_for_acknowledgements = False
for acc_msg in self._curr_sent_acceptances:
withdrawal = WinzentMessage(time_span=acc_msg.time_span,
is_answer=True, answer_to=acc_msg.id,
msg_type=xboole.MessageType.WithdrawalNotification,
ttl=self._current_ttl, receiver=acc_msg.receiver,
value=[acc_msg.value[0]],
id=str(uuid.uuid4()),
sender=self.aid
)
if self.send_message_paths:
await self.send_message(withdrawal, msg_path=self.negotiation_connections[acc_msg.receiver])
else:
await self.send_message(withdrawal, receiver=acc_msg.receiver)
logger.info(f"{self.aid} reset because the waiting time for the remaining acknowledgements"
f" is over.")
print(f"{self.aid} reset because the waiting time for the remaining acknowledgements"
f" is over.")
for acc in self._curr_sent_acceptances:
logger.info(f"{self.aid}: {acc.value[0]} from {acc.receiver} not received.")
self._curr_sent_acceptances = []
await self.reset()

async def handle_internal_request(self, requirement):
"""
Expand All @@ -231,9 +233,6 @@ async def handle_internal_request(self, requirement):
"""
message = requirement.message
values = self.get_flexibility_for_interval(time_span=message.time_span, msg_type=message.msg_type)
#print(message.time_span)
#print(message.value)
#print(values)
# for each value to negotiate about, check whether the request could be fulfilled internally completely.
for idx in range(len(values)):
if abs(message.value[idx]) - abs(values[idx]) <= 0:
Expand Down Expand Up @@ -297,7 +296,17 @@ async def handle_internal_request(self, requirement):
)
self._own_request = requirement.message
self._negotiation_running = True

task_settings = [
(self.trigger_solver),
]
for trigger_fkt in task_settings:
t = asyncio.create_task(trigger_fkt())
t.add_done_callback(self.raise_exceptions)
self.tasks.append(t)

logger.debug(f"{self.aid} sends negotiation start notification")
print(f"{self.aid} sends negotiation start notification")
await self.send_message(neg_msg)

def get_flexibility_for_interval(self, time_span, msg_type=6):
Expand Down Expand Up @@ -539,6 +548,8 @@ async def handle_initial_reply(self, requirement, message_path):
# to find a new solution. Therefore, trigger solver.
if not self._solution_found:
self.governor.power_balance.add(requirement)
if self.aid == "agent0":
print(f"{self.aid}: added {requirement.message.value} to power balance")
if not self.governor.solver_triggered:
self.governor.triggered_due_to_timeout = False
# Save the established connection
Expand Down Expand Up @@ -613,6 +624,8 @@ async def handle_acceptance_acknowledgement_reply(self, reply):
self.governor.solution_journal.remove_message(reply.answer_to)
if self.acknowledgement_valid(reply):
self.save_accepted_values(reply)
if self.aid == "agent0":
print(f"{self.aid}: Ack received from {reply.sender} over {reply.value}")
else:
logger.debug(
f"{self.aid} received an AcceptanceAcknowledgement (from {reply.sender} with value {reply.value}) "
Expand All @@ -633,6 +646,9 @@ async def handle_acceptance_acknowledgement_reply(self, reply):
if self.governor.solution_journal.is_empty():
# PGASC changed logger.info to logging
logger.debug(f'\n*** {self.aid} received all Acknowledgements. ***')
print(f'\n*** {self.aid} received all Acknowledgements. ***')
self._waiting_for_acknowledgements = False
self._acknowledgment_event.set()
await self.reset()

async def handle_withdrawal_reply(self, reply):
Expand Down Expand Up @@ -826,7 +842,7 @@ async def answer_requirements(self, final, afforded_values, initial_req):
await self.no_solution_after_timeout()
self.governor.triggered_due_to_timeout = False
return

self._solution_found_event.set()
i = 0
zero_indeces = []
for k, idx_v in self.final.items():
Expand Down Expand Up @@ -885,6 +901,8 @@ async def answer_requirements(self, final, afforded_values, initial_req):

# store acceptance message
self.governor.solution_journal.add(msg)
if self.aid == "agent0":
print(f"{self.aid}: sending acceptance to {msg.receiver}")
await self.send_message(msg)
for key in zero_indeces:
del self.final[key]
Expand Down Expand Up @@ -924,6 +942,9 @@ async def solve(self):
logger.debug(f"{self.aid} starts solver now.")
try:
final, afforded_values, initial_req = self.governor.try_balance()
print(f"afforded_values: {list(afforded_values.values())}")
print(f"initial req: {initial_req.message.value}")
print(initial_req.message.value == list(afforded_values.values()))
except Exception as e:
logger.debug(f"EXCEPTION: {e}")
if final:
Expand Down
7 changes: 3 additions & 4 deletions mango_library/negotiation/winzent/winzent_ethical_agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -124,17 +124,16 @@ async def handle_initial_reply(self, requirement, message_path):
return
# If there is no solution found already, the reply is considered
# to find a new solution. Therefore, trigger solver.
if self.aid == "agent0":
print(f"{self.aid}: received reply from {requirement.message.sender} over {requirement.message.value}")
print(f"{self._solution_found}")
if not self._solution_found:
self.governor.power_balance.add(requirement)
if not self.governor.solver_triggered:
self.governor.triggered_due_to_timeout = False
if not self.first_initial_reply_received:
self.first_initial_reply_received = True
await asyncio.sleep(self.reply_processing_waiting_time)
if self.aid == "agent0":
print(f"{self.aid}: Solver triggered with following offers:")
for req in self.governor.power_balance._ledger[0]:
print(req.message.values)
logger.debug(f"{self.aid}: Solver triggered!")
await self.solve()
self.first_initial_reply_received = False
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ async def test_get_correct_solution_through_restarts():
second_interval = 900
time_span = [first_interval, second_interval]
# this variable controls the amount of allowed restarts
number_of_restarted_negotiations_allowed = 3
number_of_restarted_negotiations_allowed = 5

agent_a, agent_b, agent_c, agent_d, agent_e, agent_f, container = await create_six_ethical_agents(
agent_a_ethics_score=2,
Expand Down
19 changes: 12 additions & 7 deletions tests/unit_test/winzent/test_ethics_single_interval.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import asyncio
from typing import Dict

import datetime
import pytest

from util_functions import shutdown, create_six_ethical_agents
Expand All @@ -19,7 +19,7 @@ async def test_hightest_priority_agents_get_provided_by_highest_ethics_score():
first_interval = 0
time_span = [first_interval]
# this variable controls the amount of allowed restarts
number_of_restarted_negotiations_allowed = 3
number_of_restarted_negotiations_allowed = 5

agent_a, agent_b, agent_c, agent_d, agent_e, agent_f, container = await create_six_ethical_agents(
agent_a_ethics_score=2,
Expand Down Expand Up @@ -101,7 +101,7 @@ async def test_hightest_priority_agents_get_provided_by_highest_ethics_score():
assert agent_e.flex[first_interval] == [0, 0]
assert agent_f.flex[first_interval] == [0, 0]

assert agent_a.ethics_score == 2.18
assert agent_a.ethics_score == 2.28
assert agent_b.ethics_score == 3.08
assert agent_c.ethics_score == 4.0

Expand All @@ -121,7 +121,7 @@ async def test_one_agent_provides_most_flexibility_and_has_to_prioritise():
first_interval = 0
time_span = [first_interval]
# this variable controls the amount of allowed restarts
number_of_restarted_negotiations_allowed = 5
number_of_restarted_negotiations_allowed = 3

agent_a, agent_b, agent_c, agent_d, agent_e, agent_f, container = await create_six_ethical_agents(
agent_a_ethics_score=2,
Expand Down Expand Up @@ -157,9 +157,11 @@ async def test_one_agent_provides_most_flexibility_and_has_to_prioritise():
while len(agents_with_started_negotiation) > 0:
agent = agents_with_started_negotiation.pop(0)
try:
if agent.aid == "agent0":
print(f"starting to wait for {agent.aid}: {datetime.datetime.now()}")
await asyncio.wait_for(agent.negotiation_done, timeout=5)
except asyncio.TimeoutError:
print(f"{agent.aid} could not finish its negotiation in time. Result is set to zero.")
print(f"{agent.aid} could not finish its negotiation in time. Result is set to zero: {datetime.datetime.now()}.")
agent.result = {}
# restart unsuccessful negotiations
# only allow a restricted number of restarts
Expand All @@ -173,18 +175,21 @@ async def test_one_agent_provides_most_flexibility_and_has_to_prioritise():

negotiation_successful = sum(agent_result_sum) >= sum(rounded_load_values[agent.aid])
if not negotiation_successful:
print(f"Negotiation of {agent.aid} was not successful. His result: "
f"{agent_result_sum} is lower than what he needed: {rounded_load_values[agent.aid]}")
if number_of_restarted_negotiations_allowed > 0:
# get sum of already negotiated values for this agent
# negotiation was not fully successful, therefore restart
agents_with_started_negotiation.append(agent)
# restart the negotiation with the missing value
rounded_load_values[agent.aid] = [a - b for a, b in zip(rounded_load_values[agent.aid], agent_result_sum)]
await agent.start_negotiation(
start_dates=[first_interval],
values=[a - b for a, b in zip(rounded_load_values[agent.aid], agent_result_sum)],
values=rounded_load_values[agent.aid],
)
print(
f"{agent.aid} restarted negotiation for value "
f"of {[a - b for a, b in zip(rounded_load_values[agent.aid], agent_result_sum)]}"
f"of {rounded_load_values[agent.aid]}"
)
print(agent.calculate_new_ethics_score(negotiation_successful))
number_of_restarted_negotiations_allowed -= 1
Expand Down

0 comments on commit 2a9ba67

Please sign in to comment.