通过一个示例了解kafka connect连接器

什么是kafka连接器connect

在实际工作中使用 kafka ,有时候会有类似这样的场景。我们需要把某些数据源的数据导入到 kafka,或者把 kafka 作为数据源导出数据。或者两种场景的需求都要。

这算是一种 kafka 生产者,消费者模式的特殊使用场景。它主要服务于数据管道的场景,为此 kafka 在0.9版本开始增加了 connect 功能,这样可以很方便使得 kafka 可以作为数据管道各个数据段的大型缓冲区,有效的解耦了管道两端的生产者和消费者。

kafka connect包含两个组件,source connector 和 sink connector。顾名思义,前者可以数据源(比如数据库,文件系统)拉取数据塞入 kafka 的 topic中。而后者则从kafka消费数据到另一个数据源(比如Elasticsearch,Hadoop)。

连接器和客户端api有什么区别

连接器和普通的生产者消费者模式有什么区别呢?似乎两种方式都可以达到目的。可能第一次接触connect的人都会由此疑问。在《kafka权威指南》这本书里,作者给出了建议:

如果你是开发人员,你会使用 Kafka 客户端将应用程序连接到
Kafka ,井修改应用程序的代码,将数据推送到 Kafka 或者从 Kafka 读取数据。

如果要将 Kafka 连接到数据存储系统,可以使用 Connect,因为这些系统不是你开发的,
构建数据管道 I 10s你无能或者也不想修改它们的代码。 Connect 可以用于从外部数据存储系统读取数据, 或
者将数据推送到外部存储系统。如果数据存储系统提供了相应的连接器,那么非开发人员
就可以通过配置连接器的方式来使用 Connect。

如果你要连接的数据存储系统没有相应的连接器,那么可以考虑使用客户端 API 或
Connect API 开发一个应用程序。我们建议首选 Connect,因为它提供了一些开箱即用的
特性,比如配置管理、偏移量存储、井行处理、错误处理,而且支持多种数据类型和标准
的 REST 管理 API。开发一个连接 Kafka 和外部数据存储系统的小应用程序看起来很简单,
但其实还有很多细节需要处理,比如数据类型和配置选项,这些无疑加大了开发的复杂
性一一毛onnect 处理了大部分细节,让你可以专注于数据的传输。

一个连接器的应用示例

这个示例,我们用kafka自带的连接器进行演示,这样我们就省去了使用 connect api 去开发一个connect的麻烦。

首先启动 zk+kafka 的环境,然后启动connect进程,

./bin/connect-distributed.sh config/connect-distributed.properties

然后我们输入下面的命令来确认下是否启动成功。

在这里插入图片描述

接着我们启动一个文件数据源,

在这里插入图片描述

解释下,

其实就是把 echo 的内容作为http post的数据发送过去,-d @-表示从管道获取数据。在这个数据中,我们指定了连接器的名字 load-kafka-config,连接器的类名,使用的是自带的FileStream-Source,需要读取的数据源的路径,当然还有kafka的topic。

这条命令执行完后,文件的内容就被发送到kafka的topic上了,我们可以通过下面这个命令来查看下,

在这里插入图片描述

我们用消费者控制台读取了topic上的消息。可以发现每一个payload包含上面读取的文件的一行。

接着我们使用一个自带的连接器把topic里的数据读出来,并且导出到文件中保存(mydata.txt),保存的文件内容应该和前面读取的那个配置文件的内容是一样的。

在这里插入图片描述


参考:

《kafka权威指南》


关注公众号:思无邪了吗

个人博客:http://www.machengyu.net

csdn博客: https://blog.csdn.net/pony_ma...

思否: https://segmentfault.com/u/ma...

Image placeholder
strcpy
未设置
  84人点赞

没有讨论,发表一下自己的看法吧

推荐文章
SpringBoot连接多RabbitMQ源

在实际开发中,很多场景需要异步处理,这时就需要用到RabbitMQ,而且随着场景的增多程序可能需要连接多个RabbitMQ。SpringBoot本身提供了默认的配置可以快速配置连接RabbitMQ,但

理解Kubernetes网络:pods篇

这篇文章将试图揭开在kubernetes集群中运行的网络的多层神秘感。Kubernetes是一个功能强大的平台,其中包含许多智能的设计选择,但讨论交互的方式可能会造成混淆:Pod网络,服务网络,集群I

异常记录——Connection reset

连接重置Connectionreset异常java.net.SocketException:Connectionreset详细信息java.net.SocketException:Connection

如何基于 Kafka 构建一个关系型数据库

在这篇文章里,我将分享如何通过扩展KCache(https://github.com/rayokota/kcache)来实现一个全功能的关系型数据库,我把这个数据库叫作KarelDB(https://

你真的了解 React 吗?这里有 50 个 React 面试问题

如果你是一位有理想的前端开发人员,并且正在准备面试,那么这篇文章就是为你准备的。本文收集了React面试中最常见的50大问题,这是一份理想的指南,让你为React相关的面试做好充分的准备工作。首先我

Spring Cloud Stream整合Kafka

引入依赖 org.springframework.cloud spring-cloud-stream-binder-kafka 或 org.springframework.cloud spr

Kafka 优秀的架构设计!它的高性能是如何保证的?

应大部分的小伙伴的要求,今天这篇咱们用大白话带你认识Kafka。Kafka 基础消息系统的作用大部分小伙伴应该都清楚,这里用机油装箱举个例子:所以消息系统就是如上图我们所说的仓库,能在中间过程作为缓存

全网最通俗易懂的Kafka入门

前言只有光头才能变强。文本已收录至我的GitHub仓库,欢迎Star:https://github.com/ZhongFuCheng3y/3y众所周知,消息队列的产品有好几种,这里我选择学习Kafka

迟来的干货 | Kafka权限管理实战

今年有很多小伙伴在公众号或者微信留言问能不能整一篇有关Kafka权限管理的文章,迫于工作关系,这个需求一直拖后。后来知道我的好友兼Kafka高玩——【一岁小宝】他一直在弄这一块的东西,所以我就厚着fa

Knative 实战:基于 Kafka 实现消息推送

导读:当前在Knative中已经提供了对Kafka事件源的支持,那么如何基于Kafka实现消息推送呢?本文作者将以阿里云Kafka产品为例,给大家解锁这一新的姿势。作者| 元毅 阿里云智能事业群高级开

Kafka 如何优化内存缓冲机制造成的频繁 GC 问题?

目录1、Kafka的客户端缓冲机制2、内存缓冲造成的频繁GC问题3、Kafka设计者实现的缓冲池机制4、总结一下“ 这篇文章,给大家聊一个硬核的技术知识,我们通过Kafka内核源码中的一些设计思想,来

Kafka 集群在马蜂窝大数据平台的优化与应用扩展

马蜂窝技术原创文章,更多干货请订阅公众号:mfwtechKafka是当下热门的消息队列中间件,它可以实时地处理海量数据,具备高吞吐、低延时等特性及可靠的消息异步传递机制,可以很好地解决不同系统间数据的

分布式场景下Kafka消息顺序性的思考

在业务中使用kafka发送消息异步消费的场景,并且需要实现在消费时实现顺序消费,利用kafka在partition内消息有序的特点,实现消息消费时的有序性。1、在发送消息时,通过指定partition

Java 8 API 示例:字符串、数值、算术和文件

Java8API示例:字符串、数值、算术和文件 大量的教程和文章都涉及到Java8中最重要的改变,例如lambda表达式和函数式数据流。但是此外许多现存的类在JDK8API中也有所改进,带有一些实用的

使用Jupyter NoteBook进行IB查询和交易,以及使用算法交易示例

在搞好IB盈透接口后,试了下客户端交易,但是最终目的还是使用程序化交易。发现vnpy已经提供的Script_engine来支持JupyterNoteBook交易的,而且非常方便调用。 这里就用写了基于

Stylus系列——webpack-spritesmith配合stylus使用示例

一、前言基于Webpack的CSSSprites实现方案,若是直接在html中调用雪碧图图标已经很方便,但是实际开发过程可能遇到需要在伪元素中使用雪碧图,或者需要hover切换另一个图标,这种情况下就

IBM Spectrum Protect 8.1.7在AIX7.1上的安装和配置

                                                本文作者: 谷铁柏摘要:    本文章主要讲述IBMSpectrumProtect8.1.7版本在AIX

innerHTML与jquery里的html()区别?

innerHTML与jquery里的html()区别?●html()可以设置tbody、tr这些只读标签,而innerHTML在低版本IE下不行;jQuery的html()做了些容错处理,原生的Dom

打通“边-云”连接 英特尔携手腾讯加速5G&MEC落地

随着5G的到来,边缘计算在5G的应用场景中起到了很好的连接作用。首先实现了移动网络和应用的连接,可以使得业务在5G网络上得到更高质量的交互;其次,5G的边缘计算可以实现2B和2C业务的连接,2B业务能

MVVM原理(Object.defineProperty和订阅者模式)

想着去了解vue的mvvm数据驱动是怎么实现的,百度中看了这篇文章,demo很好。其他文章只是讲到defineProperty的set,get。彻底理解Vue中的Watcher、Observer、De

Python 教程-了解Python

什么是Python Python能干什么? 有什么特点? 什么是Python 官方介绍: Python是一个易于学习、功能强大的编程语言。它拥有高效高级的数据结构和一种简单有效的面向对象编程的

你了解Spring Data JPA的批量插入吗?

实现场景 有关拼团旅游的业务,里面有这样一个逻辑实现:一个拼团活动表Activity对应多个Travel旅游表,travel旅游表要分段存储前端传来的List,Travel表需将每一条数据保存入库,

前端不得不了解的 Flex 布局

背景 又双叒叕被老大拉来顶替前端小姐姐撸代码,接触到了Flex布局,以前只听过没用过,碰巧这次要揭露她的面纱,就记录一下。接触前端的同学都应该知道网页布局是CSS的一个重点,布局的传统方案都是基于盒

前端不得不了解的 Flex 布局

背景 又双叒叕被老大拉来顶替前端小姐姐撸代码,接触到了Flex布局,以前只听过没用过,碰巧这次要揭露她的面纱,就记录一下。接触前端的同学都应该知道网页布局是CSS的一个重点,布局的传统方案都是基于盒