菜单 学习猿地 - LMONKEY

VIP

开通学习猿地VIP

尊享10项VIP特权 持续新增

知识通关挑战

打卡带练!告别无效练习

接私单赚外块

VIP优先接,累计金额超百万

学习猿地私房课免费学

大厂实战课仅对VIP开放

你的一对一导师

每月可免费咨询大牛30次

领取更多软件工程师实用特权

入驻
0
0

Spring Cloud Stream整合Kafka

原创
05/13 14:22
阅读数 741
引入依赖
<dependency>
  <groupId>org.springframework.cloud</groupId>
  <artifactId>spring-cloud-stream-binder-kafka</artifactId>
</dependency>

<dependency>
  <groupId>org.springframework.cloud</groupId>
  <artifactId>spring-cloud-starter-stream-kafka</artifactId>
</dependency>
发送(Spring Kafka)
private KafkaTemplate<String, Object> kafkaTemplate;
    @Autowired
    public KafkaController(KafkaTemplate kafkaTemplate) {
        this.kafkaTemplate = kafkaTemplate;
    }

    @GetMapping("/send")
    public Person send(@PathVariable String name) {
        Person person = new Person();
        person.setId(System.currentTimeMillis());
        person.setName(name);
        kafkaTemplate.send("test-topic", person);
        return person;
    }
接收(Spring Kafka)
 @KafkaListener(topics = "test-topic")
 public void consume(Person person){
        System.out.println(person.toString());
    }

//生产者端错误信息 There was an unexpected error (type=Internal Server Error, status=500). Can't convert value of class com.service.Person to class org.apache.kafka.common.serialization.ByteArraySerializer specified in value.serializer

消费者端错误信息 nested exception is org.springframework.messaging.converter.MessageConversionException: Cannot convert from [[B] to [com.service.Person]

KafkaProperties-> Consumer->valueDeserializer

// 解决办法 KafkaProperties-> Producer->valueSerializer

spring:    
  kafka:
    producer:
      valueSerializer: com.service.kafka.ObjectSerializer #加入自定义序列化方式
    consumer:
      groupId: test
      valueDeserializer: com.service.kafka.ObjectDeSerializer


public class ObjectSerializer implements Serializer<Serializable> {
    @Override
    public void configure(Map<String, ?> map, boolean b) {

    }

    @Override
    public byte[] serialize(String s, Serializable serializable) {
        System.out.printf("topic:%s, data:%s", s, serializable);

        ByteArrayOutputStream bos = new ByteArrayOutputStream();
        byte[] bytes = null;
        try (ObjectOutputStream oos = new ObjectOutputStream(bos)) {
            oos.writeObject(serializable);
            bytes = bos.toByteArray();
        } catch (IOException e) {
            e.printStackTrace();
        }
        return bytes;
    }

    @Override
    public void close() {

    }
}

public class ObjectDeSerializer implements Deserializer<Serializable> {
    @Override
    public void configure(Map map, boolean b) {

    }

    @Override
    public Serializable deserialize(String s, byte[] bytes) {
        ByteArrayInputStream bs = new ByteArrayInputStream(bytes);
        Serializable result = null;
        try (ObjectInputStream os = new ObjectInputStream(bs)) {
            result = (Serializable) os.readObject();
        } catch (IOException | ClassNotFoundException e) {
            e.printStackTrace();
        }
        System.out.printf("topic:%s, data:%s", s, result);
        return result;
    }

    @Override
    public void close() {

    }
}
发送(Spring Cloud Stream Kafka)
@GetMapping("/stream/{name}")
    public Person streamSend(@PathVariable String name){
        Person person = new Person();
        person.setId(System.currentTimeMillis());
        person.setName(name);
        MessageChannel mc =  source.output();
        mc.send(MessageBuilder.withPayload(person).build());
        return person;
    }
自定义source
public interface PersonSource {

    /**
     * Name of the output channel.
     */
    String TOPIC = "test-topic";

    /**
     * @return output channel
     */
    @Output(PersonSource.TOPIC)
    MessageChannel source();
}


// 加入注解
@EnableBinding(value = {Source.class,PersonSource.class})

// 将source替换为新定义的personSource
 MessageChannel mc =  personSource.source();
消费
// 使用如下方式会报错
@KafkaListener(topics = "test-topic")
    public void consume(Person person){
        System.out.println(person.toString());
    }


// 如下方式正常
@StreamListener("test-topic")
    public void streamConsumer(Person person){
        System.out.println(person.toString());
    }
是否能通过给数据加入Header的方式解决问题
mc.send(MessageBuilder.withPayload(person).setHeader("Content-Type","application/bean").build());

通过加入header的方式依然不能反序列化成功.

注意
  • 虽然Spring Cloud Stream Binder 中存在Spring Kafka的整合,但是Spring Kafka和Spring Cloud Stream Kafka在处理数据的生产与消费是存在差异的。所以在使用上一定要配套使用。

  • 当Spring Cloud Stream Kafka 发送消息包含头信息时,Kafka DeSerializer在实现方法回调的时候并不会处理。

  • 一般情况可以通过StreamListener 来监听数据(主体),如果需要处理消息的header信息的话可以通过SubscribableChannel来处理

@Bean
   public ApplicationRunner createRunner() {
       return (args) -> personSink.input().subscribe(message -> {
           MessageHeaders headers = message.getHeaders();
           Object obj = message.getPayload();
           System.out.printf("receive message, header:%s, body:%s", headers, obj);
       });
   }

但是如果上述代码与如下代码同时存在,那么他们会轮流执行

  @StreamListener("test-topic")
    public void streamConsumer(Person person){
        System.out.println(person.toString());
    }
Input注解
  • 对应 - SubscribableChannel
Output注解
  • 对应 - MessageChannel

两者均屏蔽了具体Stream的具体实现。 无论是@Input还是@Output他们的value不允许重复(bean不允许重复),可以通过destination来申明topic

spring:
    cloud:
        stream:
          bindings:
            test-topic-provider:
              destination: test-topic
            test-topic-consume:
              group: test02
              destination: test-topic


    /**
     * Name of the output channel.
     */
    String TOPIC = "test-topic-provider";

    /**
     * @return output channel
     */
    @Output(PersonSource.TOPIC)
    MessageChannel source();


  /**
     * Input channel name.
     */
    String INPUT = "test-topic-consume";

    /**
     * @return input channel.
     */
    @Input(INPUT)
    SubscribableChannel input();


@StreamListener(PersonSource.TOPIC)
    public void streamConsumer(Person person){
        System.out.println(person.toString());
    }
SubscribableChannel与@StreamListener

两者实现存在着差异,SubscribableChannel会触发kafka的自定义反序列化,所以Spring Cloud Stream Kafka 是将对象序列化成JSON, 通过JSON反序列化成对象(不经过自定义kafka的Serializer/DeSerializer)。

发表评论

0/200
0 点赞
0 评论
收藏
为你推荐 换一批