SpringCloud Dead-Letter主題處理

2023-11-28 16:31 更新

因?yàn)槟鸁o(wú)法預(yù)期用戶將如何處置死信,所以該框架沒(méi)有提供任何標(biāo)準(zhǔn)機(jī)制來(lái)處理它們。如果死信的原因是暫時(shí)的,則您可能希望將消息路由回原始主題。但是,如果問(wèn)題是永久性問(wèn)題,則可能導(dǎo)致無(wú)限循環(huán)。本主題中的示例Spring Boot應(yīng)用程序是如何將這些消息路由回原始主題的示例,但是在嘗試了三遍之后,將其移至停車場(chǎng)主題。該應(yīng)用程序是另一個(gè)從死信主題讀取的spring-cloud-stream應(yīng)用程序。5秒鐘未收到任何消息時(shí),它將終止。

這些示例假定原始目的地為so8400out,而使用者組為so8400。

有兩種策略可供考慮:

  • 考慮僅在主應(yīng)用程序未運(yùn)行時(shí)才運(yùn)行重新路由。否則,瞬態(tài)錯(cuò)誤的重試會(huì)很快用完。
  • 或者,使用兩階段方法:使用此應(yīng)用程序?qū)⒙酚傻降谌齻€(gè)主題,將另一個(gè)應(yīng)用程序從那里路由回到主主題。

以下代碼清單顯示了示例應(yīng)用程序:

application.properties。 

spring.cloud.stream.bindings.input.group=so8400replay
spring.cloud.stream.bindings.input.destination=error.so8400out.so8400

spring.cloud.stream.bindings.output.destination=so8400out
spring.cloud.stream.bindings.output.producer.partitioned=true

spring.cloud.stream.bindings.parkingLot.destination=so8400in.parkingLot
spring.cloud.stream.bindings.parkingLot.producer.partitioned=true

spring.cloud.stream.kafka.binder.configuration.auto.offset.reset=earliest

spring.cloud.stream.kafka.binder.headers=x-retries

應(yīng)用。 

@SpringBootApplication
@EnableBinding(TwoOutputProcessor.class)
public class ReRouteDlqKApplication implements CommandLineRunner {

    private static final String X_RETRIES_HEADER = "x-retries";

    public static void main(String[] args) {
        SpringApplication.run(ReRouteDlqKApplication.class, args).close();
    }

    private final AtomicInteger processed = new AtomicInteger();

    @Autowired
    private MessageChannel parkingLot;

    @StreamListener(Processor.INPUT)
    @SendTo(Processor.OUTPUT)
    public Message<?> reRoute(Message<?> failed) {
        processed.incrementAndGet();
        Integer retries = failed.getHeaders().get(X_RETRIES_HEADER, Integer.class);
        if (retries == null) {
            System.out.println("First retry for " + failed);
            return MessageBuilder.fromMessage(failed)
                    .setHeader(X_RETRIES_HEADER, new Integer(1))
                    .setHeader(BinderHeaders.PARTITION_OVERRIDE,
                            failed.getHeaders().get(KafkaHeaders.RECEIVED_PARTITION_ID))
                    .build();
        }
        else if (retries.intValue() < 3) {
            System.out.println("Another retry for " + failed);
            return MessageBuilder.fromMessage(failed)
                    .setHeader(X_RETRIES_HEADER, new Integer(retries.intValue() + 1))
                    .setHeader(BinderHeaders.PARTITION_OVERRIDE,
                            failed.getHeaders().get(KafkaHeaders.RECEIVED_PARTITION_ID))
                    .build();
        }
        else {
            System.out.println("Retries exhausted for " + failed);
            parkingLot.send(MessageBuilder.fromMessage(failed)
                    .setHeader(BinderHeaders.PARTITION_OVERRIDE,
                            failed.getHeaders().get(KafkaHeaders.RECEIVED_PARTITION_ID))
                    .build());
        }
        return null;
    }

    @Override
    public void run(String... args) throws Exception {
        while (true) {
            int count = this.processed.get();
            Thread.sleep(5000);
            if (count == this.processed.get()) {
                System.out.println("Idle, terminating");
                return;
            }
        }
    }

    public interface TwoOutputProcessor extends Processor {

        @Output("parkingLot")
        MessageChannel parkingLot();

    }

}

以上內(nèi)容是否對(duì)您有幫助:
在線筆記
App下載
App下載

掃描二維碼

下載編程獅App

公眾號(hào)
微信公眾號(hào)

編程獅公眾號(hào)