Kafka MirrorMaker2是用于集群间数据同步的工具。
任务配置
基本的任务配置如下所示:
# Sample MirrorMaker 2.0 top-level configuration file
# Run with ./bin/connect-mirror-maker.sh connect-mirror-maker.properties
# specify any number of cluster aliases
clusters = KAFKA1,KAFKA2
# connection information for each cluster
# This is a comma separated host:port pairs for each cluster
# for e.g. "A_host1:9092, A_host2:9092, A_host3:9092"
KAFKA1.bootstrap.servers = 172.0.1.101:9092, 172.0.1.102:9092, 172.0.1.103:9092
KAFKA1.security.protocol=SASL_PLAINTEXT
KAFKA1.sasl.mechanism=GSSAPI
KAFKA1.sasl.kerberos.service.name=kafka
KAFKA1.sasl.jaas.config=com.sun.security.auth.module.Krb5LoginModule required useKeyTab=true storeKey=true keyTab="/var/keytabs/kafka1.keytab" principal="kafka1/node1.com@KAFKA1TEST.COM";
KAFKA2.bootstrap.servers = 172.0.0.1:9092, 172.0.0.2:9092, 172.0.0.3:9092
KAFKA2.security.protocol=SASL_PLAINTEXT
KAFKA2.sasl.mechanism=GSSAPI
KAFKA2.sasl.kerberos.service.name=hadoop
KAFKA2.sasl.jaas.config=com.sun.security.auth.module.Krb5LoginModule required useKeyTab=true storeKey=true keyTab="/var/keytabs/kafka2.keytab" principal="kafka2/node1.com@KAFKA1TEST.COM";
# enable and configure individual replication flows
KAFKA1->KAFKA2.enabled = true
KAFKA1->KAFKA2.sync.group.offsets.enabled=true
# regex which defines which topics gets replicated. For eg "foo-.*"
KAFKA1->KAFKA2.topics = test.*
#KAFKA1->KAFKA2.topics = __consumer_offsets
KAFKA2->KAFKA1.enabled = false
KAFKA2->KAFKA1.topics = .*
# Setting replication factor of newly created remote topics
replication.factor=3
############################# Internal Topic Settings #############################
# The replication factor for mm2 internal topics "heartbeats", "KAFKA2.checkpoints.internal" and
# "mm2-offset-syncs.KAFKA2.internal"
# For anything other than development testing, a value greater than 1 is recommended to ensure availability such as 3.
checkpoints.topic.replication.factor=3
heartbeats.topic.replication.factor=3
offset-syncs.topic.replication.factor=3
# The replication factor for connect internal topics "mm2-configs.KAFKA2.internal", "mm2-offsets.KAFKA2.internal" and
# "mm2-status.KAFKA2.internal"
# For anything other than development testing, a value greater than 1 is recommended to ensure availability such as 3.
offset.storage.replication.factor=3
status.storage.replication.factor=3
config.storage.replication.factor=3
# customize as needed
# replication.policy.separator = _
# sync.topic.acls.enabled = false
# emit.heartbeats.interval.seconds = 5
sync.group.offsets.enabled=true
MirrorMakerConfig配置信息
如下所示:
INFO MirrorMakerConfig values:
clusters = [KAFKA1, KAFKA2]
config.providers = []
sasl.client.callback.handler.class = null
sasl.jaas.config = null
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.min.time.before.relogin = 60000
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
sasl.kerberos.ticket.renew.window.factor = 0.8
sasl.login.callback.handler.class = null
sasl.login.class = null
sasl.login.refresh.buffer.seconds = 300
sasl.login.refresh.min.period.seconds = 60
sasl.login.refresh.window.factor = 0.8
sasl.login.refresh.window.jitter = 0.05
sasl.mechanism = GSSAPI
sasl.server.principal.host.map =
security.protocol = PLAINTEXT
ssl.cipher.suites = null
ssl.enabled.protocols = [TLSv1.2, TLSv1.3]
ssl.endpoint.identification.algorithm = https
ssl.engine.factory.class = null
ssl.key.password = null
ssl.keymanager.algorithm = SunX509
ssl.keystore.certificate.chain = null
ssl.keystore.key = null
ssl.keystore.location = null
ssl.keystore.password = null
ssl.keystore.type = JKS
ssl.protocol = TLSv1.3
ssl.provider = null
ssl.secure.random.implementation = null
ssl.trustmanager.algorithm = PKIX
ssl.truststore.certificates = null
ssl.truststore.location = null
ssl.truststore.password = null
ssl.truststore.type = JKS
(org.apache.kafka.connect.mirror.MirrorMakerConfig:372)
主要是配置了clusters,也就是mirrormaker涉及的集群名称列表
DistributedConfig配置信息
如果使用MirrorMaker2的Dedicated模式,配置如下所示:
INFO DistributedConfig values:
access.control.allow.methods =
access.control.allow.origin =
admin.listeners = null
bootstrap.servers = [172.0.0.1:9092, 172.0.0.2:9092, 172.0.0.3:9092]
client.dns.lookup = use_all_dns_ips
client.id =
config.providers = []
config.storage.replication.factor = 3
config.storage.topic = mm2-configs.KAFKA1.internal
connect.protocol = sessioned
connections.max.idle.ms = 540000
connector.client.config.override.policy = None
group.id = CDH-mm2
header.converter = class org.apache.kafka.connect.converters.ByteArrayConverter
heartbeat.interval.ms = 3000
inter.worker.key.generation.algorithm = HmacSHA256
inter.worker.key.size = null
inter.worker.key.ttl.ms = 3600000
inter.worker.signature.algorithm = HmacSHA256
inter.worker.verification.algorithms = [HmacSHA256]
internal.key.converter = class org.apache.kafka.connect.json.JsonConverter
internal.value.converter = class org.apache.kafka.connect.json.JsonConverter
key.converter = class org.apache.kafka.connect.converters.ByteArrayConverter
listeners = null
metadata.max.age.ms = 300000
metric.reporters = []
metrics.num.samples = 2
metrics.recording.level = INFO
metrics.sample.window.ms = 30000
offset.flush.interval.ms = 60000
offset.flush.timeout.ms = 5000
offset.storage.partitions = 25
offset.storage.replication.factor = 3
offset.storage.topic = mm2-offsets.KAFKA1.internal
plugin.path = null
rebalance.timeout.ms = 60000
receive.buffer.bytes = 32768
reconnect.backoff.max.ms = 1000
reconnect.backoff.ms = 50
request.timeout.ms = 40000
response.http.headers.config =
rest.advertised.host.name = null
rest.advertised.listener = null
rest.advertised.port = null
rest.extension.classes = []
rest.host.name = null
rest.port = 8083
retry.backoff.ms = 100
sasl.client.callback.handler.class = null
sasl.jaas.config = [hidden]
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.min.time.before.relogin = 60000
sasl.kerberos.service.name = hadoop
sasl.kerberos.ticket.renew.jitter = 0.05
sasl.kerberos.ticket.renew.window.factor = 0.8
sasl.login.callback.handler.class = null
sasl.login.class = null
sasl.login.refresh.buffer.seconds = 300
sasl.login.refresh.min.period.seconds = 60
sasl.login.refresh.window.factor = 0.8
sasl.login.refresh.window.jitter = 0.05
sasl.mechanism = GSSAPI
sasl.server.principal.host.map =
scheduled.rebalance.max.delay.ms = 300000
security.protocol = SASL_PLAINTEXT
send.buffer.bytes = 131072
session.timeout.ms = 10000
socket.connection.setup.timeout.max.ms = 30000
socket.connection.setup.timeout.ms = 10000
ssl.cipher.suites = null
ssl.client.auth = none
ssl.enabled.protocols = [TLSv1.2, TLSv1.3]
ssl.endpoint.identification.algorithm = https
ssl.engine.factory.class = null
ssl.key.password = null
ssl.keymanager.algorithm = SunX509
ssl.keystore.certificate.chain = null
ssl.keystore.key = null
ssl.keystore.location = null
ssl.keystore.password = null
ssl.keystore.type = JKS
ssl.protocol = TLSv1.3
ssl.provider = null
ssl.secure.random.implementation = null
ssl.trustmanager.algorithm = PKIX
ssl.truststore.certificates = null
ssl.truststore.location = null
ssl.truststore.password = null
ssl.truststore.type = JKS
status.storage.partitions = 5
status.storage.replication.factor = 3
status.storage.topic = mm2-status.KFAKF1.internal
task.shutdown.graceful.timeout.ms = 5000
topic.creation.enable = true
topic.tracking.allow.reset = true
topic.tracking.enable = true
value.converter = class org.apache.kafka.connect.converters.ByteArrayConverter
worker.sync.timeout.ms = 3000
worker.unsync.backoff.ms = 300000
(org.apache.kafka.connect.runtime.distributed.DistributedConfig:372)
这里是connect的配置信息
ProducerConfig配置信息
ProducerConfig配置了目的端的kafka连接信息
INFO ProducerConfig values:
acks = -1
batch.size = 16384
bootstrap.servers = [172.0.0.1:9092, 172.0.0.2:9092, 172.0.0.3:9092]
buffer.memory = 33554432
client.dns.lookup = use_all_dns_ips
client.id = producer-2
compression.type = none
connections.max.idle.ms = 540000
delivery.timeout.ms = 2147483647
enable.idempotence = false
interceptor.classes = []
internal.auto.downgrade.txn.commit = false
key.serializer = class org.apache.kafka.common.serialization.ByteArraySerializer
linger.ms = 0
max.block.ms = 60000
max.in.flight.requests.per.connection = 1
max.request.size = 1048576
metadata.max.age.ms = 300000
metadata.max.idle.ms = 300000
metric.reporters = []
metrics.num.samples = 2
metrics.recording.level = INFO
metrics.sample.window.ms = 30000
partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner
receive.buffer.bytes = 32768
reconnect.backoff.max.ms = 1000
reconnect.backoff.ms = 50
request.timeout.ms = 30000
retries = 2147483647
retry.backoff.ms = 100
sasl.client.callback.handler.class = null
sasl.jaas.config = [hidden]
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.min.time.before.relogin = 60000
sasl.kerberos.service.name = hadoop
sasl.kerberos.ticket.renew.jitter = 0.05
sasl.kerberos.ticket.renew.window.factor = 0.8
sasl.login.callback.handler.class = null
sasl.login.class = null
sasl.login.refresh.buffer.seconds = 300
sasl.login.refresh.min.period.seconds = 60
sasl.login.refresh.window.factor = 0.8
sasl.login.refresh.window.jitter = 0.05
sasl.mechanism = GSSAPI
sasl.server.principal.host.map =
security.protocol = SASL_PLAINTEXT
security.providers = null
send.buffer.bytes = 131072
socket.connection.setup.timeout.max.ms = 30000
socket.connection.setup.timeout.ms = 10000
ssl.cipher.suites = null
ssl.enabled.protocols = [TLSv1.2, TLSv1.3]
ssl.endpoint.identification.algorithm = https
ssl.engine.factory.class = null
ssl.key.password = null
ssl.keymanager.algorithm = SunX509
ssl.keystore.certificate.chain = null
ssl.keystore.key = null
ssl.keystore.location = null
ssl.keystore.password = null
ssl.keystore.type = JKS
ssl.protocol = TLSv1.3
ssl.provider = null
ssl.secure.random.implementation = null
ssl.trustmanager.algorithm = PKIX
ssl.truststore.certificates = null
ssl.truststore.location = null
ssl.truststore.password = null
ssl.truststore.type = JKS
transaction.timeout.ms = 60000
transactional.id = null
value.serializer = class org.apache.kafka.common.serialization.ByteArraySerializer
(org.apache.kafka.clients.producer.ProducerConfig:372)
Producer往目的端写数据
ConsumerConfig配置信息
ConsumerConfig配置了源端的kafka信息
INFO ConsumerConfig values:
allow.auto.create.topics = true
auto.commit.interval.ms = 5000
auto.offset.reset = earliest
bootstrap.servers = [172.0.1.101:9092, 172.0.1.102:9092, 172.0.1.103:9092]
check.crcs = true
client.dns.lookup = use_all_dns_ips
client.id = consumer-KAFKA2-mm2-1
client.rack =
connections.max.idle.ms = 540000
default.api.timeout.ms = 60000
enable.auto.commit = false
exclude.internal.topics = true
fetch.max.bytes = 52428800
fetch.max.wait.ms = 500
fetch.min.bytes = 1
group.id = TBDS-mm2
group.instance.id = null
heartbeat.interval.ms = 3000
interceptor.classes = []
internal.leave.group.on.close = true
internal.throw.on.fetch.stable.offset.unsupported = false
isolation.level = read_uncommitted
key.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
max.partition.fetch.bytes = 1048576
max.poll.interval.ms = 300000
max.poll.records = 500
metadata.max.age.ms = 300000
metric.reporters = []
metrics.num.samples = 2
metrics.recording.level = INFO
metrics.sample.window.ms = 30000
partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor]
receive.buffer.bytes = 65536
reconnect.backoff.max.ms = 1000
reconnect.backoff.ms = 50
request.timeout.ms = 30000
retry.backoff.ms = 100
sasl.client.callback.handler.class = null
sasl.jaas.config = [hidden]
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.min.time.before.relogin = 60000
sasl.kerberos.service.name = kafka
sasl.kerberos.ticket.renew.jitter = 0.05
sasl.kerberos.ticket.renew.window.factor = 0.8
sasl.login.callback.handler.class = null
sasl.login.class = null
sasl.login.refresh.buffer.seconds = 300
sasl.login.refresh.min.period.seconds = 60
sasl.login.refresh.window.factor = 0.8
sasl.login.refresh.window.jitter = 0.05
sasl.mechanism = GSSAPI
sasl.server.principal.host.map =
security.protocol = SASL_PLAINTEXT
security.providers = null
send.buffer.bytes = 131072
session.timeout.ms = 10000
socket.connection.setup.timeout.max.ms = 30000
socket.connection.setup.timeout.ms = 10000
ssl.cipher.suites = null
ssl.enabled.protocols = [TLSv1.2, TLSv1.3]
ssl.endpoint.identification.algorithm = https
ssl.engine.factory.class = null
ssl.key.password = null
ssl.keymanager.algorithm = SunX509
ssl.keystore.certificate.chain = null
ssl.keystore.key = null
ssl.keystore.location = null
ssl.keystore.password = null
ssl.keystore.type = JKS
ssl.protocol = TLSv1.3
ssl.provider = null
ssl.secure.random.implementation = null
ssl.trustmanager.algorithm = PKIX
ssl.truststore.certificates = null
ssl.truststore.location = null
ssl.truststore.password = null
ssl.truststore.type = JKS
value.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
(org.apache.kafka.clients.consumer.ConsumerConfig:372)
通过Consumer从源端读取数据