EventMesh Java SDK 实现了 HTTP 异步消息的生产者和消费者。二者都需要一个 EventMeshHttpClientConfig
类实例来指定 EventMesh HTTP 客户端的配置信息。其中的 liteEventMeshAddr
、userName
和 password
字段需要和 EventMesh runtime eventmesh.properties
文件中的相匹配。
import org.apache.eventmesh.client.http.conf.EventMeshHttpClientConfig;
import org.apache.eventmesh.common.utils.IPUtils;
import org.apache.eventmesh.common.utils.ThreadUtils;
public class HTTP {
public static void main(String[] args) throws Exception {
EventMeshHttpClientConfig eventMeshClientConfig = EventMeshHttpClientConfig.builder()
.liteEventMeshAddr("localhost:10105")
.producerGroup("TEST_PRODUCER_GROUP")
.env("env")
.idc("idc")
.ip(IPUtils.getLocalAddress())
.sys("1234")
.pid(String.valueOf(ThreadUtils.getPID()))
.userName("eventmesh")
.password("password")
.build();
/* ... */
}
}
类 EventMeshHttpConsumer
实现了 heartbeat
、subscribe
和 unsubscribe
方法。subscribe
方法接收一个 SubscriptionItem
对象的列表,其中定义了要订阅的话题和回调的 URL 地址。
import org.apache.eventmesh.client.http.consumer.EventMeshHttpConsumer;
import org.apache.eventmesh.common.protocol.SubscriptionItem;
import org.apache.eventmesh.common.protocol.SubscriptionMode;
import org.apache.eventmesh.common.protocol.SubscriptionType;
import com.google.common.collect.Lists;
public class HTTP {
final String url = "http://localhost:8080/callback";
final List<SubscriptionItem> topicList = Lists.newArrayList(
new SubscriptionItem("eventmesh-async-topic", SubscriptionMode.CLUSTERING, SubscriptionType.ASYNC)
);
public static void main(String[] args) throws Exception {
/* ... */
eventMeshHttpConsumer = new EventMeshHttpConsumer(eventMeshClientConfig);
eventMeshHttpConsumer.heartBeat(topicList, url);
eventMeshHttpConsumer.subscribe(topicList, url);
/* ... */
eventMeshHttpConsumer.unsubscribe(topicList, url);
}
}
EventMesh runtime 将发送一个包含 CloudEvents 格式 信息的 POST 请求到这个回调的 URL 地址。类 SubController.java 实现了 Spring Boot controller,它将接收并解析回调信息。
类 EventMeshHttpProducer
实现了 publish
方法。publish
方法接收将被发布的消息和一个可选的 timeout 值。消息应是下列类的一个实例:
org.apache.eventmesh.common.EventMeshMessage
io.cloudevents.CloudEvent
io.openmessaging.api.Message
import org.apache.eventmesh.client.http.producer.EventMeshHttpProducer;
import org.apache.eventmesh.client.tcp.common.EventMeshCommon;
import org.apache.eventmesh.common.Constants;
import org.apache.eventmesh.common.utils.JsonUtils;
import io.cloudevents.CloudEvent;
import io.cloudevents.core.builder.CloudEventBuilder;
public class HTTP {
public static void main(String[] args) throws Exception {
/* ... */
EventMeshHttpProducer eventMeshHttpProducer = new EventMeshHttpProducer(eventMeshClientConfig);
Map<String, String> content = new HashMap<>();
content.put("content", "testAsyncMessage");
CloudEvent event = CloudEventBuilder.v1()
.withId(UUID.randomUUID().toString())
.withSubject("eventmesh-async-topic")
.withSource(URI.create("/"))
.withDataContentType("application/cloudevents+json")
.withType(EventMeshCommon.CLOUD_EVENTS_PROTOCOL_NAME)
.withData(JsonUtils.serialize(content).getBytes(StandardCharsets.UTF_8))
.withExtension(Constants.EVENTMESH_MESSAGE_CONST_TTL, String.valueOf(4 * 1000))
.build();
eventMeshHttpProducer.publish(event);
}
}
也可以不通过Event Mesh SDK来体验事件的收发功能
curl -H "Content-Type:application/json" -X POST -d '{"name": "admin", "pass":"12345678"}' http://127.0.0.1:10105/eventmesh/publish/TEST-TOPIC-HTTP-ASYNC
启动eventmesh运行时服务后,可以使用curl命令将事件用HTTP post方法发布到指定的主题,并且Body必须是JSON格式。发布事件的url类似于(http://127.0.0.1:10105/eventmesh/publish/TEST-TOPIC-HTTP-ASYNC),您将获得成功发布的结果。
curl -H "Content-Type:application/json" -X POST -d '{"url": "http://127.0.0.1:8088/sub/test", "consumerGroup":"TEST-GROUP", "topic":[{"mode":"CLUSTERING","topic":"TEST-TOPIC-HTTP-ASYNC","type":"ASYNC"}]}' http://127.0.0.1:10105/eventmesh/subscribe/local
启动eventmesh运行时服务器后,可以使用curl命令用HTTP post方法订阅指定的主题列表,并且Body必须是JSON格式。订阅url类似于(http://127.0.0.1:10105/eventmesh/subscribe/local),您将获得订阅成功的结果。你应该注意Body中的`url`字段,这意味着你需要在指定的url上启动HTTP服务实现监听,你可以在`eventmesh-examples`模块中看到这个例子。