Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added heartbeat & custom header params. #27

Open
wants to merge 5 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions .vscode/launch.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
{
// Use IntelliSense to learn about possible attributes.
// Hover to view descriptions of existing attributes.
// For more information, visit: https://go.microsoft.com/fwlink/?linkid=830387
"version": "0.2.0",
"configurations": [
{
"name": "Dart",
"program": "bin/main.dart",
"request": "launch",
"type": "dart"
}
]
}
45 changes: 28 additions & 17 deletions lib/impl/plugin_vm.dart
Original file line number Diff line number Diff line change
Expand Up @@ -4,40 +4,51 @@
library stomp_impl_plugin_vm;

import "dart:async";
import "dart:io";

import "plugin.dart" show BytesStompConnector;
import 'package:web_socket_channel/io.dart';
import "plugin.dart" show StringStompConnector;
import 'package:web_socket_channel/status.dart' as status;

/** The implementation on top of [Socket].
*/
class SocketStompConnector extends BytesStompConnector {
final Socket _socket;
class SocketStompConnector extends StringStompConnector {
final IOWebSocketChannel _socket;
StreamSubscription _listen;

SocketStompConnector(this._socket) {
_init();
}
void _init() {
_socket.listen((List<int> data) {
if (data != null && !data.isEmpty)
onBytes(data);
}, onError: (error, stackTrace) {
onError(error, stackTrace);
}, onDone: () {
_listen = _socket.stream.listen((data) {
if (data != null) {
final String sdata = data.toString();
if (sdata.isNotEmpty) onString(sdata);
}
});

_listen.onError((err) => onError(err, null));
_listen.onDone(() => onClose());

_socket.stream.handleError((error) => onError(error, null));

_socket.sink.done.then((v) {
onClose();
});
}

@override
Future close() {
_socket.destroy();
_listen.cancel();
_socket.sink.close(status.goingAway);
return new Future.value();
}

@override
Future writeStream_(Stream<List<int>> stream)
=> _socket.sink.addStream(stream);

@override
void writeBytes_(List<int> bytes) {
_socket.add(bytes);
void writeString_(String string) {
_socket.sink.add(string);
// TODO: implement writeString_
}
@override
Future writeStream_(Stream<List<int>> stream)
=> _socket.addStream(stream);
}
7 changes: 5 additions & 2 deletions lib/src/impl/util_write.dart
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ part of stomp_impl_util;

//Commands//
const String CONNECT = "CONNECT";
const String STOMP = "STOMP";
const String STOMP = "CONNECT";
const String CONNECTED = "CONNECTED";
const String DISCONNECT = "DISCONNECT";
const String SEND = "SEND";
Expand Down Expand Up @@ -57,7 +57,6 @@ void writeDataFrame(StompConnector connector, String command,
Map<String, String> headers, String string,
[List<int> bytes]) {
writeHeaders(connector, command, headers, endOfHeaders: false);

if (headers == null || headers[CONTENT_LENGTH] == null) {
int len = 0;
if (bytes != null) {
Expand All @@ -78,6 +77,10 @@ void writeDataFrame(StompConnector connector, String command,
connector.writeEof();
}

void pongMessage(StompConnector connector) {
writeSimpleFrame(connector,SEND,null);
}

///Write a frame from the given stream
Future writeStreamFrame(StompConnector connector, String command,
Map<String, String> headers, Stream<List<int>> stream) {
Expand Down
8 changes: 6 additions & 2 deletions lib/src/stomp_impl.dart
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ class _StompClient implements StompClient {
final _DisconnectCallback _onDisconnect;
final _ErrorCallback _onError;
final _FaultCallback _onFault;

DateTime lastMessageDate = new DateTime.now();
///<String subscription-id, _Subscriber>
final Map<String, _Subscriber> _subscribers = new HashMap();

Expand All @@ -81,6 +81,7 @@ class _StompClient implements StompClient {
static Future<StompClient> connect(
StompConnector connector,
String host,
Map<String,String> customHeaders,
String login,
String passcode,
List<int> heartbeat,
Expand All @@ -105,6 +106,7 @@ class _StompClient implements StompClient {
} else {
client.heartbeat[0] = client.heartbeat[1] = 0;
}
if(customHeaders != null) headers.addAll(customHeaders);
writeSimpleFrame(connector, STOMP, headers);

return client._connecting.future;
Expand All @@ -127,9 +129,11 @@ class _StompClient implements StompClient {

_connector
..onBytes = (List<int> data) {
lastMessageDate = DateTime.now();
_parser.addBytes(data);
}
..onString = (String data) {
lastMessageDate = DateTime.now();
_parser.addString(data);
}
..onError = (error, stackTrace) {
Expand All @@ -139,6 +143,7 @@ class _StompClient implements StompClient {
_disconnected = true;
_subscribers.clear();
_receipts.clear();
cleanTimers();
if (_onDisconnect != null) _onDisconnect(this);
};
}
Expand All @@ -155,7 +160,6 @@ class _StompClient implements StompClient {
Future disconnect({String receipt}) {
_checkSend();
_disconnected = true;

Completer completer;
Map<String, String> headers;

Expand Down
27 changes: 27 additions & 0 deletions lib/src/stomp_util.dart
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ part of stomp;

const int _SUB_BYTES = 0, _SUB_STRING = 1, _SUB_JSON = 2, _SUB_BLOB = 3;

Timer outgoingTimer,incomingTimer;
///The information of a subscriber
class _Subscriber {
final String id;
Expand Down Expand Up @@ -79,10 +80,36 @@ void _handleHeartbeat(_StompClient client, String heartbeat) {
sy = int.parse(heartbeat.substring(i + 1));
client.heartbeat[0] = _calcHeartbeat(client.heartbeat[0], sy);
client.heartbeat[1] = _calcHeartbeat(client.heartbeat[1], sx);
final int ttlOutgoing = client.heartbeat[0];
if (ttlOutgoing != 0) {
outgoingTimer = Timer.periodic(new Duration(milliseconds: ttlOutgoing), (_) {
if (!client.isDisconnected) {
print("pong");
pongMessage(client._connector);
}
});
}
final int ttlIncoming = client.heartbeat[1];
if (ttlIncoming != 0) {
incomingTimer = Timer.periodic(new Duration(milliseconds: ttlIncoming), (_) {
int delta = new DateTime.now()
.difference(client.lastMessageDate)
.inMilliseconds;
if (delta > (ttlIncoming * 2)) {
client.disconnect();
}
});
}
} catch (ex) {
// ignore silently
}
}
}
cleanTimers(){
if(outgoingTimer != null && outgoingTimer.isActive)
outgoingTimer.cancel();
if(incomingTimer != null && incomingTimer.isActive)
incomingTimer.cancel();
}

int _calcHeartbeat(int a, int b) => a == 0 || b == 0 ? 0 : max(a, b);
3 changes: 2 additions & 1 deletion lib/stomp.dart
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ abstract class StompClient {
*/
static Future<StompClient> connect(StompConnector connector,
{String host,
Map<String,String> customHeaders,
String login,
String passcode,
List<int> heartbeat,
Expand All @@ -91,7 +92,7 @@ abstract class StompClient {
throw new ArgumentError(
"Required: connector. Use stomp_vm's connect() instead.");

return _StompClient.connect(connector, host, login, passcode, heartbeat,
return _StompClient.connect(connector, host, customHeaders,login, passcode, heartbeat,
onConnect, onDisconnect, onError, onFault);
}

Expand Down
36 changes: 28 additions & 8 deletions lib/vm.dart
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,8 @@
library stomp_vm;

import "dart:async";
import "dart:io";

import "stomp.dart" show StompClient;
import 'package:web_socket_channel/io.dart';
import "impl/plugin_vm.dart" show SocketStompConnector;

/** Connects a STOMP server, and instantiates a [StompClient]
Expand All @@ -32,13 +31,34 @@ import "impl/plugin_vm.dart" show SocketStompConnector;
* * [onFault] -- callback when an exception is received.
*/
Future<StompClient> connect(address, {int port: 61626,
String host, String login, String passcode, List<int> heartbeat,
String host,Map<String, String> customHeaders, String login, String passcode, List<int> heartbeat,
void onConnect(StompClient client, Map<String, String> headers),
void onDisconnect(StompClient client),
void onError(StompClient client, String message, String detail, Map<String, String> headers),
void onFault(StompClient client, error, stackTrace)})
=> Socket.connect(address, port).then((Socket socket)
=> StompClient.connect(new SocketStompConnector(socket),
host: host, login: login, passcode: passcode, heartbeat: heartbeat,
onConnect: onConnect, onDisconnect: onDisconnect,
onError: onError, onFault: onFault));
async => connectWith(await IOWebSocketChannel.connect(address),
host: host,
customHeaders: customHeaders,
login: login,
passcode: passcode,
heartbeat: heartbeat,
onConnect: onConnect,
onDisconnect: onDisconnect,
onError: onError,
onFault: onFault);

Future<StompClient> connectWith(IOWebSocketChannel channel,
{String host,
Map<String, String> customHeaders,
String login,
String passcode,
List<int> heartbeat,
void onConnect(StompClient client, Map<String, String> headers),
void onDisconnect(StompClient client),
void onError(StompClient client, String message, String detail,
Map<String, String> headers),
void onFault(StompClient client, error, stackTrace)})=>
StompClient.connect(new SocketStompConnector(channel),
host: host,customHeaders: customHeaders, login: login, passcode: passcode, heartbeat: heartbeat,
onConnect: onConnect, onDisconnect: onDisconnect,
onError: onError, onFault: onFault);
4 changes: 4 additions & 0 deletions lib/websocket.dart
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import "impl/plugin.dart" show StringStompConnector;
*/
Future<StompClient> connect(String url,
{String host,
Map<String, String> customHeaders,
String login,
String passcode,
List<int> heartbeat,
Expand All @@ -41,6 +42,7 @@ Future<StompClient> connect(String url,
void onFault(StompClient client, error, stackTrace)}) =>
connectWith(new WebSocket(url),
host: host,
customHeaders: customHeaders,
login: login,
passcode: passcode,
heartbeat: heartbeat,
Expand All @@ -56,6 +58,7 @@ Future<StompClient> connect(String url,
*/
Future<StompClient> connectWith(WebSocket socket,
{String host,
Map<String, String> customHeaders,
String login,
String passcode,
List<int> heartbeat,
Expand All @@ -67,6 +70,7 @@ Future<StompClient> connectWith(WebSocket socket,
_WSStompConnector.startWith(socket).then((_WSStompConnector connector) =>
StompClient.connect(connector,
host: host,
customHeaders: customHeaders,
login: login,
passcode: passcode,
heartbeat: heartbeat,
Expand Down
17 changes: 11 additions & 6 deletions test/_echo_test.dart
Original file line number Diff line number Diff line change
Expand Up @@ -6,16 +6,20 @@ part of echo_test;
/** It is part of both echo_vm_test.dart and echo_ws_test.dart
* so we can test it on both VM and browser.
*/
Future testEcho(address)
=> connect(address, onDisconnect: (_) {
print("Disconnected");
Future testEcho({address,headers,heartbeat})
=> connect(address,customHeaders: headers,heartbeat: heartbeat,
onConnect: ( client, Map<String, String> headers){
} , onDisconnect: (_) {
print("disconnect");
},onError: ( client, String message, String detail, Map<String, String> headers){

}).then((client) {
test("echo test", () {
test("echo test", () {
final String destination = "/foo";
final List<String> sends = ["1. apple", "2. orange\nand 2nd line", "3. mango"];
final List<String> sendExtraHeader = ["123", "abc:", "xyz"];
final List<String> receives = [], receiveExtraHeader = [];

/*
client.subscribeString("0", destination,
(headers, message) {
//print("<<received: $headers, $message");
Expand All @@ -34,9 +38,10 @@ Future testEcho(address)
expect(receives[i], sends[i]);
expect(receiveExtraHeader[i], sendExtraHeader[i]);
}

//client.unsubscribe("0"); //optional
client.disconnect();
});
*/
});
});
3 changes: 2 additions & 1 deletion test/echo_vm_test.dart
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ library echo_test;

import "dart:async";
import "dart:io";
import 'package:stomp/stomp.dart';
import 'package:test/test.dart';

import 'package:stomp/vm.dart' show connect;
Expand All @@ -13,7 +14,7 @@ part "_echo_test.dart";

void main() {
final address = "127.0.0.1";
testEcho(address)
testEcho(address: address)
.catchError((ex) {
print("Unable to connect $address\n"
"Check if the server has been started\n\nCause:\n$ex");
Expand Down
4 changes: 2 additions & 2 deletions test/echo_ws_test.dart
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,13 @@ import "dart:html";
import "dart:async";
import 'package:test/test.dart';

import 'package:stomp/webSocket.dart' show connect;
import 'package:stomp/websocket.dart' show connect;

part "_echo_test.dart";

void main() {
final address = "ws://localhost:8080";
testEcho(address)
testEcho(address: address)
.catchError((ex) {
print("Unable to connect $address\n"
"Check if the server has been started\n\nCause:\n$ex");
Expand Down
Loading