zookeeper
zookeeper安装
安装jdk:
yum -y install wget net-tools java-1.8.0-openjdk java-1.8.0-openjdk-devel
安装zookeeper:
tar xf apache-zookeeper-3.6.1-bin.tar.gz
nohup bin/zookeeper-server-start.sh config/zookeeper.properties &> zk.log &
查看kafka信息:
bin/zookeeper-shell.sh localhost:2181 ls /
bin/zookeeper-shell.sh localhost:2181 get /
查看zookeeper服务状态:
for i in `seq 1 3` ;do echo $i ; ssh -p9922 zk0${i} "docker exec zk-${i} zkServer.sh status " ;done
kafka
kafka 2.x
启动服务器
Kafka 使用 ZooKeeper 如果你还没有ZooKeeper服务器,你需要先启动一个ZooKeeper服务器。 您可以通过与kafka打包在一起的便捷脚本来快速简单地创建一个单节点ZooKeeper实例。
> bin/zookeeper-server-start.sh config/zookeeper.properties
[2013-04-22 15:01:37,495] INFO Reading configuration from: config/zookeeper.properties (org.apache.zookeeper.server.quorum.QuorumPeerConfig)
...
启动Kafka服务器:
> bin/kafka-server-start.sh config/server.properties
[2013-04-22 15:01:47,028] INFO Verifying properties (kafka.utils.VerifiableProperties)
[2013-04-22 15:01:47,051] INFO Property socket.send.buffer.bytes is overridden to 1048576 (kafka.utils.VerifiableProperties)
...
创建topic
让我们创建一个名为“test”的topic,它有一个分区和一个副本:
> bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
查看topic:
> bin/kafka-topics.sh --list --zookeeper localhost:2181
test
生产消息(producer)
Kafka自带一个命令行客户端,它从文件或标准输入中获取输入,并将其作为message(消息)发送到Kafka集群。默认情况下,每行将作为单独的message发送。
运行 producer,然后在控制台输入一些消息以发送到服务器。
> bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
This is a message
This is another message
消费消息(consumer)
> bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning
This is a message
This is another message
kafka三节点集群搭建
复制多份配置文件:
cp config/server.properties config/server-1.properties
cp config/server.properties config/server-2.properties
编辑配置文件:
config/server-1.properties:
broker.id=1
listeners=PLAINTEXT://:9093
log.dir=/tmp/kafka-logs-1
config/server-2.properties:
broker.id=2
listeners=PLAINTEXT://:9094
log.dir=/tmp/kafka-logs-2
broker.id
属性是集群中每个节点的名称,这一名称是唯一且永久的。我们必须重写端口和日志目录,因为我们在同一台机器上运行这些,我们不希望所有的代理尝试在同一个端口注册,或者覆盖彼此的数据。
启动两个新的节点:
nohup bin/kafka-server-start.sh config/server-1.properties &> kafka-1.log &
nohup bin/kafka-server-start.sh config/server-2.properties &> kafka-2.log &
创建一个副本为3的新topic:
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 1 --topic my-replicated-topic
查看集群信息:
> bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic my-replicated-topic
Topic:my-replicated-topic PartitionCount:1 ReplicationFactor:3 Configs:
Topic: my-replicated-topic Partition: 0 Leader: 2 Replicas: 2,0,1 Isr: 2,0,1
以下是对输出信息的解释。第一行给出了所有分区的摘要,下面的每行都给出了一个分区的信息。因为我们只有一个分区,所以只有一行。
leader
是负责给定分区所有读写操作的节点。每个节点都是随机选择的部分分区的领导者。replicas
是复制分区日志的节点列表,不管这些节点是leader还是仅仅活着。isr
是一组“同步”replicas,是replicas列表的子集,它活着并被指到leader。
listeners和advertised配置
kafka配置listeners:
# The address the socket server listens on. It will get the value returned from
# java.net.InetAddress.getCanonicalHostName() if not configured.
# FORMAT:
# listeners = listener_name://host_name:port
# EXAMPLE:
# listeners = PLAINTEXT://your.host.name:9092
#listeners=PLAINTEXT://:9092
kafka配置advertised:
# Hostname and port the broker will advertise to producers and consumers. If not set,
# it uses the value for "listeners" if configured. Otherwise, it will use the value
# returned from java.net.InetAddress.getCanonicalHostName().
#advertised.listeners=PLAINTEXT://your.host.name:9092
listeners
主要用来定义Kafka Broker的Listener的配置项,是kafka服务bind端口的地址。
advertised.listeners
将Broker的Listener信息发布到Zookeeper中,暴露外网访问的地址,如果没有设置,会用listeners。
inter.broker.listener.name
:专门用于Kafka集群中Broker之间的通信。
listener.security.protocol.map
:配置监听者的安全协议,比如 PLAINTEXT
、SSL
、SASL_PLAINTEXT
、SASL_SSL
第一种情况:
默认 listeners
和 advertised.listeners
都不配置的情况下:
listeners=PLAINTEXT://hostname:port
advertised.listeners=PLAINTEXT://hostname:port
通过zookeeper查看kafka地址:
第二种情况:
只配置 listeners
不配置 advertised.listeners
。
listeners=PLAINTEXT://172.17.0.5:9092
#advertised.listeners=PLAINTEXT://192.168.100.200:9092
通过zookeeper查看kafka地址:
第三种情况:
listeners=PLAINTEXT://172.17.0.5:9092
advertised.listeners=PLAINTEXT://192.168.100.200:9092
内外网分流:
listener.security.protocol.map=EXTERNAL:PLAINTEXT,INTERNAL:PLAINTEXT
listeners=EXTERNAL://172.17.0.5:9092,INTERNAL://172.17.0.5:9093
inter.broker.listener.name=INTERNAL
#advertised.listeners=PLAINTEXT://192.168.100.200:9092
listener.security.protocol.map=EXTERNAL:PLAINTEXT,INTERNAL:PLAINTEXT
listeners=EXTERNAL://172.17.0.5:9092,INTERNAL://172.17.0.5:9093
inter.broker.listener.name=INTERNAL
advertised.listeners=EXTERNAL://192.168.100.200:9092,INTERNAL://172.17.0.5:9093
在kafka部署到kubernetes时候
内网访问可以使用:172.17.0.5:9093
外网访问使用:192.168.100.200:9092
配置参数解析
ZooKeeper 允许你增加一个“chroot”路径,将集群中所有kafka数据存放在特定的路径下。当多个Kafka集群或者其他应用使用相同ZooKeeper集群时,可以使用这个方式设置数据存放路径。这种方式的实现可以通过这样设置连接字符串格式,如下所示: hostname1:port1,hostname2:port2,hostname3:port3/chroot/path。
这样设置就将所有kafka集群数据存放在/chroot/path路径下。注意,在你启动broker之前,你必须创建这个路径,并且consumers必须使用相同的连接格式。
配置密码
Only one of inter.broker.listener.name and security.inter.broker.protocol should be set。
inter.broker.listener.name
和 security.inter.broker.protocol
只能配置一个。
# listener.security.protocol.map 安全协议,必须设置
# 默认值: PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL
listener.security.protocol.map=EXTERNAL:SASL_PLAINTEXT,INTERNAL:PLAINTEXT
#listener.security.protocol.map=EXTERNAL:SASL_PLAINTEXT,INTERNAL:SASL_PLAINTEXT
listeners=EXTERNAL://172.17.0.5:9092,INTERNAL://172.17.0.5:9093
inter.broker.listener.name=INTERNAL
advertised.listeners=EXTERNAL://192.168.100.200:9092,INTERNAL://172.17.0.5:9093
#security.inter.broker.protocol=SASL_PLAINTEXT
sasl.enabled.mechanisms=PLAIN
sasl.mechanism.inter.broker.protocol=PLAIN
authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer
allow.everyone.if.no.acl.found=true
[root@97e0b989e23a kafka_2.11-1.0.0]# cat kafka_server_jaas.conf
KafkaServer {
org.apache.kafka.common.security.plain.PlainLoginModule required
username="admin"
password="adminpasswd"
user_admin="adminpasswd"
user_producer="producerpwd"
user_consumer="consumerpwd";
};
添加启动认证参数:
if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then
#export KAFKA_HEAP_OPTS="-Xmx1G -Xms1G" # 默认配置,下面是添加配置
export KAFKA_HEAP_OPTS="-Xmx1G -Xms1G -Djava.security.auth.login.config=/root/kafka_2.11-1.0.0/kafka_server_jaas.conf"
fi
kafka 3.x
#kafka 3文档
# 创建主题
bin/kafka-topics.sh --create --topic quickstart-events --bootstrap-server localhost:9092
# 显示主题详细信息
bin/kafka-topics.sh --describe --topic quickstart-events --bootstrap-server localhost:9092
# 将事件写入主题
# 运行控制台生产者客户端以将一些事件写入您的主题。默认情况下,您输入的每一行都会导致一个单独的事件被写入主题。
bin/kafka-console-producer.sh --topic quickstart-events --bootstrap-server localhost:9092
This is my first event
This is my second event
# 读取事件
bin/kafka-console-consumer.sh --topic quickstart-events --from-beginning --bootstrap-server localhost:9092
This is my first event
This is my second event
docker搭建zookeeper
docker run --name zookeeper-2 --net host\
-v zookeeper-2-conf:/usr/zookeeper/conf:rw,z \
-v zookeeper-2-data:/usr/zookeeper/data:rw,z \
-v zookeeper-2-logs:/usr/zookeeper/logs:rw,z \
-e ZOO_SERVER_ID=2 \
-e ZOO_SERVERS="server.1=10.30.24.208:2888:3888;2181 server.2=10.30.24.212:2888:3888;2181 server.3=10.30.24.215:2888:3888;2181" \
--restart=always -it -d docker.ronzxy.com/zookeeper:3.6.2
评论区