Skip to content

Commit

Permalink
kafka-producer模块
Browse files Browse the repository at this point in the history
  • Loading branch information
guolanren committed Jul 29, 2020
1 parent 45d022d commit 2e62359
Show file tree
Hide file tree
Showing 6 changed files with 164 additions and 0 deletions.
5 changes: 5 additions & 0 deletions kafka/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
# Spring Boot Kafka

## 概述

Spring Boot Kafka 概述...
48 changes: 48 additions & 0 deletions kafka/kafka-producer/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>

<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.1.6.RELEASE</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>

<groupId>name.guolanren</groupId>
<artifactId>kafka-producer</artifactId>
<version>1.0.0</version>

<properties>
<java.version>1.8</java.version>
</properties>

<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>

<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
</dependencies>

<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package name.guolanren;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

/**
* @author guolanren
*/
@SpringBootApplication
public class KafkaProducerApplication {

public static void main(String[] args) {
SpringApplication.run(KafkaProducerApplication.class, args);
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
package name.guolanren.model;

import java.io.Serializable;
import java.math.BigDecimal;

/**
* @author guolanren
*/
public class OrderPaidEvent implements Serializable {

private String orderId;
private BigDecimal paidMoney;

public OrderPaidEvent() {
}

public OrderPaidEvent(String orderId, BigDecimal paidMoney) {
this.orderId = orderId;
this.paidMoney = paidMoney;
}

public String getOrderId() {
return orderId;
}

public void setOrderId(String orderId) {
this.orderId = orderId;
}

public BigDecimal getPaidMoney() {
return paidMoney;
}

public void setPaidMoney(BigDecimal paidMoney) {
this.paidMoney = paidMoney;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
package name.guolanren.service;

import name.guolanren.model.OrderPaidEvent;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.stereotype.Service;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.util.concurrent.ListenableFutureCallback;

import java.math.BigDecimal;
import java.util.concurrent.atomic.AtomicLong;

/**
* @author guolanren
*/
@Service
public class OrderPaidService {

private static final Logger logger = LoggerFactory.getLogger(OrderPaidService.class);

public static final String MY_TOPIC = "my-topic";

public static final AtomicLong ORDER_ID = new AtomicLong(0);

@Autowired
private KafkaTemplate<String, OrderPaidEvent> kafka;

public void send() {
ListenableFuture<SendResult<String, OrderPaidEvent>> result = kafka.send(MY_TOPIC,
new OrderPaidEvent(String.valueOf(ORDER_ID.incrementAndGet()), new BigDecimal(100.0)));
result.addCallback(new ResultHandle());
}

public void sendDefault() {
ListenableFuture<SendResult<String, OrderPaidEvent>> result = kafka.sendDefault(new OrderPaidEvent(String.valueOf(ORDER_ID.incrementAndGet()), new BigDecimal(100.0)));
result.addCallback(new ResultHandle());
}

private class ResultHandle<T> implements ListenableFutureCallback<T> {

@Override
public void onFailure(Throwable ex) {
logger.debug("MQ推送消息失败: {}", ex.getMessage());
}

@Override
public void onSuccess(T result) {
logger.info("MQ推送消息成功: {}", result);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
spring.application.name = kafka-producer
spring.kafka.bootstrap-servers = 192.168.0.1:9092,192.168.0.2:9092
spring.kafka.producer.value-serializer = org.springframework.kafka.support.serializer.JsonSerializer
spring.kafka.template.default-topic = my-topic

0 comments on commit 2e62359

Please sign in to comment.