引入依赖
<dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-stream-binder-kafka</artifactId> </dependency>
或
<dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-stream-kafka</artifactId> </dependency>
发送(Spring Kafka)
private KafkaTemplate<String, Object> kafkaTemplate; @Autowired public KafkaController(KafkaTemplate kafkaTemplate) { this.kafkaTemplate = kafkaTemplate; } @GetMapping("/send") public Person send(@PathVariable String name) { Person person = new Person(); person.setId(System.currentTimeMillis()); person.setName(name); kafkaTemplate.send("test-topic", person); return person; }
接收(Spring Kafka)
@KafkaListener(topics = "test-topic") public void consume(Person person){ System.out.println(person.toString()); }
//生产者端错误信息 There was an unexpected error (type=Internal Server Error, status=500). Can't convert value of class com.service.Person to class org.apache.kafka.common.serialization.ByteArraySerializer specified in value.serializer
消费者端错误信息 nested exception is org.springframework.messaging.converter.MessageConversionException: Cannot convert from [[B] to [com.service.Person]
KafkaProperties-> Consumer->valueDeserializer
// 解决办法 KafkaProperties-> Producer->valueSerializer
spring: kafka: producer: valueSerializer: com.service.kafka.ObjectSerializer #加入自定义序列化方式 consumer: groupId: test valueDeserializer: com.service.kafka.ObjectDeSerializer public class ObjectSerializer implements Serializer<Serializable> { @Override public void configure(Map<String, ?> map, boolean b) { } @Override public byte[] serialize(String s, Serializable serializable) { System.out.printf("topic:%s, data:%s", s, serializable); ByteArrayOutputStream bos = new ByteArrayOutputStream(); byte[] bytes = null; try (ObjectOutputStream oos = new ObjectOutputStream(bos)) { oos.writeObject(serializable); bytes = bos.toByteArray(); } catch (IOException e) { e.printStackTrace(); } return bytes; } @Override public void close() { } } public class ObjectDeSerializer implements Deserializer<Serializable> { @Override public void configure(Map map, boolean b) { } @Override public Serializable deserialize(String s, byte[] bytes) { ByteArrayInputStream bs = new ByteArrayInputStream(bytes); Serializable result = null; try (ObjectInputStream os = new ObjectInputStream(bs)) { result = (Serializable) os.readObject(); } catch (IOException | ClassNotFoundException e) { e.printStackTrace(); } System.out.printf("topic:%s, data:%s", s, result); return result; } @Override public void close() { } }
发送(Spring Cloud Stream Kafka)
@GetMapping("/stream/{name}") public Person streamSend(@PathVariable String name){ Person person = new Person(); person.setId(System.currentTimeMillis()); person.setName(name); MessageChannel mc = source.output(); mc.send(MessageBuilder.withPayload(person).build()); return person; }
自定义source
public interface PersonSource { /** * Name of the output channel. */ String TOPIC = "test-topic"; /** * @return output channel */ @Output(PersonSource.TOPIC) MessageChannel source(); } // 加入注解 @EnableBinding(value = {Source.class,PersonSource.class}) // 将source替换为新定义的personSource MessageChannel mc = personSource.source();
消费
// 使用如下方式会报错 @KafkaListener(topics = "test-topic") public void consume(Person person){ System.out.println(person.toString()); } // 如下方式正常 @StreamListener("test-topic") public void streamConsumer(Person person){ System.out.println(person.toString()); }
是否能通过给数据加入Header的方式解决问题
mc.send(MessageBuilder.withPayload(person).setHeader("Content-Type","application/bean").build());
通过加入header的方式依然不能反序列化成功.
注意
虽然Spring Cloud Stream Binder 中存在Spring Kafka的整合,但是Spring Kafka和Spring Cloud Stream Kafka在处理数据的生产与消费是存在差异的。所以在使用上一定要配套使用。
当Spring Cloud Stream Kafka 发送消息包含头信息时,Kafka DeSerializer在实现方法回调的时候并不会处理。
一般情况可以通过StreamListener 来监听数据(主体),如果需要处理消息的header信息的话可以通过SubscribableChannel来处理
@Bean public ApplicationRunner createRunner() { return (args) -> personSink.input().subscribe(message -> { MessageHeaders headers = message.getHeaders(); Object obj = message.getPayload(); System.out.printf("receive message, header:%s, body:%s", headers, obj); }); }
但是如果上述代码与如下代码同时存在,那么他们会轮流执行
@StreamListener("test-topic") public void streamConsumer(Person person){ System.out.println(person.toString()); }
Input注解
- 对应 - SubscribableChannel
Output注解
- 对应 - MessageChannel
两者均屏蔽了具体Stream的具体实现。 无论是@Input还是@Output他们的value不允许重复(bean不允许重复),可以通过destination来申明topic
spring: cloud: stream: bindings: test-topic-provider: destination: test-topic test-topic-consume: group: test02 destination: test-topic /** * Name of the output channel. */ String TOPIC = "test-topic-provider"; /** * @return output channel */ @Output(PersonSource.TOPIC) MessageChannel source(); /** * Input channel name. */ String INPUT = "test-topic-consume"; /** * @return input channel. */ @Input(INPUT) SubscribableChannel input(); @StreamListener(PersonSource.TOPIC) public void streamConsumer(Person person){ System.out.println(person.toString()); }
SubscribableChannel与@StreamListener
两者实现存在着差异,SubscribableChannel会触发kafka的自定义反序列化,所以Spring Cloud Stream Kafka 是将对象序列化成JSON, 通过JSON反序列化成对象(不经过自定义kafka的Serializer/DeSerializer)。
© 著作权归作者所有
发表评论