admin管理员组

文章数量:1026222

Spring integration: 6.2.0

I have noticed that even if I set a sendFailureChannel on a Kafka.outboundChannelAdapter an avro SerializationException is still rethrown and ends up in the error output whether a errorChannel header is set or not.
Is it the intended behaviour ? I would have first expected that the framework would not propagate the exception if a sendFailureChannel is set, or alternatively to take into account the errorChannel.

More precisely, in the example above, the KAFKA_SEND_FAILURE_CHANNEL's serviceActivator is called, but the CUSTOM_ERROR_CHANNEL one never. (KAFKA_OUTBOUND_CHANNEL is a direct channel)

public static final String KAFKA_OUTBOUND_CHANNEL = "kafkaOutboundChannel";
public static final String KAFKA_SEND_FAILURE_CHANNEL = "kafkaSendFailureChannel";
public static final String CUSTOM_ERROR_CHANNEL = "customErrorChanne";

@Bean
IntegrationFlow kafkaProducerFlow(KafkaTemplate<? extends SpecificRecord, ? extends SpecificRecord> kafkaTemplate) {
    return IntegrationFlow.from(KAFKA_OUTBOUND_CHANNEL)
            .log(log.getName(),
                    m -> "Sending following message to topic '" + m.getHeaders().get(KafkaHeaders.TOPIC) + "':\n"
                            + "key:\n" + GeneralUtils.convertToJson(m.getHeaders().get(KafkaHeaders.KEY)) + "\n"
                            + "value:\n" + GeneralUtils.convertToJson(m.getPayload()))
            .enrichHeaders(h -> h.header(MessageHeaders.ERROR_CHANNEL, CUSTOM_ERROR_CHANNEL))
            .handle(Kafka.outboundChannelAdapter(kafkaTemplate)
                    .sendFailureChannel(KAFKA_SEND_FAILURE_CHANNEL)
            .get();
}

@ServiceActivator(inputChannel = KAFKA_SEND_FAILURE_CHANNEL)
void logKafkaSendFailure(KafkaSendFailureException exception) {
    log.error("SendFailure:", exception);
}

@ServiceActivator(inputChannel = CUSTOM_ERROR_CHANNEL)
void logOtherError(Exception exception) {
    log.error("Failed to send message to Kafka.", exception);
}

Spring integration: 6.2.0

I have noticed that even if I set a sendFailureChannel on a Kafka.outboundChannelAdapter an avro SerializationException is still rethrown and ends up in the error output whether a errorChannel header is set or not.
Is it the intended behaviour ? I would have first expected that the framework would not propagate the exception if a sendFailureChannel is set, or alternatively to take into account the errorChannel.

More precisely, in the example above, the KAFKA_SEND_FAILURE_CHANNEL's serviceActivator is called, but the CUSTOM_ERROR_CHANNEL one never. (KAFKA_OUTBOUND_CHANNEL is a direct channel)

public static final String KAFKA_OUTBOUND_CHANNEL = "kafkaOutboundChannel";
public static final String KAFKA_SEND_FAILURE_CHANNEL = "kafkaSendFailureChannel";
public static final String CUSTOM_ERROR_CHANNEL = "customErrorChanne";

@Bean
IntegrationFlow kafkaProducerFlow(KafkaTemplate<? extends SpecificRecord, ? extends SpecificRecord> kafkaTemplate) {
    return IntegrationFlow.from(KAFKA_OUTBOUND_CHANNEL)
            .log(log.getName(),
                    m -> "Sending following message to topic '" + m.getHeaders().get(KafkaHeaders.TOPIC) + "':\n"
                            + "key:\n" + GeneralUtils.convertToJson(m.getHeaders().get(KafkaHeaders.KEY)) + "\n"
                            + "value:\n" + GeneralUtils.convertToJson(m.getPayload()))
            .enrichHeaders(h -> h.header(MessageHeaders.ERROR_CHANNEL, CUSTOM_ERROR_CHANNEL))
            .handle(Kafka.outboundChannelAdapter(kafkaTemplate)
                    .sendFailureChannel(KAFKA_SEND_FAILURE_CHANNEL)
            .get();
}

@ServiceActivator(inputChannel = KAFKA_SEND_FAILURE_CHANNEL)
void logKafkaSendFailure(KafkaSendFailureException exception) {
    log.error("SendFailure:", exception);
}

@ServiceActivator(inputChannel = CUSTOM_ERROR_CHANNEL)
void logOtherError(Exception exception) {
    log.error("Failed to send message to Kafka.", exception);
}
Share Improve this question asked Nov 18, 2024 at 15:35 Maxime DutautMaxime Dutaut 1326 bronze badges
Add a comment  | 

1 Answer 1

Reset to default 0

That's correct.

The logic there is like this:

    catch (RuntimeException rtex) {
        sendFailure(message, producerRecord, getSendFailureChannel(), rtex);
        throw rtex;
    }

So, indeed such an error is sent to the sendFailureChannel and re-thrown back to the caller.

The header(MessageHeaders.ERROR_CHANNEL, CUSTOM_ERROR_CHANNEL) is not involved here because process is fully direct. The errorChannel is only considered when we deal with async channels.

We may consider to change such a logic, but only for tomorrow's 6.4.0 release. What I'm trying to say, that I am agreed with you that it is not supposed to be like that: if we provide sendFailureChannel, then no reason to re-throw such an exception.

Feel free to raise a GH issue and we will address it shortly!

Spring integration: 6.2.0

I have noticed that even if I set a sendFailureChannel on a Kafka.outboundChannelAdapter an avro SerializationException is still rethrown and ends up in the error output whether a errorChannel header is set or not.
Is it the intended behaviour ? I would have first expected that the framework would not propagate the exception if a sendFailureChannel is set, or alternatively to take into account the errorChannel.

More precisely, in the example above, the KAFKA_SEND_FAILURE_CHANNEL's serviceActivator is called, but the CUSTOM_ERROR_CHANNEL one never. (KAFKA_OUTBOUND_CHANNEL is a direct channel)

public static final String KAFKA_OUTBOUND_CHANNEL = "kafkaOutboundChannel";
public static final String KAFKA_SEND_FAILURE_CHANNEL = "kafkaSendFailureChannel";
public static final String CUSTOM_ERROR_CHANNEL = "customErrorChanne";

@Bean
IntegrationFlow kafkaProducerFlow(KafkaTemplate<? extends SpecificRecord, ? extends SpecificRecord> kafkaTemplate) {
    return IntegrationFlow.from(KAFKA_OUTBOUND_CHANNEL)
            .log(log.getName(),
                    m -> "Sending following message to topic '" + m.getHeaders().get(KafkaHeaders.TOPIC) + "':\n"
                            + "key:\n" + GeneralUtils.convertToJson(m.getHeaders().get(KafkaHeaders.KEY)) + "\n"
                            + "value:\n" + GeneralUtils.convertToJson(m.getPayload()))
            .enrichHeaders(h -> h.header(MessageHeaders.ERROR_CHANNEL, CUSTOM_ERROR_CHANNEL))
            .handle(Kafka.outboundChannelAdapter(kafkaTemplate)
                    .sendFailureChannel(KAFKA_SEND_FAILURE_CHANNEL)
            .get();
}

@ServiceActivator(inputChannel = KAFKA_SEND_FAILURE_CHANNEL)
void logKafkaSendFailure(KafkaSendFailureException exception) {
    log.error("SendFailure:", exception);
}

@ServiceActivator(inputChannel = CUSTOM_ERROR_CHANNEL)
void logOtherError(Exception exception) {
    log.error("Failed to send message to Kafka.", exception);
}

Spring integration: 6.2.0

I have noticed that even if I set a sendFailureChannel on a Kafka.outboundChannelAdapter an avro SerializationException is still rethrown and ends up in the error output whether a errorChannel header is set or not.
Is it the intended behaviour ? I would have first expected that the framework would not propagate the exception if a sendFailureChannel is set, or alternatively to take into account the errorChannel.

More precisely, in the example above, the KAFKA_SEND_FAILURE_CHANNEL's serviceActivator is called, but the CUSTOM_ERROR_CHANNEL one never. (KAFKA_OUTBOUND_CHANNEL is a direct channel)

public static final String KAFKA_OUTBOUND_CHANNEL = "kafkaOutboundChannel";
public static final String KAFKA_SEND_FAILURE_CHANNEL = "kafkaSendFailureChannel";
public static final String CUSTOM_ERROR_CHANNEL = "customErrorChanne";

@Bean
IntegrationFlow kafkaProducerFlow(KafkaTemplate<? extends SpecificRecord, ? extends SpecificRecord> kafkaTemplate) {
    return IntegrationFlow.from(KAFKA_OUTBOUND_CHANNEL)
            .log(log.getName(),
                    m -> "Sending following message to topic '" + m.getHeaders().get(KafkaHeaders.TOPIC) + "':\n"
                            + "key:\n" + GeneralUtils.convertToJson(m.getHeaders().get(KafkaHeaders.KEY)) + "\n"
                            + "value:\n" + GeneralUtils.convertToJson(m.getPayload()))
            .enrichHeaders(h -> h.header(MessageHeaders.ERROR_CHANNEL, CUSTOM_ERROR_CHANNEL))
            .handle(Kafka.outboundChannelAdapter(kafkaTemplate)
                    .sendFailureChannel(KAFKA_SEND_FAILURE_CHANNEL)
            .get();
}

@ServiceActivator(inputChannel = KAFKA_SEND_FAILURE_CHANNEL)
void logKafkaSendFailure(KafkaSendFailureException exception) {
    log.error("SendFailure:", exception);
}

@ServiceActivator(inputChannel = CUSTOM_ERROR_CHANNEL)
void logOtherError(Exception exception) {
    log.error("Failed to send message to Kafka.", exception);
}
Share Improve this question asked Nov 18, 2024 at 15:35 Maxime DutautMaxime Dutaut 1326 bronze badges
Add a comment  | 

1 Answer 1

Reset to default 0

That's correct.

The logic there is like this:

    catch (RuntimeException rtex) {
        sendFailure(message, producerRecord, getSendFailureChannel(), rtex);
        throw rtex;
    }

So, indeed such an error is sent to the sendFailureChannel and re-thrown back to the caller.

The header(MessageHeaders.ERROR_CHANNEL, CUSTOM_ERROR_CHANNEL) is not involved here because process is fully direct. The errorChannel is only considered when we deal with async channels.

We may consider to change such a logic, but only for tomorrow's 6.4.0 release. What I'm trying to say, that I am agreed with you that it is not supposed to be like that: if we provide sendFailureChannel, then no reason to re-throw such an exception.

Feel free to raise a GH issue and we will address it shortly!

本文标签: