菜单 学习猿地 - LMONKEY

VIP

开通学习猿地VIP

尊享10项VIP特权 持续新增

知识通关挑战

打卡带练!告别无效练习

接私单赚外块

VIP优先接,累计金额超百万

学习猿地私房课免费学

大厂实战课仅对VIP开放

你的一对一导师

每月可免费咨询大牛30次

领取更多软件工程师实用特权

入驻
356
0

Kafka

原创
05/13 14:22
阅读数 88720

kafka
Kafka是分布式发布-订阅消息系统。它最初由LinkedIn公司开发,之后成为Apache项目的一部分。Kafka是一个分布式的,可划分的,冗余备份的持久性的日志服务。它主要用于处理活跃的流式数据。
通过集群来提供实时的消费;

节点信息
kafka1   192.168.1.31  broker.id=1 
Kafka2   192.168.1.32  broker.id=2
kafka3   192.168.1.33  broker.id=3
端口 9092

版本 kafka_2.11-0.10.1.0.tgz

下载安装:
https://www.apache.org/dyn/closer.cgi?path=/kafka/0.10.1.1/kafka_2.11-0.10.1.1.tgz
# mkdir /opt/kafka_2.11-0.10.1.0/logs

配置信息:

修改配置文件示例
# cat config/server.properties

broker.id=1
#每个服务的 标识ID (每个服务都是唯一的)

listeners=PLAINTEXT://192.168.1.31:9092
#备注:listeners一定要配置成为IP地址;如果配置为localhost或服务器的hostname,在使用java发送数据时就会抛出异 ;也可以设置hostname

message.max.bytes=10000000
#服务器可以接收的最大的消息大小

default.replication.factor=1
#默认副本

replica.lag.time.max.ms=10000
#在此窗口时间内没有收到follower的fetch请求,leader会将其从ISR(in-sync replicas)中移除
num.network.threads=3
num.io.threads=8
#服务器执行读写的IO线程数 (可以根据服务器磁盘数量)

socket.send.buffer.bytes=102400
#发送字节缓冲最大值

socket.receive.buffer.bytes=102400
#接收缓冲区最大字节

socket.request.max.bytes=104857600
#服务器允许请求的最大值,防止内存溢出,小于内存实际大小;

num.partitions=1
#默认partition数量,如果topic在创建时没有指定partition数量,默认使用此值,建议改为5

num.recovery.threads.per.data.dir=1
#在启动时用于日志恢复和关闭时刷新的每个数据目录的线程数。

log.retention.hours=168
#日志保留时长

log.segment.bytes=1073741824
#日志字段大小;

log.retention.check.interval.ms=300000
#日志保留间隔时间;

log.dirs= /opt/kafka_2.11-0.10.1.0/logs
#zk 配置 
zookeeper.connect=192.168.1.31:2181,192.168.1.32:2181,192.168.1.33:2181
zookeeper.connection.timeout.ms=6000

新增Kafka环境变量:(配置kafka内存)

添加环境变量

cat /etc/profile
export KAFKA_HEAP_OPTS="-Xms30g -Xmx30g -XX:+UseG1GC -XX:MaxGCPauseMillis=50 -XX:InitiatingHeapOccupancyPercent=45"

source /etc/profile

启动:

nohup 后台运行
nohup bin/kafka-server-start.sh config/server.properties >/dev/null 2>&1 &

 


 

发表评论

0/200
356 点赞
0 评论
收藏