【Kafka】Zookeeper和Kafka集群的安装和配置

news/2024/7/9 21:32:35 标签: JDK, zookeeper, kafka, 集群

一、集群环境说明

1. 虚拟机:192.168.223.101/103/105

2. 系统版本:CentOS 7.9

3. JDK版本:11.0.18.0.1

4. Zookeeper版本:3.7.1

5. Kafka版本:2.13-2.8.2

备注:无论是ZK,还是Kafka的安装,都需要用到JDK,上面给出的ZK和Kafka版本,都已经支持JDK11(JDK 11 Supported)。这三者之间的兼容关系,感兴趣的可以去对应的官网上查询官方Docs,这里就不做赘述了。

二、集群组件部署

2.1 安装JDK

使用root用户安装JDK11,JDK目录为:/usr/jdk-11.0.18.0.1

cd /usr
tar -xzf jdk-11.0.18.0.1_linux-x64_bin.tar.gz
rm -f jdk-11.0.18.0.1_linux-x64_bin.tar.gz
# 目录授权,供其他用户和组调用
chmod -R 755 jdk-11.0.18.0.1

2.2 创建用户和组

由于Zookeeper和Kafka的安装和运行无需root用户,因此从安全角度考虑,我们为其安装和运行创建普通用户和组(app:apps)。

groupadd apps
useradd -d /app -g apps app
chmod -R 755 /app
chown -R app:apps /app

2.3配置Java环境变量

首先,我们切换到app用户下,然后vi .bash_profile这个文件(如果不存在可以直接vi创建即可)。然后将下面的内容黏贴到文件中,并保存退出。最后使用source .bash_profile使配置生效即可。

# .bash_profile

# Get the aliases and functions
if [ -f ~/.bashrc ]; then
        . ~/.bashrc
fi

# User specific environment and startup programs
JAVA_HOME=/usr/jdk-11.0.18.0.1
export JAVA_HOME

CLASSPATH=.:$JAVA_HOME/lib:$JAVA_HOME/lib/tools.jar
export CLASSPATH

PATH=$JAVA_HOME/bin:$PATH:/usr/local/bin:$HOME/bin
export PATH

2.4 安装Zookeeper集群

首先,我们将zk的安装包使用rz命令上传到app用户目录下,然后解压,按照下面的配置编辑配置文件即可。

cd /app
tar -xzf apache-zookeeper-3.7.1-bin.tar.gz
rm -f ./apache-zookeeper-3.7.1-bin.tar.gz
mv apache-zookeeper-3.7.1-bin/ zookeeper
cd /app/zookeeper/conf 
vi zoo.cfg

192.168.223.101/103/105三台服务器上的zoo.cfg配置一致,这里需要注意dataDir,需要自定义目录,目录需要提前创建好。

# The number of milliseconds of each tick
tickTime=2000

# The number of ticks that the initial synchronization phase can take
initLimit=10

# The number of ticks that can pass between sending a request and getting an acknowledgement
syncLimit=5

# the directory where the snapshot is stored.do not use /tmp for storage, /tmp here is just example sakes.
dataDir=/app/zookeeper/data

# the port at which the clients will connect
clientPort=2181

# the maximum number of client connections.increase this if you need to handle more clients
maxClientCnxns=60

# Be sure to read the maintenance section of the 
# administrator guide before turning on autopurge.

# http://zookeeper.apache.org/doc/current/zookeeperAdmin.html#sc_maintenance

# The number of snapshots to retain in dataDir
autopurge.snapRetainCount=3

# Purge task interval in hours Set to "0" to disable auto purge feature
autopurge.purgeInterval=1

## Metrics Providers
# https://prometheus.io Metrics Exporter
metricsProvider.className=org.apache.zookeeper.metrics.prometheus.PrometheusMetricsProvider
metricsProvider.httpPort=17000
metricsProvider.exportJvmInfo=true

server.1=192.168.223.101:2888:3888
server.2=192.168.223.103:2888:3888
server.3=192.168.223.105:2888:3888

这里不同的配置为myid,配置如下:

# 192.168.223.101
cd /app/zookeeper/data
echo 1 > myid

# 192.168.223.103
cd /app/zookeeper/data
echo 2 > myid

# 192.168.223.105
cd /app/zookeeper/data
echo 3 > myid

最后依次启动三台服务器上的zookeeper即可,并检查当前节点的状态,三个节点中,2个为follower,1个为leader。

cd /app/zookeeper/bin
./zkServer.sh start
./zkServer.sh status

到这里,zookeeper集群的安装配置就完成了,下面我们来安装配置Kafka集群

2.5 安装Kafka集群

首先,还是先将Kafka的二进制包上传的app用户目录下,然后执行解压、配置和服务启动。

cd /app
tar -xzf kafka_2.13-2.8.2.tgz
rm -f kafka_2.13-2.8.2.tgz
mv kafka_2.13-2.8.2 kafka
cd  kafka/config
mv server.properties server.properties.bak
vi server.properties

三台服务器(192.168.223.101/103/105)的server.properties配置分别如下:

# Server - 192.168.223.101
############################# Server Basics #############################
# 每个节点的broker-id不能一样,需要修改
broker.id=1

############################# Socket Server Settings #############################
# 每个节点的listeners,需要修改IP
listeners=PLAINTEXT://192.168.223.101:9092

num.network.threads=3
num.io.threads=8

socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600


############################# Log Basics #############################
# 自定义log目录路径
log.dirs=/app/kafka/logs

num.partitions=3
num.recovery.threads.per.data.dir=1

############################# Internal Topic Settings  #############################
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1

############################# Log Flush Policy #############################
log.flush.interval.messages=10000
log.flush.interval.ms=1000

############################# Log Retention Policy #############################
log.retention.hours=168
log.retention.bytes=1073741824
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000

############################# Zookeeper #############################
zookeeper.connect=192.168.223.101:2181,192.168.223.103:2181,192.168.223.105:2181
zookeeper.connection.timeout.ms=18000

############################# Group Coordinator Settings #############################
group.initial.rebalance.delay.ms=0
# Server - 192.168.223.103
############################# Server Basics #############################
# 每个节点的broker-id不能一样,需要修改
broker.id=2

############################# Socket Server Settings #############################
# 每个节点的listeners,需要修改IP
listeners=PLAINTEXT://192.168.223.103:9092

num.network.threads=3
num.io.threads=8

socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600


############################# Log Basics #############################
# 自定义log目录路径
log.dirs=/app/kafka/logs

num.partitions=3
num.recovery.threads.per.data.dir=1

############################# Internal Topic Settings  #############################
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1

############################# Log Flush Policy #############################
log.flush.interval.messages=10000
log.flush.interval.ms=1000

############################# Log Retention Policy #############################
log.retention.hours=168
log.retention.bytes=1073741824
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000

############################# Zookeeper #############################
zookeeper.connect=192.168.223.101:2181,192.168.223.103:2181,192.168.223.105:2181
zookeeper.connection.timeout.ms=18000

############################# Group Coordinator Settings #############################
group.initial.rebalance.delay.ms=0
# Server - 192.168.223.105
############################# Server Basics #############################
# 每个节点的broker-id不能一样,需要修改
broker.id=3

############################# Socket Server Settings #############################
# 每个节点的listeners,需要修改IP
listeners=PLAINTEXT://192.168.223.105:9092

num.network.threads=3
num.io.threads=8

socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600


############################# Log Basics #############################
# 自定义log目录路径
log.dirs=/app/kafka/logs

num.partitions=3
num.recovery.threads.per.data.dir=1

############################# Internal Topic Settings  #############################
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1

############################# Log Flush Policy #############################
log.flush.interval.messages=10000
log.flush.interval.ms=1000

############################# Log Retention Policy #############################
log.retention.hours=168
log.retention.bytes=1073741824
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000

############################# Zookeeper #############################
zookeeper.connect=192.168.223.101:2181,192.168.223.103:2181,192.168.223.105:2181
zookeeper.connection.timeout.ms=18000

############################# Group Coordinator Settings #############################
group.initial.rebalance.delay.ms=0

然后,我们依次启动三台服务器上的Kafka即可

cd /app/kafka
./bin/kafka-server-start.sh -daemon ./config/server.properties

备注:-daemon为后台启动,这样就无需在启动命令中写nohup …… &这样的字符了。

三、Kafka使用测试

到此为止,三节点的Kafka集群就已经部署完毕,部分配置参数还有待调优,这里就不做扩展说明了,生产环境上的配置,以本地的配置为准。

$ cd /app/kafka

# 创建topic
$ sh ./bin/kafka-topics.sh --create --zookeeper 192.168.223.103:2181 --replication-factor 1 --partitions 1 --topic my-topic
Created topic my-topic.

# 浏览所有topic
$ ./bin/kafka-topics.sh --list --zookeeper 192.168.223.103:2181
my-topic

$ ./bin/kafka-topics.sh --list --zookeeper 192.168.223.101:2181
my-topic

$ ./bin/kafka-topics.sh --list --zookeeper 192.168.223.105:2181
my-topic

# 浏览指定topic
$ ./bin/kafka-topics.sh --describe --zookeeper 192.168.223.103:2181 --topic my-topic
Topic: my-topic TopicId: wfI9VvK1QAyCP9ReZrDlIQ PartitionCount: 1       ReplicationFactor: 1    Configs: 
        Topic: my-topic Partition: 0    Leader: 2       Replicas: 2     Isr: 2

# 生产console信息
$ ./bin/kafka-console-producer.sh --broker-list 192.168.223.103:9092 --topic my-topic

# 消费Console消息:
$ ./bin/kafka-console-consumer.sh --bootstrap-server 192.168.223.103:9092 --topic my-topic --from-beginning

$ cd /app/zookeeper/bin
$ ./zkCli.sh -server 192.168.223.103:2181
[zk: 192.168.223.103:2181(CONNECTED) 6] ls /brokers/topics
[__consumer_offsets, my-topic]


http://www.niftyadmin.cn/n/60663.html

相关文章

计算机组成原理(五)

3.理解主存储器与CPU的连接原理;   主存通过数据总线、地址总线和控制总线与CPU连接;   数据总线的位数与工作频率的乘积正比于数据传输率;   地址总线的位数决定了可寻址的最大内存空间;   控制总线(读/写&am…

数据机构笔记哈夫曼编码

1.什么是哈夫曼树?哈夫曼树经典问题:合并果堆问题:如果有三个果堆,其质量分别是1,2,3,我们现在需要将这三堆合并成一堆果堆,合并过程消耗体力等于两堆果堆的质量之和,求最小体力消耗值&#xff…

性能优化方向

性能怎么样就看io的应用,网络和数据库要好好设计,能一次查出来就一次。 对外接口尽量不要多创建对象, 少用bean复制 少用getbean(.class) 缓存不要频繁操作,最好异步 循环不要调用数据库,调用接口最好批量 Compon…

龙曲良 Tensorflow —— tensorflow高级操作(自用)

目录 一、合并与分割 1.1 tf.concat (合并) 1.2 tf.stack (增加新维度) 1.3 tf.unstack (一个一个拆分) 1.4 tf.split (均分拆分) 二、数据统计 2.1 tf.norm(默认二范数) 2…

应对新的挑战!ChatGPT将如何改变多域作战?

​公众号博主推送内容,未经许可,不得转载或者引用。 原文:Exploring the Possibilities of ChatGPT in Rugged Military AI Applications 《ChatGPT:利用最先进的技术支撑多域作战》 ChatGPT是一款基于GPT-3大型自然语言模型的…

蓝牙安全(AES-CCM)

目录 AES-CCM CCM规范加密过程 CCM规范解密认证过程 formatting函数 counter generation函数 蓝牙AES-CCM加密流程 参考文献 AES-CCM Advanced Encryption Standard-Counter with Cipher Block Chaining-Message Authentication Code 自蓝牙4.1起蓝牙的加密算法开始采…

k8s之apiserver

1、Kube-APIServer 启动APIServer 启动采用 Cobra 命令行,解析相关 flags 参数,经过 Complete(填充默认值)->Validate(校验) 逻辑后,通过 Run 启动服务。在 Run 函数中,按序分别初始化 APIServer 链(APIExtensionsServer、Kube…

Spring MVC注解Controller源码流程解析--HandlerAdapter执行流程--上

Spring MVC注解Controller源码流程解析--HandlerAdapter执行流程--上引言RequestMappingHandlerAdapter方法参数解析器方法参数名解析器类型转换体系简单的使用演示数据绑定器工厂定制化修改DataBinder获取泛型参数ControllerAdvice与InitBinder注解控制器方法执行流程Controll…