1.Flume传输系统配置
官方文档给出的配置案例如下所示:
# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# Describe/configure the source
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444
# Describe the sink
a1.sinks.k1.type = logger
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
共分为以下五个部分。
1.1.配置Agent
主要涉及配置agent名称、source名称、channel名称、以及sink名称。配置格式为:
{agent_name}.sources = {source_name_list}
{agent_name}.sinks = {sink_name_list}
{agent_name}.channels = {channel_name_list}
1.2.配置Source
配置每个Source的属性,配置格式为:
{agent_name}.sources.{source_name}.{attribute_key} = {attribute_value}
其中attribute_key表示属性名,必配的属性为“type”,配置好type后,根据type的类型,去官方文档中找该类source需要配置的其他属性。不同类型的source需要配置的属性也不一致。官方文档地址例如https://flume.apache.org/releases/1.9.0/FlumeUserGuide.html#flume-sources。
2.Flume服务安装
从官方网站下载tar.gz包,拷贝conf/flume-env.sh.template文件为conf/flume-env.sh,并修改JVM_HOME配置即可。
3.启动flume agent进程
当配置号了flume传输系统后,可以启动flume服务,命令例如:
bin/flume-ng agent --conf conf --conf-file example.conf --name a1 -Dflume.root.logger=INFO,console
或
bin/flume-ng agent -n $agent_name -c conf -f conf/flume-conf.properties.template
其中,conf为配置目录;conf-file为配置文件,那么表示agent的名称,-D表示系统参数。
4.停止flume agent进程
flume没有提供停止agent进程的服务,我们需要使用kill去杀进程。
但需要注意的是,直接使用“kill {agent_pid}”即可,千万不要使用“kill -9 {agent_pid}”。因为“kill -9 {agent_pid}”是强制停止命令,无法flume有大量的hook函数无法执行,可能导致数据不一致。而“kill {agent_pid}”允许flume agent执行完hook函数再停止agent进程,不会影响数据一致性。
5.自定义Interceptor
编写代码,引入依赖flume-ng-core,实现接口org.apache.flume.interceptor.Interceptor,并在当前类中新增Builder公共静态内部类,此静态类实现Interceptor.Builder接口。然后打包,并上传到flume的lib目录下,在flume任务中配置相应的拦截器即可使用。
6.自定义Source
自定义Source类需要继承AbstractSource类,并实现Configurable和PollableSource接口。以满足实际开发中的个性化需求。
7.自定义Sink
自定义Sink类需要继承AbstractSink类,并实现Configurable接口。基本逻辑实现在process方法中。具体信息可参考官方文档(https://flume.apache.org/releases/1.9.0/FlumeDeveloperGuide.html)