目 录CONTENT

文章目录

zookeeper-kafka

xlong
2024-04-12 / 0 评论 / 0 点赞 / 27 阅读 / 37067 字 / 正在检测是否收录...

zookeeper

zookeeper安装

安装jdk:

yum -y install wget net-tools java-1.8.0-openjdk java-1.8.0-openjdk-devel 

安装zookeeper:

tar xf apache-zookeeper-3.6.1-bin.tar.gz
​
nohup bin/zookeeper-server-start.sh config/zookeeper.properties &> zk.log  & 

查看kafka信息:

bin/zookeeper-shell.sh localhost:2181 ls /
bin/zookeeper-shell.sh localhost:2181 get /

查看zookeeper服务状态:

for i in `seq 1 3` ;do echo $i ; ssh -p9922 zk0${i} "docker exec zk-${i} zkServer.sh status " ;done

kafka

kafka 2.x

启动服务器

Kafka 使用 ZooKeeper 如果你还没有ZooKeeper服务器,你需要先启动一个ZooKeeper服务器。 您可以通过与kafka打包在一起的便捷脚本来快速简单地创建一个单节点ZooKeeper实例。

> bin/zookeeper-server-start.sh config/zookeeper.properties
[2013-04-22 15:01:37,495] INFO Reading configuration from: config/zookeeper.properties (org.apache.zookeeper.server.quorum.QuorumPeerConfig)
...

启动Kafka服务器:

> bin/kafka-server-start.sh config/server.properties
[2013-04-22 15:01:47,028] INFO Verifying properties (kafka.utils.VerifiableProperties)
[2013-04-22 15:01:47,051] INFO Property socket.send.buffer.bytes is overridden to 1048576 (kafka.utils.VerifiableProperties)
...

创建topic

让我们创建一个名为“test”的topic,它有一个分区和一个副本:

> bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test

查看topic:

> bin/kafka-topics.sh --list --zookeeper localhost:2181
test

生产消息(producer)

Kafka自带一个命令行客户端,它从文件或标准输入中获取输入,并将其作为message(消息)发送到Kafka集群。默认情况下,每行将作为单独的message发送。

运行 producer,然后在控制台输入一些消息以发送到服务器。

> bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
This is a message
This is another message

消费消息(consumer)

> bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning
This is a message
This is another message

kafka三节点集群搭建

复制多份配置文件:

cp config/server.properties config/server-1.properties
cp config/server.properties config/server-2.properties

编辑配置文件:

config/server-1.properties:
    broker.id=1
    listeners=PLAINTEXT://:9093
    log.dir=/tmp/kafka-logs-1
 
config/server-2.properties:
    broker.id=2
    listeners=PLAINTEXT://:9094
    log.dir=/tmp/kafka-logs-2

broker.id属性是集群中每个节点的名称,这一名称是唯一且永久的。我们必须重写端口和日志目录,因为我们在同一台机器上运行这些,我们不希望所有的代理尝试在同一个端口注册,或者覆盖彼此的数据。

启动两个新的节点:

nohup  bin/kafka-server-start.sh config/server-1.properties &> kafka-1.log & 
nohup  bin/kafka-server-start.sh config/server-2.properties &> kafka-2.log & 

创建一个副本为3的新topic:

bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 1 --topic my-replicated-topic

查看集群信息:

> bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic my-replicated-topic
Topic:my-replicated-topic   PartitionCount:1    ReplicationFactor:3 Configs:
    Topic: my-replicated-topic  Partition: 0    Leader: 2   Replicas: 2,0,1 Isr: 2,0,1

以下是对输出信息的解释。第一行给出了所有分区的摘要,下面的每行都给出了一个分区的信息。因为我们只有一个分区,所以只有一行。

  • leader 是负责给定分区所有读写操作的节点。每个节点都是随机选择的部分分区的领导者。

  • replicas 是复制分区日志的节点列表,不管这些节点是leader还是仅仅活着。

  • isr 是一组“同步”replicas,是replicas列表的子集,它活着并被指到leader。

listeners和advertised配置

kafka配置listeners:

# The address the socket server listens on. It will get the value returned from
# java.net.InetAddress.getCanonicalHostName() if not configured.
#   FORMAT:
#     listeners = listener_name://host_name:port
#   EXAMPLE:
#     listeners = PLAINTEXT://your.host.name:9092
#listeners=PLAINTEXT://:9092

kafka配置advertised:

# Hostname and port the broker will advertise to producers and consumers. If not set,
# it uses the value for "listeners" if configured.  Otherwise, it will use the value
# returned from java.net.InetAddress.getCanonicalHostName().
#advertised.listeners=PLAINTEXT://your.host.name:9092

listeners 主要用来定义Kafka Broker的Listener的配置项,是kafka服务bind端口的地址

advertised.listeners 将Broker的Listener信息发布到Zookeeper中,暴露外网访问的地址,如果没有设置,会用listeners

inter.broker.listener.name专门用于Kafka集群中Broker之间的通信

listener.security.protocol.map配置监听者的安全协议,比如 PLAINTEXTSSLSASL_PLAINTEXTSASL_SSL

第一种情况:

默认 listenersadvertised.listeners 都不配置的情况下:

listeners=PLAINTEXT://hostname:port

advertised.listeners=PLAINTEXT://hostname:port

通过zookeeper查看kafka地址:

image-20230208165236963.png

第二种情况:

只配置 listeners 不配置 advertised.listeners

listeners=PLAINTEXT://172.17.0.5:9092
#advertised.listeners=PLAINTEXT://192.168.100.200:9092

通过zookeeper查看kafka地址:

image-20230208171357736.png

第三种情况:

listeners=PLAINTEXT://172.17.0.5:9092
advertised.listeners=PLAINTEXT://192.168.100.200:9092
image-20230208171713622.png

内外网分流:

listener.security.protocol.map=EXTERNAL:PLAINTEXT,INTERNAL:PLAINTEXT
listeners=EXTERNAL://172.17.0.5:9092,INTERNAL://172.17.0.5:9093
inter.broker.listener.name=INTERNAL
#advertised.listeners=PLAINTEXT://192.168.100.200:9092
image-20230208172846999.png
listener.security.protocol.map=EXTERNAL:PLAINTEXT,INTERNAL:PLAINTEXT
listeners=EXTERNAL://172.17.0.5:9092,INTERNAL://172.17.0.5:9093
inter.broker.listener.name=INTERNAL
advertised.listeners=EXTERNAL://192.168.100.200:9092,INTERNAL://172.17.0.5:9093
image-20230208173540310.png

在kafka部署到kubernetes时候

内网访问可以使用:172.17.0.5:9093

外网访问使用:192.168.100.200:9092

配置参数解析

名称

描述

类型

默认值

broker.id

broker的id,id是唯一的非负整数,集群的broker.id不能重复。

int

-1

zookeeper.connect

Zookeeper主机地址 hostname1:port1,hostname2:port2,hostname3:port3

string

log.dirs

kafka存放数据的路径。这个路径并不是唯一的,可以是多个,路径之间只需要使用逗号分隔即可;每当创建新partition时,都会选择在包含最少partitions的路径下进行。如果未设置将使用log.dir的配置。

string

/tmp/kafka-logs

listeners

监听器列表 - 使用逗号分隔URI列表和监听器名称。如果侦听器名称不是安全协议,则还必须设置 listener.security.protocol.map。指定主机名为0.0.0.0来绑定到所有接口。留空则绑定到默认接口上。合法监听器列表的示例:PLAINTEXT:// myhost:9092,SSL://:9091,CLIENT://0.0.0.0:9092,REPLICATION:// localhost:9093

string

null

advertised.listeners

监听器发布到ZooKeeper供客户端使用,如果与listeners配置不同。在IaaS环境,这可能需要与broker绑定不通的接口。如果没有设置,将使用listeners的配置。与listeners不同的是,配置0.0.0.0元地址是无效的。

string

null

delete.topic.enable

是否允许删除topic。如果关闭此配置,通过管理工具删除topic将不再生效。

boolean

true

listener.security.protocol.map

侦听器名称和安全协议之间的映射。必须定义为相同的安全协议可用于多个端口或IP。例如,即使两者都需要ssl,内部和外部流量也可以分开。具体的说,用户可以定义名字为INTERNAL和EXTERNAL的侦听器,这个属性为:internal:ssl,external:ssl。

string

PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL

inter.broker.listener.name

broker间通讯的监听器名称。如果未设置,则侦听器名称由security.inter.broker.protocol 定义。不能同时设置此项和security.inter.broker.protocol属性。

string

null

security.inter.broker.protocol

broker之间的安全通讯协议,有效值有:PLAINTEXT, SSL, SASL_PLAINTEXT, SASL_SSL。同时设置此配置和inter.broker.listener.name属性会出错

string

PLAINTEXT

sasl.enabled.mechanisms

kafka服务器中启用的sasl机制的列表。 该列表可能包含安全提供程序可用的任何机制。默认情况下只有gssapi是启用的。

list

GSSAPI

sasl.mechanism.inter.broker.protocol

SASL机制,用于broker之间的通讯,默认是GSSAPI。

string

GSSAPI

authorizer.class.name

用于认证授权的程序类

string

""

Property

Default

Description

broker.id

broker的id,id是唯一的非负整数,集群的broker.id不能重复。

zookeeper.connect

null

hostname1:port1,hostname2:port2,hostname3:port3

kafka存放数据的路径。这个路径并不是唯一的,可以是多个,路径之间只需要使用逗号分隔即可;每当创建新partition时,都会选择在包含最少partitions的路径下进行。

ZooKeeper 允许你增加一个“chroot”路径,将集群中所有kafka数据存放在特定的路径下。当多个Kafka集群或者其他应用使用相同ZooKeeper集群时,可以使用这个方式设置数据存放路径。这种方式的实现可以通过这样设置连接字符串格式,如下所示: hostname1:port1,hostname2:port2,hostname3:port3/chroot/path。

这样设置就将所有kafka集群数据存放在/chroot/path路径下。注意,在你启动broker之前,你必须创建这个路径,并且consumers必须使用相同的连接格式。

配置密码

Only one of inter.broker.listener.name and security.inter.broker.protocol should be set。

inter.broker.listener.namesecurity.inter.broker.protocol 只能配置一个。

# listener.security.protocol.map 安全协议,必须设置
# 默认值: PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL
listener.security.protocol.map=EXTERNAL:SASL_PLAINTEXT,INTERNAL:PLAINTEXT
#listener.security.protocol.map=EXTERNAL:SASL_PLAINTEXT,INTERNAL:SASL_PLAINTEXT
​
listeners=EXTERNAL://172.17.0.5:9092,INTERNAL://172.17.0.5:9093
inter.broker.listener.name=INTERNAL
advertised.listeners=EXTERNAL://192.168.100.200:9092,INTERNAL://172.17.0.5:9093
​
#security.inter.broker.protocol=SASL_PLAINTEXT
sasl.enabled.mechanisms=PLAIN
sasl.mechanism.inter.broker.protocol=PLAIN
authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer
allow.everyone.if.no.acl.found=true

[root@97e0b989e23a kafka_2.11-1.0.0]# cat kafka_server_jaas.conf 
KafkaServer {
  org.apache.kafka.common.security.plain.PlainLoginModule required
    username="admin"
    password="adminpasswd"
    user_admin="adminpasswd"
    user_producer="producerpwd"
    user_consumer="consumerpwd";
};

添加启动认证参数:

if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then
    #export KAFKA_HEAP_OPTS="-Xmx1G -Xms1G" # 默认配置,下面是添加配置
    export KAFKA_HEAP_OPTS="-Xmx1G -Xms1G -Djava.security.auth.login.config=/root/kafka_2.11-1.0.0/kafka_server_jaas.conf"
fi

kafka 3.x

​
#kafka 3文档
​
# 创建主题
bin/kafka-topics.sh --create --topic quickstart-events --bootstrap-server localhost:9092
​
​
# 显示主题详细信息
bin/kafka-topics.sh --describe --topic quickstart-events --bootstrap-server localhost:9092
​
​
# 将事件写入主题
# 运行控制台生产者客户端以将一些事件写入您的主题。默认情况下,您输入的每一行都会导致一个单独的事件被写入主题。
bin/kafka-console-producer.sh --topic quickstart-events --bootstrap-server localhost:9092
This is my first event
This is my second event
​
​
# 读取事件
bin/kafka-console-consumer.sh --topic quickstart-events --from-beginning --bootstrap-server localhost:9092
This is my first event
This is my second event
​

docker搭建zookeeper

 docker run --name zookeeper-2 --net host\
    -v zookeeper-2-conf:/usr/zookeeper/conf:rw,z   \
    -v zookeeper-2-data:/usr/zookeeper/data:rw,z   \
    -v zookeeper-2-logs:/usr/zookeeper/logs:rw,z   \
    -e ZOO_SERVER_ID=2    \
    -e ZOO_SERVERS="server.1=10.30.24.208:2888:3888;2181 server.2=10.30.24.212:2888:3888;2181 server.3=10.30.24.215:2888:3888;2181"  \
    --restart=always     -it -d docker.ronzxy.com/zookeeper:3.6.2 
​
​
​


0

评论区