W3Cschool
恭喜您成為首批注冊用戶
獲得88經(jīng)驗值獎勵
PubSubInboundChannelAdapter
是GCP發(fā)布/訂閱的入站通道適配器,它偵聽GCP發(fā)布/訂閱的新消息。它將新消息轉(zhuǎn)換為內(nèi)部Spring Message
,然后將其發(fā)送到綁定的輸出通道。
Google Pub / Sub將消息有效負載視為字節(jié)數(shù)組。因此,默認情況下,入站通道適配器將使用byte[]
作為有效載荷來構(gòu)造Spring Message
。但是,可以通過設置PubSubInboundChannelAdapter
的payloadType
屬性來更改所需的有效負載類型。
PubSubInboundChannelAdapter
將對所需有效負載類型的轉(zhuǎn)換委派給在PubSubTemplate
中配置的PubSubMessageConverter
。
要使用入站通道適配器,必須在用戶應用程序端提供PubSubInboundChannelAdapter
并對其進行配置。
@Bean public MessageChannel pubsubInputChannel() { return new PublishSubscribeChannel(); } @Bean public PubSubInboundChannelAdapter messageChannelAdapter( @Qualifier("pubsubInputChannel") MessageChannel inputChannel, SubscriberFactory subscriberFactory) { PubSubInboundChannelAdapter adapter = new PubSubInboundChannelAdapter(subscriberFactory, "subscriptionName"); adapter.setOutputChannel(inputChannel); adapter.setAckMode(AckMode.MANUAL); return adapter; }
在示例中,我們首先指定適配器將向其寫入傳入消息的MessageChannel
。MessageChannel
的實現(xiàn)在這里并不重要。根據(jù)您的用例,您可能需要使用MessageChannel
而非PublishSubscribeChannel
。
然后,我們聲明PubSubInboundChannelAdapter
bean。它需要我們剛創(chuàng)建的通道和一個SubscriberFactory
,該SubscriberFactory
從Google Cloud Java Client for Pub / Sub創(chuàng)建Subscriber
對象。
GCP Pub / Sub的Spring Boot入門程序提供了已配置的SubscriberFactory
。
PubSubInboundChannelAdapter
支持三種確認模式,其中AckMode.AUTO
是默認值。
自動確認(AckMode.AUTO
)
如果適配器將消息發(fā)送到通道,并且未引發(fā)任何異常,則消息將被GCP發(fā)布/訂閱確認。如果在處理郵件時拋出RuntimeException
,則該郵件將被否定。
自動確認確認(AckMode.AUTO_ACK
)
如果適配器將消息發(fā)送到通道,并且未引發(fā)任何異常,則消息將被GCP發(fā)布/訂閱確認。如果在處理消息時拋出RuntimeException
,則消息既不會被確認也不會被拒絕。
當使用訂閱的確認截止時間超時作為重試傳遞回退機制時,此功能很有用。
手動確認(AckMode.MANUAL
)
適配器將BasicAcknowledgeablePubsubMessage
對象附加到Message
標頭。用戶可以使用GcpPubSubHeaders.ORIGINAL_MESSAGE
鍵提取BasicAcknowledgeablePubsubMessage
,并將其用于(n)確認消息。
@Bean @ServiceActivator(inputChannel = "pubsubInputChannel") public MessageHandler messageReceiver() { return message -> { LOGGER.info("Message arrived! Payload: " + new String((byte[]) message.getPayload())); BasicAcknowledgeablePubsubMessage originalMessage = message.getHeaders().get(GcpPubSubHeaders.ORIGINAL_MESSAGE, BasicAcknowledgeablePubsubMessage.class); originalMessage.ack(); }; }
Copyright©2021 w3cschool編程獅|閩ICP備15016281號-3|閩公網(wǎng)安備35020302033924號
違法和不良信息舉報電話:173-0602-2364|舉報郵箱:jubao@eeedong.com
掃描二維碼
下載編程獅App
編程獅公眾號
聯(lián)系方式:
更多建議: