服务器之家:专注于服务器技术及软件下载分享
分类导航

PHP教程|ASP.NET教程|JAVA教程|ASP教程|编程技术|正则表达式|C/C++|

服务器之家 - 编程语言 - JAVA教程 - spring boot整合spring-kafka实现发送接收消息实例代码

spring boot整合spring-kafka实现发送接收消息实例代码

2020-11-26 13:37honway JAVA教程

这篇文章主要给大家介绍了关于spring-boot整合spring-kafka实现发送接收消息的相关资料,文中介绍的非常详细,对大家具有一定的参考学习价值,需要的朋友们下面跟着小编一起来看看吧。

前言

由于我们的新项目使用的是spring-boot,而又要同步新项目中建的数据到老的系统当中.原来已经有一部分的同步代码,使用的是kafka. 其实只是做数据的同步,我觉得选MQ没必要使用kafka.首先数据量不大,其实搞kafka又要搞集群,ZK.只是用做一些简单数据同步的话,有点大材小用.

没办法,咱只是个打工的,领导让搞就搞吧.刚开始的时候发现有一个spring-integration-kafka,描述中说是基于spring-kafka做了一次重写.但是我看了官方文档.实在是搞的有点头大.功能一直没实现.文档写的也不是很漂亮,也可能是刚起步,有很多的问题.我这里只能放弃了,使用了spring-kafka.

实现方法

pom.xml文件如下

 
?
1
 
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
   xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
   xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
 <modelVersion>4.0.0</modelVersion>
 
 <groupId>org.linuxsogood.sync</groupId>
 <artifactId>linuxsogood-sync</artifactId>
 <version>1.0.0-SNAPSHOT</version>
 
 <parent>
  <groupId>org.springframework.boot</groupId>
  <artifactId>spring-boot-starter-parent</artifactId>
  <version>1.4.0.RELEASE</version>
 </parent>
 
 <properties>
  <java.version>1.8</java.version>
  <!-- 依赖版本 -->
  <mybatis.version>3.3.1</mybatis.version>
  <mybatis.spring.version>1.2.4</mybatis.spring.version>
  <mapper.version>3.3.6</mapper.version>
  <pagehelper.version>4.1.1</pagehelper.version>
 </properties>
 
 <dependencies>
  <dependency>
   <groupId>org.springframework.boot</groupId>
   <artifactId>spring-boot-starter-web</artifactId>
  </dependency>
  <dependency>
   <groupId>org.springframework.boot</groupId>
   <artifactId>spring-boot-starter-jdbc</artifactId>
  </dependency>
  <dependency>
   <groupId>org.springframework.boot</groupId>
   <artifactId>spring-boot-starter-aop</artifactId>
  </dependency>
  <dependency>
   <groupId>org.springframework.boot</groupId>
   <artifactId>spring-boot-starter-freemarker</artifactId>
  </dependency>
  <!--<dependency>
   <groupId>org.springframework.boot</groupId>
   <artifactId>spring-boot-starter-integration</artifactId>
   <scope>compile</scope>
  </dependency>
  <dependency>
   <groupId>org.springframework.integration</groupId>
   <artifactId>spring-integration-kafka</artifactId>
   <version>2.0.1.RELEASE</version>
   <scope>compile</scope>
  </dependency>
  <dependency>
   <groupId>org.springframework.integration</groupId>
   <artifactId>spring-integration-core</artifactId>
   <version>4.3.1.RELEASE</version>
   <scope>compile</scope>
  </dependency>-->
  <dependency>
   <groupId>org.springframework.kafka</groupId>
   <artifactId>spring-kafka</artifactId>
   <version>1.1.0.RELEASE</version>
  </dependency>
  <!--<dependency>
   <groupId>org.springframework.kafka</groupId>
   <artifactId>spring-kafka-test</artifactId>
   <version>1.1.0.RELEASE</version>
  </dependency>-->
  <dependency>
   <groupId>junit</groupId>
   <artifactId>junit</artifactId>
   <version>4.12</version>
   <scope>test</scope>
  </dependency>
  <dependency>
   <groupId>org.assertj</groupId>
   <artifactId>assertj-core</artifactId>
   <version>3.5.2</version>
  </dependency>
  <dependency>
   <groupId>org.hamcrest</groupId>
   <artifactId>hamcrest-all</artifactId>
   <version>1.3</version>
   <scope>test</scope>
  </dependency>
  <dependency>
   <groupId>org.mockito</groupId>
   <artifactId>mockito-all</artifactId>
   <version>1.9.5</version>
   <scope>test</scope>
  </dependency>
  <dependency>
   <groupId>org.springframework</groupId>
   <artifactId>spring-test</artifactId>
   <version>4.2.3.RELEASE</version>
   <scope>test</scope>
  </dependency>
  <dependency>
   <groupId>org.springframework.boot</groupId>
   <artifactId>spring-boot-starter-test</artifactId>
   <scope>test</scope>
  </dependency>
  <dependency>
   <groupId>mysql</groupId>
   <artifactId>mysql-connector-java</artifactId>
  </dependency>
  <dependency>
   <groupId>com.microsoft.sqlserver</groupId>
   <artifactId>sqljdbc4</artifactId>
   <version>4.0.0</version>
  </dependency>
  <dependency>
   <groupId>com.alibaba</groupId>
   <artifactId>druid</artifactId>
   <version>1.0.11</version>
  </dependency>
 
  <!--Mybatis-->
  <dependency>
   <groupId>org.mybatis</groupId>
   <artifactId>mybatis</artifactId>
   <version>${mybatis.version}</version>
  </dependency>
  <dependency>
   <groupId>org.mybatis</groupId>
   <artifactId>mybatis-spring</artifactId>
   <version>${mybatis.spring.version}</version>
  </dependency>
  <!--<dependency>
   <groupId>org.mybatis.spring.boot</groupId>
   <artifactId>mybatis-spring-boot-starter</artifactId>
   <version>1.1.1</version>
  </dependency>-->
  <!-- Mybatis Generator -->
  <dependency>
   <groupId>org.mybatis.generator</groupId>
   <artifactId>mybatis-generator-core</artifactId>
   <version>1.3.2</version>
   <scope>compile</scope>
   <optional>true</optional>
  </dependency>
  <!--分页插件-->
  <dependency>
   <groupId>com.github.pagehelper</groupId>
   <artifactId>pagehelper</artifactId>
   <version>${pagehelper.version}</version>
  </dependency>
  <!--通用Mapper-->
  <dependency>
   <groupId>tk.mybatis</groupId>
   <artifactId>mapper</artifactId>
   <version>${mapper.version}</version>
  </dependency>
  <dependency>
   <groupId>com.alibaba</groupId>
   <artifactId>fastjson</artifactId>
   <version>1.2.17</version>
  </dependency>
 </dependencies>
 <repositories>
  <repository>
   <id>repo.spring.io.milestone</id>
   <name>Spring Framework Maven Milestone Repository</name>
   <url>https://repo.spring.io/libs-milestone</url>
  </repository>
 </repositories>
 <build>
  <finalName>mybatis_generator</finalName>
  <plugins>
   <plugin>
    <groupId>org.mybatis.generator</groupId>
    <artifactId>mybatis-generator-maven-plugin</artifactId>
    <version>1.3.2</version>
    <configuration>
     <verbose>true</verbose>
     <overwrite>true</overwrite>
    </configuration>
   </plugin>
   <plugin>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-maven-plugin</artifactId>
    <configuration>
     <mainClass>org.linuxsogood.sync.Starter</mainClass>
    </configuration>
   </plugin>
  </plugins>
 </build>
</project>

orm层使用了MyBatis,又使用了通用Mapper和分页插件.

kafka消费端配置

 
?
1
 
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
import org.linuxsogood.sync.listener.Listener;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
 
import java.util.HashMap;
import java.util.Map;
 
@Configuration
@EnableKafka
public class KafkaConsumerConfig {
 
 @Value("${kafka.broker.address}")
 private String brokerAddress;
 
 @Bean
 KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() {
 ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
 factory.setConsumerFactory(consumerFactory());
 factory.setConcurrency(3);
 factory.getContainerProperties().setPollTimeout(3000);
 return factory;
 }
 
 @Bean
 public ConsumerFactory<String, String> consumerFactory() {
 return new DefaultKafkaConsumerFactory<>(consumerConfigs());
 }
 
 @Bean
 public Map<String, Object> consumerConfigs() {
 Map<String, Object> propsMap = new HashMap<>();
 propsMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, this.brokerAddress);
 propsMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
 propsMap.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "100");
 propsMap.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "15000");
 propsMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
 propsMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
 propsMap.put(ConsumerConfig.GROUP_ID_CONFIG, "firehome-group");
 propsMap.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
 return propsMap;
 }
 
 @Bean
 public Listener listener() {
 return new Listener();
 }
}

生产者的配置.

 
?
1
 
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
 
import java.util.HashMap;
import java.util.Map;
 
@Configuration
@EnableKafka
public class KafkaProducerConfig {
 
 @Value("${kafka.broker.address}")
 private String brokerAddress;
 
 @Bean
 public ProducerFactory<String, String> producerFactory() {
 return new DefaultKafkaProducerFactory<>(producerConfigs());
 }
 
 @Bean
 public Map<String, Object> producerConfigs() {
 Map<String, Object> props = new HashMap<>();
 props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, this.brokerAddress);
 props.put(ProducerConfig.RETRIES_CONFIG, 0);
 props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);
 props.put(ProducerConfig.LINGER_MS_CONFIG, 1);
 props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);
 props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
 props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
 return props;
 }
 
 @Bean
 public KafkaTemplate<String, String> kafkaTemplate() {
 return new KafkaTemplate<String, String>(producerFactory());
 }
}

监听,监听里面,写的就是业务逻辑了,从kafka里面得到数据后,具体怎么去处理. 如果需要开启kafka处理消息的广播模式,多个监听要监听不同的group,即方法上的注解@KafkaListener里的group一定要不一样.如果多个监听里的group写的一样,就会造成只有一个监听能处理其中的消息,另外监听就不能处理消息了.也即是kafka的分布式消息处理方式.

在同一个group里的监听,共同处理接收到的消息,会根据一定的算法来处理.如果不在一个组,但是监听的是同一个topic的话,就会形成广播模式

 
?
1
 
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
import com.alibaba.fastjson.JSON;
import org.linuxsogood.qilian.enums.CupMessageType;
import org.linuxsogood.qilian.kafka.MessageWrapper;
import org.linuxsogood.qilian.model.store.Store;
import org.linuxsogood.sync.mapper.StoreMapper;
import org.linuxsogood.sync.model.StoreExample;
import org.apache.commons.lang3.StringUtils;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.annotation.KafkaListener;
 
import java.util.List;
import java.util.Optional;
 
public class Listener {
 
 private static final Logger LOGGER = LoggerFactory.getLogger(Listener.class);
 
 @Autowired
 private StoreMapper storeMapper;
 
 /**
  * 监听kafka消息,如果有消息则消费,同步数据到新烽火的库
  * @param record 消息实体bean
  */
 @KafkaListener(topics = "linuxsogood-topic", group = "sync-group")
 public void listen(ConsumerRecord<?, ?> record) {
  Optional<?> kafkaMessage = Optional.ofNullable(record.value());
  if (kafkaMessage.isPresent()) {
   Object message = kafkaMessage.get();
   try {
    MessageWrapper messageWrapper = JSON.parseObject(message.toString(), MessageWrapper.class);
    CupMessageType type = messageWrapper.getType();
    //判断消息的数据类型,不同的数据入不同的表
    if (CupMessageType.STORE == type) {
     proceedStore(messageWrapper);
    }
   } catch (Exception e) {
    LOGGER.error("将接收到的消息保存到数据库时异常, 消息:{}, 异常:{}",message.toString(),e);
   }
  }
 }
 
 /**
  * 消息是店铺类型,店铺消息处理入库
  * @param messageWrapper 从kafka中得到的消息
  */
 private void proceedStore(MessageWrapper messageWrapper) {
  Object data = messageWrapper.getData();
  Store cupStore = JSON.parseObject(data.toString(), Store.class);
  StoreExample storeExample = new StoreExample();
  String storeName = StringUtils.isBlank(cupStore.getStoreOldName()) ? cupStore.getStoreName() : cupStore.getStoreOldName();
  storeExample.createCriteria().andStoreNameEqualTo(storeName);
  List<org.linuxsogood.sync.model.Store> stores = storeMapper.selectByExample(storeExample);
  org.linuxsogood.sync.model.Store convertStore = new org.linuxsogood.sync.model.Store();
  org.linuxsogood.sync.model.Store store = convertStore.convert(cupStore);
  //如果查询不到记录则新增
  if (stores.size() == 0) {
   storeMapper.insert(store);
  } else {
   store.setStoreId(stores.get(0).getStoreId());
   storeMapper.updateByPrimaryKey(store);
  }
 }
 
}

总结

以上就是这篇文章的全部内容了,希望本文的内容对大家的学习或者工作能带来一定的帮助,如果有疑问大家可以留言交流,谢谢大家对服务器之家的支持。

原文链接:http://linuxsogood.org/1572.html

延伸 · 阅读

精彩推荐