在 Spring Cloud Dataflow 中将 header-enricher 从 1.3 切换到 2.x 时无法转换消息

在 Spring Cloud Dataflow 中将 header-enricher 从 1.3 切换到 2.x 时无法转换消息

我创造了Spring Cloud 数据流使用 header-enricher 1.3 进行流式传输并部署,它工作正常。但是当我在相同的 SCDF 环境中切换到 header-enricher 2.x 时,它抛出了“无法转换消息异常”。

我想知道 1.3 和 2.x 之间有什么区别导致了这个问题?

以下是1.3流的定义:

app register --name header-enricher_1_3 --type processor --uri http://repo.spring.io/release/org/springframework/cloud/stream/app/header-enricher-processor-rabbit/1.3.1.RELEASE/header-enricher-processor-rabbit-1.3.1.RELEASE.jar 

stream create Header_TEST --definition "trigger --time-unit=MINUTES --initialDelay=0  --fixed-delay=500  --payload={'testWords':'Hello World!'} | header-enricher_1_3  --headers='testWords_header=payload.testWords' --outputType=application/json  | log --expression=headers.testWords_header" --deploy

一切运行正常,我们可以在日志中得到“Hello World!”:

2019-04-04T17:15:08.833-04:00 [APP/PROC/WEB/0] [OUT] 2019-04-04 21:15:08.831 INFO 17 --- [ main] o.s.c.s.a.l.s.r.LogSinkRabbitApplication : Started LogSinkRabbitApplication in 12.406 seconds (JVM running for 14.73)
2019-04-04T17:15:10.312-04:00 [CELL/0] [OUT] Container became healthy
2019-04-04T17:15:13.112-04:00 [APP/PROC/WEB/0] [OUT] 2019-04-04 21:15:13.111 INFO 17 --- [w.Header_TEST-9] dataflow-server-stg-Header_TEST-log : Hello World!

这是 2.x 的 Stream 定义:

app register --name header-enricher_2_1 --type processor --uri  http://repo.spring.io/release/org/springframework/cloud/stream/app/header-enricher-processor-rabbit/2.1.0.RELEASE/header-enricher-processor-rabbit-2.1.0.RELEASE.jar 

stream create Header_TEST --definition "trigger --time-unit=MINUTES --initialDelay=0  --fixed-delay=500  --payload={'testWords':'Hello World!'} | header-enricher_2_1  --headers='testWords_header=payload.testWords' --outputType=application/json  | log --expression=headers.testWords_header" --deploy

我收到一个异常:
org.springframework.expression.spel.SpelEvaluationException:EL1008E:在“byte []”类型的对象上找不到属性或字段“testWords”

以下是完整的日志:

019-04-04T17:19:38.425-04:00 [APP/PROC/WEB/0] [OUT] 2019-04-04 21:19:38.423 INFO 16 --- [ main] HeaderEnricherProcessorRabbitApplication : Started HeaderEnricherProcessorRabbitApplication in 12.71 seconds (JVM running for 13.979)
2019-04-04T17:19:38.802-04:00 [CELL/0] [OUT] Container became healthy
2019-04-04T17:19:43.338-04:00 [APP/PROC/WEB/0] [OUT] ... 31 more
2019-04-04T17:19:43.338-04:00 [APP/PROC/WEB/0] [OUT] at org.springframework.integration.util.AbstractExpressionEvaluator.evaluateExpression(AbstractExpressionEvaluator.java:128)
2019-04-04T17:19:43.338-04:00 [APP/PROC/WEB/0] [OUT] at org.springframework.integration.util.AbstractExpressionEvaluator.evaluateExpression(AbstractExpressionEvaluator.java:169)
2019-04-04T17:19:43.338-04:00 [APP/PROC/WEB/0] [OUT] at org.springframework.expression.spel.standard.SpelExpression.getValue(SpelExpression.java:371)
2019-04-04T17:19:43.338-04:00 [APP/PROC/WEB/0] [OUT] at org.springframework.expression.spel.ast.SpelNodeImpl.getTypedValue(SpelNodeImpl.java:116)
2019-04-04T17:19:43.338-04:00 [APP/PROC/WEB/0] [OUT] at org.springframework.expression.spel.ast.CompoundExpression.getValueInternal(CompoundExpression.java:89)
2019-04-04T17:19:43.338-04:00 [APP/PROC/WEB/0] [OUT] at org.springframework.expression.spel.ast.PropertyOrFieldReference$AccessorLValue.getValue(PropertyOrFieldReference.java:407)
2019-04-04T17:19:43.338-04:00 [APP/PROC/WEB/0] [OUT] at org.springframework.expression.spel.ast.PropertyOrFieldReference.access$000(PropertyOrFieldReference.java:51)
2019-04-04T17:19:43.338-04:00 [APP/PROC/WEB/0] [OUT] at org.springframework.expression.spel.ast.PropertyOrFieldReference.getValueInternal(PropertyOrFieldReference.java:104)
2019-04-04T17:19:43.338-04:00 [APP/PROC/WEB/0] [OUT] at org.springframework.expression.spel.ast.PropertyOrFieldReference.readProperty(PropertyOrFieldReference.java:217)
2019-04-04T17:19:43.338-04:00 [APP/PROC/WEB/0] [OUT] Caused by: org.springframework.expression.spel.SpelEvaluationException: EL1008E: Property or field 'testWords' cannot be found on object of type 'byte[]' - maybe not public or not valid?
2019-04-04T17:19:43.338-04:00 [APP/PROC/WEB/0] [OUT] ... 28 more
2019-04-04T17:19:43.338-04:00 [APP/PROC/WEB/0] [OUT] at org.springframework.integration.transformer.HeaderEnricher.transform(HeaderEnricher.java:122)
2019-04-04T17:19:43.338-04:00 [APP/PROC/WEB/0] [OUT] at org.springframework.integration.transformer.support.ExpressionEvaluatingHeaderValueMessageProcessor.processMessage(ExpressionEvaluatingHeaderValueMessageProcessor.java:71)
2019-04-04T17:19:43.338-04:00 [APP/PROC/WEB/0] [OUT] at org.springframework.integration.handler.ExpressionEvaluatingMessageProcessor.processMessage(ExpressionEvaluatingMessageProcessor.java:105)
2019-04-04T17:19:43.338-04:00 [APP/PROC/WEB/0] [OUT] at org.springframework.integration.util.AbstractExpressionEvaluator.evaluateExpression(AbstractExpressionEvaluator.java:136)
2019-04-04T17:19:43.338-04:00 [APP/PROC/WEB/0] [OUT] Caused by: org.springframework.messaging.MessageHandlingException: Expression evaluation failed: payload.testWords; nested exception is org.springframework.expression.spel.SpelEvaluationException: EL1008E: Property or field 'testWords' cannot be found on object of type 'byte[]' - maybe not public or not valid?, failedMessage=GenericMessage [payload=byte[28], headers={amqp_receivedDeliveryMode=PERSISTENT, amqp_receivedRoutingKey=Header_TEST.trigger, amqp_receivedExchange=Header_TEST.trigger, amqp_deliveryTag=1, deliveryAttempt=3, amqp_consumerQueue=Header_TEST.trigger.Header_TEST, amqp_redelivered=false, id=66e0aa1c-1188-5c0b-ed94-cdfea662e096, amqp_consumerTag=amq.ctag-I7THOoYQhfPXXp5pivECXg, contentType=application/json, timestamp=1554412780307}]
2019-04-04T17:19:43.338-04:00 [APP/PROC/WEB/0] [OUT] ... 27 more
2019-04-04T17:19:43.338-04:00 [APP/PROC/WEB/0] [OUT] at org.springframework.integration.transformer.MessageTransformingHandler.handleRequestMessage(MessageTransformingHandler.java:89)
2019-04-04T17:19:43.338-04:00 [APP/PROC/WEB/0] [OUT] at org.springframework.integration.transformer.HeaderEnricher.transform(HeaderEnricher.java:131)
2019-04-04T17:19:43.338-04:00 [APP/PROC/WEB/0] [OUT] Caused by: org.springframework.messaging.MessagingException: failed to transform message headers; nested exception is org.springframework.messaging.MessageHandlingException: Expression evaluation failed: payload.testWords; nested exception is org.springframework.expression.spel.SpelEvaluationException: EL1008E: Property or field 'testWords' cannot be found on object of type 'byte[]' - maybe not public or not valid?, failedMessage=GenericMessage [payload=byte[28], headers={amqp_receivedDeliveryMode=PERSISTENT, amqp_receivedRoutingKey=Header_TEST.trigger, amqp_receivedExchange=Header_TEST.trigger, amqp_deliveryTag=1, deliveryAttempt=3, amqp_consumerQueue=Header_TEST.trigger.Header_TEST, amqp_redelivered=false, id=66e0aa1c-1188-5c0b-ed94-cdfea662e096, amqp_consumerTag=amq.ctag-I7THOoYQhfPXXp5pivECXg, contentType=application/json, timestamp=1554412780307}], failedMessage=GenericMessage [payload=byte[28], headers={amqp_receivedDeliveryMode=PERSISTENT, amqp_receivedRoutingKey=Header_TEST.trigger, amqp_receivedExchange=Header_TEST.trigger, amqp_deliveryTag=1, deliveryAttempt=3, amqp_consumerQueue=Header_TEST.trigger.Header_TEST, amqp_redelivered=false, id=66e0aa1c-1188-5c0b-ed94-cdfea662e096, amqp_consumerTag=amq.ctag-I7THOoYQhfPXXp5pivECXg, contentType=application/json, timestamp=1554412780307}]
2019-04-04T17:19:43.338-04:00 [APP/PROC/WEB/0] [OUT] at java.lang.Thread.run(Thread.java:748)
2019-04-04T17:19:43.338-04:00 [APP/PROC/WEB/0] [OUT] at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:1042)
2019-04-04T17:19:43.338-04:00 [APP/PROC/WEB/0] [OUT] at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access$700(SimpleMessageListenerContainer.java:77)
2019-04-04T17:19:43.338-04:00 [APP/PROC/WEB/0] [OUT] at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.receiveAndExecute(SimpleMessageListenerContainer.java:801)
2019-04-04T17:19:43.338-04:00 [APP/PROC/WEB/0] [OUT] at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.doReceiveAndExecute(SimpleMessageListenerContainer.java:817)
2019-04-04T17:19:43.338-04:00 [APP/PROC/WEB/0] [OUT] at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.executeListener(AbstractMessageListenerContainer.java:1303)
2019-04-04T17:19:43.338-04:00 [APP/PROC/WEB/0] [OUT] at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:1324)
2019-04-04T17:19:43.338-04:00 [APP/PROC/WEB/0] [OUT] at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.actualInvokeListener(AbstractMessageListenerContainer.java:1337)
2019-04-04T17:19:43.338-04:00 [APP/PROC/WEB/0] [OUT] at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:1414)
2019-04-04T17:19:43.338-04:00 [APP/PROC/WEB/0] [OUT] at org.springframework.integration.amqp.inbound.AmqpInboundChannelAdapter$Listener.onMessage(AmqpInboundChannelAdapter.java:211)
2019-04-04T17:19:43.338-04:00 [APP/PROC/WEB/0] [OUT] at org.springframework.retry.support.RetryTemplate.execute(RetryTemplate.java:180)
2019-04-04T17:19:43.338-04:00 [APP/PROC/WEB/0] [OUT] at org.springframework.retry.support.RetryTemplate.doExecute(RetryTemplate.java:287)
2019-04-04T17:19:43.338-04:00 [APP/PROC/WEB/0] [OUT] at org.springframework.integration.amqp.inbound.AmqpInboundChannelAdapter$Listener.lambda$onMessage$0(AmqpInboundChannelAdapter.java:214)
2019-04-04T17:19:43.338-04:00 [APP/PROC/WEB/0] [OUT] at org.springframework.integration.amqp.inbound.AmqpInboundChannelAdapter.access$1100(AmqpInboundChannelAdapter.java:60)
2019-04-04T17:19:43.338-04:00 [APP/PROC/WEB/0] [OUT] at org.springframework.integration.endpoint.MessageProducerSupport.sendMessage(MessageProducerSupport.java:203)
2019-04-04T17:19:43.338-04:00 [APP/PROC/WEB/0] [OUT] at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:108)
2019-04-04T17:19:43.338-04:00 [APP/PROC/WEB/0] [OUT] at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:47)
2019-04-04T17:19:43.338-04:00 [APP/PROC/WEB/0] [OUT] at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:160)
2019-04-04T17:19:43.338-04:00 [APP/PROC/WEB/0] [OUT] at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:181)
2019-04-04T17:19:43.338-04:00 [APP/PROC/WEB/0] [OUT] at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:394)
2019-04-04T17:19:43.338-04:00 [APP/PROC/WEB/0] [OUT] at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:445)
2019-04-04T17:19:43.338-04:00 [APP/PROC/WEB/0] [OUT] at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:73)
2019-04-04T17:19:43.338-04:00 [APP/PROC/WEB/0] [OUT] at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:105)
2019-04-04T17:19:43.338-04:00 [APP/PROC/WEB/0] [OUT] at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:132)
2019-04-04T17:19:43.338-04:00 [APP/PROC/WEB/0] [OUT] at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:116)
2019-04-04T17:19:43.338-04:00 [APP/PROC/WEB/0] [OUT] at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:158)
2019-04-04T17:19:43.338-04:00 [APP/PROC/WEB/0] [OUT] at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:109)
2019-04-04T17:19:43.338-04:00 [APP/PROC/WEB/0] [OUT] at org.springframework.integration.transformer.MessageTransformingHandler.handleRequestMessage(MessageTransformingHandler.java:95)
2019-04-04T17:19:43.338-04:00 [APP/PROC/WEB/0] [OUT] 2019-04-04 21:19:43.337 ERROR 16 --- [r.Header_TEST-9] o.s.integration.handler.LoggingHandler : org.springframework.integration.transformer.MessageTransformationException: Failed to transform Message; nested exception is org.springframework.messaging.MessagingException: failed to transform message headers; nested exception is org.springframework.messaging.MessageHandlingException: Expression evaluation failed: payload.testWords; nested exception is org.springframework.expression.spel.SpelEvaluationException: EL1008E: Property or field 'testWords' cannot be found on object of type 'byte[]' - maybe not public or not valid?, failedMessage=GenericMessage [payload=byte[28], headers={amqp_receivedDeliveryMode=PERSISTENT, amqp_receivedRoutingKey=Header_TEST.trigger, amqp_receivedExchange=Header_TEST.trigger, amqp_deliveryTag=1, deliveryAttempt=3, amqp_consumerQueue=Header_TEST.trigger.Header_TEST, amqp_redelivered=false, id=66e0aa1c-1188-5c0b-ed94-cdfea662e096, amqp_consumerTag=amq.ctag-I7THOoYQhfPXXp5pivECXg, contentType=application/json, timestamp=1554412780307}], failedMessage=GenericMessage [payload=byte[28], headers={amqp_receivedDeliveryMode=PERSISTENT, amqp_receivedRoutingKey=Header_TEST.trigger, amqp_receivedExchange=Header_TEST.trigger, amqp_deliveryTag=1, deliveryAttempt=3, amqp_consumerQueue=Header_TEST.trigger.Header_TEST, amqp_redelivered=false, id=66e0aa1c-1188-5c0b-ed94-cdfea662e096, amqp_consumerTag=amq.ctag-I7THOoYQhfPXXp5pivECXg, contentType=application/json, timestamp=1554412780307}], failedMessage=GenericMessage [payload=byte[28], headers={amqp_receivedDeliveryMode=PERSISTENT, amqp_receivedRoutingKey=Header_TEST.trigger, amqp_receivedExchange=Header_TEST.trigger, amqp_deliveryTag=1, deliveryAttempt=3, amqp_consumerQueue=Header_TEST.trigger.Header_TEST, amqp_redelivered=false, id=66e0aa1c-1188-5c0b-ed94-cdfea662e096, amqp_consumerTag=amq.ctag-I7THOoYQhfPXXp5pivECXg, contentType=application/json, timestamp=1554412780307}]
2019-04-04T17:19:43.345-04:00 [APP/PROC/WEB/0] [OUT] 2019-04-04 21:19:43.345 WARN 16 --- [r.Header_TEST-9] o.s.a.r.r.RejectAndDontRequeueRecoverer : Retries exhausted for message (Body:'{"testWords":"Hello World!"}' MessageProperties [headers={contentType=application/json}, contentType=application/json, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, redelivered=false, receivedExchange=Header_TEST.trigger, receivedRoutingKey=Header_TEST.trigger, deliveryTag=1, consumerTag=amq.ctag-I7THOoYQhfPXXp5pivECXg, consumerQueue=Header_TEST.trigger.Header_TEST])
2019-04-04T17:19:43.346-04:00 [APP/PROC/WEB/0] [OUT] ... 31 common frames omitted
2019-04-04T17:19:43.346-04:00 [APP/PROC/WEB/0] [OUT] at org.springframework.integration.util.AbstractExpressionEvaluator.evaluateExpression(AbstractExpressionEvaluator.java:128) ~[spring-integration-core-5.0.6.RELEASE.jar!/:5.0.6.RELEASE]
2019-04-04T17:19:43.346-04:00 [APP/PROC/WEB/0] [OUT] at org.springframework.integration.util.AbstractExpressionEvaluator.evaluateExpression(AbstractExpressionEvaluator.java:169) ~[spring-integration-core-5.0.6.RELEASE.jar!/:5.0.6.RELEASE]
2019-04-04T17:19:43.346-04:00 [APP/PROC/WEB/0] [OUT] at org.springframework.expression.spel.standard.SpelExpression.getValue(SpelExpression.java:371) ~[spring-expression-5.0.7.RELEASE.jar!/:5.0.7.RELEASE]
2019-04-04T17:19:43.346-04:00 [APP/PROC/WEB/0] [OUT] at org.springframework.expression.spel.ast.SpelNodeImpl.getTypedValue(SpelNodeImpl.java:116) ~[spring-expression-5.0.7.RELEASE.jar!/:5.0.7.RELEASE]
2019-04-04T17:19:43.346-04:00 [APP/PROC/WEB/0] [OUT] at org.springframework.expression.spel.ast.CompoundExpression.getValueInternal(CompoundExpression.java:89) ~[spring-expression-5.0.7.RELEASE.jar!/:5.0.7.RELEASE]
2019-04-04T17:19:43.346-04:00 [APP/PROC/WEB/0] [OUT] at org.springframework.expression.spel.ast.PropertyOrFieldReference$AccessorLValue.getValue(PropertyOrFieldReference.java:407) ~[spring-expression-5.0.7.RELEASE.jar!/:5.0.7.RELEASE]
2019-04-04T17:19:43.346-04:00 [APP/PROC/WEB/0] [OUT] at org.springframework.expression.spel.ast.PropertyOrFieldReference.access$000(PropertyOrFieldReference.java:51) ~[spring-expression-5.0.7.RELEASE.jar!/:5.0.7.RELEASE]
2019-04-04T17:19:43.346-04:00 [APP/PROC/WEB/0] [OUT] at org.springframework.expression.spel.ast.PropertyOrFieldReference.getValueInternal(PropertyOrFieldReference.java:104) ~[spring-expression-5.0.7.RELEASE.jar!/:5.0.7.RELEASE]
2019-04-04T17:19:43.346-04:00 [APP/PROC/WEB/0] [OUT] at org.springframework.expression.spel.ast.PropertyOrFieldReference.readProperty(PropertyOrFieldReference.java:217) ~[spring-expression-5.0.7.RELEASE.jar!/:5.0.7.RELEASE]
2019-04-04T17:19:43.346-04:00 [APP/PROC/WEB/0] [OUT] Caused by: org.springframework.expression.spel.SpelEvaluationException: EL1008E: Property or field 'testWords' cannot be found on object of type 'byte[]' - maybe not public or not valid?
2019-04-04T17:19:43.346-04:00 [APP/PROC/WEB/0] [OUT] ... 28 common frames omitted
2019-04-04T17:19:43.346-04:00 [APP/PROC/WEB/0] [OUT] at org.springframework.integration.transformer.HeaderEnricher.transform(HeaderEnricher.java:122) ~[spring-integration-core-5.0.6.RELEASE.jar!/:5.0.6.RELEASE]
2019-04-04T17:19:43.346-04:00 [APP/PROC/WEB/0] [OUT] at org.springframework.integration.transformer.support.ExpressionEvaluatingHeaderValueMessageProcessor.processMessage(ExpressionEvaluatingHeaderValueMessageProcessor.java:71) ~[spring-integration-core-5.0.6.RELEASE.jar!/:5.0.6.RELEASE]
2019-04-04T17:19:43.346-04:00 [APP/PROC/WEB/0] [OUT] at org.springframework.integration.handler.ExpressionEvaluatingMessageProcessor.processMessage(ExpressionEvaluatingMessageProcessor.java:105) ~[spring-integration-core-5.0.6.RELEASE.jar!/:5.0.6.RELEASE]
2019-04-04T17:19:43.346-04:00 [APP/PROC/WEB/0] [OUT] at org.springframework.integration.util.AbstractExpressionEvaluator.evaluateExpression(AbstractExpressionEvaluator.java:136) ~[spring-integration-core-5.0.6.RELEASE.jar!/:5.0.6.RELEASE]
2019-04-04T17:19:43.346-04:00 [APP/PROC/WEB/0] [OUT] Caused by: org.springframework.messaging.MessageHandlingException: Expression evaluation failed: payload.testWords; nested exception is org.springframework.expression.spel.SpelEvaluationException: EL1008E: Property or field 'testWords' cannot be found on object of type 'byte[]' - maybe not public or not valid?
2019-04-04T17:19:43.346-04:00 [APP/PROC/WEB/0] [OUT] ... 27 common frames omitted


在环境变量中,我们也尝试了几种配置,但都不起作用:

  spring.cloud.stream.bindings.output.destination=Header_TEST.trigger
  spring.cloud.stream.rabbit.bindings.output.producer.routing-key-expression=headers.type

GitHub 上也提出了这个问题:https://github.com/spring-cloud-stream-app-starters/header-enricher/issues/23

相关内容