From 25b05292b3f9d01b6b0565efb3fe9cfe0b2d4252 Mon Sep 17 00:00:00 2001 From: Takao Nakaguchi Date: Tue, 3 Sep 2013 01:59:07 +0900 Subject: [PATCH 1/5] now you can log any object as JSON. --- .../java/org/fluentd/logger/FluentLogger.java | 4 +- .../java/org/fluentd/logger/sender/Event.java | 59 ++++++++++++++----- .../org/fluentd/logger/sender/NullSender.java | 5 +- .../logger/sender/RawSocketSender.java | 4 +- .../org/fluentd/logger/sender/Sender.java | 6 +- .../logger/sender/TestRawSocketSender.java | 8 +-- .../org/fluentd/logger/util/MockFluentd.java | 3 +- 7 files changed, 57 insertions(+), 32 deletions(-) diff --git a/src/main/java/org/fluentd/logger/FluentLogger.java b/src/main/java/org/fluentd/logger/FluentLogger.java index 45bc41b..0ca381d 100644 --- a/src/main/java/org/fluentd/logger/FluentLogger.java +++ b/src/main/java/org/fluentd/logger/FluentLogger.java @@ -81,11 +81,11 @@ public boolean log(String tag, String key, Object value, long timestamp) { return log(tag, data, timestamp); } - public boolean log(String tag, Map data) { + public boolean log(String tag, Object data) { return log(tag, data, 0); } - public boolean log(String tag, Map data, long timestamp) { + public boolean log(String tag, Object data, long timestamp) { String concatTag = null; if (tagPrefix == null || tagPrefix.length() == 0) { concatTag = tag; diff --git a/src/main/java/org/fluentd/logger/sender/Event.java b/src/main/java/org/fluentd/logger/sender/Event.java index 3eb2425..98a801a 100644 --- a/src/main/java/org/fluentd/logger/sender/Event.java +++ b/src/main/java/org/fluentd/logger/sender/Event.java @@ -18,6 +18,9 @@ package org.fluentd.logger.sender; import java.io.IOException; +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; +import java.util.LinkedHashMap; import java.util.Map; import org.msgpack.MessageTypeException; @@ -31,16 +34,16 @@ public class Event { public long timestamp; - public Map data; + public Object data; public Event() { } - public Event(String tag, Map data) { + public Event(String tag, Object data) { this(tag, System.currentTimeMillis() / 1000, data); } - public Event(String tag, long timestamp, Map data) { + public Event(String tag, long timestamp, Object data) { this.tag = tag; this.timestamp = timestamp; this.data = data; @@ -68,19 +71,7 @@ public void write(Packer pk, Event v, boolean required) throws IOException { { Templates.TString.write(pk, v.tag, required); Templates.TLong.write(pk, v.timestamp, required); - pk.writeMapBegin(v.data.size()); - { - for (Map.Entry entry : v.data.entrySet()) { - Templates.TString.write(pk, entry.getKey(), required); - try { - pk.write(entry.getValue()); - } catch (MessageTypeException e) { - String val = entry.getValue().toString(); - Templates.TString.write(pk, val, required); - } - } - } - pk.writeMapEnd(); + writeRec(pk, v.data, required); } pk.writeArrayEnd(); } @@ -88,5 +79,41 @@ public void write(Packer pk, Event v, boolean required) throws IOException { public Event read(Unpacker u, Event to, boolean required) throws IOException { throw new UnsupportedOperationException("Don't need the operation"); } + + private void writeRec(Packer pk, Object data, boolean required) throws IOException { + Map map = null; + if(!(data instanceof Map)){ + Map ret = new LinkedHashMap(); + for(Method m : data.getClass().getMethods()){ + if(m.getDeclaringClass().equals(Object.class)) continue; + if(!m.getName().startsWith("get")) continue; + if(m.getParameterTypes().length != 0) continue; + String name = m.getName().substring(3); + if(name.length() == 0) continue; + name = name.substring(0, 1).toLowerCase() + (name.length() == 1 ? "" : name.substring(1)); + try { + ret.put(name, m.invoke(data)); + } catch (IllegalArgumentException e) { + } catch (IllegalAccessException e) { + } catch (InvocationTargetException e) { + } + } + map = ret; + } else{ + map = (Map)data; + } + pk.writeMapBegin(map.size()); + { + for (Map.Entry entry : map.entrySet()) { + Templates.TString.write(pk, entry.getKey().toString(), required); + try { + pk.write(entry.getValue()); + } catch (MessageTypeException e) { + writeRec(pk, entry.getValue(), required); + } + } + } + pk.writeMapEnd(); + } } } diff --git a/src/main/java/org/fluentd/logger/sender/NullSender.java b/src/main/java/org/fluentd/logger/sender/NullSender.java index de33679..4c5671c 100644 --- a/src/main/java/org/fluentd/logger/sender/NullSender.java +++ b/src/main/java/org/fluentd/logger/sender/NullSender.java @@ -17,7 +17,6 @@ // package org.fluentd.logger.sender; -import java.util.Map; public class NullSender implements Sender { @@ -25,12 +24,12 @@ public NullSender(String host, int port, int timeout, int bufferCapacity) { } @Override - public boolean emit(String tag, Map data) { + public boolean emit(String tag, Object data) { return emit(tag, System.currentTimeMillis() / 1000, data); } @Override - public boolean emit(String tag, long timestamp, Map data) { + public boolean emit(String tag, long timestamp, Object data) { return true; } diff --git a/src/main/java/org/fluentd/logger/sender/RawSocketSender.java b/src/main/java/org/fluentd/logger/sender/RawSocketSender.java index ba22fef..b7a300b 100644 --- a/src/main/java/org/fluentd/logger/sender/RawSocketSender.java +++ b/src/main/java/org/fluentd/logger/sender/RawSocketSender.java @@ -125,11 +125,11 @@ public void close() { } } - public boolean emit(String tag, Map data) { + public boolean emit(String tag, Object data) { return emit(tag, System.currentTimeMillis() / 1000, data); } - public boolean emit(String tag, long timestamp, Map data) { + public boolean emit(String tag, long timestamp, Object data) { return emit(new Event(tag, timestamp, data)); } diff --git a/src/main/java/org/fluentd/logger/sender/Sender.java b/src/main/java/org/fluentd/logger/sender/Sender.java index 9a6dc42..8a56250 100644 --- a/src/main/java/org/fluentd/logger/sender/Sender.java +++ b/src/main/java/org/fluentd/logger/sender/Sender.java @@ -17,12 +17,10 @@ // package org.fluentd.logger.sender; -import java.util.Map; - public interface Sender { - boolean emit(String tag, Map data); + boolean emit(String tag, Object data); - boolean emit(String tag, long timestamp, Map data); + boolean emit(String tag, long timestamp, Object data); void flush(); diff --git a/src/test/java/org/fluentd/logger/sender/TestRawSocketSender.java b/src/test/java/org/fluentd/logger/sender/TestRawSocketSender.java index 8130ae2..0789da4 100644 --- a/src/test/java/org/fluentd/logger/sender/TestRawSocketSender.java +++ b/src/test/java/org/fluentd/logger/sender/TestRawSocketSender.java @@ -67,14 +67,14 @@ public void process(MessagePack msgpack, Socket socket) throws IOException { { Event e = elist.get(0); assertEquals("tag.label1", e.tag); - assertEquals("t1v1", e.data.get("t1k1")); - assertEquals("t1v2", e.data.get("t1k2")); + assertEquals("t1v1", ((Map)e.data).get("t1k1")); + assertEquals("t1v2", ((Map)e.data).get("t1k2")); } { Event e = elist.get(1); assertEquals("tag.label2", e.tag); - assertEquals("t2v1", e.data.get("t2k1")); - assertEquals("t2v2", e.data.get("t2k2")); + assertEquals("t2v1", ((Map)e.data).get("t2k1")); + assertEquals("t2v2", ((Map)e.data).get("t2k2")); } } diff --git a/src/test/java/org/fluentd/logger/util/MockFluentd.java b/src/test/java/org/fluentd/logger/util/MockFluentd.java index cf39540..f9be22a 100644 --- a/src/test/java/org/fluentd/logger/util/MockFluentd.java +++ b/src/test/java/org/fluentd/logger/util/MockFluentd.java @@ -4,6 +4,7 @@ import java.net.ServerSocket; import java.net.Socket; import java.util.HashMap; +import java.util.Map; import java.util.concurrent.atomic.AtomicBoolean; import org.fluentd.logger.sender.Event; @@ -42,7 +43,7 @@ public Event read(Unpacker u, Event to, boolean required) throws IOException { for (int i = 0; i < size; i++) { String key = (String) toObject(u, u.readValue()); Object value = toObject(u, u.readValue()); - to.data.put(key, value); + ((Map)to.data).put(key, value); } } u.readMapEnd(); From 836c347c6510f5fec71e77a39d0c9736686d6139 Mon Sep 17 00:00:00 2001 From: Takao Nakaguchi Date: Tue, 3 Sep 2013 08:15:32 +0900 Subject: [PATCH 2/5] support logging various values i.e. Integer, Boolean, String... --- .../java/org/fluentd/logger/FluentLogger.java | 8 +++ .../java/org/fluentd/logger/sender/Event.java | 62 +++++++++++-------- 2 files changed, 45 insertions(+), 25 deletions(-) diff --git a/src/main/java/org/fluentd/logger/FluentLogger.java b/src/main/java/org/fluentd/logger/FluentLogger.java index 0ca381d..5f8d278 100644 --- a/src/main/java/org/fluentd/logger/FluentLogger.java +++ b/src/main/java/org/fluentd/logger/FluentLogger.java @@ -81,6 +81,14 @@ public boolean log(String tag, String key, Object value, long timestamp) { return log(tag, data, timestamp); } + public boolean log(String tag, Map data) { + return log(tag, (Object)data, 0); + } + + public boolean log(String tag, Map data, long timestamp) { + return log(tag, (Object)data, timestamp); + } + public boolean log(String tag, Object data) { return log(tag, data, 0); } diff --git a/src/main/java/org/fluentd/logger/sender/Event.java b/src/main/java/org/fluentd/logger/sender/Event.java index 98a801a..ac1d4af 100644 --- a/src/main/java/org/fluentd/logger/sender/Event.java +++ b/src/main/java/org/fluentd/logger/sender/Event.java @@ -20,6 +20,7 @@ import java.io.IOException; import java.lang.reflect.InvocationTargetException; import java.lang.reflect.Method; +import java.math.BigInteger; import java.util.LinkedHashMap; import java.util.Map; @@ -71,7 +72,15 @@ public void write(Packer pk, Event v, boolean required) throws IOException { { Templates.TString.write(pk, v.tag, required); Templates.TLong.write(pk, v.timestamp, required); - writeRec(pk, v.data, required); + if(v.data instanceof Map){ + writeMap(pk, (Map)v.data, required); + } else{ + try{ + pk.write(v.data); + } catch (MessageTypeException e) { + writeObj(pk, v.data, required); + } + } } pk.writeArrayEnd(); } @@ -80,36 +89,39 @@ public Event read(Unpacker u, Event to, boolean required) throws IOException { throw new UnsupportedOperationException("Don't need the operation"); } - private void writeRec(Packer pk, Object data, boolean required) throws IOException { - Map map = null; - if(!(data instanceof Map)){ - Map ret = new LinkedHashMap(); - for(Method m : data.getClass().getMethods()){ - if(m.getDeclaringClass().equals(Object.class)) continue; - if(!m.getName().startsWith("get")) continue; - if(m.getParameterTypes().length != 0) continue; - String name = m.getName().substring(3); - if(name.length() == 0) continue; - name = name.substring(0, 1).toLowerCase() + (name.length() == 1 ? "" : name.substring(1)); - try { - ret.put(name, m.invoke(data)); - } catch (IllegalArgumentException e) { - } catch (IllegalAccessException e) { - } catch (InvocationTargetException e) { - } + private void writeObj(Packer pk, Object data, boolean required) throws IOException { + Map map = new LinkedHashMap(); + for(Method m : data.getClass().getMethods()){ + if(m.getDeclaringClass().equals(Object.class)) continue; + if(!m.getName().startsWith("get")) continue; + if(m.getParameterTypes().length != 0) continue; + String name = m.getName().substring(3); + if(name.length() == 0) continue; + name = name.substring(0, 1).toLowerCase() + (name.length() == 1 ? "" : name.substring(1)); + try { + map.put(name, m.invoke(data)); + } catch (IllegalArgumentException e) { + } catch (IllegalAccessException e) { + } catch (InvocationTargetException e) { } - map = ret; - } else{ - map = (Map)data; } + writeMap(pk, map, required); + } + + private void writeMap(Packer pk, Map map, boolean required) throws IOException { pk.writeMapBegin(map.size()); { for (Map.Entry entry : map.entrySet()) { Templates.TString.write(pk, entry.getKey().toString(), required); - try { - pk.write(entry.getValue()); - } catch (MessageTypeException e) { - writeRec(pk, entry.getValue(), required); + Object value = entry.getValue(); + if(value instanceof Map){ + writeMap(pk, (Map)value, required); + } else{ + try { + pk.write(entry.getValue()); + } catch (MessageTypeException e) { + writeObj(pk, entry.getValue(), required); + } } } } From 77a28e519847457b10a07473fbed8ecac2d556ad Mon Sep 17 00:00:00 2001 From: Takao Nakaguchi Date: Tue, 3 Sep 2013 08:26:57 +0900 Subject: [PATCH 3/5] remove unused import. --- src/main/java/org/fluentd/logger/sender/Event.java | 1 - 1 file changed, 1 deletion(-) diff --git a/src/main/java/org/fluentd/logger/sender/Event.java b/src/main/java/org/fluentd/logger/sender/Event.java index ac1d4af..8016785 100644 --- a/src/main/java/org/fluentd/logger/sender/Event.java +++ b/src/main/java/org/fluentd/logger/sender/Event.java @@ -20,7 +20,6 @@ import java.io.IOException; import java.lang.reflect.InvocationTargetException; import java.lang.reflect.Method; -import java.math.BigInteger; import java.util.LinkedHashMap; import java.util.Map; From 617bd36a7a24e6f5e0e49abd7ef9897f1319393e Mon Sep 17 00:00:00 2001 From: Takao Nakaguchi Date: Tue, 3 Sep 2013 22:55:06 +0900 Subject: [PATCH 4/5] support boolean property of object and add tests. --- .../java/org/fluentd/logger/sender/Event.java | 34 ++-- .../logger/sender/TestRawSocketSender.java | 148 ++++++++++++++++++ .../org/fluentd/logger/util/MockFluentd.java | 23 +-- 3 files changed, 182 insertions(+), 23 deletions(-) diff --git a/src/main/java/org/fluentd/logger/sender/Event.java b/src/main/java/org/fluentd/logger/sender/Event.java index 8016785..a3daa5e 100644 --- a/src/main/java/org/fluentd/logger/sender/Event.java +++ b/src/main/java/org/fluentd/logger/sender/Event.java @@ -90,19 +90,29 @@ public Event read(Unpacker u, Event to, boolean required) throws IOException { private void writeObj(Packer pk, Object data, boolean required) throws IOException { Map map = new LinkedHashMap(); - for(Method m : data.getClass().getMethods()){ - if(m.getDeclaringClass().equals(Object.class)) continue; - if(!m.getName().startsWith("get")) continue; - if(m.getParameterTypes().length != 0) continue; - String name = m.getName().substring(3); - if(name.length() == 0) continue; - name = name.substring(0, 1).toLowerCase() + (name.length() == 1 ? "" : name.substring(1)); - try { - map.put(name, m.invoke(data)); - } catch (IllegalArgumentException e) { - } catch (IllegalAccessException e) { - } catch (InvocationTargetException e) { + Class clazz = data.getClass(); + while(!clazz.equals(Object.class)){ + for(Method m : clazz.getDeclaredMethods()){ + if(m.getDeclaringClass().equals(Object.class)) continue; + if(m.getParameterTypes().length != 0) continue; + String name = null; + if(m.getName().startsWith("get")){ + name = m.getName().substring(3); + } else if(m.getName().startsWith("is") && m.getReturnType().equals(boolean.class)){ + name = m.getName().substring(2); + } else{ + continue; + } + if(name.length() == 0) continue; + name = name.substring(0, 1).toLowerCase() + (name.length() == 1 ? "" : name.substring(1)); + try { + map.put(name, m.invoke(data)); + } catch (IllegalArgumentException e) { + } catch (IllegalAccessException e) { + } catch (InvocationTargetException e) { + } } + clazz = clazz.getSuperclass(); } writeMap(pk, map, required); } diff --git a/src/test/java/org/fluentd/logger/sender/TestRawSocketSender.java b/src/test/java/org/fluentd/logger/sender/TestRawSocketSender.java index 0789da4..2237d14 100644 --- a/src/test/java/org/fluentd/logger/sender/TestRawSocketSender.java +++ b/src/test/java/org/fluentd/logger/sender/TestRawSocketSender.java @@ -205,4 +205,152 @@ public void process(MessagePack msgpack, Socket socket) throws IOException { assertEquals(counts[0], elists[0].size()); assertEquals(counts[1], elists[1].size()); } + + @Test + public void testNormal04_SOMapInSOMap() throws Exception { + List elist = new ArrayList(); + int port = 25225; + MockFluentd mock = createMock(elist, port); + try{ + // start senders + Sender sender = new RawSocketSender("localhost", port); + Map data = new HashMap(); + data.put("t1k1", "t1v1"); + data.put("t1k2", "t1v2"); + Map data2 = new HashMap(); + data2.put("t2k1", "t2v1"); + data2.put("t2k2", "t2v2"); + data.put("data2", data2); + sender.emit("tag.label1", data); + // close sender sockets + sender.close(); + } finally{ + // close mock server sockets + mock.close(); + } + + // wait for unpacking event data on fluentd + Thread.sleep(1000); + + // check data + assertEquals(1, elist.size()); + { + Event e = elist.get(0); + assertEquals("tag.label1", e.tag); + assertEquals("t1v1", ((Map)e.data).get("t1k1")); + assertEquals("t1v2", ((Map)e.data).get("t1k2")); + Map data2 = (Map)((Map)e.data).get("data2"); + assertEquals("t2v1", data2.get("t2k1")); + assertEquals("t2v2", data2.get("t2k2")); + } + } + + @Test + public void testNormal05_WrappersAndString() throws Exception { + List elist = new ArrayList(); + int port = 25225; + MockFluentd mock = createMock(elist, port); + try{ + // start senders + Sender sender = new RawSocketSender("localhost", port); + sender.emit("tag.label1", (short)1); + sender.emit("tag.label1", 2); + sender.emit("tag.label1", 3L); + sender.emit("tag.label1", 4.4f); + sender.emit("tag.label1", 5.5); + sender.emit("tag.label1", true); + sender.emit("tag.label1", 'A'); + sender.emit("tag.label1", "hello"); + // close sender sockets + sender.close(); + } finally{ + // close mock server sockets + mock.close(); + } + + // wait for unpacking event data on fluentd + Thread.sleep(1000); + + // check data + assertEquals(8, elist.size()); + { + int i = 0; + assertEquals((long)1, elist.get(i++).data); + assertEquals((long)2, elist.get(i++).data); + assertEquals((long)3, elist.get(i++).data); + assertEquals(4.4, (Double)elist.get(i++).data, 0.001); + assertEquals(5.5, (Double)elist.get(i++).data, 0.001); + assertEquals(true, elist.get(i++).data); + assertEquals((long)'A', elist.get(i++).data); + assertEquals("hello", elist.get(i++).data); + } + } + + public static class Msg{ + public Msg() { + } + public Msg(String name, int age, boolean live) { + this.name = name; + this.age = age; + this.live = live; + } + public int getAge() { + return age; + } + public String getName() { + return name; + } + public boolean isLive() { + return live; + } + + private String name; + private int age; + private boolean live; + } + @Test + public void testNormal06_Object() throws Exception { + List elist = new ArrayList(); + int port = 25225; + MockFluentd mock = createMock(elist, port); + try{ + Sender sender = new RawSocketSender("localhost", port); + sender.emit("tag.label1", new Msg("suzuki", 30, true)); + sender.close(); + } finally{ + mock.close(); + } + + Thread.sleep(1000); + + // check data + assertEquals(1, elist.size()); + { + Map m = (Map)elist.get(0).data; + assertEquals("suzuki", m.get("name")); + assertEquals((long)30, m.get("age")); + assertEquals(true, m.get("live")); + } + } + + private MockFluentd createMock(final List elist, int port) throws IOException{ + // start mock fluentd + MockFluentd fluentd = new MockFluentd(port, new MockFluentd.MockProcess() { + public void process(MessagePack msgpack, Socket socket) throws IOException { + BufferedInputStream in = new BufferedInputStream(socket.getInputStream()); + try { + Unpacker unpacker = msgpack.createUnpacker(in); + while (true) { + Event e = unpacker.read(Event.class); + elist.add(e); + } + //socket.close(); + } catch (EOFException e) { + // ignore + } + } + }); + fluentd.start(); + return fluentd; + } } diff --git a/src/test/java/org/fluentd/logger/util/MockFluentd.java b/src/test/java/org/fluentd/logger/util/MockFluentd.java index f9be22a..0be023d 100644 --- a/src/test/java/org/fluentd/logger/util/MockFluentd.java +++ b/src/test/java/org/fluentd/logger/util/MockFluentd.java @@ -4,6 +4,7 @@ import java.net.ServerSocket; import java.net.Socket; import java.util.HashMap; +import java.util.LinkedHashMap; import java.util.Map; import java.util.concurrent.atomic.AtomicBoolean; @@ -11,6 +12,8 @@ import org.msgpack.MessagePack; import org.msgpack.packer.Packer; import org.msgpack.template.Templates; +import org.msgpack.type.ArrayValue; +import org.msgpack.type.RawValue; import org.msgpack.type.Value; import org.msgpack.unpacker.Unpacker; @@ -37,16 +40,7 @@ public Event read(Unpacker u, Event to, boolean required) throws IOException { { to.tag = Templates.TString.read(u, null, required); to.timestamp = Templates.TLong.read(u, null, required); - int size = u.readMapBegin(); - to.data = new HashMap(size); - { - for (int i = 0; i < size; i++) { - String key = (String) toObject(u, u.readValue()); - Object value = toObject(u, u.readValue()); - ((Map)to.data).put(key, value); - } - } - u.readMapEnd(); + to.data = toObject(u, u.readValue()); } u.readArrayEnd(); return to; @@ -65,7 +59,14 @@ private static Object toObject(Unpacker u, Value v) { } else if (v.isIntegerValue()) { return v.asIntegerValue().getLong(); // long only } else if (v.isMapValue()) { - throw new UnsupportedOperationException(); + Map map = new LinkedHashMap(); + Value[] vals = v.asMapValue().asMapValue().getKeyValueArray(); + for(int i = 0; i < (vals.length / 2); i++){ + String key = vals[i * 2].asRawValue().getString(); + Object value = toObject(u, vals[i * 2 + 1]); + map.put(key, value); + } + return map; } else if (v.isArrayValue()) { throw new UnsupportedOperationException(); } else { From d7bc9f84bd45d510af2fbd80da2d414b313e8c71 Mon Sep 17 00:00:00 2001 From: Takao Nakaguchi Date: Sun, 15 Sep 2013 13:14:16 +0900 Subject: [PATCH 5/5] Refactor template and divide map-style serialization function to MapStyleEventTemplate. --- .../logger/sender/DefaultEventTemplate.java | 12 +++ .../java/org/fluentd/logger/sender/Event.java | 94 ------------------- .../fluentd/logger/sender/EventTemplate.java | 38 ++++++++ .../logger/sender/MapStyleEventTemplate.java | 75 +++++++++++++++ .../logger/sender/RawSocketSender.java | 3 +- .../logger/sender/TestRawSocketSender.java | 1 + .../TestSenderFluentdDownOperation.java | 7 +- .../org/fluentd/logger/util/MockFluentd.java | 12 +-- 8 files changed, 131 insertions(+), 111 deletions(-) create mode 100644 src/main/java/org/fluentd/logger/sender/DefaultEventTemplate.java create mode 100644 src/main/java/org/fluentd/logger/sender/EventTemplate.java create mode 100644 src/main/java/org/fluentd/logger/sender/MapStyleEventTemplate.java diff --git a/src/main/java/org/fluentd/logger/sender/DefaultEventTemplate.java b/src/main/java/org/fluentd/logger/sender/DefaultEventTemplate.java new file mode 100644 index 0000000..9c24542 --- /dev/null +++ b/src/main/java/org/fluentd/logger/sender/DefaultEventTemplate.java @@ -0,0 +1,12 @@ +package org.fluentd.logger.sender; + +import java.io.IOException; + +import org.msgpack.packer.Packer; + +public class DefaultEventTemplate extends EventTemplate { + @Override + protected void doWriteData(Packer pk, Object data, boolean required) throws IOException { + pk.write(data); + } +} diff --git a/src/main/java/org/fluentd/logger/sender/Event.java b/src/main/java/org/fluentd/logger/sender/Event.java index a3daa5e..c107109 100644 --- a/src/main/java/org/fluentd/logger/sender/Event.java +++ b/src/main/java/org/fluentd/logger/sender/Event.java @@ -17,17 +17,6 @@ // package org.fluentd.logger.sender; -import java.io.IOException; -import java.lang.reflect.InvocationTargetException; -import java.lang.reflect.Method; -import java.util.LinkedHashMap; -import java.util.Map; - -import org.msgpack.MessageTypeException; -import org.msgpack.packer.Packer; -import org.msgpack.template.AbstractTemplate; -import org.msgpack.template.Templates; -import org.msgpack.unpacker.Unpacker; public class Event { public String tag; @@ -54,87 +43,4 @@ public String toString() { return String.format("Event{tag=%s,timestamp=%d,data=%s}", tag, timestamp, data.toString()); } - - public static class EventTemplate extends AbstractTemplate { - public static EventTemplate INSTANCE = new EventTemplate(); - - public void write(Packer pk, Event v, boolean required) throws IOException { - if (v == null) { - if (required) { - throw new MessageTypeException("Attempted to write null"); - } - pk.writeNil(); - return; - } - - pk.writeArrayBegin(3); - { - Templates.TString.write(pk, v.tag, required); - Templates.TLong.write(pk, v.timestamp, required); - if(v.data instanceof Map){ - writeMap(pk, (Map)v.data, required); - } else{ - try{ - pk.write(v.data); - } catch (MessageTypeException e) { - writeObj(pk, v.data, required); - } - } - } - pk.writeArrayEnd(); - } - - public Event read(Unpacker u, Event to, boolean required) throws IOException { - throw new UnsupportedOperationException("Don't need the operation"); - } - - private void writeObj(Packer pk, Object data, boolean required) throws IOException { - Map map = new LinkedHashMap(); - Class clazz = data.getClass(); - while(!clazz.equals(Object.class)){ - for(Method m : clazz.getDeclaredMethods()){ - if(m.getDeclaringClass().equals(Object.class)) continue; - if(m.getParameterTypes().length != 0) continue; - String name = null; - if(m.getName().startsWith("get")){ - name = m.getName().substring(3); - } else if(m.getName().startsWith("is") && m.getReturnType().equals(boolean.class)){ - name = m.getName().substring(2); - } else{ - continue; - } - if(name.length() == 0) continue; - name = name.substring(0, 1).toLowerCase() + (name.length() == 1 ? "" : name.substring(1)); - try { - map.put(name, m.invoke(data)); - } catch (IllegalArgumentException e) { - } catch (IllegalAccessException e) { - } catch (InvocationTargetException e) { - } - } - clazz = clazz.getSuperclass(); - } - writeMap(pk, map, required); - } - - private void writeMap(Packer pk, Map map, boolean required) throws IOException { - pk.writeMapBegin(map.size()); - { - for (Map.Entry entry : map.entrySet()) { - Templates.TString.write(pk, entry.getKey().toString(), required); - Object value = entry.getValue(); - if(value instanceof Map){ - writeMap(pk, (Map)value, required); - } else{ - try { - pk.write(entry.getValue()); - } catch (MessageTypeException e) { - writeObj(pk, entry.getValue(), required); - } - } - } - } - pk.writeMapEnd(); - } - } } diff --git a/src/main/java/org/fluentd/logger/sender/EventTemplate.java b/src/main/java/org/fluentd/logger/sender/EventTemplate.java new file mode 100644 index 0000000..44dd7a6 --- /dev/null +++ b/src/main/java/org/fluentd/logger/sender/EventTemplate.java @@ -0,0 +1,38 @@ +package org.fluentd.logger.sender; + +import java.io.IOException; + +import org.msgpack.MessageTypeException; +import org.msgpack.packer.Packer; +import org.msgpack.template.AbstractTemplate; +import org.msgpack.template.Templates; +import org.msgpack.unpacker.Unpacker; + +public abstract class EventTemplate extends AbstractTemplate { + public static EventTemplate INSTANCE = new DefaultEventTemplate(); + + public void write(Packer pk, Event v, boolean required) throws IOException { + if (v == null) { + if (required) { + throw new MessageTypeException("Attempted to write null"); + } + pk.writeNil(); + return; + } + + pk.writeArrayBegin(3); + { + Templates.TString.write(pk, v.tag, required); + Templates.TLong.write(pk, v.timestamp, required); + doWriteData(pk, v.data, required); + } + pk.writeArrayEnd(); + } + + protected abstract void doWriteData(Packer pk, Object data, boolean required) + throws IOException; + + public Event read(Unpacker u, Event to, boolean required) throws IOException { + throw new UnsupportedOperationException("Don't need the operation"); + } +} diff --git a/src/main/java/org/fluentd/logger/sender/MapStyleEventTemplate.java b/src/main/java/org/fluentd/logger/sender/MapStyleEventTemplate.java new file mode 100644 index 0000000..151d2a1 --- /dev/null +++ b/src/main/java/org/fluentd/logger/sender/MapStyleEventTemplate.java @@ -0,0 +1,75 @@ +package org.fluentd.logger.sender; + +import java.io.IOException; +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; +import java.util.LinkedHashMap; +import java.util.Map; + +import org.msgpack.MessageTypeException; +import org.msgpack.packer.Packer; +import org.msgpack.template.Templates; + +public class MapStyleEventTemplate extends EventTemplate { + @Override + protected void doWriteData(Packer pk, Object data, boolean required) throws IOException { + if(data instanceof Map){ + writeMap(pk, (Map)data, required); + } else{ + try{ + pk.write(data); + } catch (MessageTypeException e) { + writeObj(pk, data, required); + } + } + } + + private void writeMap(Packer pk, Map map, boolean required) throws IOException { + pk.writeMapBegin(map.size()); + { + for (Map.Entry entry : map.entrySet()) { + Templates.TString.write(pk, entry.getKey().toString(), required); + Object value = entry.getValue(); + if(value instanceof Map){ + writeMap(pk, (Map)value, required); + } else{ + try { + pk.write(entry.getValue()); + } catch (MessageTypeException e) { + writeObj(pk, entry.getValue(), required); + } + } + } + } + pk.writeMapEnd(); + } + + private void writeObj(Packer pk, Object data, boolean required) throws IOException { + Map map = new LinkedHashMap(); + Class clazz = data.getClass(); + while(!clazz.equals(Object.class)){ + for(Method m : clazz.getDeclaredMethods()){ + if(m.getDeclaringClass().equals(Object.class)) continue; + if(m.getParameterTypes().length != 0) continue; + String name = null; + if(m.getName().startsWith("get")){ + name = m.getName().substring(3); + } else if(m.getName().startsWith("is") && m.getReturnType().equals(boolean.class)){ + name = m.getName().substring(2); + } else{ + continue; + } + if(name.length() == 0) continue; + name = name.substring(0, 1).toLowerCase() + (name.length() == 1 ? "" : name.substring(1)); + try { + map.put(name, m.invoke(data)); + } catch (IllegalArgumentException e) { + } catch (IllegalAccessException e) { + } catch (InvocationTargetException e) { + } + } + clazz = clazz.getSuperclass(); + } + writeMap(pk, map, required); + } +} diff --git a/src/main/java/org/fluentd/logger/sender/RawSocketSender.java b/src/main/java/org/fluentd/logger/sender/RawSocketSender.java index b7a300b..20b48be 100644 --- a/src/main/java/org/fluentd/logger/sender/RawSocketSender.java +++ b/src/main/java/org/fluentd/logger/sender/RawSocketSender.java @@ -23,7 +23,6 @@ import java.net.Socket; import java.net.SocketAddress; import java.nio.ByteBuffer; -import java.util.Map; import java.util.logging.Level; import org.msgpack.MessagePack; @@ -63,7 +62,7 @@ public RawSocketSender(String host, int port, int timeout, int bufferCapacity) { public RawSocketSender(String host, int port, int timeout, int bufferCapacity, Reconnector reconnector) { msgpack = new MessagePack(); - msgpack.register(Event.class, Event.EventTemplate.INSTANCE); + msgpack.register(Event.class, EventTemplate.INSTANCE); pendings = ByteBuffer.allocate(bufferCapacity); server = new InetSocketAddress(host, port); this.reconnector = reconnector; diff --git a/src/test/java/org/fluentd/logger/sender/TestRawSocketSender.java b/src/test/java/org/fluentd/logger/sender/TestRawSocketSender.java index 2237d14..f60691d 100644 --- a/src/test/java/org/fluentd/logger/sender/TestRawSocketSender.java +++ b/src/test/java/org/fluentd/logger/sender/TestRawSocketSender.java @@ -312,6 +312,7 @@ public boolean isLive() { public void testNormal06_Object() throws Exception { List elist = new ArrayList(); int port = 25225; + EventTemplate.INSTANCE = new MapStyleEventTemplate(); MockFluentd mock = createMock(elist, port); try{ Sender sender = new RawSocketSender("localhost", port); diff --git a/src/test/java/org/fluentd/logger/sender/TestSenderFluentdDownOperation.java b/src/test/java/org/fluentd/logger/sender/TestSenderFluentdDownOperation.java index abd33c7..1272b58 100644 --- a/src/test/java/org/fluentd/logger/sender/TestSenderFluentdDownOperation.java +++ b/src/test/java/org/fluentd/logger/sender/TestSenderFluentdDownOperation.java @@ -7,10 +7,7 @@ import java.util.HashMap; import java.util.Map; -import org.fluentd.logger.sender.RawSocketSender; -import org.fluentd.logger.sender.Sender; import org.fluentd.logger.util.MockFluentd; -import org.fluentd.logger.util.MockFluentd.MockProcess; import org.junit.Ignore; import org.junit.Test; import org.msgpack.MessagePack; @@ -25,7 +22,7 @@ public class TestSenderFluentdDownOperation { public void testFluentdDownOperation01() throws Exception { int port = 25225; MessagePack msgpack = new MessagePack(); - msgpack.register(Event.class, Event.EventTemplate.INSTANCE); + msgpack.register(Event.class, EventTemplate.INSTANCE); BufferPacker packer = msgpack.createBufferPacker(); long timestamp = System.currentTimeMillis() / 1000; @@ -60,7 +57,7 @@ public void testFluentdDownOperation01() throws Exception { public void testFluentdDownOperation02()throws Exception { int port = 25225; MessagePack msgpack = new MessagePack(); - msgpack.register(Event.class, Event.EventTemplate.INSTANCE); + msgpack.register(Event.class, EventTemplate.INSTANCE); BufferPacker packer = msgpack.createBufferPacker(); long timestamp = System.currentTimeMillis(); diff --git a/src/test/java/org/fluentd/logger/util/MockFluentd.java b/src/test/java/org/fluentd/logger/util/MockFluentd.java index 0be023d..102ee1d 100644 --- a/src/test/java/org/fluentd/logger/util/MockFluentd.java +++ b/src/test/java/org/fluentd/logger/util/MockFluentd.java @@ -3,17 +3,14 @@ import java.io.IOException; import java.net.ServerSocket; import java.net.Socket; -import java.util.HashMap; import java.util.LinkedHashMap; import java.util.Map; import java.util.concurrent.atomic.AtomicBoolean; +import org.fluentd.logger.sender.DefaultEventTemplate; import org.fluentd.logger.sender.Event; import org.msgpack.MessagePack; -import org.msgpack.packer.Packer; import org.msgpack.template.Templates; -import org.msgpack.type.ArrayValue; -import org.msgpack.type.RawValue; import org.msgpack.type.Value; import org.msgpack.unpacker.Unpacker; @@ -23,13 +20,8 @@ public static interface MockProcess { public void process(MessagePack msgpack, Socket socket) throws IOException; } - public static class MockEventTemplate extends Event.EventTemplate { + public static class MockEventTemplate extends DefaultEventTemplate { public static MockEventTemplate INSTANCE = new MockEventTemplate(); - - public void write(Packer pk, Event v, boolean required) throws IOException { - throw new UnsupportedOperationException("don't need operation"); - } - public Event read(Unpacker u, Event to, boolean required) throws IOException { if (!required && u.trySkipNil()) { return null;