admin管理员组文章数量:1026222
Spring integration: 6.2.0
I have noticed that even if I set a sendFailureChannel
on a Kafka.outboundChannelAdapter
an avro SerializationException
is still rethrown and ends up in the error output whether a errorChannel
header is set or not.
Is it the intended behaviour ? I would have first expected that the framework would not propagate the exception if a sendFailureChannel
is set, or alternatively to take into account the errorChannel
.
More precisely, in the example above, the KAFKA_SEND_FAILURE_CHANNEL's serviceActivator is called, but the CUSTOM_ERROR_CHANNEL one never. (KAFKA_OUTBOUND_CHANNEL
is a direct channel)
public static final String KAFKA_OUTBOUND_CHANNEL = "kafkaOutboundChannel";
public static final String KAFKA_SEND_FAILURE_CHANNEL = "kafkaSendFailureChannel";
public static final String CUSTOM_ERROR_CHANNEL = "customErrorChanne";
@Bean
IntegrationFlow kafkaProducerFlow(KafkaTemplate<? extends SpecificRecord, ? extends SpecificRecord> kafkaTemplate) {
return IntegrationFlow.from(KAFKA_OUTBOUND_CHANNEL)
.log(log.getName(),
m -> "Sending following message to topic '" + m.getHeaders().get(KafkaHeaders.TOPIC) + "':\n"
+ "key:\n" + GeneralUtils.convertToJson(m.getHeaders().get(KafkaHeaders.KEY)) + "\n"
+ "value:\n" + GeneralUtils.convertToJson(m.getPayload()))
.enrichHeaders(h -> h.header(MessageHeaders.ERROR_CHANNEL, CUSTOM_ERROR_CHANNEL))
.handle(Kafka.outboundChannelAdapter(kafkaTemplate)
.sendFailureChannel(KAFKA_SEND_FAILURE_CHANNEL)
.get();
}
@ServiceActivator(inputChannel = KAFKA_SEND_FAILURE_CHANNEL)
void logKafkaSendFailure(KafkaSendFailureException exception) {
log.error("SendFailure:", exception);
}
@ServiceActivator(inputChannel = CUSTOM_ERROR_CHANNEL)
void logOtherError(Exception exception) {
log.error("Failed to send message to Kafka.", exception);
}
Spring integration: 6.2.0
I have noticed that even if I set a sendFailureChannel
on a Kafka.outboundChannelAdapter
an avro SerializationException
is still rethrown and ends up in the error output whether a errorChannel
header is set or not.
Is it the intended behaviour ? I would have first expected that the framework would not propagate the exception if a sendFailureChannel
is set, or alternatively to take into account the errorChannel
.
More precisely, in the example above, the KAFKA_SEND_FAILURE_CHANNEL's serviceActivator is called, but the CUSTOM_ERROR_CHANNEL one never. (KAFKA_OUTBOUND_CHANNEL
is a direct channel)
public static final String KAFKA_OUTBOUND_CHANNEL = "kafkaOutboundChannel";
public static final String KAFKA_SEND_FAILURE_CHANNEL = "kafkaSendFailureChannel";
public static final String CUSTOM_ERROR_CHANNEL = "customErrorChanne";
@Bean
IntegrationFlow kafkaProducerFlow(KafkaTemplate<? extends SpecificRecord, ? extends SpecificRecord> kafkaTemplate) {
return IntegrationFlow.from(KAFKA_OUTBOUND_CHANNEL)
.log(log.getName(),
m -> "Sending following message to topic '" + m.getHeaders().get(KafkaHeaders.TOPIC) + "':\n"
+ "key:\n" + GeneralUtils.convertToJson(m.getHeaders().get(KafkaHeaders.KEY)) + "\n"
+ "value:\n" + GeneralUtils.convertToJson(m.getPayload()))
.enrichHeaders(h -> h.header(MessageHeaders.ERROR_CHANNEL, CUSTOM_ERROR_CHANNEL))
.handle(Kafka.outboundChannelAdapter(kafkaTemplate)
.sendFailureChannel(KAFKA_SEND_FAILURE_CHANNEL)
.get();
}
@ServiceActivator(inputChannel = KAFKA_SEND_FAILURE_CHANNEL)
void logKafkaSendFailure(KafkaSendFailureException exception) {
log.error("SendFailure:", exception);
}
@ServiceActivator(inputChannel = CUSTOM_ERROR_CHANNEL)
void logOtherError(Exception exception) {
log.error("Failed to send message to Kafka.", exception);
}
Share
Improve this question
asked Nov 18, 2024 at 15:35
Maxime DutautMaxime Dutaut
1326 bronze badges
1 Answer
Reset to default 0That's correct.
The logic there is like this:
catch (RuntimeException rtex) {
sendFailure(message, producerRecord, getSendFailureChannel(), rtex);
throw rtex;
}
So, indeed such an error is sent to the sendFailureChannel
and re-thrown back to the caller.
The header(MessageHeaders.ERROR_CHANNEL, CUSTOM_ERROR_CHANNEL)
is not involved here because process is fully direct. The errorChannel
is only considered when we deal with async channels.
We may consider to change such a logic, but only for tomorrow's 6.4.0
release.
What I'm trying to say, that I am agreed with you that it is not supposed to be like that: if we provide sendFailureChannel
, then no reason to re-throw such an exception.
Feel free to raise a GH issue and we will address it shortly!
Spring integration: 6.2.0
I have noticed that even if I set a sendFailureChannel
on a Kafka.outboundChannelAdapter
an avro SerializationException
is still rethrown and ends up in the error output whether a errorChannel
header is set or not.
Is it the intended behaviour ? I would have first expected that the framework would not propagate the exception if a sendFailureChannel
is set, or alternatively to take into account the errorChannel
.
More precisely, in the example above, the KAFKA_SEND_FAILURE_CHANNEL's serviceActivator is called, but the CUSTOM_ERROR_CHANNEL one never. (KAFKA_OUTBOUND_CHANNEL
is a direct channel)
public static final String KAFKA_OUTBOUND_CHANNEL = "kafkaOutboundChannel";
public static final String KAFKA_SEND_FAILURE_CHANNEL = "kafkaSendFailureChannel";
public static final String CUSTOM_ERROR_CHANNEL = "customErrorChanne";
@Bean
IntegrationFlow kafkaProducerFlow(KafkaTemplate<? extends SpecificRecord, ? extends SpecificRecord> kafkaTemplate) {
return IntegrationFlow.from(KAFKA_OUTBOUND_CHANNEL)
.log(log.getName(),
m -> "Sending following message to topic '" + m.getHeaders().get(KafkaHeaders.TOPIC) + "':\n"
+ "key:\n" + GeneralUtils.convertToJson(m.getHeaders().get(KafkaHeaders.KEY)) + "\n"
+ "value:\n" + GeneralUtils.convertToJson(m.getPayload()))
.enrichHeaders(h -> h.header(MessageHeaders.ERROR_CHANNEL, CUSTOM_ERROR_CHANNEL))
.handle(Kafka.outboundChannelAdapter(kafkaTemplate)
.sendFailureChannel(KAFKA_SEND_FAILURE_CHANNEL)
.get();
}
@ServiceActivator(inputChannel = KAFKA_SEND_FAILURE_CHANNEL)
void logKafkaSendFailure(KafkaSendFailureException exception) {
log.error("SendFailure:", exception);
}
@ServiceActivator(inputChannel = CUSTOM_ERROR_CHANNEL)
void logOtherError(Exception exception) {
log.error("Failed to send message to Kafka.", exception);
}
Spring integration: 6.2.0
I have noticed that even if I set a sendFailureChannel
on a Kafka.outboundChannelAdapter
an avro SerializationException
is still rethrown and ends up in the error output whether a errorChannel
header is set or not.
Is it the intended behaviour ? I would have first expected that the framework would not propagate the exception if a sendFailureChannel
is set, or alternatively to take into account the errorChannel
.
More precisely, in the example above, the KAFKA_SEND_FAILURE_CHANNEL's serviceActivator is called, but the CUSTOM_ERROR_CHANNEL one never. (KAFKA_OUTBOUND_CHANNEL
is a direct channel)
public static final String KAFKA_OUTBOUND_CHANNEL = "kafkaOutboundChannel";
public static final String KAFKA_SEND_FAILURE_CHANNEL = "kafkaSendFailureChannel";
public static final String CUSTOM_ERROR_CHANNEL = "customErrorChanne";
@Bean
IntegrationFlow kafkaProducerFlow(KafkaTemplate<? extends SpecificRecord, ? extends SpecificRecord> kafkaTemplate) {
return IntegrationFlow.from(KAFKA_OUTBOUND_CHANNEL)
.log(log.getName(),
m -> "Sending following message to topic '" + m.getHeaders().get(KafkaHeaders.TOPIC) + "':\n"
+ "key:\n" + GeneralUtils.convertToJson(m.getHeaders().get(KafkaHeaders.KEY)) + "\n"
+ "value:\n" + GeneralUtils.convertToJson(m.getPayload()))
.enrichHeaders(h -> h.header(MessageHeaders.ERROR_CHANNEL, CUSTOM_ERROR_CHANNEL))
.handle(Kafka.outboundChannelAdapter(kafkaTemplate)
.sendFailureChannel(KAFKA_SEND_FAILURE_CHANNEL)
.get();
}
@ServiceActivator(inputChannel = KAFKA_SEND_FAILURE_CHANNEL)
void logKafkaSendFailure(KafkaSendFailureException exception) {
log.error("SendFailure:", exception);
}
@ServiceActivator(inputChannel = CUSTOM_ERROR_CHANNEL)
void logOtherError(Exception exception) {
log.error("Failed to send message to Kafka.", exception);
}
Share
Improve this question
asked Nov 18, 2024 at 15:35
Maxime DutautMaxime Dutaut
1326 bronze badges
1 Answer
Reset to default 0That's correct.
The logic there is like this:
catch (RuntimeException rtex) {
sendFailure(message, producerRecord, getSendFailureChannel(), rtex);
throw rtex;
}
So, indeed such an error is sent to the sendFailureChannel
and re-thrown back to the caller.
The header(MessageHeaders.ERROR_CHANNEL, CUSTOM_ERROR_CHANNEL)
is not involved here because process is fully direct. The errorChannel
is only considered when we deal with async channels.
We may consider to change such a logic, but only for tomorrow's 6.4.0
release.
What I'm trying to say, that I am agreed with you that it is not supposed to be like that: if we provide sendFailureChannel
, then no reason to re-throw such an exception.
Feel free to raise a GH issue and we will address it shortly!
本文标签:
版权声明:本文标题:java - Spring Integration - Kafka OutboundChannelAdapter propagates exception even if sendFailureChannel set - Stack Overflow 内容由热心网友自发贡献,该文观点仅代表作者本人, 转载请联系作者并注明出处:http://it.en369.cn/questions/1745611135a2159009.html, 本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌抄袭侵权/违法违规的内容,一经查实,本站将立刻删除。
发表评论