Knative 实战:基于 Kafka 实现消息推送

导读:当前在 Knative 中已经提供了对 Kafka 事件源的支持,那么如何基于 Kafka 实现消息推送呢?本文作者将以阿里云 Kafka 产品为例,给大家解锁这一新的姿势。

作者 | 元毅 阿里云智能事业群高级开发工程师

背景

消息队列 for Apache Kafka 是阿里云提供的分布式、高吞吐、可扩展的消息队列服务。消息队列 for Apache Kafka 广泛用于日志收集、监控数据聚合、流式数据处理、在线和离线分析等大数据领域,已成为大数据生态中不可或缺的部分。结合 Knative 中提供了 KafkaSource 事件源的支持, 可以方便的对接 Kafka 消息服务。另外也可以安装社区 Kafka 2.0.0 及以上版本使用。

在阿里云上创建 Kafka 实例

创建 Kafka 实例

登录消息队列 Kafka 控制台, 选择【购买实例】。由于当前 Knative 中 Kafka 事件源支持 2.0.0 及以上版本,在阿里云上创建 Kafka 实例需要选择包年包月、专业版本进行购买,购买之后升级到 2.0.0 即可。


部署实例并绑定 VPC

购买完成之后,进行部署,部署时设置 Knative 集群所在的 VPC 即可:

创建 Topic 和 Consumer Group

接下来我们创建 Topic 和消费组。进入【Topic 管理】,点击 创建 Topic, 这里我们创建名称为 demo 的 topic:

进入【Consumer Group 管理】,点击 创建 Consumer Group, 这里我们创建名称为 demo-consumer 的消费组:

部署 Kafka 数据源

部署 Kafka addon 组件

登录容器服务控制台,进入【Knative 组件管理】,部署 Kafka addon 组件。

创建 KafkaSource 实例
首先创建用于接收事件的服务 event-display:

apiVersion: serving.knative.dev/v1
kind: Service
metadata:
  name: event-display
spec:
  template:
    spec:
      containers:
      - image: registry.cn-hangzhou.aliyuncs.com/knative-sample/eventing-sources-cmd-event_display:bf45b3eb1e7fc4cb63d6a5a6416cf696295484a7662e0cf9ccdf5c080542c21d

接下来创建 KafkaSource:

apiVersion: sources.eventing.knative.dev/v1alpha1
kind: KafkaSource
metadata:
  name: alikafka-source
spec:
  consumerGroup: demo-consumer
  # Broker URL. Replace this with the URLs for your kafka cluster,
  # which is in the format of my-cluster-kafka-bootstrap.my-kafka-namespace:9092.
  bootstrapServers: 192.168.0.6x:9092,192.168.0.7x:9092,192.168.0.8x:9092
  topics: demo
  sink:
    apiVersion: serving.knative.dev/v1alpha1
    kind: Service
    name: event-display

说明:

  • bootstrapServers: Kafka VPC 访问地址
  • consumerGroup: 设置消费组
  • topics:设置 Topic

创建完成之后,我们可以查看对应的实例已经运行:

[root@iZ2zeae8wzyq0ypgjowzq2Z ~]# kubectl get pods
NAME                                    READY   STATUS    RESTARTS   AGE
alikafka-source-k22vz-db44cc7f8-879pj   1/1     Running   0          8h

验证

在 Kafka 控制台,选择 topic 发送消息,注意这里的消息格式必须是 json 格式:

我们可以看到已经接收到了发送过来的 Kafka 消息:

[root@iZ2zeae8wzyq0ypgjowzq2Z ~]# kubectl logs event-display-zl6m5-deployment-6bf9596b4f-8psx4 user-container

☁️  CloudEvent: valid ✅
Context Attributes,
  SpecVersion: 0.2
  Type: dev.knative.kafka.event
  Source: /apis/v1/namespaces/default/kafkasources/alikafka-source#demo
  ID: partition:7/offset:1
  Time: 2019-10-18T08:50:32.492Z
  ContentType: application/json
  Extensions: 
    key: demo
Transport Context,
  URI: /
  Host: event-display.default.svc.cluster.local
  Method: POST
Data,
  {
    "key": "test"
  }

小结

结合阿里云 Kafka 产品,通过事件驱动触发服务(函数)执行,是不是简单又高效?

这样我们利用 Knative 得以把云原生的能力充分释放出来,带给我们更多的想象空间。欢迎对 Knative 感兴趣的一起交流。

Image placeholder
gengxuliang
未设置
  95人点赞

没有讨论,发表一下自己的看法吧

推荐文章
基于 Hyperf 实现 RabbitMQ + WebSocket 消息推送

#介绍 基于Hyperf+WebSocket+RabbitMQ实现的一个简单大屏幕的消息推送。 #思路 利用WebSocket协议让客户端和服务器端保持有状态的长链接,保存链接上来的客户端id。订阅发

Knative 驾驭篇:带你 '纵横驰骋' Knative 自动扩缩容实现

Knative中提供了自动扩缩容灵活的实现机制,本文从 三横两纵 的维度带你深入了解KPA自动扩缩容的实现机制。让你轻松驾驭Knative自动扩缩容。注:本文基于最新Knativev0.11.0版本代

HBase实战:记一次Safepoint导致长时间STW的踩坑之旅

本文记录了HBase中Safepoint导致长时间STW此问题的解决思路及办法。过程记录现象:小米有一个比较大的公共离线HBase集群,用户很多,每天有大量的MapReduce或Spark离线分析任务

Talos网卡负载优化:基于个性化一致性哈希的负载均衡

本文将详细介绍基于个性化一致性哈希的流量均衡方法。 目录  业务增长带来的流量均衡需求基于一致性哈希的调度策略个性化一致性哈希的负载均衡流量均衡在Talos中的实现前文《小米消息队列的实践》介绍了小米

分布式场景下Kafka消息顺序性的思考

在业务中使用kafka发送消息异步消费的场景,并且需要实现在消费时实现顺序消费,利用kafka在partition内消息有序的特点,实现消息消费时的有序性。1、在发送消息时,通过指定partition

迟来的干货 | Kafka权限管理实战

今年有很多小伙伴在公众号或者微信留言问能不能整一篇有关Kafka权限管理的文章,迫于工作关系,这个需求一直拖后。后来知道我的好友兼Kafka高玩——【一岁小宝】他一直在弄这一块的东西,所以我就厚着fa

如何基于 Kafka 构建一个关系型数据库

在这篇文章里,我将分享如何通过扩展KCache(https://github.com/rayokota/kcache)来实现一个全功能的关系型数据库,我把这个数据库叫作KarelDB(https://

数据库请求:基本操作 SQL 语句

下面是一系列常用的MySQL语句操作:创建数据库、选择数据库、创建数据表、增、删、改、查、清空数据表、删除数据表。 1)MySQL创建数据库 createdatabase语句用来创建一个数据库。 cr

[随笔] ios连接charles提示unknown

前提:ios移动端和charles处于同一局域网,且可以互相访问。charles配置点击Proxy->sslproxyingsettings->sslproxying.enablesslproxyin

Spring Cloud Stream整合Kafka

引入依赖 org.springframework.cloud spring-cloud-stream-binder-kafka 或 org.springframework.cloud spr

Kafka 优秀的架构设计!它的高性能是如何保证的?

应大部分的小伙伴的要求,今天这篇咱们用大白话带你认识Kafka。Kafka 基础消息系统的作用大部分小伙伴应该都清楚,这里用机油装箱举个例子:所以消息系统就是如上图我们所说的仓库,能在中间过程作为缓存

全网最通俗易懂的Kafka入门

前言只有光头才能变强。文本已收录至我的GitHub仓库,欢迎Star:https://github.com/ZhongFuCheng3y/3y众所周知,消息队列的产品有好几种,这里我选择学习Kafka

Kafka 如何优化内存缓冲机制造成的频繁 GC 问题?

目录1、Kafka的客户端缓冲机制2、内存缓冲造成的频繁GC问题3、Kafka设计者实现的缓冲池机制4、总结一下“ 这篇文章,给大家聊一个硬核的技术知识,我们通过Kafka内核源码中的一些设计思想,来

Kafka 集群在马蜂窝大数据平台的优化与应用扩展

马蜂窝技术原创文章,更多干货请订阅公众号:mfwtechKafka是当下热门的消息队列中间件,它可以实时地处理海量数据,具备高吞吐、低延时等特性及可靠的消息异步传递机制,可以很好地解决不同系统间数据的

通过一个示例了解kafka connect连接器

什么是kafka连接器connect在实际工作中使用kafka,有时候会有类似这样的场景。我们需要把某些数据源的数据导入到kafka,或者把kafka作为数据源导出数据。或者两种场景的需求都要。这算是

万亿级消息背后: 小米消息队列的实践

目录业务背景架构与关键问题性能与资源优化平台化效率小米消息中间件的规划与愿景前文《消息队列价值思考》讲述了消息中间件在企业IT架构中的重要价值,本文将呈现这些价值在落地小米业务过程中的遇到的问题和实践

深入探究 RocketMQ 事务机制的实现流程,为什么它能做到发送消息零丢失?

1、解决消息丢失的第一个问题:订单系统推送消息领丢失既然我们已经明确了消息在基于MQ传输的过程中可能丢失的几个地方,那么我们接着就得一步一步考虑如何去解决各个环节丢失消息的问题,首先要解决的第一个问题

如果有人再问你怎么实现分布式延时消息,这篇文章丢给他

1.背景上篇文章介绍了RocketMQ整体架构和原理有兴趣的可以阅读一下,在这篇文章中的延时消息部分,我写道开源版的RocketMQ只提供了18个层级的消息队列延时,这个功能在开源版中显得特别鸡肋,但

基于Tcp协议与基于Http协议的RPC简介笔记

前言:之前对于RPC方面的学习多限于对RMI原理的学习,直到今天在看陈康贤前辈的《大型分布式网站架构-设计与实践》这本书的时候,才发现原来RPC可以基于TCP协议也可以基于HTTP协议(这里所说的TC

基于JWT规范实现的认证微服务

本文由公众号EAWorld翻译发表,转载需注明出处。作者:MarceloFonseca译者:白小白 原题:Buildinganauthenticationmicro-servicewithJWTsta

一个简单的基于 Redis 的分布式任务调度器 —— Java 语言实现

折腾了一周的JavaQuartz集群任务调度,很遗憾没能搞定,网上的相关文章也少得可怜,在多节点(多进程)环境下Quartz似乎无法动态增减任务,恼火。无奈之下自己撸了一个简单的任务调度器,结果只花了

基于Redis实现Spring Cloud Gateway的动态管理

引言:SpringCloudGateway是当前使用非常广泛的一种API网关。它本身能力并不能完全满足企业对网关的期望,人们希望它可以提供更多的服务治理能力。但SpringCloudGateway并不

基于 Zookeeper 的分布式锁实现

1.背景最近在学习Zookeeper,在刚开始接触Zookeeper的时候,完全不知道Zookeeper有什么用。且很多资料都是将Zookeeper描述成一个“类Unix/Linux文件系统”的中间件

基于Pandas+ECharts的金融大数据可视化实现方案

前言最近无意中看到一篇文章,介绍的是在IPythonNotebook里实现ECharts的可视化效果。我个人对ECharts一直是推崇有加,是baidu发布的开源项目中我比较喜欢的一个,绝对是良心之作

基于Webpack的css sprites实现方案

一、前言关于csssprites(雪碧图/精灵图)的几种实现方案可以参考浅谈CSSSprites雪碧图应用。本文主要讨论基于webpack的csssprites实现方案。由于使用webpack时会涉及