RABBITMQ_02 - 无法确认偏移错误

RABBITMQ_02 - 无法确认偏移错误

我有一个从 RabbitMQ 读取并将其加载到 Kinesis 流的管道,但出现以下错误:

知道可能是什么原因吗?

谢谢

UNKNOWN java.lang.RuntimeException: com.streamsets.pipeline.api.StageException: RABBITMQ_02 - Failed to acknowledge offset '2945': java.net.SocketException: Connection reset
  at com.streamsets.datacollector.runner.production.ProductionSourceOffsetCommitterOffsetTracker.commitOffset(ProductionSourceOffsetCommitterOffsetTracker.java:80)
  at com.streamsets.datacollector.execution.runner.common.ProductionPipelineRunner.executeRunner(ProductionPipelineRunner.java:857)
  at com.streamsets.datacollector.execution.runner.common.ProductionPipelineRunner.runSourceLessBatch(ProductionPipelineRunner.java:823)
  at com.streamsets.datacollector.execution.runner.common.ProductionPipelineRunner.runPollSource(ProductionPipelineRunner.java:563)
  at com.streamsets.datacollector.execution.runner.common.ProductionPipelineRunner.run(ProductionPipelineRunner.java:383)
  at com.streamsets.datacollector.runner.Pipeline.run(Pipeline.java:512)
  at com.streamsets.datacollector.execution.runner.common.ProductionPipeline.run(ProductionPipeline.java:112)
  at com.streamsets.datacollector.execution.runner.common.ProductionPipelineRunnable.run(ProductionPipelineRunnable.java:74)
  at com.streamsets.datacollector.execution.runner.standalone.StandaloneRunner.start(StandaloneRunner.java:756)
  at com.streamsets.datacollector.execution.runner.common.AsyncRunner.lambda$start$3(AsyncRunner.java:152)
  at com.streamsets.pipeline.lib.executor.SafeScheduledExecutorService$SafeCallable.lambda$call$0(SafeScheduledExecutorService.java:227)
  at com.streamsets.datacollector.security.GroupsInScope.execute(GroupsInScope.java:33)
  at com.streamsets.pipeline.lib.executor.SafeScheduledExecutorService$SafeCallable.call(SafeScheduledExecutorService.java:223)
  at com.streamsets.pipeline.lib.executor.SafeScheduledExecutorService$SafeCallable.lambda$call$0(SafeScheduledExecutorService.java:227)
  at com.streamsets.datacollector.security.GroupsInScope.execute(GroupsInScope.java:33)
  at com.streamsets.pipeline.lib.executor.SafeScheduledExecutorService$SafeCallable.call(SafeScheduledExecutorService.java:223)
  at java.util.concurrent.FutureTask.run(FutureTask.java:266)
  at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
  at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
  at com.streamsets.datacollector.metrics.MetricSafeScheduledExecutorService$MetricsTask.run(MetricSafeScheduledExecutorService.java:100)
  at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
  at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
  at java.lang.Thread.run(Thread.java:748)
Caused by: com.streamsets.pipeline.api.StageException: RABBITMQ_02 - Failed to acknowledge offset '2945': java.net.SocketException: Connection reset
  at com.streamsets.pipeline.stage.origin.rabbitmq.RabbitSource.commit(RabbitSource.java:187)
  at com.streamsets.pipeline.configurablestage.DSourceOffsetCommitter.commit(DSourceOffsetCommitter.java:41)
  at com.streamsets.datacollector.runner.production.ProductionSourceOffsetCommitterOffsetTracker.commitOffset(ProductionSourceOffsetCommitterOffsetTracker.java:75) ... 22 more

答案1

发生这种情况可能是因为 RabbitMQ 在处理批处理时超时。检查 RabbitMQ 错误日志。

如果是这种情况,请尝试减少 RabbitMQ 源中的批次大小。

相关内容