-
Notifications
You must be signed in to change notification settings - Fork 0
/
AMQPTesterMultiplexing.java
353 lines (276 loc) · 12.9 KB
/
AMQPTesterMultiplexing.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
//This tester checks if multiplexing works over multiple channels,
//i.e. the client is expected to subscribe to two queues and the tester will
//deliver payload frames over multiple channels at the same time.
//The RabbitMQ behaviour is to never multiplex payloads, even though it is
//perfectly fine according to the specifications of the protocol
import java.util.*;
public class AMQPTesterMultiplexing extends AMQPTester {
//Queue of incoming frames
LinkedList<AMQPFrame> queue_incoming = new LinkedList<AMQPFrame>();
//Queue of outgoing frames
LinkedList<AMQPFrame> queue_outgoing = new LinkedList<AMQPFrame>();
//Tester state enumeration
public enum State {
INITIALIZING, //Connection.Start, Connection.Tune
HANDSHAKE_COMPLETE, //Connection.Start and Tune complete
SUBSCRIBED_1, //Subscribing to a queue on one channel
SUBSCRIBED_2, //Subscribing to two queues on separate channels
}
//Associated AMQPConnection
AMQPConnection amqpConnection;
//Current state this tester is in
public State state = State.INITIALIZING;
//The two different channels we have consumers on
public AShortUInt channel_1;
public AShortUInt channel_2;
//Constructor, we've just completed the handshake and the client now expects a
//connection.start object
AMQPTesterMultiplexing(AMQPConnection amqpConnection) {
//Store reference to the AMQPConnection we are working with
this.amqpConnection = amqpConnection;
//Arguments in Connection.Start
LinkedHashMap<AShortString, AMQPNativeType> start_arg = new LinkedHashMap<AShortString, AMQPNativeType>();
//Properties of server-properties
//FIXME: Include more headers?
LinkedHashMap<AShortString, AMQPNativeType> server_props = new LinkedHashMap<AShortString, AMQPNativeType>();
server_props.put(new AShortString("copyright"), new ALongString("Hello World Inc."));
//Add the expected data to the Connection.Start arglist
start_arg.put(new AShortString("version-major"), new AOctet(0x00));
start_arg.put(new AShortString("version-minor"), new AOctet(0x09));
start_arg.put(new AShortString("server-properties"), new AFieldTable(server_props));
start_arg.put(new AShortString("mechanisms"), new ALongString("PLAIN AMQPPLAIN")); //Not checked anyway
start_arg.put(new AShortString("locales"), new ALongString("en-US"));
//Build the complete frame
AMQPFrame start_frame = AMQPMethodFrame.build(10, 10, start_arg);
//Queue the frame up to be sent to the client
queue_outgoing.add(start_frame);
System.out.println("Sending Connection.Start");
}
//Called when a frame is received and we are still initalizing
public void updateInitializing() {
if (queue_incoming.size() != 0) {
//Get the received frame
AMQPFrame frame = queue_incoming.pop();
//Did we receive a method frame?
if (frame.amqpFrameType == AMQPFrame.AMQPFrameType.METHOD) {
//Get the inner frame, which is an AMQPMethodFrame in this case
AMQPMethodFrame inner = (AMQPMethodFrame) frame.innerFrame;
System.out.println("Received: " + inner.toString());
//Start-OK
if (inner.amqpClass.toInt() == 10 && inner.amqpMethod.toInt() == 11) {
//Send Connection.Tune
//Arguments to include in the method call
LinkedHashMap<AShortString, AMQPNativeType> arguments = new LinkedHashMap<AShortString, AMQPNativeType>();
arguments.put(new AShortString("channel-max"), new AShortUInt(0)); //No specific channel limit
arguments.put(new AShortString("frame-max"), new ALongUInt(1024)); //Max frame length
arguments.put(new AShortString("heartbeat"), new AShortUInt(0)); //Don't care about heartbeat
//Send connection.tune
queue_outgoing.add(AMQPMethodFrame.build(10, 30, arguments));
System.out.println("Sending Connection.Tune");
}
//Connection.Tune-ok
if (inner.amqpClass.toInt() == 10 && inner.amqpMethod.toInt() == 31) {
state = State.HANDSHAKE_COMPLETE; //Handshake is now considered complete
System.out.println("Handshake phase complete");
}
} else { //We are not expecting any non-method frames here
//Invalid frame, disconnect the client
amqpConnection.status = AMQPConnection.AMQPConnectionState.DISCONNECT;
System.out.println("AMQPTesterMultiplexing: Received bad frame during initialization");
}
}
}
//Quickly build a basic.deliver frame
public static AMQPFrame basicDeliver(AShortUInt channel, String routingKey) {
//Options for basic.deliver (60/60)
LinkedHashMap<AShortString, AMQPNativeType> arguments = new LinkedHashMap<AShortString, AMQPNativeType>();
arguments.put(new AShortString("consumer-tag"), new AShortString("amq.HelloWorld")); //Unique per channel
arguments.put(new AShortString("delivery-tag"), new ALongLongUInt(1));
arguments.put(new AShortString("redelivered"), new ABoolean(false));
arguments.put(new AShortString("exchange"), new AShortString("default"));
arguments.put(new AShortString("routing-key"), new AShortString(routingKey));
//Build the frame and set channel
AMQPFrame outgoing = AMQPMethodFrame.build(60, 60, arguments);
outgoing.channel = channel;
return outgoing;
}
//Quickly build a header frame
public static AMQPFrame headerFrame(AShortUInt channel, int length) {
return AMQPHeaderFrame.build(
channel, //Channel
new AShortUInt(60), //Class ID 60, i.e. Basic.Deliver
new ALongLongUInt(length) //Body length
);
}
//Quickly build a body frame
public static AMQPFrame bodyFrame(AShortUInt channel, String message) {
return AMQPBodyFrame.build(
channel,
message
);
}
//Periodical update
public void periodical() {
//Bad channel numbers
AShortUInt channel_zero = new AShortUInt(0);
AShortUInt channel_upper = new AShortUInt(123);
//Write max 10 bytes per 100ms
Server.DELAYED_WRITE = true;
Server.SOCKET_BUFFER_SIZE = 10;
//Only send periodical messages once we have two consumers on different
//channels connected
if (state != State.SUBSCRIBED_2) return;
//Basic deliver 1
queue_outgoing.add(basicDeliver(channel_1, "test_1"));
//Basic deliver 2
queue_outgoing.add(basicDeliver(channel_2, "test_2"));
//Header 1
queue_outgoing.add(headerFrame(channel_1, 4));
//Header 2
queue_outgoing.add(headerFrame(channel_2, 4));
//Payload, these are valid length
queue_outgoing.add(bodyFrame(channel_1, "1"));
queue_outgoing.add(bodyFrame(channel_2, "2"));
queue_outgoing.add(bodyFrame(channel_1, "1"));
queue_outgoing.add(bodyFrame(channel_2, "2"));
queue_outgoing.add(bodyFrame(channel_1, "1"));
queue_outgoing.add(bodyFrame(channel_2, "2"));
//queue_outgoing.add(bodyFrame(channel_1, "1"));
//queue_outgoing.add(bodyFrame(channel_2, "2"));
//queue_outgoing.add(bodyFrame(channel_upper, "2"));
//...but both of these are not (many libraries accepts these anyway!)
//queue_outgoing.add(bodyFrame(channel_1, "aa"));
//queue_outgoing.add(bodyFrame(channel_2, "bb"));
System.out.println("Sent body");
}
//Currently triggered upon modifying the incoming frame queue
//May be periodically triggered in the future
public void updateState() {
//Are we initializing? This is handeled separately to make the code more clean
if (state == State.INITIALIZING) {
updateInitializing();
return;
}
//Make sure that we have incoming data
if (queue_incoming.size() == 0) return;
AMQPFrame frame = queue_incoming.pop();
//Did we receive a Method frame?
if (frame.amqpFrameType == AMQPFrame.AMQPFrameType.METHOD) {
//Get the inner frame that contains all important frame data
AMQPMethodFrame inner = (AMQPMethodFrame) frame.innerFrame;
System.out.println("Frame received (full size: " + frame.size() + "): " + inner.toString());
//Connection.open
if (inner.amqpClass.toInt() == 10 && inner.amqpMethod.toInt() == 40) {
//Maybe check the path in the future if needed?
//Send connection.open-ok
//The supplied octet is the reserved field
queue_outgoing.add(AMQPMethodFrame.build(10, 41, new AOctet(0x00)));
System.out.println("Sending Connection.Open-OK");
}
//Connection.close
if (inner.amqpClass.toInt() == 10 && inner.amqpMethod.toInt() == 50) {
//Maybe check the path in the future if needed?
//Send connection.open-ok
//The supplied octet is the reserved field
queue_outgoing.add(AMQPMethodFrame.build(10, 51));
System.out.println("Sending Connection.Close-OK");
}
//Channel.open
if (inner.amqpClass.toInt() == 20 && inner.amqpMethod.toInt() == 10) {
//Build the channel.open-ok frame
//Arguments: class, method, args (arg in this case is reserved)
AMQPFrame outgoing = AMQPMethodFrame.build(20, 11, new ALongUInt(0));
//Reply on same channel as we got the message on
outgoing.channel = frame.channel;
//Queue frame to be sent
queue_outgoing.add(outgoing);
//Debugging
System.out.println("Sending Channel.Open-OK (for channel " + frame.channel + ")");
}
//Channel.close
if (inner.amqpClass.toInt() == 20 && inner.amqpMethod.toInt() == 40) {
//Prepare channel.close-ok
AMQPFrame outgoing = AMQPMethodFrame.build(20, 41, new ALongUInt(0));
//Reply on same channel as we got the message on
outgoing.channel = frame.channel;
//Queue frame to be sent
queue_outgoing.add(outgoing);
//Debugging
System.out.println("Sending Channel.Close-OK");
}
//Queue.declare
if (inner.amqpClass.toInt() == 50 && inner.amqpMethod.toInt() == 10) {
//List of arguments to be returned
LinkedHashMap<AShortString, AMQPNativeType> arguments = new LinkedHashMap<AShortString, AMQPNativeType>();
//Queue name received from the client
AShortString queue_name = (AShortString) inner.getArg("queue-name");
//According to page 6 in the XML-derived specification, the queue-name
//may be empty. In that case, the server should assign the last queue name
//declared on the active vhost & channel automatically.
if (queue_name.toString().equals("")) {
//Generate a queue named based on the channel it was received on
queue_name = new AShortString("amq.autoName_" + frame.channel.toString());
//Print info about the queue name
System.out.println("*** No queue name specified, returning: " + queue_name.toString());
}
//Add arguments
arguments.put(new AShortString("queue"), queue_name);
arguments.put(new AShortString("message-count"), new ALongUInt(0));
arguments.put(new AShortString("consumer-count"), new ALongUInt(1));
//Build frame and set same channel
AMQPFrame outgoing = AMQPMethodFrame.build(50, 11, arguments);
outgoing.channel = frame.channel;
//Send queue.declare-ok
queue_outgoing.add(outgoing);
System.out.println("Sending Queue.Declare-OK");
}
//Basic.consume
if (inner.amqpClass.toInt() == 60 && inner.amqpMethod.toInt() == 20) {
//Build outgoing basic.consome-ok
AMQPFrame outgoing = AMQPMethodFrame.build(60, 21, new AShortString("amq.HelloWorld"));
outgoing.channel = frame.channel;
queue_outgoing.add(outgoing);
System.out.println("Sending Basic.Consume-OK");
//Do we have a consumer on the first channel?
if (channel_1 == null) {
channel_1 = frame.channel;
state = State.SUBSCRIBED_1;
System.out.println("First subscriber connected");
return;
}
//Do we have a consumer on the first channel?
if (channel_2 == null) {
channel_2 = frame.channel;
state = State.SUBSCRIBED_2;
System.out.println("Second subscriber connected");
}
}
}
//Did we receive a Header frame?
if (frame.amqpFrameType == AMQPFrame.AMQPFrameType.HEADER) {
System.out.println("Received header frame in TesterSimple");
System.out.println(frame.innerFrame.toString());
}
//Did we receive a Body frame?
if (frame.amqpFrameType == AMQPFrame.AMQPFrameType.BODY) {
System.out.println("Received body frame (full size: " + frame.toWire().length() + ") in TesterSimple, data:");
System.out.println(frame.innerFrame.toString());
}
}
//Called when a frame has been received and decoded over the wire
public void deliverFrame(AMQPFrame amqpFrame) {
//Add frame to queue
queue_incoming.add(amqpFrame);
//Trigger state update
updateState();
}
//Get a frame from the internal queue
//Returns null if no frames are available
public AMQPFrame getFrame() {
if (queue_outgoing.size() != 0) {
//System.out.println("AMQPTesterMultiplexing sent a frame");
return queue_outgoing.pop();
}
return null;
}
};