Kafa MirrorMaker2 Dedicated模式


发布于 2024-12-12 / 4 阅读 / 0 评论 /
Kafka MirrorMaker2 Dedicated模式

Kafka MirrorMaker2是用于集群间数据同步的工具。

任务配置

基本的任务配置如下所示:

# Sample MirrorMaker 2.0 top-level configuration file
# Run with ./bin/connect-mirror-maker.sh connect-mirror-maker.properties 

# specify any number of cluster aliases
clusters = KAFKA1,KAFKA2

# connection information for each cluster
# This is a comma separated host:port pairs for each cluster
# for e.g. "A_host1:9092, A_host2:9092, A_host3:9092"
KAFKA1.bootstrap.servers = 172.0.1.101:9092, 172.0.1.102:9092, 172.0.1.103:9092
KAFKA1.security.protocol=SASL_PLAINTEXT
KAFKA1.sasl.mechanism=GSSAPI
KAFKA1.sasl.kerberos.service.name=kafka
KAFKA1.sasl.jaas.config=com.sun.security.auth.module.Krb5LoginModule required useKeyTab=true storeKey=true keyTab="/var/keytabs/kafka1.keytab" principal="kafka1/node1.com@KAFKA1TEST.COM";

KAFKA2.bootstrap.servers = 172.0.0.1:9092, 172.0.0.2:9092, 172.0.0.3:9092
KAFKA2.security.protocol=SASL_PLAINTEXT
KAFKA2.sasl.mechanism=GSSAPI
KAFKA2.sasl.kerberos.service.name=hadoop
KAFKA2.sasl.jaas.config=com.sun.security.auth.module.Krb5LoginModule required useKeyTab=true storeKey=true keyTab="/var/keytabs/kafka2.keytab" principal="kafka2/node1.com@KAFKA1TEST.COM";

# enable and configure individual replication flows
KAFKA1->KAFKA2.enabled = true
KAFKA1->KAFKA2.sync.group.offsets.enabled=true
# regex which defines which topics gets replicated. For eg "foo-.*"
KAFKA1->KAFKA2.topics = test.*
#KAFKA1->KAFKA2.topics = __consumer_offsets

KAFKA2->KAFKA1.enabled = false
KAFKA2->KAFKA1.topics = .*

# Setting replication factor of newly created remote topics
replication.factor=3

############################# Internal Topic Settings  #############################
# The replication factor for mm2 internal topics "heartbeats", "KAFKA2.checkpoints.internal" and
# "mm2-offset-syncs.KAFKA2.internal"
# For anything other than development testing, a value greater than 1 is recommended to ensure availability such as 3.
checkpoints.topic.replication.factor=3
heartbeats.topic.replication.factor=3
offset-syncs.topic.replication.factor=3

# The replication factor for connect internal topics "mm2-configs.KAFKA2.internal", "mm2-offsets.KAFKA2.internal" and
# "mm2-status.KAFKA2.internal"
# For anything other than development testing, a value greater than 1 is recommended to ensure availability such as 3.
offset.storage.replication.factor=3
status.storage.replication.factor=3
config.storage.replication.factor=3

# customize as needed
# replication.policy.separator = _
# sync.topic.acls.enabled = false
# emit.heartbeats.interval.seconds = 5
sync.group.offsets.enabled=true

MirrorMakerConfig配置信息

如下所示:

INFO MirrorMakerConfig values: 
	clusters = [KAFKA1, KAFKA2]
	config.providers = []
	sasl.client.callback.handler.class = null
	sasl.jaas.config = null
	sasl.kerberos.kinit.cmd = /usr/bin/kinit
	sasl.kerberos.min.time.before.relogin = 60000
	sasl.kerberos.service.name = null
	sasl.kerberos.ticket.renew.jitter = 0.05
	sasl.kerberos.ticket.renew.window.factor = 0.8
	sasl.login.callback.handler.class = null
	sasl.login.class = null
	sasl.login.refresh.buffer.seconds = 300
	sasl.login.refresh.min.period.seconds = 60
	sasl.login.refresh.window.factor = 0.8
	sasl.login.refresh.window.jitter = 0.05
	sasl.mechanism = GSSAPI
	sasl.server.principal.host.map = 
	security.protocol = PLAINTEXT
	ssl.cipher.suites = null
	ssl.enabled.protocols = [TLSv1.2, TLSv1.3]
	ssl.endpoint.identification.algorithm = https
	ssl.engine.factory.class = null
	ssl.key.password = null
	ssl.keymanager.algorithm = SunX509
	ssl.keystore.certificate.chain = null
	ssl.keystore.key = null
	ssl.keystore.location = null
	ssl.keystore.password = null
	ssl.keystore.type = JKS
	ssl.protocol = TLSv1.3
	ssl.provider = null
	ssl.secure.random.implementation = null
	ssl.trustmanager.algorithm = PKIX
	ssl.truststore.certificates = null
	ssl.truststore.location = null
	ssl.truststore.password = null
	ssl.truststore.type = JKS
 (org.apache.kafka.connect.mirror.MirrorMakerConfig:372)

主要是配置了clusters,也就是mirrormaker涉及的集群名称列表

DistributedConfig配置信息

如果使用MirrorMaker2的Dedicated模式,配置如下所示:

INFO DistributedConfig values: 
	access.control.allow.methods = 
	access.control.allow.origin = 
	admin.listeners = null
	bootstrap.servers = [172.0.0.1:9092, 172.0.0.2:9092, 172.0.0.3:9092]
	client.dns.lookup = use_all_dns_ips
	client.id = 
	config.providers = []
	config.storage.replication.factor = 3
	config.storage.topic = mm2-configs.KAFKA1.internal
	connect.protocol = sessioned
	connections.max.idle.ms = 540000
	connector.client.config.override.policy = None
	group.id = CDH-mm2
	header.converter = class org.apache.kafka.connect.converters.ByteArrayConverter
	heartbeat.interval.ms = 3000
	inter.worker.key.generation.algorithm = HmacSHA256
	inter.worker.key.size = null
	inter.worker.key.ttl.ms = 3600000
	inter.worker.signature.algorithm = HmacSHA256
	inter.worker.verification.algorithms = [HmacSHA256]
	internal.key.converter = class org.apache.kafka.connect.json.JsonConverter
	internal.value.converter = class org.apache.kafka.connect.json.JsonConverter
	key.converter = class org.apache.kafka.connect.converters.ByteArrayConverter
	listeners = null
	metadata.max.age.ms = 300000
	metric.reporters = []
	metrics.num.samples = 2
	metrics.recording.level = INFO
	metrics.sample.window.ms = 30000
	offset.flush.interval.ms = 60000
	offset.flush.timeout.ms = 5000
	offset.storage.partitions = 25
	offset.storage.replication.factor = 3
	offset.storage.topic = mm2-offsets.KAFKA1.internal
	plugin.path = null
	rebalance.timeout.ms = 60000
	receive.buffer.bytes = 32768
	reconnect.backoff.max.ms = 1000
	reconnect.backoff.ms = 50
	request.timeout.ms = 40000
	response.http.headers.config = 
	rest.advertised.host.name = null
	rest.advertised.listener = null
	rest.advertised.port = null
	rest.extension.classes = []
	rest.host.name = null
	rest.port = 8083
	retry.backoff.ms = 100
	sasl.client.callback.handler.class = null
	sasl.jaas.config = [hidden]
	sasl.kerberos.kinit.cmd = /usr/bin/kinit
	sasl.kerberos.min.time.before.relogin = 60000
	sasl.kerberos.service.name = hadoop
	sasl.kerberos.ticket.renew.jitter = 0.05
	sasl.kerberos.ticket.renew.window.factor = 0.8
	sasl.login.callback.handler.class = null
	sasl.login.class = null
	sasl.login.refresh.buffer.seconds = 300
	sasl.login.refresh.min.period.seconds = 60
	sasl.login.refresh.window.factor = 0.8
	sasl.login.refresh.window.jitter = 0.05
	sasl.mechanism = GSSAPI
	sasl.server.principal.host.map = 
	scheduled.rebalance.max.delay.ms = 300000
	security.protocol = SASL_PLAINTEXT
	send.buffer.bytes = 131072
	session.timeout.ms = 10000
	socket.connection.setup.timeout.max.ms = 30000
	socket.connection.setup.timeout.ms = 10000
	ssl.cipher.suites = null
	ssl.client.auth = none
	ssl.enabled.protocols = [TLSv1.2, TLSv1.3]
	ssl.endpoint.identification.algorithm = https
	ssl.engine.factory.class = null
	ssl.key.password = null
	ssl.keymanager.algorithm = SunX509
	ssl.keystore.certificate.chain = null
	ssl.keystore.key = null
	ssl.keystore.location = null
	ssl.keystore.password = null
	ssl.keystore.type = JKS
	ssl.protocol = TLSv1.3
	ssl.provider = null
	ssl.secure.random.implementation = null
	ssl.trustmanager.algorithm = PKIX
	ssl.truststore.certificates = null
	ssl.truststore.location = null
	ssl.truststore.password = null
	ssl.truststore.type = JKS
	status.storage.partitions = 5
	status.storage.replication.factor = 3
	status.storage.topic = mm2-status.KFAKF1.internal
	task.shutdown.graceful.timeout.ms = 5000
	topic.creation.enable = true
	topic.tracking.allow.reset = true
	topic.tracking.enable = true
	value.converter = class org.apache.kafka.connect.converters.ByteArrayConverter
	worker.sync.timeout.ms = 3000
	worker.unsync.backoff.ms = 300000
 (org.apache.kafka.connect.runtime.distributed.DistributedConfig:372)

这里是connect的配置信息

ProducerConfig配置信息

ProducerConfig配置了目的端的kafka连接信息

INFO ProducerConfig values: 
	acks = -1
	batch.size = 16384
	bootstrap.servers = [172.0.0.1:9092, 172.0.0.2:9092, 172.0.0.3:9092]
	buffer.memory = 33554432
	client.dns.lookup = use_all_dns_ips
	client.id = producer-2
	compression.type = none
	connections.max.idle.ms = 540000
	delivery.timeout.ms = 2147483647
	enable.idempotence = false
	interceptor.classes = []
	internal.auto.downgrade.txn.commit = false
	key.serializer = class org.apache.kafka.common.serialization.ByteArraySerializer
	linger.ms = 0
	max.block.ms = 60000
	max.in.flight.requests.per.connection = 1
	max.request.size = 1048576
	metadata.max.age.ms = 300000
	metadata.max.idle.ms = 300000
	metric.reporters = []
	metrics.num.samples = 2
	metrics.recording.level = INFO
	metrics.sample.window.ms = 30000
	partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner
	receive.buffer.bytes = 32768
	reconnect.backoff.max.ms = 1000
	reconnect.backoff.ms = 50
	request.timeout.ms = 30000
	retries = 2147483647
	retry.backoff.ms = 100
	sasl.client.callback.handler.class = null
	sasl.jaas.config = [hidden]
	sasl.kerberos.kinit.cmd = /usr/bin/kinit
	sasl.kerberos.min.time.before.relogin = 60000
	sasl.kerberos.service.name = hadoop
	sasl.kerberos.ticket.renew.jitter = 0.05
	sasl.kerberos.ticket.renew.window.factor = 0.8
	sasl.login.callback.handler.class = null
	sasl.login.class = null
	sasl.login.refresh.buffer.seconds = 300
	sasl.login.refresh.min.period.seconds = 60
	sasl.login.refresh.window.factor = 0.8
	sasl.login.refresh.window.jitter = 0.05
	sasl.mechanism = GSSAPI
	sasl.server.principal.host.map = 
	security.protocol = SASL_PLAINTEXT
	security.providers = null
	send.buffer.bytes = 131072
	socket.connection.setup.timeout.max.ms = 30000
	socket.connection.setup.timeout.ms = 10000
	ssl.cipher.suites = null
	ssl.enabled.protocols = [TLSv1.2, TLSv1.3]
	ssl.endpoint.identification.algorithm = https
	ssl.engine.factory.class = null
	ssl.key.password = null
	ssl.keymanager.algorithm = SunX509
	ssl.keystore.certificate.chain = null
	ssl.keystore.key = null
	ssl.keystore.location = null
	ssl.keystore.password = null
	ssl.keystore.type = JKS
	ssl.protocol = TLSv1.3
	ssl.provider = null
	ssl.secure.random.implementation = null
	ssl.trustmanager.algorithm = PKIX
	ssl.truststore.certificates = null
	ssl.truststore.location = null
	ssl.truststore.password = null
	ssl.truststore.type = JKS
	transaction.timeout.ms = 60000
	transactional.id = null
	value.serializer = class org.apache.kafka.common.serialization.ByteArraySerializer
 (org.apache.kafka.clients.producer.ProducerConfig:372)

Producer往目的端写数据

ConsumerConfig配置信息

ConsumerConfig配置了源端的kafka信息

INFO ConsumerConfig values: 
	allow.auto.create.topics = true
	auto.commit.interval.ms = 5000
	auto.offset.reset = earliest
	bootstrap.servers = [172.0.1.101:9092, 172.0.1.102:9092, 172.0.1.103:9092]
	check.crcs = true
	client.dns.lookup = use_all_dns_ips
	client.id = consumer-KAFKA2-mm2-1
	client.rack = 
	connections.max.idle.ms = 540000
	default.api.timeout.ms = 60000
	enable.auto.commit = false
	exclude.internal.topics = true
	fetch.max.bytes = 52428800
	fetch.max.wait.ms = 500
	fetch.min.bytes = 1
	group.id = TBDS-mm2
	group.instance.id = null
	heartbeat.interval.ms = 3000
	interceptor.classes = []
	internal.leave.group.on.close = true
	internal.throw.on.fetch.stable.offset.unsupported = false
	isolation.level = read_uncommitted
	key.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
	max.partition.fetch.bytes = 1048576
	max.poll.interval.ms = 300000
	max.poll.records = 500
	metadata.max.age.ms = 300000
	metric.reporters = []
	metrics.num.samples = 2
	metrics.recording.level = INFO
	metrics.sample.window.ms = 30000
	partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor]
	receive.buffer.bytes = 65536
	reconnect.backoff.max.ms = 1000
	reconnect.backoff.ms = 50
	request.timeout.ms = 30000
	retry.backoff.ms = 100
	sasl.client.callback.handler.class = null
	sasl.jaas.config = [hidden]
	sasl.kerberos.kinit.cmd = /usr/bin/kinit
	sasl.kerberos.min.time.before.relogin = 60000
	sasl.kerberos.service.name = kafka
	sasl.kerberos.ticket.renew.jitter = 0.05
	sasl.kerberos.ticket.renew.window.factor = 0.8
	sasl.login.callback.handler.class = null
	sasl.login.class = null
	sasl.login.refresh.buffer.seconds = 300
	sasl.login.refresh.min.period.seconds = 60
	sasl.login.refresh.window.factor = 0.8
	sasl.login.refresh.window.jitter = 0.05
	sasl.mechanism = GSSAPI
	sasl.server.principal.host.map = 
	security.protocol = SASL_PLAINTEXT
	security.providers = null
	send.buffer.bytes = 131072
	session.timeout.ms = 10000
	socket.connection.setup.timeout.max.ms = 30000
	socket.connection.setup.timeout.ms = 10000
	ssl.cipher.suites = null
	ssl.enabled.protocols = [TLSv1.2, TLSv1.3]
	ssl.endpoint.identification.algorithm = https
	ssl.engine.factory.class = null
	ssl.key.password = null
	ssl.keymanager.algorithm = SunX509
	ssl.keystore.certificate.chain = null
	ssl.keystore.key = null
	ssl.keystore.location = null
	ssl.keystore.password = null
	ssl.keystore.type = JKS
	ssl.protocol = TLSv1.3
	ssl.provider = null
	ssl.secure.random.implementation = null
	ssl.trustmanager.algorithm = PKIX
	ssl.truststore.certificates = null
	ssl.truststore.location = null
	ssl.truststore.password = null
	ssl.truststore.type = JKS
	value.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
 (org.apache.kafka.clients.consumer.ConsumerConfig:372)

通过Consumer从源端读取数据