知乎专栏 |
http://flume.apache.org/
Flume is a distributed, reliable, and available service for efficiently collecting, aggregating, and moving large amounts of log data. It has a simple and flexible architecture based on streaming data flows. It is robust and fault tolerant with tunable reliability mechanisms and many failover and recovery mechanisms. It uses a simple extensible data model that allows for online analytic application.
cd /usr/local/src wget http://mirrors.tuna.tsinghua.edu.cn/apache/flume/1.7.0/apache-flume-1.7.0-bin.tar.gz tar zvf apache-flume-1.7.0-bin.tar.gz mv apache-flume-1.7.0-bin /srv/apache-flume-1.7.0 ln -s /srv/apache-flume-1.7.0 /srv/apache-flume cp /srv/apache-flume/conf/flume-env.sh.template /srv/apache-flume/conf/flume-env.sh cp /srv/apache-flume/conf/flume-conf.properties.template /srv/apache-flume/conf/flume-conf.properties
# Define a memory channel called ch1 on agent1 agent1.channels.ch1.type = memory # Define an Avro source called avro-source1 on agent1 and tell it # to bind to 0.0.0.0:41414. Connect it to channel ch1. agent1.sources.avro-source1.channels = ch1 agent1.sources.avro-source1.type = avro agent1.sources.avro-source1.bind = 0.0.0.0 agent1.sources.avro-source1.port = 41414 # Define a logger sink that simply logs all events it receives # and connect it to the other end of the same channel. agent1.sinks.log-sink1.channel = ch1 agent1.sinks.log-sink1.type = logger # Finally, now that we've defined all of our components, tell # agent1 which ones we want to activate. agent1.channels = ch1 agent1.sources = avro-source1 agent1.sinks = log-sink1
在agent的机器上执行以下命令启动flume server
$ bin/flume-ng agent --conf ./conf/ -f conf/flume.conf -Dflume.root.logger=DEBUG,console -n agent1
在client的机器上执行以下命令接收日志
$ bin/flume-ng avro-client --conf conf -H localhost -p 41414 -F /etc/passwd -Dflume.root.logger=DEBUG,console
cp flume-mysql-sink-1.x.x.jar /srv/apache-flume/lib cp /usr/share/java/mysql-connector-java.jar /srv/apache-flume/lib
DROP TABLE IF EXISTS flume; CREATE TABLE flume ( ROW_KEY BIGINT, timeid BIGINT, systemid INT, functionid INT, bussinessid TEXT, bussinessType INT, nodeid INT, userid INT, logtype INT, timeout INT, detail TEXT, PRIMARY KEY (ROW_KEY) ) ENGINE=INNODB DEFAULT CHARSET=utf8;
a1.sources = source1 a1.sinks = sink1 a1.channels = channel1 # Describe/configure source1 a1.sources.source1.type = avro a1.sources.source1.bind = 0.0.0.0 a1.sources.source1.port = 44444 # Use a channel which buffers events in memory a1.channels.channel1.type = memory a1.channels.channel1.capacity = 1000 a1.channels.channel1.transactionCapactiy = 100 # Bind the source and sink to the channel a1.sources.source1.channels = channel1 a1.sinks.sink1.channel = channel1 a1.sinks.sink1.type=org.flume.mysql.sink.RegexMysqlSink a1.sinks.sink1.hostname=192.168.10.94 a1.sinks.sink1.databaseName=logging a1.sinks.sink1.port=3306 a1.sinks.sink1.user=flume a1.sinks.sink1.password=flume a1.sinks.sink1.regex=^([^,]+),([^,]+),([^,]+),([^,]+),([^,]+),([^,]+),([^,]+),([^,]+),([^,]+),([^,]+),([^,]+)$ a1.sinks.sink1.tableName=flume a1.sinks.sink1.colNames=ROW_KEY,timeid,systemid,functionid,bussinessid,bussinessType,nodeid,userid,logtype,timeout,detail a1.sinks.sink1.colDataTypes=LONG,LONG,INT,INT,TEXT,INT,INT,INT,INT,INT,TEXT a1.sinks.sink1.batchSize=100
启动
[root@netkiller]/srv/apache-flume# bin/flume-ng agent --conf conf --conf-file conf/flume-conf.properties --name a1 -Dflume.root.logger=INFO,console
配置conf/flume.conf # Define a memory channel called ch1 on agent1 agent1.channels.ch1.type = memory # Define an Avro source called avro-source1 on agent1 and tell it # to bind to 0.0.0.0:41414. Connect it to channel ch1. agent1.sources.spooldir-source1.channels = ch1 agent1.sources.spooldir-source1.type = spooldir agent1.sources.spooldir-source1.spoolDir=/opt/hadoop/flume/tmpData agent1.sources.spooldir-source1.bind = 0.0.0.0 agent1.sources.spooldir-source1.port = 41414 # Define a logger sink that simply logs all events it receives # and connect it to the other end of the same channel. agent1.sinks.hdfs-sink1.channel = ch1 agent1.sinks.hdfs-sink1.type = hdfs agent1.sinks.hdfs-sink1.hdfs.path = hdfs://master:9000/flume agent1.sinks.hdfs-sink1.hdfs.filePrefix = events- agent1.sinks.hdfs-sink1.hdfs.useLocalTimeStamp = true agent1.sinks.hdfs-sink1.hdfs.round = true agent1.sinks.hdfs-sink1.hdfs.roundValue = 10 # Finally, now that we've defined all of our components, tell # agent1 which ones we want to activate. agent1.channels = ch1 agent1.sources = spooldir-source1 agent1.sinks = hdfs-sink1
启动agent
bin/flume-ng agent --conf ./conf/ -f ./conf/flume.conf --name agent1 -Dflume.root.logger=DEBUG,console
查看结果
到Hadoop提供的WEB GUI界面可以看到刚刚上传的文件是否成功。GUI界面地址为:http://master:50070/explorer.html#/test 其中,master为Hadoop的Namenode所在的机器名。