Skip to content

Commit

Permalink
[ISSUE #405]update cloudevents examples (#688)
Browse files Browse the repository at this point in the history
[Minor #405] update cloudevents examples
  • Loading branch information
li-xiao-shuang authored Jan 4, 2022
1 parent f170786 commit fa9622d
Show file tree
Hide file tree
Showing 3 changed files with 61 additions and 63 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -42,28 +42,40 @@

@Slf4j
public class AsyncPublishInstance {

// This messageSize is also used in SubService.java (Subscriber)
public static int messageSize = 5;

public static final int MESSAGE_SIZE = 5;

public static final String DEFAULT_IP_PORT = "127.0.0.1:10105";

public static final String FILE_NAME = "application.properties";

public static final String IP_KEY = "eventmesh.ip";

public static final String PORT_KEY = "eventmesh.http.port";

public static final String TEST_TOPIC = "TEST-TOPIC-HTTP-ASYNC";

public static final String TEST_GROUP = "EventMeshTest-producerGroup";

public static final String CONTENT_TYPE = "application/cloudevents+json";


public static void main(String[] args) throws Exception {
Properties properties = Utils.readPropertiesFile("application.properties");
final String eventMeshIp = properties.getProperty("eventmesh.ip");
final String eventMeshHttpPort = properties.getProperty("eventmesh.http.port");

final String eventMeshIPPort;
if (StringUtils.isBlank(eventMeshIp) || StringUtils.isBlank(eventMeshHttpPort)) {
// if has multi value, can config as: 127.0.0.1:10105;127.0.0.2:10105
eventMeshIPPort = "127.0.0.1:10105";
} else {

Properties properties = Utils.readPropertiesFile(FILE_NAME);
final String eventMeshIp = properties.getProperty(IP_KEY);
final String eventMeshHttpPort = properties.getProperty(PORT_KEY);

// if has multi value, can config as: 127.0.0.1:10105;127.0.0.2:10105
String eventMeshIPPort = DEFAULT_IP_PORT;
if (StringUtils.isNotBlank(eventMeshIp) || StringUtils.isNotBlank(eventMeshHttpPort)) {
eventMeshIPPort = eventMeshIp + ":" + eventMeshHttpPort;
}

final String topic = "TEST-TOPIC-HTTP-ASYNC";

EventMeshHttpClientConfig eventMeshClientConfig = EventMeshHttpClientConfig.builder()
.liteEventMeshAddr(eventMeshIPPort)
.producerGroup("EventMeshTest-producerGroup")
.producerGroup(TEST_GROUP)
.env("env")
.idc("idc")
.ip(IPUtils.getLocalAddress())
Expand All @@ -74,20 +86,21 @@ public static void main(String[] args) throws Exception {
.build();

try (EventMeshHttpProducer eventMeshHttpProducer = new EventMeshHttpProducer(eventMeshClientConfig)) {
for (int i = 0; i < messageSize; i++) {
for (int i = 0; i < MESSAGE_SIZE; i++) {
Map<String, String> content = new HashMap<>();
content.put("content", "testAsyncMessage");

CloudEvent event = CloudEventBuilder.v1()
.withId(UUID.randomUUID().toString())
.withSubject(topic)
.withSubject(TEST_TOPIC)
.withSource(URI.create("/"))
.withDataContentType("application/cloudevents+json")
.withDataContentType(CONTENT_TYPE)
.withType(EventMeshCommon.CLOUD_EVENTS_PROTOCOL_NAME)
.withData(JsonUtils.serialize(content).getBytes(StandardCharsets.UTF_8))
.withExtension(Constants.EVENTMESH_MESSAGE_CONST_TTL, String.valueOf(4 * 1000))
.build();
eventMeshHttpProducer.publish(event);
log.info("publish event success content:{}",content);
}
Thread.sleep(30000);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,8 @@ public class Utils {
/**
* Get local IP address
*
* @throws SocketException
*/
public static String getLocalIP() throws UnknownHostException, SocketException {
public static String getLocalIP() throws UnknownHostException {
if (isWindowsOS()) {
return InetAddress.getLocalHost().getHostAddress();
} else {
Expand All @@ -55,9 +54,8 @@ public static boolean isWindowsOS() {
* Get local IP address under Linux system
*
* @return IP address
* @throws SocketException
*/
private static String getLinuxLocalIp() throws SocketException {
private static String getLinuxLocalIp() {
String ip = "";
try {
for (Enumeration<NetworkInterface> en = NetworkInterface.getNetworkInterfaces(); en.hasMoreElements(); ) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import org.apache.commons.collections4.MapUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.http.HttpHost;
import org.apache.http.HttpResponse;
import org.apache.http.NameValuePair;
import org.apache.http.client.ResponseHandler;
import org.apache.http.client.config.RequestConfig;
Expand Down Expand Up @@ -57,17 +56,14 @@ public static String post(CloseableHttpClient client,
RequestParam requestParam) throws Exception {
final ResponseHolder responseHolder = new ResponseHolder();
final CountDownLatch countDownLatch = new CountDownLatch(1);
post(client, null, uri, requestParam, new ResponseHandler<String>() {
@Override
public String handleResponse(HttpResponse response) throws IOException {
responseHolder.response =
EntityUtils.toString(response.getEntity(), Charset.forName(Constants.DEFAULT_CHARSET));
countDownLatch.countDown();
if (log.isDebugEnabled()) {
log.debug("{}", responseHolder);
}
return responseHolder.response;
post(client, null, uri, requestParam, response -> {
responseHolder.response =
EntityUtils.toString(response.getEntity(), Charset.forName(Constants.DEFAULT_CHARSET));
countDownLatch.countDown();
if (log.isDebugEnabled()) {
log.debug("{}", responseHolder);
}
return responseHolder.response;
});

try {
Expand All @@ -85,17 +81,14 @@ public static String post(CloseableHttpClient client,
RequestParam requestParam) throws Exception {
final ResponseHolder responseHolder = new ResponseHolder();
final CountDownLatch countDownLatch = new CountDownLatch(1);
post(client, forwardAgent, uri, requestParam, new ResponseHandler<String>() {
@Override
public String handleResponse(HttpResponse response) throws IOException {
responseHolder.response =
EntityUtils.toString(response.getEntity(), Charset.forName(Constants.DEFAULT_CHARSET));
countDownLatch.countDown();
if (log.isDebugEnabled()) {
log.debug("{}", responseHolder);
}
return responseHolder.response;
post(client, forwardAgent, uri, requestParam, response -> {
responseHolder.response =
EntityUtils.toString(response.getEntity(), Charset.forName(Constants.DEFAULT_CHARSET));
countDownLatch.countDown();
if (log.isDebugEnabled()) {
log.debug("{}", responseHolder);
}
return responseHolder.response;
});

try {
Expand Down Expand Up @@ -203,17 +196,14 @@ public static String get(CloseableHttpClient client,
RequestParam requestParam) throws Exception {
final ResponseHolder responseHolder = new ResponseHolder();
final CountDownLatch countDownLatch = new CountDownLatch(1);
get(client, null, url, requestParam, new ResponseHandler<String>() {
@Override
public String handleResponse(HttpResponse response) throws IOException {
responseHolder.response =
EntityUtils.toString(response.getEntity(), Charset.forName(Constants.DEFAULT_CHARSET));
countDownLatch.countDown();
if (log.isDebugEnabled()) {
log.debug("{}", responseHolder);
}
return responseHolder.response;
get(client, null, url, requestParam, response -> {
responseHolder.response =
EntityUtils.toString(response.getEntity(), Charset.forName(Constants.DEFAULT_CHARSET));
countDownLatch.countDown();
if (log.isDebugEnabled()) {
log.debug("{}", responseHolder);
}
return responseHolder.response;
});

try {
Expand All @@ -231,17 +221,14 @@ public static String get(CloseableHttpClient client,
RequestParam requestParam) throws Exception {
final ResponseHolder responseHolder = new ResponseHolder();
final CountDownLatch countDownLatch = new CountDownLatch(1);
get(client, forwardAgent, url, requestParam, new ResponseHandler<String>() {
@Override
public String handleResponse(HttpResponse response) throws IOException {
responseHolder.response =
EntityUtils.toString(response.getEntity(), Charset.forName(Constants.DEFAULT_CHARSET));
countDownLatch.countDown();
if (log.isDebugEnabled()) {
log.debug("{}", responseHolder);
}
return responseHolder.response;
get(client, forwardAgent, url, requestParam, response -> {
responseHolder.response =
EntityUtils.toString(response.getEntity(), Charset.forName(Constants.DEFAULT_CHARSET));
countDownLatch.countDown();
if (log.isDebugEnabled()) {
log.debug("{}", responseHolder);
}
return responseHolder.response;
});

try {
Expand Down

0 comments on commit fa9622d

Please sign in to comment.