Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improved Prophet Routing protocol #42

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 17 additions & 1 deletion src/core/Message.java
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,8 @@ public class Message implements Comparable<Message> {

/** Application ID of the application that created the message */
private String appID;

/** Forwarding Counter of this message */
private int forwardingCounter;
static {
reset();
DTNSim.registerForReset(Message.class.getCanonicalName());
Expand Down Expand Up @@ -77,6 +78,7 @@ public Message(DTNHost from, DTNHost to, String id, int size) {
this.requestMsg = null;
this.properties = null;
this.appID = null;
this.forwardingCounter = 0;

Message.nextUniqueId++;
addNodeOnPath(from);
Expand Down Expand Up @@ -359,5 +361,19 @@ public String getAppID() {
public void setAppID(String appID) {
this.appID = appID;
}

/**
* Returns the number of times this message has been forwarded
* @return the forwarding counter
*/
public int getForwardingCounter() {
return this.forwardingCounter;
}
/**
* Increments the forwardingCounter each time a message is forwarded
*/
public void incrementForwardingCounter() {
this.forwardingCounter++;
}

}
23 changes: 23 additions & 0 deletions src/routing/ActiveRouter.java
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,7 @@ protected int startTransfer(Message m, Connection con) {
retVal = con.startTransfer(getHost(), m);
if (retVal == RCV_OK) { // started transfer
addToSendingConnections(con);
m.incrementForwardingCounter();
}
else if (deleteDelivered && retVal == DENIED_OLD &&
m.getTo() == con.getOtherNode(this.getHost())) {
Expand Down Expand Up @@ -460,6 +461,28 @@ protected Connection tryAllMessagesToAllConnections(){
return tryMessagesToConnections(messages, connections);
}

/**
* Tries to send selected messages that this router is carrying to all
* connections this node has. Messages are identified based on
* selection of routing protocol - epidemic or ImprovedProphet. Messages are
* ordered using the {@link MessageRouter#sortByQueueMode(List)}. See
* {@link #tryMessagesToConnections(List, List)} for sending details.
* @return The connections that started a transfer or null if no connection
* accepted a message.
*/

protected Connection trySelectedMessagesToAllConnections(List<Message> messages) {
List<Connection> connections = getConnections();
if (connections.size() == 0 || messages.size() == 0) {
return null;
}

this.sortByQueueMode(messages);

return tryMessagesToConnections(messages, connections);
}


/**
* Exchanges deliverable (to final recipient) messages between this host
* and all hosts this host is currently connected to. First all messages
Expand Down
312 changes: 312 additions & 0 deletions src/routing/ImprovedProphetRouter.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,312 @@
/*
* Copyright 2010 Aalto University, ComNet
* Released under GPLv3. See LICENSE.txt for details.
*/
package routing;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import core.Connection;
import core.DTNHost;
import core.Message;
import core.Settings;
import core.SimClock;
import core.Tuple;

/**
* Implementation of PRoPHET router as described in
* <I>Probabilistic routing in intermittently connected networks</I> by
* Anders Lindgren et al.
*/
public class ImprovedProphetRouter extends ActiveRouter {
/** delivery predictability initialization constant*/
public static final double P_INIT = 0.75;
/** delivery predictability transitivity scaling constant default value */
public static final double DEFAULT_BETA = 0.25;
/** delivery predictability aging constant */
public static final double GAMMA = 0.98;

/** Prophet router's setting namespace ({@value})*/
public static final String PROPHET_NS = "ImprovedProphetRouter";
/**
* Number of seconds in time unit -setting id ({@value}).
* How many seconds one time unit is when calculating aging of
* delivery predictions. Should be tweaked for the scenario.*/
public static final String SECONDS_IN_UNIT_S ="secondsInTimeUnit";

/**
* Transitivity scaling constant (beta) -setting id ({@value}).
* Default value for setting is {@link #DEFAULT_BETA}.
*/
public static final String BETA_S = "beta";

/** the value of nrof seconds in time unit -setting */
private int secondsInTimeUnit;
/** value of beta setting */
private double beta;

/** delivery predictabilities */
private Map<DTNHost, Double> preds;
/** last delivery predictability update (sim)time */
private double lastAgeUpdate;

/**
* Constructor. Creates a new message router based on the settings in
* the given Settings object.
* @param s The settings object
*/
public ImprovedProphetRouter(Settings s) {
super(s);
Settings prophetSettings = new Settings(PROPHET_NS);
secondsInTimeUnit = prophetSettings.getInt(SECONDS_IN_UNIT_S);
if (prophetSettings.contains(BETA_S)) {
beta = prophetSettings.getDouble(BETA_S);
}
else {
beta = DEFAULT_BETA;
}


initPreds();
}

/**
* Copyconstructor.
* @param r The router prototype where setting values are copied from
*/
protected ImprovedProphetRouter(ImprovedProphetRouter r) {
super(r);
this.secondsInTimeUnit = r.secondsInTimeUnit;
this.beta = r.beta;
initPreds();
}

/**
* Initializes predictability hash
*/
private void initPreds() {
this.preds = new HashMap<DTNHost, Double>();
}

@Override
public void changedConnection(Connection con) {
if (con.isUp()) {
DTNHost otherHost = con.getOtherNode(getHost());
updateDeliveryPredFor(otherHost);
updateTransitivePreds(otherHost);
}
}

/**
* Updates delivery predictions for a host.
* <CODE>P(a,b) = P(a,b)_old + (1 - P(a,b)_old) * P_INIT</CODE>
* @param host The host we just met
*/
private void updateDeliveryPredFor(DTNHost host) {
double oldValue = getPredFor(host);
double newValue = oldValue + (1 - oldValue) * P_INIT;
preds.put(host, newValue);
}

/**
* Returns the current prediction (P) value for a host or 0 if entry for
* the host doesn't exist.
* @param host The host to look the P for
* @return the current P value
*/
public double getPredFor(DTNHost host) {
ageDeliveryPreds(); // make sure preds are updated before getting
if (preds.containsKey(host)) {
return preds.get(host);
}
else {
return 0;
}
}

/**
* Updates transitive (A->B->C) delivery predictions.
* <CODE>P(a,c) = P(a,c)_old + (1 - P(a,c)_old) * P(a,b) * P(b,c) * BETA
* </CODE>
* @param host The B host who we just met
*/
private void updateTransitivePreds(DTNHost host) {
MessageRouter otherRouter = host.getRouter();
assert otherRouter instanceof ImprovedProphetRouter : "PRoPHET only works " +
" with other routers of same type";

double pForHost = getPredFor(host); // P(a,b)
Map<DTNHost, Double> othersPreds =
((ImprovedProphetRouter)otherRouter).getDeliveryPreds();

for (Map.Entry<DTNHost, Double> e : othersPreds.entrySet()) {
if (e.getKey() == getHost()) {
continue; // don't add yourself
}

double pOld = getPredFor(e.getKey()); // P(a,c)_old
double pNew = pOld + ( 1 - pOld) * pForHost * e.getValue() * beta;
preds.put(e.getKey(), pNew);
}
}

/**
* Ages all entries in the delivery predictions.
* <CODE>P(a,b) = P(a,b)_old * (GAMMA ^ k)</CODE>, where k is number of
* time units that have elapsed since the last time the metric was aged.
* @see #SECONDS_IN_UNIT_S
*/
private void ageDeliveryPreds() {
double timeDiff = (SimClock.getTime() - this.lastAgeUpdate) /
secondsInTimeUnit;

if (timeDiff == 0) {
return;
}

double mult = Math.pow(GAMMA, timeDiff);
for (Map.Entry<DTNHost, Double> e : preds.entrySet()) {
e.setValue(e.getValue()*mult);
}

this.lastAgeUpdate = SimClock.getTime();
}

/**
* Returns a map of this router's delivery predictions
* @return a map of this router's delivery predictions
*/
private Map<DTNHost, Double> getDeliveryPreds() {
ageDeliveryPreds(); // make sure the aging is done
return this.preds;
}

@Override
public void update() {
super.update();
if (!canStartTransfer() ||isTransferring()) {
return; // nothing to transfer or is currently transferring
}

// try messages that could be delivered to final recipient
if (exchangeDeliverableMessages() != null) {
return;
}

tryOtherMessages();
}

/**
* Tries to send all other messages to all connected hosts ordered by
* their delivery probability
* @return The return value of {@link #tryMessagesForConnected(List)}
*/
private Tuple<Message, Connection> tryOtherMessages() {
List<Tuple<Message, Connection>> messages =
new ArrayList<Tuple<Message, Connection>>();

List<Message> epidemicMessages = new ArrayList<Message>();
Collection<Message> msgCollection = getMessageCollection();

/* for all connected hosts collect all messages that have a higher
probability of delivery by the other host */
for (Connection con : getConnections()) {
DTNHost other = con.getOtherNode(getHost());
ImprovedProphetRouter othRouter = (ImprovedProphetRouter)other.getRouter();

if (othRouter.isTransferring()) {
continue; // skip hosts that are transferring
}

for (Message m : msgCollection) {
if (m.getForwardingCounter() <= 5 && m.getHopCount() <= 0 ){
epidemicMessages.add(m);
}
else {

if (othRouter.hasMessage(m.getId())) {
continue; // skip messages that the other one has
}
if (othRouter.getPredFor(m.getTo()) > getPredFor(m.getTo())) {
// the other node has higher probability of delivery
messages.add(new Tuple<Message, Connection>(m,con));
}
}
}
}

if (messages.size() == 0 && epidemicMessages.size() == 0) {
return null;
}

// sort the message-connection tuples
Collections.sort(messages, new TupleComparator());
trySelectedMessagesToAllConnections(epidemicMessages);
return tryMessagesForConnected(messages); // try to send messages Ashfaq
}

/**
* Comparator for Message-Connection-Tuples that orders the tuples by
* their delivery probability by the host on the other side of the
* connection (GRTRMax)
*/
private class TupleComparator implements Comparator
<Tuple<Message, Connection>> {

public int compare(Tuple<Message, Connection> tuple1,
Tuple<Message, Connection> tuple2) {
// delivery probability of tuple1's message with tuple1's connection
double p1 = ((ImprovedProphetRouter)tuple1.getValue().
getOtherNode(getHost()).getRouter()).getPredFor(
tuple1.getKey().getTo());
// -"- tuple2...
double p2 = ((ImprovedProphetRouter)tuple2.getValue().
getOtherNode(getHost()).getRouter()).getPredFor(
tuple2.getKey().getTo());

// bigger probability should come first
if (p2-p1 == 0) {
/* equal probabilities -> let queue mode decide */
return compareByQueueMode(tuple1.getKey(), tuple2.getKey());
}
else if (p2-p1 < 0) {
return -1;
}
else {
return 1;
}
}
}

@Override
public RoutingInfo getRoutingInfo() {
ageDeliveryPreds();
RoutingInfo top = super.getRoutingInfo();
RoutingInfo ri = new RoutingInfo(preds.size() +
" delivery prediction(s)");

for (Map.Entry<DTNHost, Double> e : preds.entrySet()) {
DTNHost host = e.getKey();
Double value = e.getValue();

ri.addMoreInfo(new RoutingInfo(String.format("%s : %.6f",
host, value)));
}

top.addMoreInfo(ri);
return top;
}

@Override
public MessageRouter replicate() {
ImprovedProphetRouter r = new ImprovedProphetRouter(this);
return r;
}

}