kafka开启ssl加密通信
默认情况下,kafka间通信是明文的,容易被截获和篡改,可对kafka间的通信进行加密,即kafka对ssl的支持。
kafka本身支持ssl加密通信。
解决方案
通过以下步骤实现自签名的ssl证书配置kafka
第一步:创建CA
1.找一个kafka节点,执行以下命令,创建CA的认证证书。
mkdir -p /data/kafka/ssl-store;
cd /data/kafka/ssl-store;
openssl req -new -x509 -keyout ca-key -out ca-cert -days 3650第二步:制作客户端验证服务端的证书
cd /data/kafka/ssl-store;1.执行以下命令,生成server.keystore.jks证书的孵化器。
keytool -genkey -keystore server.keystore.jks -alias localhost -validity 3650 -keyalg EC -keysize 256 -dname "CN=kafka-client, OU=XXX, O=XXX, L=XXX, ST=XXX, C=XXX" -keypass 123456 -storepass 123456命令中“XXX”可自定义。
2.执行以下命令,从1中生成的server.keystore.jks中导出证书,并命名为“server.crt”。
keytool -keystore server.keystore.jks -alias localhost -certreq -file server.crt按照提示信息输入keystore密码。
3.执行以下命令,使用CA的私钥对server.crt进行签名,并为签名后的证书命名为“server-signed.crt”。
openssl x509 -req -CA ca-cert -CAkey ca-key -in server.crt -out server-signed.crt -days 3650 -CAcreateserial按照提示信息输入2中的PEM密码。
4.执行以下命令,将CA证书和“server-signed.crt”导入密钥仓库。
keytool -keystore server.keystore.jks -alias CARoot -import -file ca-cert
keytool -keystore server.keystore.jks -alias localhost -import -file server-signed.crt按照提示信息输入keystore密码。
5.执行以下命令,使客户端信任服务端证书。
keytool -keystore client.truststore.jks -alias CARoot -import -file ca-cert按照提示信息输入client.truststore.jks的密码。
6.将“client.truststore.jks”和“server.keystore.jks”证书推送到所有kafka节点的/data/kafka/ssl/目录。
第三步:制作服务端验证客户端的证书
cd /data/kafka/ssl-store;1.执行以下命令,生成client.keystore.jks证书的孵化器。
keytool -genkey -keystore client.keystore.jks -alias localhost -validity 3650 -keyalg EC -keysize 256 -dname "CN=kafka-server, OU=XXX, O=XXX, L=XXX, ST=XXX, C=XXX" -keypass 123456 -storepass 123456按照提示信息输入keystore密码。密码需要满足如下要求:
2.执行以下命令,从1中生成的client.keystore.jks中导出证书,并命名为“client.crt”。
keytool -keystore client.keystore.jks -alias localhost -certreq -file client.crt按照提示信息输入keystore密码。
3.执行以下命令,使用CA的私钥对client.crt进行签名,并为签名后的证书命名为“client-signed.crt”。
openssl x509 -req -CA ca-cert -CAkey ca-key -in client.crt -out client-signed.crt -days 3650 -CAcreateserial按照提示信息输入2中的PEM密码。
4.执行以下命令,将CA证书和“client-signed.crt”导入密钥仓库。
keytool -keystore client.keystore.jks -alias CARoot -import -file ca-cert
keytool -keystore client.keystore.jks -alias localhost -import -file client-signed.crt按照提示信息输入keystore密码。
5.执行以下命令,使服务端信任客户端证书。
keytool -keystore server.truststore.jks -alias CARoot -import -file ca-cert按照提示信息输入server.truststore.jks的密码。
6.将“server.truststore.jks”和“client.keystore.jks”证书推送到所有kafka节点的/data/kafka/ssl/目录。
第四步:kafka服务端配置文件server.properties
ssl.endpoint.identification.algorithm=
ssl.truststore.location=/data/kafka/ssl/server.truststore.jks
ssl.truststore.password=123456
ssl.keystore.location=/data/kafka/ssl/client.keystore.jks
ssl.keystore.password=123456
ssl.key.password=123456第五步:kafka客户端配置文件
ssl.endpoint.identification.algorithm=
ssl.keystore.location=/data/kafka/ssl/client.keystore.jks
ssl.keystore.password=123456
ssl.key.password=123456
ssl.truststore.location=/data/kafka/ssl/client.truststore.jks
ssl.truststore.password=123456无认证场景下ranger鉴权
正常情况下,如果一个集群只配置了无认证,则无需添加ranger鉴权,因为无认证表示并没有实际的用户,也就无法实现某个用户对某种资源的鉴权。
但是在多种认证场景的情况下,如果其中一种是无认证,且加了authorizer.class为ranger,则需要进行适配,否则无认证是无法使用的。
解决方案
无认证时,kafka会自动使用ANONYMOUS用户,如下所示:
public class PlaintextTransportLayer implements TransportLayer {
private final SelectionKey key;
private final SocketChannel socketChannel;
private final Principal principal = KafkaPrincipal.ANONYMOUS;
……
}调用authorize方法进行鉴权时,传入的用户即为ANONYMOUS用户。
为了ranger鉴权能通过,需要在ranger上为ANONYMOUS用户添加kafka资源的所有权限。
具体操作方法如下:
第一步:在ranger上添加ANONYMOUS用户组。
第二步:在ranger上添加ANONYMOUS用户,并绑定ANONYMOUS用户组。
第三步:在ranger上为ANONYMOUS用户添加对应的kafka service的所有资源的权限。
同时支持多个SASL_PLAINTEXT listeners
在sasl安全协议中,plain和gssapi是kafka中比较常用的认证机制,有些客户需要同时支持gssapi和plain。
在默认情况下,同一个kafka集群中,多个listeners之间安全协议必须不相同,否则会报以下错误:
Exiting Kafka due to fatal exception
java.lang.IllegalArgumentException: requirement failed: Each listener must have a different name, listeners: SASL_PLAINTEXT://10.0.1.101:9092,SASL_PLAINTEXT://10.0.1.101:9093解决方案
为实现此场景,需要在kafka的server.properties文件中添加以下配置
listener.security.protocol.map=SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_ANOTHER:SASL_PLAINTEXT
advertised.listeners=SASL_PLAINTEXT://10.0.1.101:9092,SASL_ANOTHER://10.0.1.101:9093
listeners=SASL_PLAINTEXT://10.0.1.101:9092,SASL_ANOTHER://10.0.1.101:9093
sail.enabled.mechanisms=GSSAPI,PLAIN
sasl.mechanism.inter.broker.protocol=GSSAPI
security.inter.broker.protocol=SASL_PLAINTEXT
sail.kerberos.service.name=kafka这里如果broker间使用plain认证,则security.inter.broker.protocol需配置为SASL_ANOTHER。
除了调整配置,broker启动时加载的jaas文件也需要调整,内容如下:
KafkaServer {
com.sun.security.auth.module.Krb5LoginModule required
useKeyTab=true
storeKey=true
keyTab="/etc/kafka/kafka.keytab"
principal="kafka/10.0.1.101@{kerberos realm}";
org.apache.kafka.common.security.plain.PlainLoginModule required
username="kafka"
password="1234567890";
};jaas文件通过分号分隔不同的login module。
zookeeper模式下如何删除一个topic
场景说明
场景分析
解决方案
kafka如何接收多个网卡的访问
在多网卡的服务器中,如何实现不同网卡都能访问kafka服务
解决方案
将broker的listeners中的hostname配置为0.0.0.0,实现对所有IPV4网卡的绑定。
分区元数据不一致场景如何快速恢复
由于分区元数据不一致,导致副本同步失败,生产和消费都失败。
解决方案
清理分区元数据,重启broker节点,重建分区元数据即可。
单块磁盘硬件损坏如何快速恢复
场景说明
场景分析
解决方案
多块磁盘硬件损坏如何快速恢复
场景说明
场景分析
解决方案
单个Broker硬件问题无法快速恢复如何故障转移
场景说明
场景分析
解决方案
多个Broker硬件问题无法快速恢复如何故障转移
由于硬件问题导致某个broker不可用
解决方案
分为以下场景
(1)磁盘硬件问题
首先停止该broker节点;然后更换该磁盘文件,拷贝元数据文件到新磁盘;最后启动broker即可。
(2)非磁盘硬件问题
这种最简单,直接停止该broker节点;然后更换对应的故障硬件;最后启动该节点即可。
或者可以通过将kafka数据目录拷贝到一个新建的broker节点。
单个Broker元数据文件不一致快速修复
由于broker元数据或者是分区元数据导致某个broker中的元数据与zookeeper或者其他分区副本不一致,最终topic或者broker不可用。
解决方案
分为以下不同的场景,分别处理
(1)如果是分区副本元数据不一致
首先停止对应的broker节点;然后清理分区元数据,包括leader-epoch-checkpoint和partition.metadata文件;最后重启该broker节点。
(2)如果是broker级别元数据不一致
首先停止该节点;然后将元数据进行修复,涉及手动改动元数据文件;最后重启broker节点。
单个分区副本重建
某个副本因磁盘损坏,导致无法同步数据,被踢出ISR。
解决方案
方案一:数据重分布
停止该节点的kafka;然后启动分区重分布,副本在其他节点上进行重建;最后重启该节点的kafka
方案二:直接重建
停止该节点的kafka;然后手动删除副本目录下的所有文件;最后重启该节点,此时应该会重新同步数据。
经验证,删除某个分区副本目录或者分区副本目录下的所有文件,重启该节点后,该分区副本会同步leader副本,达成重建的效果。
GSSAPI认证下如何修改sasl.kerberos.service.name
集群迁移场景中,比如国内如火如荼的国产化替代潮流,如果迁移前后两个集群的sasl.kerberos.service.name不一致,那迁移前后就需要调整kafka客户端的配置,如果kafka客户端任务比较多或者有任务不支持配置的调整(必须代码层调整),那调整的工作量会比较大,因此往往倾向于修改sasl.kerberos.service.name配置。
解决方案
在GSSAPI认证下,sasl.kerberos.service.name表示kafka服务端principal中的用户名,即kafka服务端用户名。
如果要调整sasl.kerberos.service.name,不仅涉及配置的调整,还涉及用户和权限的调整。操作方法如下:
第一步:新增用户,该用户名用于组装kafka服务端的principal
第二步:在ranger上为该新建的用户添加kafka所有资源的所有权限。
第三步:kafka服务端server.properties配置文件中修改sasl.kerberos.service.name配置。
第四步:kafka服务端jaas配置文件对应的principal修改。
第五步:重启kafka即可。
TopicId不一致场景修复
使用低版本(小于2.8.0)的kafka客户端向高版本(大于或等于2.8.0)的kafka集群,通过zookeeper模式进行topic创建和分区的扩容后,topic会因为topicId不一致导致不可用。
Kafka 在 2.8 版本引入了 topicId 的概念,topic 在创建时会 controller 会生成一个 topicId(默认生成策略为 uuid)并存储到 zookeeper中,一个 topic 唯一映射一个 topicId。
在 Kafka 2.8.0 之前,Kafka 主要依赖 Topic 名称 来标识一个 Topic。但这在某些管理操作(如使用 MirrorMaker 2 进行跨集群同步)时可能带来歧义,例如当两个同名的 Topic 内容完全不同时。
topicId 的引入解决了这个问题:
唯一性:全局唯一的标识符。
持久性:一旦分配,在 Topic 的整个生命周期内不会改变。
内部管理:Kafka 内部使用 topicId 来管理元数据,使其与易变的 Topic 名称解耦。
解决方案
清理topicId不一致的分区目录下的元数据文件,然后重启kafka即可。
第一步:删除文件leader-epoch-checkpoint和partition.metadata,命令如下所示(注意替换“kafka数据目录”):
find {kafka数据目录} -type f -name "partition.metadata" | grep -v -E 'consumer_offsets|transaction_state'| xargs rm
find {kafka数据目录} -type f -name "leader-epoch-checkpoint" | grep -v -E 'consumer_offsets|transaction_state'| xargs rm第二步:重启kafka集群中涉及分区元数据重建的节点。