Skip to content

Commit

Permalink
Symbol subjects
Browse files Browse the repository at this point in the history
Also fixed bug in publish in on_open.
  • Loading branch information
ohler55 committed May 12, 2018
1 parent 5263f34 commit dabd1c9
Show file tree
Hide file tree
Showing 17 changed files with 75 additions and 37 deletions.
2 changes: 1 addition & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -62,4 +62,4 @@ build-iPhoneSimulator/

test/log
log

misc/.yardoc
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,11 @@
# CHANGELOG

### 2.1.1 - 2018-05-11

- Subject can now be Symbols or any other object that responds top `#to_s`.

- Fixed bug where publishes from the `#on_open` callback broke the connection.

### 2.1.0 - 2018-05-10

- This is a minor release even though the API has changed. The changed API is the one for Rack based WebSocket and SSE connection upgrades. The PR for the spec addition is currently stalled but some suggestions for a stateless API are implemented in this release. The proposed Rack SPEC is [here](misc/SPEC). The PR is [here](https://github.com/rack/rack/pull/1272)
Expand Down
5 changes: 1 addition & 4 deletions example/config.ru
Original file line number Diff line number Diff line change
@@ -1,8 +1,5 @@
# frozen_string_literal: true

require "rack"
require 'agoo'

class FlyHandler
def call(req)
[ 200, { }, [ "flying fish" ] ]
Expand All @@ -13,6 +10,6 @@ run FlyHandler.new

# A minimal startup of the Agoo rack handle using rackup. Note this does not
# allow for loading any static assets.
# $ bundle exec rackup
# $ bundle exec rackup -r agoo -s agoo

# Make requests on port 9292 to received responses.
7 changes: 2 additions & 5 deletions example/push/config.ru
Original file line number Diff line number Diff line change
@@ -1,7 +1,4 @@

require 'rack'
require 'agoo'

# The websocket.html and sse.html are used for this example. After starting
# open a URL of http://localhost:9292/websocket.html or
# http://localhost:9292/sse.html.
Expand Down Expand Up @@ -37,7 +34,7 @@ class Clock

def on_message(client, data)
puts "--- on_message #{data}"
client.write("echo: #{data}")
client.write("Handler says #{data}")
end

# A simple clock publisher of sorts. It writes the current time every second
Expand Down Expand Up @@ -98,6 +95,6 @@ run Listen.new

# A minimal startup of the Agoo rack handle using rackup. Note this does not
# allow for loading any static assets.
# $ bundle exec rackup
# $ bundle exec rackup -r agoo -s agoo

# Make requests on port 9292 to received responses.
2 changes: 1 addition & 1 deletion example/push/push.rb
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ def on_drained(client)

def on_message(client, data)
puts "--- on_message #{data}"
client.write("echo: #{data}")
client.write("Handler says #{data}")
end

# A simple clock publisher of sorts. It writes the current time every second
Expand Down
1 change: 1 addition & 0 deletions example/push/websocket.html
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
}
sock.onopen = function() {
document.getElementById("status").textContent = "connected";
sock.send("Connected!")
}
sock.onmessage = function(msg) {
document.getElementById("message").textContent = msg.data;
Expand Down
15 changes: 9 additions & 6 deletions ext/agoo/agoo.c
Original file line number Diff line number Diff line change
Expand Up @@ -38,15 +38,16 @@ ragoo_shutdown(VALUE self) {
*
* call-seq: publish(subject, message)
*
* Publish a message on the given subject.
* Publish a message on the given subject. A subject is normally a String but
* Symbols can also be used as can any other object that responds to #to_s.
*/
VALUE
ragoo_publish(VALUE self, VALUE subject, VALUE message) {
rb_check_type(subject, T_STRING);
rb_check_type(message, T_STRING);
int slen;
const char *subj = extract_subject(subject, &slen);

queue_push(&the_server.pub_queue, pub_publish(StringValuePtr(subject), (int)RSTRING_LEN(subject),
StringValuePtr(message), (int)RSTRING_LEN(message)));
rb_check_type(message, T_STRING);
queue_push(&the_server.pub_queue, pub_publish(subj, slen, StringValuePtr(message), (int)RSTRING_LEN(message)));

return Qnil;
}
Expand All @@ -55,7 +56,9 @@ ragoo_publish(VALUE self, VALUE subject, VALUE message) {
*
* call-seq: unsubscribe(subject)
*
* Unsubscribes on client listeners on the specified subject.
* Unsubscribes on client listeners on the specified subject. Subjects are
* normally Strings but Symbols can also be used as can any other object that
* responds to #to_s.
*/
static VALUE
ragoo_unsubscribe(VALUE self, VALUE subject) {
Expand Down
10 changes: 7 additions & 3 deletions ext/agoo/con.c
Original file line number Diff line number Diff line change
Expand Up @@ -620,6 +620,10 @@ con_ws_write(Con c) {
}
} else {
ws_req_close(c);
c->res_head = res->next;
if (res == c->res_tail) {
c->res_tail = NULL;
}
res_destroy(res);
return true;
}
Expand Down Expand Up @@ -764,7 +768,7 @@ con_write(Con c) {
default:
break;
}
if (kind != c->kind) {
if (kind != c->kind && CON_ANY != kind) {
c->kind = kind;
/*
if (CON_HTTP != kind && !remove) {
Expand Down Expand Up @@ -792,7 +796,7 @@ publish_pub(Pub pub) {
up->con->res_tail->next = res;
}
up->con->res_tail = res;
res->con_kind = up->con->kind;
res->con_kind = CON_ANY;
res_set_message(res, text_dup(pub->msg));
}
}
Expand Down Expand Up @@ -849,7 +853,7 @@ process_pub_con(Pub pub) {
up->con->res_tail->next = res;
}
up->con->res_tail = res;
res->con_kind = up->con->kind;
res->con_kind = CON_ANY;
res_set_message(res, pub->msg);
}
}
Expand Down
2 changes: 1 addition & 1 deletion ext/agoo/pub.c
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ pub_unsubscribe(Upgraded up, const char *subject, int slen) {
}

Pub
pub_publish(char *subject, int slen, const char *message, size_t mlen) {
pub_publish(const char *subject, int slen, const char *message, size_t mlen) {
Pub p = (Pub)malloc(sizeof(struct _Pub));

if (NULL != p) {
Expand Down
2 changes: 1 addition & 1 deletion ext/agoo/pub.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ typedef struct _Pub {
extern Pub pub_close(struct _Upgraded *up);
extern Pub pub_subscribe(struct _Upgraded *up, const char *subject, int slen);
extern Pub pub_unsubscribe(struct _Upgraded *up, const char *subject, int slen);
extern Pub pub_publish(char *subject, int slen, const char *message, size_t mlen);
extern Pub pub_publish(const char *subject, int slen, const char *message, size_t mlen);
extern Pub pub_write(struct _Upgraded *up, const char *message, size_t mlen, bool bin);
extern void pub_destroy(Pub pub);

Expand Down
13 changes: 8 additions & 5 deletions ext/agoo/text.c
Original file line number Diff line number Diff line change
Expand Up @@ -25,16 +25,15 @@ text_create(const char *str, int len) {

Text
text_dup(Text t0) {
int len = t0->len;
Text t = (Text)malloc(sizeof(struct _Text) - TEXT_MIN_SIZE + len + 1);
Text t = (Text)malloc(sizeof(struct _Text) - TEXT_MIN_SIZE + t0->alen + 1);

if (NULL != t) {
DEBUG_ALLOC(mem_text, t)
t->len = len;
t->alen = len;
t->len = t0->len;
t->alen = t0->alen;
t->bin = false;
atomic_init(&t->ref_cnt, 0);
memcpy(t->text, t0->text, len + 1);
memcpy(t->text, t0->text, t0->len + 1);
}
return t;
}
Expand Down Expand Up @@ -99,10 +98,14 @@ text_prepend(Text t, const char *s, int len) {
if (t->alen <= t->len + len) {
long new_len = t->alen + len + t->alen / 2;
size_t size = sizeof(struct _Text) - TEXT_MIN_SIZE + new_len + 1;
#ifdef MEM_DEBUG
Text t0 = t;
#endif

if (NULL == (t = (Text)realloc(t, size))) {
return NULL;
}
DEBUG_REALLOC(mem_text, t0, t);
t->alen = new_len;
}
memmove(t->text + len, t->text, t->len + 1);
Expand Down
1 change: 1 addition & 0 deletions ext/agoo/types.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ typedef enum {
} Method;

typedef enum {
CON_ANY = '\0',
CON_HTTP = 'H',
CON_WS = 'W',
CON_SSE = 'S',
Expand Down
37 changes: 31 additions & 6 deletions ext/agoo/upgraded.c
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,28 @@ destroy(Upgraded up) {
free(up);
}

const char*
extract_subject(VALUE subject, int *slen) {
const char *subj;

switch (rb_type(subject)) {
case T_STRING:
subj = StringValuePtr(subject);
*slen = (int)RSTRING_LEN(subject);
break;
case T_SYMBOL:
subj = rb_id2name(rb_sym2id(subject));
*slen = strlen(subj);
break;
default:
subject = rb_funcall(subject, to_s_id, 0);
subj = StringValuePtr(subject);
*slen = (int)RSTRING_LEN(subject);
break;
}
return subj;
}

void
upgraded_release(Upgraded up) {
pthread_mutex_lock(&the_server.up_lock);
Expand Down Expand Up @@ -184,15 +206,18 @@ up_write(VALUE self, VALUE msg) {
* dot delimited string that can include a '*' character as a wild card that
* matches any set of characters. The '>' character matches all remaining
* characters. Examples: people.fred.log, people.*.log, people.fred.>
*
* Symbols can also be used as can any other object that responds to #to_s.
*/
static VALUE
up_subscribe(VALUE self, VALUE subject) {
Upgraded up;

rb_check_type(subject, T_STRING);
int slen;
const char *subj = extract_subject(subject, &slen);

if (NULL != (up = get_upgraded(self))) {
atomic_fetch_add(&up->pending, 1);
queue_push(&the_server.pub_queue, pub_subscribe(up, StringValuePtr(subject), RSTRING_LEN(subject)));
queue_push(&the_server.pub_queue, pub_subscribe(up, subj, slen));
}
return Qnil;
}
Expand All @@ -203,6 +228,8 @@ up_subscribe(VALUE self, VALUE subject) {
*
* Unsubscribes to messages on the provided subject. If the subject is nil
* then all subscriptions for the object are removed.
*
* Symbols can also be used as can any other object that responds to #to_s.
*/
static VALUE
up_unsubscribe(int argc, VALUE *argv, VALUE self) {
Expand All @@ -211,9 +238,7 @@ up_unsubscribe(int argc, VALUE *argv, VALUE self) {
int slen = 0;

if (0 < argc) {
rb_check_type(argv[0], T_STRING);
subject = StringValuePtr(argv[0]);
slen = (int)RSTRING_LEN(argv[0]);
subject = extract_subject(argv[0], &slen);
}
if (NULL != (up = get_upgraded(self))) {
atomic_fetch_add(&up->pending, 1);
Expand Down
2 changes: 2 additions & 0 deletions ext/agoo/upgraded.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,4 +36,6 @@ extern void upgraded_add_subject(Upgraded up, struct _Subject *subject);
extern void upgraded_del_subject(Upgraded up, struct _Subject *subject);
extern bool upgraded_match(Upgraded up, const char *subject);

extern const char* extract_subject(VALUE subject, int *slen);

#endif // __AGOO_UPGRADED_H__
2 changes: 0 additions & 2 deletions ext/agoo/websocket.c
Original file line number Diff line number Diff line change
Expand Up @@ -215,7 +215,6 @@ ws_ping(Con c) {
if (NULL == (res = res_create(c))) {
log_cat(&error_cat, "Memory allocation of response failed on connection %llu.", c->id);
} else {
DEBUG_ALLOC(mem_res, res)
if (NULL == c->res_tail) {
c->res_head = res;
} else {
Expand All @@ -235,7 +234,6 @@ ws_pong(Con c) {
if (NULL == (res = res_create(c))) {
log_cat(&error_cat, "Memory allocation of response failed on connection %llu.", c->id);
} else {
DEBUG_ALLOC(mem_res, res)
if (NULL == c->res_tail) {
c->res_head = res;
} else {
Expand Down
2 changes: 1 addition & 1 deletion lib/agoo/version.rb
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@

module Agoo
# Agoo version.
VERSION = '2.1.0'
VERSION = '2.1.1'
end
3 changes: 2 additions & 1 deletion misc/push.md
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,8 @@ browser.
The example is a bit more than a hello world but only enough to make it
interesting. A browser is used to connect to a Rack server that runs a clock,
On each tick of the clock the time is sent to the browser. Either an SSE and a
WebSocket page can be used.
WebSocket page can be used. That means you can connect with your mobile device
using SSE. Try it to see how easy it is.

First some web pages will be needed. Lets call them `websocket.html` and
`sse.html`. Notice how similar they look.
Expand Down

0 comments on commit dabd1c9

Please sign in to comment.