Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

point-streaming: further development based on PR #88 #215

Open
wants to merge 8 commits into
base: kinetic-devel
Choose a base branch
from
Original file line number Diff line number Diff line change
Expand Up @@ -300,6 +300,14 @@ class JointTrajectoryInterface
*/
virtual void jointTrajectoryCB(const trajectory_msgs::JointTrajectoryConstPtr &msg);

/**
* \brief Callback function registered to joint_command topic subscriber.
* Specific method is implemented in JointTrajectoryStreamer class.
*
* \param msg JointTrajectory message
*/
virtual void jointCommandCB(const trajectory_msgs::JointTrajectoryConstPtr &msg) = 0;

/**
* \brief Callback function registered to ROS stopMotion service
* Sends stop-motion command to robot.
Expand Down Expand Up @@ -342,6 +350,9 @@ class JointTrajectoryInterface
SmplMsgConnection* connection_;
ros::Subscriber sub_cur_pos_; // handle for joint-state topic subscription
ros::Subscriber sub_joint_trajectory_; // handle for joint-trajectory topic subscription

ros::Subscriber sub_joint_command_; // handle for joint-trajectory topic subscription

ros::ServiceServer srv_joint_trajectory_; // handle for joint-trajectory service
ros::Subscriber sub_joint_trajectory_ex_; // handle for joint-trajectory topic subscription
ros::ServiceServer srv_joint_trajectory_ex_; // handle for joint-trajectory service
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
#include <map>
#include <vector>
#include <string>
#include <queue>

namespace industrial_robot_client
{
Expand All @@ -51,7 +52,7 @@ namespace TransferStates
{
enum TransferState
{
IDLE = 0, STREAMING = 1 // ,STARTING, //, STOPPING
IDLE = 0, STREAMING = 1, POINT_STREAMING = 2 // ,STARTING, //, STOPPING
};
}
typedef TransferStates::TransferState TransferState;
Expand Down Expand Up @@ -116,6 +117,8 @@ class JointTrajectoryStreamer : public JointTrajectoryInterface

virtual void jointTrajectoryCB(const motoman_msgs::DynamicJointTrajectoryConstPtr &msg);

virtual void jointCommandCB(const trajectory_msgs::JointTrajectoryConstPtr &msg);

virtual bool trajectory_to_msgs(const trajectory_msgs::JointTrajectoryConstPtr &traj, std::vector<SimpleMessage>* msgs);

virtual bool trajectory_to_msgs(const motoman_msgs::DynamicJointTrajectoryConstPtr &traj, std::vector<SimpleMessage>* msgs);
Expand All @@ -134,6 +137,9 @@ class JointTrajectoryStreamer : public JointTrajectoryInterface
TransferState state_;
ros::Time streaming_start_;
int min_buffer_size_;

int ptstreaming_seq_count_; // sequence count for point streaming (--> JointTrajPtFull::sequence_)
std::queue<SimpleMessage> ptstreaming_queue_; // message queue for point streaming
};

} // namespace joint_trajectory_streamer
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,8 @@ class MotomanJointTrajectoryStreamer : public JointTrajectoryStreamer
virtual void streamingThread();

protected:
static const double pos_stale_time_ = 1.0; // max time since last "current position" update, for validation (sec)
static const double start_pos_tol_ = 5e-4; // max difference btwn start & current position, for validation (rad)

int robot_id_;
MotomanMotionCtrl motion_ctrl_;
Expand All @@ -143,6 +145,11 @@ class MotomanJointTrajectoryStreamer : public JointTrajectoryStreamer
static bool VectorToJointData(const std::vector<double> &vec,
industrial::joint_data::JointData &joints);

// variables for point streaming
double time_ptstreaming_last_point_; // time at which the last point was received
double dt_ptstreaming_points_; // elapsed time between two received points
static const double ptstreaming_timeout_ = 3.0; // seconds
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we make this a parameter that defaults to 3.0 seconds?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can but I guess it is already covered by "Force a transition to the IDLE state if we get a message with no-points" right?


/**
* \brief Service used to disable the robot controller. When disabled,
* all incoming goals are ignored.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,8 @@ bool JointTrajectoryInterface::init(SmplMsgConnection* connection, const std::ve
this->sub_cur_pos_ = this->node_.subscribe(
"joint_states", 1, &JointTrajectoryInterface::jointStateCB, this);

this->sub_joint_command_ = this->node_.subscribe("joint_command", 0, &JointTrajectoryInterface::jointCommandCB, this);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should just double check that an infinite queue size makes sense for point streaming. Should we drop commands if the streaming node can cycle fast enough, or should we continue to fill up the command queue?


return true;
}

Expand Down Expand Up @@ -733,4 +735,3 @@ void JointTrajectoryInterface::jointStateCB(

} // namespace joint_trajectory_interface
} // namespace industrial_robot_client

Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,73 @@ void JointTrajectoryStreamer::jointTrajectoryCB(const trajectory_msgs::JointTraj
send_to_robot(new_traj_msgs);
}

void JointTrajectoryStreamer::jointCommandCB(const trajectory_msgs::JointTrajectoryConstPtr &msg)
{
// read current state value (should be atomic)
int state = this->state_;

ROS_DEBUG("Current state is: %d", state);

//If current state is idle, set to POINT_STREAMING
if (TransferStates::IDLE == state)
{
this->mutex_.lock();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we could get in an undesired state transition if we get a new msg with empty points and we are we are currently in IDLE. Maybe we should check if the msg->points.empty() and if true report a warning, and return.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be handled above on line 185.

this->state_ = TransferStates::POINT_STREAMING;
this->current_point_ = 0;
this->ptstreaming_seq_count_ = 0;
this->streaming_start_ = ros::Time::now();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reset / clear the queue to prevent sending old trajectory points:

this->ptstreaming_queue_ = std::queue<SimpleMessage>();

this->mutex_.unlock();
state = TransferStates::POINT_STREAMING;
ROS_INFO("First joint point received. Starting on-the-fly streaming.");
}

//if current state is POINT_STREAMING, process incoming point.
if (TransferStates::POINT_STREAMING == state)
{
if (msg->points.empty())
{
ROS_INFO("Empty point received, cancelling current trajectory");
return;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Force a transition to the IDLE state if we get a message with no-points, this functionality will allow users publishing to the joint_command topic to "instantly" stop POINT_STREAMING without waiting for timeout.

this->mutex_.lock();
trajectoryStop();
this->mutex_.unlock();

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This would be a good addition I think.

}

//Else, Push point into queue
SimpleMessage message;
trajectory_msgs::JointTrajectoryPoint rbt_pt, xform_pt;

// select / reorder joints for sending to robot
if (!select(msg->joint_names, msg->points[0], this->all_joint_names_, &rbt_pt))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does select(..) also check that all required joints are present in the incoming msg?

If not, such a check would probably need to be added.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The function select(.) internally checks whether any element of msg->joint_names is empty or whether a matching "ROS element" can be found for the name of each joint in the message. If either of those two conditions are false, the function returns false. So I think that would be the check for all required joints being present.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The function select(.) internally checks whether any element of msg->joint_names is empty or whether a matching "ROS element" can be found for the name of each joint in the message. If either of those two conditions are false, the function returns false. So I think that would be the check for all required joints being present.

I agree with you on this. I think checking the return on select is adequate.

return;

// transform point data (e.g. for joint-coupling)
if (!transform(rbt_pt, &xform_pt))
return;

// convert trajectory point to ROS message
if (!create_message(this->ptstreaming_seq_count_, xform_pt, &message))
return;

//Points get pushed into queue here. They will be popped in the Streaming Thread and sent to controller.
this->mutex_.lock();
this->ptstreaming_queue_.push(message);
this->ptstreaming_seq_count_++;
this->mutex_.unlock();
}

//Else, cannot splice. Cancel current motion.
else
{
if (msg->points.empty())
ROS_INFO("Empty trajectory received, canceling current trajectory");
else
ROS_ERROR("Trajectory splicing not yet implemented, stopping current motion.");

this->mutex_.lock();
trajectoryStop();
this->mutex_.unlock();
return;
}
}

bool JointTrajectoryStreamer::send_to_robot(const std::vector<SimpleMessage>& messages)
{
ROS_INFO("Loading trajectory, setting state to streaming");
Expand Down Expand Up @@ -276,8 +343,47 @@ void JointTrajectoryStreamer::streamingThread()
ROS_WARN("Failed sent joint point, will try again");

break;

case TransferStates::POINT_STREAMING:

// if no points in queue, streaming complete, set to idle.
if (this->ptstreaming_queue_.empty())
{
ROS_INFO("Point streaming complete, setting state to IDLE");
this->state_ = TransferStates::IDLE;
break;
}
// if not connected, reconnect.
if (!this->connection_->isConnected())
{
ROS_DEBUG("Robot disconnected. Attempting reconnect...");
connectRetryCount = 5;
break;
}
// otherwise, send point to robot.
tmpMsg = this->ptstreaming_queue_.front();
this->ptstreaming_queue_.pop();
msg.init(tmpMsg.getMessageType(), CommTypes::SERVICE_REQUEST,
ReplyTypes::INVALID, tmpMsg.getData()); // set commType=REQUEST

ROS_DEBUG("Sending joint trajectory point");
if (this->connection_->sendAndReceiveMsg(msg, reply, false))
{
ROS_INFO("Point[%d] sent to controller", this->current_point_);
this->current_point_++;
}
else
ROS_WARN("Failed sent joint point, will try again");

break;
// TODO Consider checking for controller point starvation here. use a
// timer to check if the state is popping in and out of
// POINT_STREAMING mode, indicating something is trying to send
// streaming points, but is doing so too slowly. It may, in fact, not
// matter other than motion won't be smooth.

default:
ROS_ERROR("Joint trajectory streamer: unknown state");
ROS_ERROR("Joint trajectory streamer: unknown state, %d", this->state_);
this->state_ = TransferStates::IDLE;
break;
}
Expand All @@ -298,4 +404,3 @@ void JointTrajectoryStreamer::trajectoryStop()

} // namespace joint_trajectory_streamer
} // namespace industrial_robot_client

85 changes: 76 additions & 9 deletions motoman_driver/src/joint_trajectory_streamer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -128,9 +128,9 @@ bool MotomanJointTrajectoryStreamer::disableRobotCB(std_srvs::Trigger::Request &

trajectoryStop();

bool ret = motion_ctrl_.setTrajMode(false);
bool ret = motion_ctrl_.setTrajMode(false);
res.success = ret;

if (!res.success) {
res.message="Motoman robot was NOT disabled. Please re-examine and retry.";
ROS_ERROR_STREAM(res.message);
Expand All @@ -139,7 +139,7 @@ bool MotomanJointTrajectoryStreamer::disableRobotCB(std_srvs::Trigger::Request &
res.message="Motoman robot is now disabled and will NOT accept motion commands.";
ROS_WARN_STREAM(res.message);
}


return true;

Expand All @@ -148,9 +148,9 @@ bool MotomanJointTrajectoryStreamer::disableRobotCB(std_srvs::Trigger::Request &
bool MotomanJointTrajectoryStreamer::enableRobotCB(std_srvs::Trigger::Request &req,
std_srvs::Trigger::Response &res)
{
bool ret = motion_ctrl_.setTrajMode(true);
bool ret = motion_ctrl_.setTrajMode(true);
res.success = ret;

if (!res.success) {
res.message="Motoman robot was NOT enabled. Please re-examine and retry.";
ROS_ERROR_STREAM(res.message);
Expand All @@ -165,7 +165,7 @@ bool MotomanJointTrajectoryStreamer::enableRobotCB(std_srvs::Trigger::Request &r
}



// override create_message to generate JointTrajPtFull message (instead of default JointTrajPt)
bool MotomanJointTrajectoryStreamer::create_message(int seq, const trajectory_msgs::JointTrajectoryPoint &pt, SimpleMessage *msg)
{
Expand Down Expand Up @@ -364,7 +364,7 @@ bool MotomanJointTrajectoryStreamer::VectorToJointData(const std::vector<double>
}
return true;
}

// override send_to_robot to provide controllerReady() and setTrajMode() calls
bool MotomanJointTrajectoryStreamer::send_to_robot(const std::vector<SimpleMessage>& messages)
{
Expand Down Expand Up @@ -428,7 +428,7 @@ void MotomanJointTrajectoryStreamer::streamingThread()

tmpMsg = this->current_traj_[this->current_point_];
msg.init(tmpMsg.getMessageType(), CommTypes::SERVICE_REQUEST,
ReplyTypes::INVALID, tmpMsg.getData()); // set commType=REQUEST
ReplyTypes::INVALID, tmpMsg.getData());

if (!this->connection_->sendAndReceiveMsg(msg, reply, false))
ROS_WARN("Failed sent joint point, will try again");
Expand Down Expand Up @@ -460,6 +460,74 @@ void MotomanJointTrajectoryStreamer::streamingThread()
}
}
break;

case TransferStates::POINT_STREAMING:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@shaun-edwards: does this need to be added twice? Once here and once in the industrial_robot_client extension part of this package?

// if no points in queue, streaming complete, set to idle.
if (this->ptstreaming_queue_.empty())
{
if (this->dt_ptstreaming_points_ < this->ptstreaming_timeout_)
{
this->dt_ptstreaming_points_ = ros::Time::now().toSec() - this->time_ptstreaming_last_point_;
ros::Duration(0.005).sleep();
ROS_DEBUG("Time since last point: %f", this->dt_ptstreaming_points_);
break;
}
else
{
ROS_INFO("Point streaming complete, setting state to IDLE");
this->state_ = TransferStates::IDLE;
break;
}
}
// if not connected, reconnect.
if (!this->connection_->isConnected())
{
ROS_DEBUG("Robot disconnected. Attempting reconnect...");
connectRetryCount = 5;
break;
}
// otherwise, send point to robot.
tmpMsg = this->ptstreaming_queue_.front();
msg.init(tmpMsg.getMessageType(), CommTypes::SERVICE_REQUEST,
ReplyTypes::INVALID, tmpMsg.getData());

ROS_INFO("Sending joint trajectory point");
if (this->connection_->sendAndReceiveMsg(msg, reply, false))
{
MotionReplyMessage reply_status;
if (!reply_status.init(reply))
{
ROS_ERROR("Aborting point stream operation.");
this->state_ = TransferStates::IDLE;
break;
}
if (reply_status.reply_.getResult() == MotionReplyResults::SUCCESS)
{
ROS_DEBUG("Point[%d] sent to controller", this->current_point_);
this->current_point_++;
this->dt_ptstreaming_points_ = 0.0;
this->time_ptstreaming_last_point_ = ros::Time::now().toSec();
this->ptstreaming_queue_.pop();
}
else if (reply_status.reply_.getResult() == MotionReplyResults::BUSY)
{
ROS_DEBUG("silently resending.");
break; // silently retry sending this point
}
else
{
ROS_ERROR_STREAM("Aborting point stream operation. Failed to send point"
<< " (#" << this->current_point_ << "): "
<< MotomanMotionCtrl::getErrorString(reply_status.reply_));
this->state_ = TransferStates::IDLE;
break;
}
}
else
ROS_WARN("Failed sent joint point, will try again");

break;

default:
ROS_ERROR("Joint trajectory streamer: unknown state");
this->state_ = TransferStates::IDLE;
Expand Down Expand Up @@ -544,4 +612,3 @@ bool MotomanJointTrajectoryStreamer::is_valid(const motoman_msgs::DynamicJointTr

} // namespace joint_trajectory_streamer
} // namespace motoman