SpringCloud 使用動(dòng)態(tài)綁定的目的地

2023-11-27 11:51 更新

除了使用@EnableBinding定義的通道外,Spring Cloud Stream還允許應(yīng)用程序?qū)⑾l(fā)送到動(dòng)態(tài)綁定的目的地。例如,當(dāng)需要在運(yùn)行時(shí)確定目標(biāo)目的地時(shí),這很有用。應(yīng)用程序可以通過(guò)使用@EnableBinding注釋自動(dòng)注冊(cè)的BinderAwareChannelResolver bean來(lái)實(shí)現(xiàn)。

“ spring.cloud.stream.dynamicDestinations”屬性可用于將動(dòng)態(tài)目標(biāo)名稱限制為已知集合(白名單)。如果未設(shè)置此屬性,則可以動(dòng)態(tài)綁定任何目標(biāo)。

BinderAwareChannelResolver可以直接使用,如以下使用路徑變量來(lái)確定目標(biāo)通道的REST控制器示例所示:

@EnableBinding
@Controller
public class SourceWithDynamicDestination {

    @Autowired
    private BinderAwareChannelResolver resolver;

    @RequestMapping(path = "/{target}", method = POST, consumes = "*/*")
    @ResponseStatus(HttpStatus.ACCEPTED)
    public void handleRequest(@RequestBody String body, @PathVariable("target") target,
           @RequestHeader(HttpHeaders.CONTENT_TYPE) Object contentType) {
        sendMessage(body, target, contentType);
    }

    private void sendMessage(String body, String target, Object contentType) {
        resolver.resolveDestination(target).send(MessageBuilder.createMessage(body,
                new MessageHeaders(Collections.singletonMap(MessageHeaders.CONTENT_TYPE, contentType))));
    }
}

現(xiàn)在考慮當(dāng)我們?cè)谀J(rèn)端口(8080)上啟動(dòng)應(yīng)用程序并使用CURL發(fā)出以下請(qǐng)求時(shí)會(huì)發(fā)生什么:

curl -H "Content-Type: application/json" -X POST -d "customer-1" http://localhost:8080/customers

curl -H "Content-Type: application/json" -X POST -d "order-1" http://localhost:8080/orders

在經(jīng)紀(jì)人中創(chuàng)建目的地“客戶”和“訂單”(交換為Rabbit或在主題為“ Kafka”中),名稱為“客戶”和“訂單”,數(shù)據(jù)為發(fā)布到適當(dāng)?shù)哪康牡亍?/font>

BinderAwareChannelResolver是通用的Spring Integration DestinationResolver,并且可以注入到其他組件中,例如,在路由器中使用基于傳入的target字段的SpEL表達(dá)式的路由器JSON消息。以下示例包含一個(gè)讀取SpEL表達(dá)式的路由器:

@EnableBinding
@Controller
public class SourceWithDynamicDestination {

    @Autowired
    private BinderAwareChannelResolver resolver;


    @RequestMapping(path = "/", method = POST, consumes = "application/json")
    @ResponseStatus(HttpStatus.ACCEPTED)
    public void handleRequest(@RequestBody String body, @RequestHeader(HttpHeaders.CONTENT_TYPE) Object contentType) {
        sendMessage(body, contentType);
    }

    private void sendMessage(Object body, Object contentType) {
        routerChannel().send(MessageBuilder.createMessage(body,
                new MessageHeaders(Collections.singletonMap(MessageHeaders.CONTENT_TYPE, contentType))));
    }

    @Bean(name = "routerChannel")
    public MessageChannel routerChannel() {
        return new DirectChannel();
    }

    @Bean
    @ServiceActivator(inputChannel = "routerChannel")
    public ExpressionEvaluatingRouter router() {
        ExpressionEvaluatingRouter router =
            new ExpressionEvaluatingRouter(new SpelExpressionParser().parseExpression("payload.target"));
        router.setDefaultOutputChannelName("default-output");
        router.setChannelResolver(resolver);
        return router;
    }
}

路由器接收器應(yīng)用程序使用此技術(shù)的按需創(chuàng)建的目的地。

如果預(yù)先知道通道名稱,則可以像其他任何目的地一樣配置生產(chǎn)者屬性。或者,如果您注冊(cè)NewBindingCallback<> bean,則會(huì)在創(chuàng)建綁定之前調(diào)用它。回調(diào)采用綁定程序使用的擴(kuò)展生產(chǎn)者屬性的通用類型。它有一種方法:

void configure(String channelName, MessageChannel channel, ProducerProperties producerProperties,
        T extendedProducerProperties);

下面的示例顯示如何使用RabbitMQ活頁(yè)夾:

@Bean
public NewBindingCallback<RabbitProducerProperties> dynamicConfigurer() {
    return (name, channel, props, extended) -> {
        props.setRequiredGroups("bindThisQueue");
        extended.setQueueNameGroupOnly(true);
        extended.setAutoBindDlq(true);
        extended.setDeadLetterQueueName("myDLQ");
    };
}
如果需要支持具有多個(gè)活頁(yè)夾類型的動(dòng)態(tài)目標(biāo),請(qǐng)對(duì)通用類型使用Object,并根據(jù)需要強(qiáng)制轉(zhuǎn)換extended參數(shù)。
以上內(nèi)容是否對(duì)您有幫助:
在線筆記
App下載
App下載

掃描二維碼

下載編程獅App

公眾號(hào)
微信公眾號(hào)

編程獅公眾號(hào)