RabbitMQ消费端如何配置性能高--解读SimpleMessageListennerContainer
SimpleMessageListenerContainer 是RabbitMQ的侦听容器。消费端的一些配置都在这里面(spring-rabbit通过xml的配置listener-container就能完成具体配置声明在spring-rabbit-{version}.xsd中),其中影响
public class SimpleMessageListenerContainer extends AbstractMessageListenerContainer
implements ApplicationEventPublisherAware {
//常量值
private static final long DEFAULT_START_CONSUMER_MIN_INTERVAL = 10000;
private static final long DEFAULT_STOP_CONSUMER_MIN_INTERVAL = 60000;
private static final int DEFAULT_CONSECUTIVE_ACTIVE_TRIGGER = 10;
private static final int DEFAULT_CONSECUTIVE_IDLE_TRIGGER = 10;
public static final long DEFAULT_RECEIVE_TIMEOUT = 1000;
public static final int DEFAULT_PREFETCH_COUNT = 1;
public static final long DEFAULT_SHUTDOWN_TIMEOUT = 5000;
public static final long DEFAULT_RECOVERY_INTERVAL = 5000;
private volatile int prefetchCount = DEFAULT_PREFETCH_COUNT;
//启动消费者最小时间间隔,默认10s
private volatile long startConsumerMinInterval = DEFAULT_START_CONSUMER_MIN_INTERVAL;
//关闭空闲的消费者最小时间间隔 默认60s
private volatile long stopConsumerMinInterval = DEFAULT_STOP_CONSUMER_MIN_INTERVAL;
//消费者连续成功消费几条消息触发新增消费者(调用considerAddingAConsumer()),默认10条
private volatile int consecutiveActiveTrigger = DEFAULT_CONSECUTIVE_ACTIVE_TRIGGER;
//消费者连续空闲几次触发关闭当前消费者(调用considerStoppingAConsumer(this.consumer)),默认10
private volatile int consecutiveIdleTrigger = DEFAULT_CONSECUTIVE_IDLE_TRIGGER;
private volatile int txSize = 1;
//AsyncMessageProcessingConsumer的线程池,新增消费者创建相应的线程
private volatile Executor taskExecutor = new SimpleAsyncTaskExecutor();
//并发的消费者数量,默认值1,把我们的消费者变成多个一起运行,增加这个数量可以大大增加消费能力。
private volatile int concurrentConsumers = 1;
//最大并发消费者数量,
private volatile Integer maxConcurrentConsumers;
//排它性 默认false,如果设置为true,只允许容器中的单个消费者使用队列,并要求并发数为1
private volatile boolean exclusive;
//最近一次开启的消费者时间戳
private volatile long lastConsumerStarted;
//最近一次关闭的消费者时间戳
private volatile long lastConsumerStopped;
//消费者从队列获取消息的超时时间,默认1s
private long receiveTimeout = DEFAULT_RECEIVE_TIMEOUT;
//
private volatile long shutdownTimeout = DEFAULT_SHUTDOWN_TIMEOUT;
//消费者出现异常时,从休眠到唤醒需要的时间
private BackOff recoveryBackOff = new FixedBackOff(DEFAULT_RECOVERY_INTERVAL, FixedBackOff.UNLIMITED_ATTEMPTS);
// Map entry value, when false, signals the consumer to terminate
private Map<BlockingQueueConsumer, Boolean> consumers;
private final Object consumersMonitor = new Object();
private PlatformTransactionManager transactionManager;
private TransactionAttribute transactionAttribute = new DefaultTransactionAttribute();
private volatile Advice[] adviceChain = new Advice[0];
private final ActiveObjectCounter<BlockingQueueConsumer> cancellationLock = new ActiveObjectCounter<BlockingQueueConsumer>();
private volatile MessagePropertiesConverter messagePropertiesConverter = new DefaultMessagePropertiesConverter();
private volatile boolean defaultRequeueRejected = true;
private final Map<String, Object> consumerArgs = new HashMap<String, Object>();
private volatile RabbitAdmin rabbitAdmin;
private volatile boolean missingQueuesFatal = true;
private volatile boolean missingQueuesFatalSet;
private volatile boolean autoDeclare = true;
private volatile ConsumerTagStrategy consumerTagStrategy;
private volatile ApplicationEventPublisher applicationEventPublisher;
public interface ContainerDelegate {
void invokeListener(Channel channel, Message message) throws Exception;
}
private final ContainerDelegate delegate = new ContainerDelegate() {
@Override
public void invokeListener(Channel channel, Message message) throws Exception {
SimpleMessageListenerContainer.super.invokeListener(channel, message);
}
};
private ContainerDelegate proxy = delegate;
private Integer declarationRetries;
private Long failedDeclarationRetryInterval;
private Long retryDeclarationInterval;
private ConditionalExceptionLogger exclusiveConsumerExceptionLogger = new DefaultExclusiveConsumerLogger();
public SimpleMessageListenerContainer() {
}
public SimpleMessageListenerContainer(ConnectionFactory connectionFactory) {
this.setConnectionFactory(connectionFactory);
}
public void setAdviceChain(Advice[] adviceChain) {
this.adviceChain = Arrays.copyOf(adviceChain, adviceChain.length);
}
/**设置尝试恢复消费者的时间间隔,默认5s
* Specify the interval between recovery attempts, in <b>milliseconds</b> The default is 5000 ms, that is, 5 seconds
*/
public void setRecoveryInterval(long recoveryInterval) {
this.recoveryBackOff = new FixedBackOff(recoveryInterval, FixedBackOff.UNLIMITED_ATTEMPTS);
}
/**
* Specify the {@link BackOff} for interval between recovery attempts.
* The default is 5000 ms, that is, 5 seconds.
*/
public void setRecoveryBackOff(BackOff recoveryBackOff) {
Assert.notNull(recoveryBackOff, "'recoveryBackOff' must not be null.");
this.recoveryBackOff = recoveryBackOff;
}
/** 设置创建多少个并发的消费者,默认创建1个。
* Specify the number of concurrent consumers to create. Default is 1.
* 推荐增加并发消费者的数量,从而增加消费能力(即多个消费者比一个消费者的处理消息的能力肯定快)。然而,值得注意的是,
* 一旦注册了多个消费者,消息的的顺序性将无法得到保证(原本MQ中消息是有序的,多个消费者是没办法保证消息按顺序处理的)。
* 要是需要保持顺序性,就只能用一个消费者。
* Raising the number of concurrent consumers is recommended in order to scale the consumption of messages coming in
* from a queue. However, note that any ordering guarantees are lost once multiple consumers are registered. In
* general, stick with 1 consumer for low-volume queues. Cannot be more than {@link #maxConcurrentConsumers} (if set).
* @param concurrentConsumers the minimum number of consumers to create.
* @see #setMaxConcurrentConsumers(int)
*/
public void setConcurrentConsumers(final int concurrentConsumers) {
Assert.isTrue(concurrentConsumers > 0, "'concurrentConsumers' value must be at least 1 (one)");
Assert.isTrue(!this.exclusive || concurrentConsumers == 1,
"When the consumer is exclusive, the concurrency must be 1");
if (this.maxConcurrentConsumers != null) {
Assert.isTrue(concurrentConsumers <= this.maxConcurrentConsumers,
"'concurrentConsumers' cannot be more than 'maxConcurrentConsumers'");
}
synchronized(consumersMonitor) {
if (logger.isDebugEnabled()) {
logger.debug("Changing consumers from " + this.concurrentConsumers + " to " + concurrentConsumers);
}
int delta = this.concurrentConsumers - concurrentConsumers;
this.concurrentConsumers = concurrentConsumers;
if (isActive() && this.consumers != null) {
if (delta > 0) {
Iterator<Entry<BlockingQueueConsumer, Boolean>> entryIterator = consumers.entrySet()
.iterator();
while (entryIterator.hasNext() && delta > 0) {
Entry<BlockingQueueConsumer, Boolean> entry = entryIterator.next();
if (entry.getValue()) {
BlockingQueueConsumer consumer = entry.getKey();
consumer.basicCancel();
this.consumers.put(consumer, false);
delta--;
}
}
}
else {
addAndStartConsumers(-delta);
}
}
}
}
/**消费者并发数量上限
* Sets an upper limit to the number of consumers; defaults to 'concurrentConsumers'. Consumers
* will be added on demand. Cannot be less than {@link #concurrentConsumers}.
*/
public void setMaxConcurrentConsumers(int maxConcurrentConsumers) {
Assert.isTrue(maxConcurrentConsumers >= this.concurrentConsumers,
"'maxConcurrentConsumers' value must be at least 'concurrentConsumers'");
Assert.isTrue(!this.exclusive || maxConcurrentConsumers == 1,
"When the consumer is exclusive, the concurrency must be 1");
this.maxConcurrentConsumers = maxConcurrentConsumers;
}
此处略过部分代码
/** 当消费者的最大并发数大于初始并发数量,并且当前并发数量还没有到达最大并发数时,消费者成功连续消费多少条后可以创建新的消费者,默认10条。
* If {@link #maxConcurrentConsumers} is greater then {@link #concurrentConsumers}, and
* {@link #maxConcurrentConsumers} has not been reached, specifies the number of
* consecutive cycles when a single consumer was active, in order to consider
* starting a new consumer. If the consumer goes idle for one cycle, the counter is reset.
* This is impacted by the {@link #txSize}.
* Default is 10 consecutive messages.
* @param consecutiveActiveTrigger The number of consecutive receives to trigger a new consumer.
*/
public final void setConsecutiveActiveTrigger(int consecutiveActiveTrigger) {
Assert.isTrue(consecutiveActiveTrigger > 0, "'consecutiveActiveTrigger' must be > 0");
this.consecutiveActiveTrigger = consecutiveActiveTrigger;
}
/**关闭消费者,当消费者的最大并发数大于初始并发数量,并且当前的并发消费者数量大于初始的设定的并发数量时,消费者没有获取到消息处理结果的次数到达consecutiveIdleTrigger时,
* 会去关闭当前消费者。
* If {@link #maxConcurrentConsumers} is greater then {@link #concurrentConsumers}, and
* the number of consumers exceeds {@link #concurrentConsumers}, specifies the
* number of consecutive receive attempts that return no data; after which we consider
* stopping a consumer. The idle time is effectively
* {@link #receiveTimeout} * {@link #txSize} * this value because the consumer thread waits for
* a message for up to {@link #receiveTimeout} up to {@link #txSize} times.
* Default is 10 consecutive idles.
* @param consecutiveIdleTrigger The number of consecutive timeouts to trigger stopping a consumer.
*/
public final void setConsecutiveIdleTrigger(int consecutiveIdleTrigger) {
Assert.isTrue(consecutiveIdleTrigger > 0, "'consecutiveIdleTrigger' must be > 0");
this.consecutiveIdleTrigger = consecutiveIdleTrigger;
}
略过部分代码
/** 设置单次请求中发给每个消费者的消息数量,通常这个值设置的高一些能提高吞吐量。这个值应该大于或等于一次事物处理的消息数
* Tells the broker how many messages to send to each consumer in a single request. Often this can be set quite high
* to improve throughput. It should be greater than or equal to {@link #setTxSize(int) the transaction size}.
* @param prefetchCount the prefetch count
*/
public void setPrefetchCount(int prefetchCount) {
this.prefetchCount = prefetchCount;
}
/**设置容器在一次事物中处理多少条消息(通道需为事物行的)。为了得到最优结果,这个值应该小于等于prefetch count。。。。
* Tells the container how many messages to process in a single transaction (if the channel is transactional). For
* best results it should be less than or equal to {@link #setPrefetchCount(int) the prefetch count}. Also affects
* how often acks are sent when using {@link AcknowledgeMode#AUTO} - one ack per txSize. Default is 1.
* @param txSize the transaction size
*/
public void setTxSize(int txSize) {
Assert.isTrue(txSize > 0, "'txSize' must be > 0");
this.txSize = txSize;
} //略过部分代码
/** 创建设定的并发数量的消费者,
* Creates the specified number of concurrent consumers, in the form of a Rabbit Channel plus associated
* MessageConsumer.
* @throws Exception Any Exception.
*/
@Override
protected void doInitialize() throws Exception {
checkMissingQueuesFatal();
if (!this.isExposeListenerChannel() && this.transactionManager != null) {
logger.warn("exposeListenerChannel=false is ignored when using a TransactionManager");
}
initializeProxy();
if (this.transactionManager != null) {
if (!isChannelTransacted()) {
logger.debug("The 'channelTransacted' is coerced to 'true', when 'transactionManager' is provided");
setChannelTransacted(true);
}
}
}
@ManagedMetric(metricType = MetricType.GAUGE)
public int getActiveConsumerCount() {
return cancellationLock.getCount();
}
/**
* Re-initializes this container's Rabbit message consumers, if not initialized already. Then submits each consumer
* to this container's task executor.
* @throws Exception Any Exception.
*/
@Override
protected void doStart() throws Exception {
if (getMessageListener() instanceof ListenerContainerAware) {
Collection<String> expectedQueueNames = ((ListenerContainerAware) getMessageListener()).expectedQueueNames();
if (expectedQueueNames != null) {
String[] queueNames = getQueueNames();
Assert.state(expectedQueueNames.size() == queueNames.length,
"Listener expects us to be listening on '" + expectedQueueNames + "'; our queues: "
+ Arrays.asList(queueNames));
boolean found = false;
for (String queueName : queueNames) {
if (expectedQueueNames.contains(queueName)) {
found = true;
}
else {
found = false;
break;
}
}
Assert.state(found, "Listener expects us to be listening on '" + expectedQueueNames + "'; our queues: "
+ Arrays.asList(queueNames));
}
}
super.doStart();
if (this.rabbitAdmin == null && this.getApplicationContext() != null) {
Map<String, RabbitAdmin> admins = this.getApplicationContext().getBeansOfType(RabbitAdmin.class);
if (!admins.isEmpty()) {
this.rabbitAdmin = admins.values().iterator().next();
}
}
if (this.rabbitAdmin == null && this.autoDeclare) {
RabbitAdmin rabbitAdmin = new RabbitAdmin(this.getConnectionFactory());
rabbitAdmin.setApplicationContext(this.getApplicationContext());
this.rabbitAdmin = rabbitAdmin;
}
synchronized (this.consumersMonitor) {
int newConsumers = initializeConsumers();//初始化消费者
if (this.consumers == null) {
if (logger.isInfoEnabled()) {
logger.info("Consumers were initialized and then cleared " +
"(presumably the container was stopped concurrently)");
}
return;
}
if (newConsumers <= 0) {
if (logger.isInfoEnabled()) {
logger.info("Consumers are already running");
}
return;
}
Set<AsyncMessageProcessingConsumer> processors = new HashSet<AsyncMessageProcessingConsumer>();
for (BlockingQueueConsumer consumer : this.consumers.keySet()) {
AsyncMessageProcessingConsumer processor = new AsyncMessageProcessingConsumer(consumer);
processors.add(processor);
this.taskExecutor.execute(processor);
}
for (AsyncMessageProcessingConsumer processor : processors) {
FatalListenerStartupException startupException = processor.getStartupException();
if (startupException != null) {
throw new AmqpIllegalStateException("Fatal exception on listener startup", startupException);
}
}
}
}
@Override
protected void doStop() {
shutdown();
super.doStop();
}
//关闭全部消费者
@Override
protected void doShutdown() {
if (!this.isRunning()) {
return;
}
try {
synchronized (consumersMonitor) {
if (this.consumers != null) {
for (BlockingQueueConsumer consumer : this.consumers.keySet()) {
consumer.basicCancel();
}
}
}
logger.info("Waiting for workers to finish.");
boolean finished = cancellationLock.await(shutdownTimeout, TimeUnit.MILLISECONDS);
if (finished) {
logger.info("Successfully waited for workers to finish.");
}
else {
logger.info("Workers not finished.");
}
}
catch (InterruptedException e) {
Thread.currentThread().interrupt();
logger.warn("Interrupted waiting for workers. Continuing with shutdown.");
}
synchronized (this.consumersMonitor) {
this.consumers = null;
}
}
//判断消费者是否存活
private boolean isActive(BlockingQueueConsumer consumer) {
Boolean consumerActive;
synchronized(consumersMonitor) {
if (this.consumers != null) {
Boolean active = this.consumers.get(consumer);
consumerActive = active != null && active;
}
else {
consumerActive = false;
}
}
return consumerActive && this.isActive();
}
//初始化消费者 初始数量为设定的并发消费者数量
protected int initializeConsumers() {
int count = 0;
synchronized (this.consumersMonitor) {
if (this.consumers == null) {
cancellationLock.reset();
this.consumers = new HashMap<BlockingQueueConsumer, Boolean>(this.concurrentConsumers);
for (int i = 0; i < this.concurrentConsumers; i++) {
BlockingQueueConsumer consumer = createBlockingQueueConsumer();
this.consumers.put(consumer, true);
count++;
}
}
}
return count;
}
private void checkMissingQueuesFatal() {
if (!this.missingQueuesFatalSet) {
try {
ApplicationContext applicationContext = getApplicationContext();
if (applicationContext != null) {
Properties properties = applicationContext.getBean("spring.amqp.global.properties", Properties.class);
String missingQueuesFatal = properties.getProperty("smlc.missing.queues.fatal");
if (StringUtils.hasText(missingQueuesFatal)) {
this.missingQueuesFatal = Boolean.parseBoolean(missingQueuesFatal);
}
}
}
catch (BeansException be) {
if (logger.isDebugEnabled()) {
logger.debug("No global properties bean");
}
}
}
}
protected void addAndStartConsumers(int delta) {
synchronized (this.consumersMonitor) {
if (this.consumers != null) {
for (int i = 0; i < delta; i++) {
BlockingQueueConsumer consumer = createBlockingQueueConsumer();
this.consumers.put(consumer, true);
AsyncMessageProcessingConsumer processor = new AsyncMessageProcessingConsumer(consumer);
if (logger.isDebugEnabled()) {
logger.debug("Starting a new consumer: " + consumer);
}
this.taskExecutor.execute(processor);
try {
FatalListenerStartupException startupException = processor.getStartupException();
if (startupException != null) {
this.consumers.remove(consumer);
throw new AmqpIllegalStateException("Fatal exception on listener startup", startupException);
}
}
catch (InterruptedException ie) {
Thread.currentThread().interrupt();
}
catch (Exception e) {
consumer.stop();
logger.error("Error starting new consumer", e);
this.cancellationLock.release(consumer);
this.consumers.remove(consumer);
}
}
}
}
}
private void considerAddingAConsumer() {
synchronized(consumersMonitor) {
if (this.consumers != null
&& this.maxConcurrentConsumers != null && this.consumers.size() < this.maxConcurrentConsumers) {
long now = System.currentTimeMillis();
if (this.lastConsumerStarted + startConsumerMinInterval < now) {
this.addAndStartConsumers(1);
this.lastConsumerStarted = now;
}
}
}
}
private void considerStoppingAConsumer(BlockingQueueConsumer consumer) {
synchronized (consumersMonitor) {
if (this.consumers != null && this.consumers.size() > concurrentConsumers) {
long now = System.currentTimeMillis();
if (this.lastConsumerStopped + this.stopConsumerMinInterval < now) {
consumer.basicCancel();
this.consumers.put(consumer, false);
if (logger.isDebugEnabled()) {
logger.debug("Idle consumer terminating: " + consumer);
}
this.lastConsumerStopped = now;
}
}
}
}
private void queuesChanged() {
synchronized (consumersMonitor) {
if (this.consumers != null) {
int count = 0;
for (Entry<BlockingQueueConsumer, Boolean> consumer : this.consumers.entrySet()) {
if (consumer.getValue()) {
if (logger.isDebugEnabled()) {
logger.debug("Queues changed; stopping consumer: " + consumer.getKey());
}
consumer.getKey().basicCancel();
consumer.setValue(false);
count++;
}
}
this.addAndStartConsumers(count);
}
}
}
@Override
protected boolean isChannelLocallyTransacted(Channel channel) {
return super.isChannelLocallyTransacted(channel) && this.transactionManager == null;
}
protected BlockingQueueConsumer createBlockingQueueConsumer() {
BlockingQueueConsumer consumer;
String[] queues = getRequiredQueueNames();
// There's no point prefetching less than the tx size, otherwise the consumer will stall because the broker
// didn't get an ack for delivered messages
int actualPrefetchCount = prefetchCount > txSize ? prefetchCount : txSize;
consumer = new BlockingQueueConsumer(getConnectionFactory(), this.messagePropertiesConverter, cancellationLock,
getAcknowledgeMode(), isChannelTransacted(), actualPrefetchCount, this.defaultRequeueRejected,
this.consumerArgs, this.exclusive, queues);
if (this.declarationRetries != null) {
consumer.setDeclarationRetries(this.declarationRetries);
}
if (this.failedDeclarationRetryInterval != null) {
consumer.setFailedDeclarationRetryInterval(this.failedDeclarationRetryInterval);
}
if (this.retryDeclarationInterval != null) {
consumer.setRetryDeclarationInterval(this.retryDeclarationInterval);
}
if (this.consumerTagStrategy != null) {
consumer.setTagStrategy(this.consumerTagStrategy);
}
consumer.setBackOffExecution(this.recoveryBackOff.start());
return consumer;
}
private void restart(BlockingQueueConsumer consumer) {
synchronized (this.consumersMonitor) {
if (this.consumers != null) {
try {
// Need to recycle the channel in this consumer
consumer.stop();
// Ensure consumer counts are correct (another is going
// to start because of the exception, but
// we haven't counted down yet)
this.cancellationLock.release(consumer);
this.consumers.remove(consumer);
BlockingQueueConsumer newConsumer = createBlockingQueueConsumer();
newConsumer.setBackOffExecution(consumer.getBackOffExecution());
consumer = newConsumer;
this.consumers.put(consumer, true);
}
catch (RuntimeException e) {
logger.warn("Consumer failed irretrievably on restart. " + e.getClass() + ": " + e.getMessage());
// Re-throw and have it logged properly by the caller.
throw e;
}
this.taskExecutor.execute(new AsyncMessageProcessingConsumer(consumer));
}
}
}
private class AsyncMessageProcessingConsumer implements Runnable {
private final BlockingQueueConsumer consumer;
private final CountDownLatch start;
private volatile FatalListenerStartupException startupException;
private AsyncMessageProcessingConsumer(BlockingQueueConsumer consumer) {
this.consumer = consumer;
this.start = new CountDownLatch(1);
}
@Override
public void run() {
boolean aborted = false;
int consecutiveIdles = 0;
int consecutiveMessages = 0;
try {
try {
if (SimpleMessageListenerContainer.this.autoDeclare) {
SimpleMessageListenerContainer.this.redeclareElementsIfNecessary();
}
this.consumer.start();
this.start.countDown();
}
catch (QueuesNotAvailableException e) {
if (SimpleMessageListenerContainer.this.missingQueuesFatal) {
throw e;
}
else {
this.start.countDown();
handleStartupFailure(this.consumer.getBackOffExecution());
throw e;
}
}
catch (FatalListenerStartupException ex) {
throw ex;
}
catch (Throwable t) {//NOSONAR
this.start.countDown();
handleStartupFailure(this.consumer.getBackOffExecution());
throw t;
}
if (SimpleMessageListenerContainer.this.transactionManager != null) {
/*
* Register the consumer's channel so it will be used by the transaction manager
* if it's an instance of RabbitTransactionManager.
*/
ConsumerChannelRegistry.registerConsumerChannel(consumer.getChannel(), getConnectionFactory());
}
while (isActive(this.consumer) || this.consumer.hasDelivery()) {
try {
boolean receivedOk = receiveAndExecute(this.consumer); // At least one message received
if (SimpleMessageListenerContainer.this.maxConcurrentConsumers != null) {
if (receivedOk) {
if (isActive(this.consumer)) {
consecutiveIdles = 0;
if (consecutiveMessages++ > SimpleMessageListenerContainer.this.consecutiveActiveTrigger) {
considerAddingAConsumer();
consecutiveMessages = 0;
}
}
}
else {
consecutiveMessages = 0;
if (consecutiveIdles++ > SimpleMessageListenerContainer.this.consecutiveIdleTrigger) {
considerStoppingAConsumer(this.consumer);
consecutiveIdles = 0;
}
}
}
}
catch (ListenerExecutionFailedException ex) {
// Continue to process, otherwise re-throw
if (ex.getCause() instanceof NoSuchMethodException) {
throw new FatalListenerExecutionException("Invalid listener", ex);
}
}
catch (AmqpRejectAndDontRequeueException rejectEx) {
/*
* These will normally be wrapped by an LEFE if thrown by the
* listener, but we will also honor it if thrown by an
* error handler.
*/
}
}
}
catch (InterruptedException e) {
logger.debug("Consumer thread interrupted, processing stopped.");
Thread.currentThread().interrupt();
aborted = true;
publishEvent("Consumer thread interrupted, processing stopped", true, e);
}
catch (QueuesNotAvailableException ex) {
if (SimpleMessageListenerContainer.this.missingQueuesFatal) {
logger.error("Consumer received fatal exception on startup", ex);
this.startupException = ex;
// Fatal, but no point re-throwing, so just abort.
aborted = true;
}
publishEvent("Consumer queue(s) not available", aborted, ex);
}
catch (FatalListenerStartupException ex) {
logger.error("Consumer received fatal exception on startup", ex);
this.startupException = ex;
// Fatal, but no point re-throwing, so just abort.
aborted = true;
publishEvent("Consumer received fatal exception on startup", true, ex);
}
catch (FatalListenerExecutionException ex) {
logger.error("Consumer received fatal exception during processing", ex);
// Fatal, but no point re-throwing, so just abort.
aborted = true;
publishEvent("Consumer received fatal exception during processing", true, ex);
}
catch (ShutdownSignalException e) {
if (RabbitUtils.isNormalShutdown(e)) {
if (logger.isDebugEnabled()) {
logger.debug("Consumer received Shutdown Signal, processing stopped: " + e.getMessage());
}
}
else {
this.logConsumerException(e);
}
}
catch (AmqpIOException e) {
if (e.getCause() instanceof IOException && e.getCause().getCause() instanceof ShutdownSignalException
&& e.getCause().getCause().getMessage().contains("in exclusive use")) {
exclusiveConsumerExceptionLogger.log(logger, "Exclusive consumer failure", e.getCause().getCause());
publishEvent("Consumer raised exception, attempting restart", false, e);
}
else {
this.logConsumerException(e);
}
}
catch (Error e) {//NOSONAR
// ok to catch Error - we're aborting so will stop
logger.error("Consumer thread error, thread abort.", e);
aborted = true;
}
catch (Throwable t) {//NOSONAR
// by now, it must be an exception
if (isActive()) {
this.logConsumerException(t);
}
}
finally {
if (SimpleMessageListenerContainer.this.transactionManager != null) {
ConsumerChannelRegistry.unRegisterConsumerChannel();
}
}
// In all cases count down to allow container to progress beyond startup
start.countDown();
if (!isActive(consumer) || aborted) {
logger.debug("Cancelling " + this.consumer);
try {
this.consumer.stop();
SimpleMessageListenerContainer.this.cancellationLock.release(this.consumer);
synchronized (consumersMonitor) {
if (SimpleMessageListenerContainer.this.consumers != null) {
SimpleMessageListenerContainer.this.consumers.remove(this.consumer);
}
}
}
catch (AmqpException e) {
logger.info("Could not cancel message consumer", e);
}
if (aborted) {
logger.error("Stopping container from aborted consumer");
stop();
}
}
else {
logger.info("Restarting " + this.consumer);
restart(this.consumer);
}
}
private void logConsumerException(Throwable t) {
if (logger.isDebugEnabled()
|| !(t instanceof AmqpConnectException || t instanceof ConsumerCancelledException)) {
logger.warn(
"Consumer raised exception, processing can restart if the connection factory supports it",
t);
}
else {
logger.warn("Consumer raised exception, processing can restart if the connection factory supports it. "
+ "Exception summary: " + t);
}
publishEvent("Consumer raised exception, attempting restart", false, t);
}
private void publishEvent(String reason, boolean fatal, Throwable t) {
if (applicationEventPublisher != null) {
applicationEventPublisher.publishEvent(new ListenerContainerConsumerFailedEvent(
SimpleMessageListenerContainer.this, reason, t, fatal));
}
}
}
@Override
protected void invokeListener(Channel channel, Message message) throws Exception {
proxy.invokeListener(channel, message);
}
} 
