我正在使用 Flume flume-ng-1.5.0 (带有 CDH 5.4) 从许多服务器收集日志并接收器到 HDFS 以下是我的配置:
#Define Source , Sinks, Channel
collector.sources = avro
collector.sinks = HadoopOut
collector.channels = fileChannel
# Define Scribe Interface
collector.sources.avro.type = avro
collector.sources.avro.bind = 0.0.0.0
collector.sources.avro.port = 1463
collector.sources.avro.threads = 5
collector.sources.avro.channels = fileChannel
collector.channels.fileChannel.type = file
collector.channels.fileChannel.checkpointDir = /channel/flume/collector/checkpoint
collector.channels.fileChannel.dataDirs = /channel/flume/collector/data
#collector.channels.fileChannel.transactionCapacity = 100000
#collector.channels.fileChannel.capacity = 1000000000
#Describe Haoop Out
collector.sinks.HadoopOut.type = hdfs
collector.sinks.HadoopOut.channel = fileChannel
collector.sinks.HadoopOut.hdfs.path = /logfarm/%{game_studio}/%{product_code}/%Y-%m-%d/%{category}
collector.sinks.HadoopOut.hdfs.filePrefix = %{category}-%Y-%m-%d
collector.sinks.HadoopOut.hdfs.inUseSuffix = _current
collector.sinks.HadoopOut.hdfs.fileType = DataStream
collector.sinks.HadoopOut.hdfs.writeFormat = Text
#Max File size = 10 MB
collector.sinks.HadoopOut.hdfs.round = true
collector.sinks.HadoopOut.hdfs.roundValue = 10
collector.sinks.HadoopOut.hdfs.roundUnit = minute
collector.sinks.HadoopOut.hdfs.rollSize = 10000000
collector.sinks.HadoopOut.hdfs.rollCount = 0
collector.sinks.HadoopOut.hdfs.rollInterval = 600
collector.sinks.HadoopOut.hdfs.maxOpenFiles = 4096
collector.sinks.HadoopOut.hdfs.timeZone = Asia/Saigon
collector.sinks.HadoopOut.hdfs.useLocalTimeStamp = true
collector.sinks.HadoopOut.hdfs.threadsPoolSize = 50
collector.sinks.HadoopOut.hdfs.batchSize = 10000
目录:/channel/flume/collector/checkpoint、/channel/flume/collector/data 为空白,归用户 flume 所有
但我有一个奇怪的例外:
2015-05-08 18:31:34,290 ERROR org.apache.flume.SinkRunner: Unable to deliver event. Exception follows.
java.lang.IllegalStateException: Channel closed [channel=fileChannel]. Due to java.io.EOFException: null
at org.apache.flume.channel.file.FileChannel.createTransaction(FileChannel.java:340)
at org.apache.flume.channel.BasicChannelSemantics.getTransaction(BasicChannelSemantics.java:122)
at org.apache.flume.sink.hdfs.HDFSEventSink.process(HDFSEventSink.java:368)
at org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:68)
at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.io.EOFException
at java.io.RandomAccessFile.readInt(RandomAccessFile.java:827)
at java.io.RandomAccessFile.readLong(RandomAccessFile.java:860)
at org.apache.flume.channel.file.EventQueueBackingStoreFactory.get(EventQueueBackingStoreFactory.java:80)
at org.apache.flume.channel.file.Log.replay(Log.java:426)
at org.apache.flume.channel.file.FileChannel.start(FileChannel.java:290)
at org.apache.flume.lifecycle.LifecycleSupervisor$MonitorRunnable.run(LifecycleSupervisor.java:251)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:304)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:178)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
... 1 more
我需要任何专家帮助我修复它。非常感谢
答案1
我遇到了与 Flume 通道相关的类似错误。当我删除/移动数据和检查点目录时,这个问题得到了修复。
就你的情况而言:
/通道/水槽/收集器/检查点,/通道/水槽/收集器/数据
确保目录“/channel/flume/collector/”是干净且空的。重新运行 flume 作业应该会创建“checkpoint”和“data”目录。
移动目录并将其保存到您喜欢的地方作为日志的未来参考始终是安全的。我在 CDH 5.4(flume 1.5)和 CDh 5.5(flume 1.6)上都成功做到了这一点。大多数与通道关闭有关的异常都应该通过这种方式得到修复。
我相信 apache.org 仍在处理此问题请检查以下事项:https://issues.apache.org/jira/browse/FLUME-2282
希望能帮助到你!!