RabbitMQ消费端如何配置性能高--解读SimpleMessageListennerContainer
SimpleMessageListenerContainer 是RabbitMQ的侦听容器。消费端的一些配置都在这里面(spring-rabbit通过xml的配置listener-container就能完成具体配置声明在spring-rabbit-{version}.xsd中),其中影响
public class SimpleMessageListenerContainer extends AbstractMessageListenerContainer implements ApplicationEventPublisherAware { //常量值 private static final long DEFAULT_START_CONSUMER_MIN_INTERVAL = 10000; private static final long DEFAULT_STOP_CONSUMER_MIN_INTERVAL = 60000; private static final int DEFAULT_CONSECUTIVE_ACTIVE_TRIGGER = 10; private static final int DEFAULT_CONSECUTIVE_IDLE_TRIGGER = 10; public static final long DEFAULT_RECEIVE_TIMEOUT = 1000; public static final int DEFAULT_PREFETCH_COUNT = 1; public static final long DEFAULT_SHUTDOWN_TIMEOUT = 5000; public static final long DEFAULT_RECOVERY_INTERVAL = 5000; private volatile int prefetchCount = DEFAULT_PREFETCH_COUNT; //启动消费者最小时间间隔,默认10s private volatile long startConsumerMinInterval = DEFAULT_START_CONSUMER_MIN_INTERVAL; //关闭空闲的消费者最小时间间隔 默认60s private volatile long stopConsumerMinInterval = DEFAULT_STOP_CONSUMER_MIN_INTERVAL; //消费者连续成功消费几条消息触发新增消费者(调用considerAddingAConsumer()),默认10条 private volatile int consecutiveActiveTrigger = DEFAULT_CONSECUTIVE_ACTIVE_TRIGGER; //消费者连续空闲几次触发关闭当前消费者(调用considerStoppingAConsumer(this.consumer)),默认10 private volatile int consecutiveIdleTrigger = DEFAULT_CONSECUTIVE_IDLE_TRIGGER; private volatile int txSize = 1; //AsyncMessageProcessingConsumer的线程池,新增消费者创建相应的线程 private volatile Executor taskExecutor = new SimpleAsyncTaskExecutor(); //并发的消费者数量,默认值1,把我们的消费者变成多个一起运行,增加这个数量可以大大增加消费能力。 private volatile int concurrentConsumers = 1; //最大并发消费者数量, private volatile Integer maxConcurrentConsumers; //排它性 默认false,如果设置为true,只允许容器中的单个消费者使用队列,并要求并发数为1 private volatile boolean exclusive; //最近一次开启的消费者时间戳 private volatile long lastConsumerStarted; //最近一次关闭的消费者时间戳 private volatile long lastConsumerStopped; //消费者从队列获取消息的超时时间,默认1s private long receiveTimeout = DEFAULT_RECEIVE_TIMEOUT; // private volatile long shutdownTimeout = DEFAULT_SHUTDOWN_TIMEOUT; //消费者出现异常时,从休眠到唤醒需要的时间 private BackOff recoveryBackOff = new FixedBackOff(DEFAULT_RECOVERY_INTERVAL, FixedBackOff.UNLIMITED_ATTEMPTS); // Map entry value, when false, signals the consumer to terminate private Map<BlockingQueueConsumer, Boolean> consumers; private final Object consumersMonitor = new Object(); private PlatformTransactionManager transactionManager; private TransactionAttribute transactionAttribute = new DefaultTransactionAttribute(); private volatile Advice[] adviceChain = new Advice[0]; private final ActiveObjectCounter<BlockingQueueConsumer> cancellationLock = new ActiveObjectCounter<BlockingQueueConsumer>(); private volatile MessagePropertiesConverter messagePropertiesConverter = new DefaultMessagePropertiesConverter(); private volatile boolean defaultRequeueRejected = true; private final Map<String, Object> consumerArgs = new HashMap<String, Object>(); private volatile RabbitAdmin rabbitAdmin; private volatile boolean missingQueuesFatal = true; private volatile boolean missingQueuesFatalSet; private volatile boolean autoDeclare = true; private volatile ConsumerTagStrategy consumerTagStrategy; private volatile ApplicationEventPublisher applicationEventPublisher; public interface ContainerDelegate { void invokeListener(Channel channel, Message message) throws Exception; } private final ContainerDelegate delegate = new ContainerDelegate() { @Override public void invokeListener(Channel channel, Message message) throws Exception { SimpleMessageListenerContainer.super.invokeListener(channel, message); } }; private ContainerDelegate proxy = delegate; private Integer declarationRetries; private Long failedDeclarationRetryInterval; private Long retryDeclarationInterval; private ConditionalExceptionLogger exclusiveConsumerExceptionLogger = new DefaultExclusiveConsumerLogger(); public SimpleMessageListenerContainer() { } public SimpleMessageListenerContainer(ConnectionFactory connectionFactory) { this.setConnectionFactory(connectionFactory); } public void setAdviceChain(Advice[] adviceChain) { this.adviceChain = Arrays.copyOf(adviceChain, adviceChain.length); } /**设置尝试恢复消费者的时间间隔,默认5s * Specify the interval between recovery attempts, in <b>milliseconds</b> The default is 5000 ms, that is, 5 seconds */ public void setRecoveryInterval(long recoveryInterval) { this.recoveryBackOff = new FixedBackOff(recoveryInterval, FixedBackOff.UNLIMITED_ATTEMPTS); } /** * Specify the {@link BackOff} for interval between recovery attempts. * The default is 5000 ms, that is, 5 seconds. */ public void setRecoveryBackOff(BackOff recoveryBackOff) { Assert.notNull(recoveryBackOff, "'recoveryBackOff' must not be null."); this.recoveryBackOff = recoveryBackOff; } /** 设置创建多少个并发的消费者,默认创建1个。 * Specify the number of concurrent consumers to create. Default is 1. * 推荐增加并发消费者的数量,从而增加消费能力(即多个消费者比一个消费者的处理消息的能力肯定快)。然而,值得注意的是, * 一旦注册了多个消费者,消息的的顺序性将无法得到保证(原本MQ中消息是有序的,多个消费者是没办法保证消息按顺序处理的)。 * 要是需要保持顺序性,就只能用一个消费者。 * Raising the number of concurrent consumers is recommended in order to scale the consumption of messages coming in * from a queue. However, note that any ordering guarantees are lost once multiple consumers are registered. In * general, stick with 1 consumer for low-volume queues. Cannot be more than {@link #maxConcurrentConsumers} (if set). * @param concurrentConsumers the minimum number of consumers to create. * @see #setMaxConcurrentConsumers(int) */ public void setConcurrentConsumers(final int concurrentConsumers) { Assert.isTrue(concurrentConsumers > 0, "'concurrentConsumers' value must be at least 1 (one)"); Assert.isTrue(!this.exclusive || concurrentConsumers == 1, "When the consumer is exclusive, the concurrency must be 1"); if (this.maxConcurrentConsumers != null) { Assert.isTrue(concurrentConsumers <= this.maxConcurrentConsumers, "'concurrentConsumers' cannot be more than 'maxConcurrentConsumers'"); } synchronized(consumersMonitor) { if (logger.isDebugEnabled()) { logger.debug("Changing consumers from " + this.concurrentConsumers + " to " + concurrentConsumers); } int delta = this.concurrentConsumers - concurrentConsumers; this.concurrentConsumers = concurrentConsumers; if (isActive() && this.consumers != null) { if (delta > 0) { Iterator<Entry<BlockingQueueConsumer, Boolean>> entryIterator = consumers.entrySet() .iterator(); while (entryIterator.hasNext() && delta > 0) { Entry<BlockingQueueConsumer, Boolean> entry = entryIterator.next(); if (entry.getValue()) { BlockingQueueConsumer consumer = entry.getKey(); consumer.basicCancel(); this.consumers.put(consumer, false); delta--; } } } else { addAndStartConsumers(-delta); } } } } /**消费者并发数量上限 * Sets an upper limit to the number of consumers; defaults to 'concurrentConsumers'. Consumers * will be added on demand. Cannot be less than {@link #concurrentConsumers}. */ public void setMaxConcurrentConsumers(int maxConcurrentConsumers) { Assert.isTrue(maxConcurrentConsumers >= this.concurrentConsumers, "'maxConcurrentConsumers' value must be at least 'concurrentConsumers'"); Assert.isTrue(!this.exclusive || maxConcurrentConsumers == 1, "When the consumer is exclusive, the concurrency must be 1"); this.maxConcurrentConsumers = maxConcurrentConsumers; } 此处略过部分代码 /** 当消费者的最大并发数大于初始并发数量,并且当前并发数量还没有到达最大并发数时,消费者成功连续消费多少条后可以创建新的消费者,默认10条。 * If {@link #maxConcurrentConsumers} is greater then {@link #concurrentConsumers}, and * {@link #maxConcurrentConsumers} has not been reached, specifies the number of * consecutive cycles when a single consumer was active, in order to consider * starting a new consumer. If the consumer goes idle for one cycle, the counter is reset. * This is impacted by the {@link #txSize}. * Default is 10 consecutive messages. * @param consecutiveActiveTrigger The number of consecutive receives to trigger a new consumer. */ public final void setConsecutiveActiveTrigger(int consecutiveActiveTrigger) { Assert.isTrue(consecutiveActiveTrigger > 0, "'consecutiveActiveTrigger' must be > 0"); this.consecutiveActiveTrigger = consecutiveActiveTrigger; } /**关闭消费者,当消费者的最大并发数大于初始并发数量,并且当前的并发消费者数量大于初始的设定的并发数量时,消费者没有获取到消息处理结果的次数到达consecutiveIdleTrigger时, * 会去关闭当前消费者。 * If {@link #maxConcurrentConsumers} is greater then {@link #concurrentConsumers}, and * the number of consumers exceeds {@link #concurrentConsumers}, specifies the * number of consecutive receive attempts that return no data; after which we consider * stopping a consumer. The idle time is effectively * {@link #receiveTimeout} * {@link #txSize} * this value because the consumer thread waits for * a message for up to {@link #receiveTimeout} up to {@link #txSize} times. * Default is 10 consecutive idles. * @param consecutiveIdleTrigger The number of consecutive timeouts to trigger stopping a consumer. */ public final void setConsecutiveIdleTrigger(int consecutiveIdleTrigger) { Assert.isTrue(consecutiveIdleTrigger > 0, "'consecutiveIdleTrigger' must be > 0"); this.consecutiveIdleTrigger = consecutiveIdleTrigger; } 略过部分代码 /** 设置单次请求中发给每个消费者的消息数量,通常这个值设置的高一些能提高吞吐量。这个值应该大于或等于一次事物处理的消息数 * Tells the broker how many messages to send to each consumer in a single request. Often this can be set quite high * to improve throughput. It should be greater than or equal to {@link #setTxSize(int) the transaction size}. * @param prefetchCount the prefetch count */ public void setPrefetchCount(int prefetchCount) { this.prefetchCount = prefetchCount; } /**设置容器在一次事物中处理多少条消息(通道需为事物行的)。为了得到最优结果,这个值应该小于等于prefetch count。。。。 * Tells the container how many messages to process in a single transaction (if the channel is transactional). For * best results it should be less than or equal to {@link #setPrefetchCount(int) the prefetch count}. Also affects * how often acks are sent when using {@link AcknowledgeMode#AUTO} - one ack per txSize. Default is 1. * @param txSize the transaction size */ public void setTxSize(int txSize) { Assert.isTrue(txSize > 0, "'txSize' must be > 0"); this.txSize = txSize; } //略过部分代码 /** 创建设定的并发数量的消费者, * Creates the specified number of concurrent consumers, in the form of a Rabbit Channel plus associated * MessageConsumer. * @throws Exception Any Exception. */ @Override protected void doInitialize() throws Exception { checkMissingQueuesFatal(); if (!this.isExposeListenerChannel() && this.transactionManager != null) { logger.warn("exposeListenerChannel=false is ignored when using a TransactionManager"); } initializeProxy(); if (this.transactionManager != null) { if (!isChannelTransacted()) { logger.debug("The 'channelTransacted' is coerced to 'true', when 'transactionManager' is provided"); setChannelTransacted(true); } } } @ManagedMetric(metricType = MetricType.GAUGE) public int getActiveConsumerCount() { return cancellationLock.getCount(); } /** * Re-initializes this container's Rabbit message consumers, if not initialized already. Then submits each consumer * to this container's task executor. * @throws Exception Any Exception. */ @Override protected void doStart() throws Exception { if (getMessageListener() instanceof ListenerContainerAware) { Collection<String> expectedQueueNames = ((ListenerContainerAware) getMessageListener()).expectedQueueNames(); if (expectedQueueNames != null) { String[] queueNames = getQueueNames(); Assert.state(expectedQueueNames.size() == queueNames.length, "Listener expects us to be listening on '" + expectedQueueNames + "'; our queues: " + Arrays.asList(queueNames)); boolean found = false; for (String queueName : queueNames) { if (expectedQueueNames.contains(queueName)) { found = true; } else { found = false; break; } } Assert.state(found, "Listener expects us to be listening on '" + expectedQueueNames + "'; our queues: " + Arrays.asList(queueNames)); } } super.doStart(); if (this.rabbitAdmin == null && this.getApplicationContext() != null) { Map<String, RabbitAdmin> admins = this.getApplicationContext().getBeansOfType(RabbitAdmin.class); if (!admins.isEmpty()) { this.rabbitAdmin = admins.values().iterator().next(); } } if (this.rabbitAdmin == null && this.autoDeclare) { RabbitAdmin rabbitAdmin = new RabbitAdmin(this.getConnectionFactory()); rabbitAdmin.setApplicationContext(this.getApplicationContext()); this.rabbitAdmin = rabbitAdmin; } synchronized (this.consumersMonitor) { int newConsumers = initializeConsumers();//初始化消费者 if (this.consumers == null) { if (logger.isInfoEnabled()) { logger.info("Consumers were initialized and then cleared " + "(presumably the container was stopped concurrently)"); } return; } if (newConsumers <= 0) { if (logger.isInfoEnabled()) { logger.info("Consumers are already running"); } return; } Set<AsyncMessageProcessingConsumer> processors = new HashSet<AsyncMessageProcessingConsumer>(); for (BlockingQueueConsumer consumer : this.consumers.keySet()) { AsyncMessageProcessingConsumer processor = new AsyncMessageProcessingConsumer(consumer); processors.add(processor); this.taskExecutor.execute(processor); } for (AsyncMessageProcessingConsumer processor : processors) { FatalListenerStartupException startupException = processor.getStartupException(); if (startupException != null) { throw new AmqpIllegalStateException("Fatal exception on listener startup", startupException); } } } } @Override protected void doStop() { shutdown(); super.doStop(); } //关闭全部消费者 @Override protected void doShutdown() { if (!this.isRunning()) { return; } try { synchronized (consumersMonitor) { if (this.consumers != null) { for (BlockingQueueConsumer consumer : this.consumers.keySet()) { consumer.basicCancel(); } } } logger.info("Waiting for workers to finish."); boolean finished = cancellationLock.await(shutdownTimeout, TimeUnit.MILLISECONDS); if (finished) { logger.info("Successfully waited for workers to finish."); } else { logger.info("Workers not finished."); } } catch (InterruptedException e) { Thread.currentThread().interrupt(); logger.warn("Interrupted waiting for workers. Continuing with shutdown."); } synchronized (this.consumersMonitor) { this.consumers = null; } } //判断消费者是否存活 private boolean isActive(BlockingQueueConsumer consumer) { Boolean consumerActive; synchronized(consumersMonitor) { if (this.consumers != null) { Boolean active = this.consumers.get(consumer); consumerActive = active != null && active; } else { consumerActive = false; } } return consumerActive && this.isActive(); } //初始化消费者 初始数量为设定的并发消费者数量 protected int initializeConsumers() { int count = 0; synchronized (this.consumersMonitor) { if (this.consumers == null) { cancellationLock.reset(); this.consumers = new HashMap<BlockingQueueConsumer, Boolean>(this.concurrentConsumers); for (int i = 0; i < this.concurrentConsumers; i++) { BlockingQueueConsumer consumer = createBlockingQueueConsumer(); this.consumers.put(consumer, true); count++; } } } return count; } private void checkMissingQueuesFatal() { if (!this.missingQueuesFatalSet) { try { ApplicationContext applicationContext = getApplicationContext(); if (applicationContext != null) { Properties properties = applicationContext.getBean("spring.amqp.global.properties", Properties.class); String missingQueuesFatal = properties.getProperty("smlc.missing.queues.fatal"); if (StringUtils.hasText(missingQueuesFatal)) { this.missingQueuesFatal = Boolean.parseBoolean(missingQueuesFatal); } } } catch (BeansException be) { if (logger.isDebugEnabled()) { logger.debug("No global properties bean"); } } } } protected void addAndStartConsumers(int delta) { synchronized (this.consumersMonitor) { if (this.consumers != null) { for (int i = 0; i < delta; i++) { BlockingQueueConsumer consumer = createBlockingQueueConsumer(); this.consumers.put(consumer, true); AsyncMessageProcessingConsumer processor = new AsyncMessageProcessingConsumer(consumer); if (logger.isDebugEnabled()) { logger.debug("Starting a new consumer: " + consumer); } this.taskExecutor.execute(processor); try { FatalListenerStartupException startupException = processor.getStartupException(); if (startupException != null) { this.consumers.remove(consumer); throw new AmqpIllegalStateException("Fatal exception on listener startup", startupException); } } catch (InterruptedException ie) { Thread.currentThread().interrupt(); } catch (Exception e) { consumer.stop(); logger.error("Error starting new consumer", e); this.cancellationLock.release(consumer); this.consumers.remove(consumer); } } } } } private void considerAddingAConsumer() { synchronized(consumersMonitor) { if (this.consumers != null && this.maxConcurrentConsumers != null && this.consumers.size() < this.maxConcurrentConsumers) { long now = System.currentTimeMillis(); if (this.lastConsumerStarted + startConsumerMinInterval < now) { this.addAndStartConsumers(1); this.lastConsumerStarted = now; } } } } private void considerStoppingAConsumer(BlockingQueueConsumer consumer) { synchronized (consumersMonitor) { if (this.consumers != null && this.consumers.size() > concurrentConsumers) { long now = System.currentTimeMillis(); if (this.lastConsumerStopped + this.stopConsumerMinInterval < now) { consumer.basicCancel(); this.consumers.put(consumer, false); if (logger.isDebugEnabled()) { logger.debug("Idle consumer terminating: " + consumer); } this.lastConsumerStopped = now; } } } } private void queuesChanged() { synchronized (consumersMonitor) { if (this.consumers != null) { int count = 0; for (Entry<BlockingQueueConsumer, Boolean> consumer : this.consumers.entrySet()) { if (consumer.getValue()) { if (logger.isDebugEnabled()) { logger.debug("Queues changed; stopping consumer: " + consumer.getKey()); } consumer.getKey().basicCancel(); consumer.setValue(false); count++; } } this.addAndStartConsumers(count); } } } @Override protected boolean isChannelLocallyTransacted(Channel channel) { return super.isChannelLocallyTransacted(channel) && this.transactionManager == null; } protected BlockingQueueConsumer createBlockingQueueConsumer() { BlockingQueueConsumer consumer; String[] queues = getRequiredQueueNames(); // There's no point prefetching less than the tx size, otherwise the consumer will stall because the broker // didn't get an ack for delivered messages int actualPrefetchCount = prefetchCount > txSize ? prefetchCount : txSize; consumer = new BlockingQueueConsumer(getConnectionFactory(), this.messagePropertiesConverter, cancellationLock, getAcknowledgeMode(), isChannelTransacted(), actualPrefetchCount, this.defaultRequeueRejected, this.consumerArgs, this.exclusive, queues); if (this.declarationRetries != null) { consumer.setDeclarationRetries(this.declarationRetries); } if (this.failedDeclarationRetryInterval != null) { consumer.setFailedDeclarationRetryInterval(this.failedDeclarationRetryInterval); } if (this.retryDeclarationInterval != null) { consumer.setRetryDeclarationInterval(this.retryDeclarationInterval); } if (this.consumerTagStrategy != null) { consumer.setTagStrategy(this.consumerTagStrategy); } consumer.setBackOffExecution(this.recoveryBackOff.start()); return consumer; } private void restart(BlockingQueueConsumer consumer) { synchronized (this.consumersMonitor) { if (this.consumers != null) { try { // Need to recycle the channel in this consumer consumer.stop(); // Ensure consumer counts are correct (another is going // to start because of the exception, but // we haven't counted down yet) this.cancellationLock.release(consumer); this.consumers.remove(consumer); BlockingQueueConsumer newConsumer = createBlockingQueueConsumer(); newConsumer.setBackOffExecution(consumer.getBackOffExecution()); consumer = newConsumer; this.consumers.put(consumer, true); } catch (RuntimeException e) { logger.warn("Consumer failed irretrievably on restart. " + e.getClass() + ": " + e.getMessage()); // Re-throw and have it logged properly by the caller. throw e; } this.taskExecutor.execute(new AsyncMessageProcessingConsumer(consumer)); } } } private class AsyncMessageProcessingConsumer implements Runnable { private final BlockingQueueConsumer consumer; private final CountDownLatch start; private volatile FatalListenerStartupException startupException; private AsyncMessageProcessingConsumer(BlockingQueueConsumer consumer) { this.consumer = consumer; this.start = new CountDownLatch(1); } @Override public void run() { boolean aborted = false; int consecutiveIdles = 0; int consecutiveMessages = 0; try { try { if (SimpleMessageListenerContainer.this.autoDeclare) { SimpleMessageListenerContainer.this.redeclareElementsIfNecessary(); } this.consumer.start(); this.start.countDown(); } catch (QueuesNotAvailableException e) { if (SimpleMessageListenerContainer.this.missingQueuesFatal) { throw e; } else { this.start.countDown(); handleStartupFailure(this.consumer.getBackOffExecution()); throw e; } } catch (FatalListenerStartupException ex) { throw ex; } catch (Throwable t) {//NOSONAR this.start.countDown(); handleStartupFailure(this.consumer.getBackOffExecution()); throw t; } if (SimpleMessageListenerContainer.this.transactionManager != null) { /* * Register the consumer's channel so it will be used by the transaction manager * if it's an instance of RabbitTransactionManager. */ ConsumerChannelRegistry.registerConsumerChannel(consumer.getChannel(), getConnectionFactory()); } while (isActive(this.consumer) || this.consumer.hasDelivery()) { try { boolean receivedOk = receiveAndExecute(this.consumer); // At least one message received if (SimpleMessageListenerContainer.this.maxConcurrentConsumers != null) { if (receivedOk) { if (isActive(this.consumer)) { consecutiveIdles = 0; if (consecutiveMessages++ > SimpleMessageListenerContainer.this.consecutiveActiveTrigger) { considerAddingAConsumer(); consecutiveMessages = 0; } } } else { consecutiveMessages = 0; if (consecutiveIdles++ > SimpleMessageListenerContainer.this.consecutiveIdleTrigger) { considerStoppingAConsumer(this.consumer); consecutiveIdles = 0; } } } } catch (ListenerExecutionFailedException ex) { // Continue to process, otherwise re-throw if (ex.getCause() instanceof NoSuchMethodException) { throw new FatalListenerExecutionException("Invalid listener", ex); } } catch (AmqpRejectAndDontRequeueException rejectEx) { /* * These will normally be wrapped by an LEFE if thrown by the * listener, but we will also honor it if thrown by an * error handler. */ } } } catch (InterruptedException e) { logger.debug("Consumer thread interrupted, processing stopped."); Thread.currentThread().interrupt(); aborted = true; publishEvent("Consumer thread interrupted, processing stopped", true, e); } catch (QueuesNotAvailableException ex) { if (SimpleMessageListenerContainer.this.missingQueuesFatal) { logger.error("Consumer received fatal exception on startup", ex); this.startupException = ex; // Fatal, but no point re-throwing, so just abort. aborted = true; } publishEvent("Consumer queue(s) not available", aborted, ex); } catch (FatalListenerStartupException ex) { logger.error("Consumer received fatal exception on startup", ex); this.startupException = ex; // Fatal, but no point re-throwing, so just abort. aborted = true; publishEvent("Consumer received fatal exception on startup", true, ex); } catch (FatalListenerExecutionException ex) { logger.error("Consumer received fatal exception during processing", ex); // Fatal, but no point re-throwing, so just abort. aborted = true; publishEvent("Consumer received fatal exception during processing", true, ex); } catch (ShutdownSignalException e) { if (RabbitUtils.isNormalShutdown(e)) { if (logger.isDebugEnabled()) { logger.debug("Consumer received Shutdown Signal, processing stopped: " + e.getMessage()); } } else { this.logConsumerException(e); } } catch (AmqpIOException e) { if (e.getCause() instanceof IOException && e.getCause().getCause() instanceof ShutdownSignalException && e.getCause().getCause().getMessage().contains("in exclusive use")) { exclusiveConsumerExceptionLogger.log(logger, "Exclusive consumer failure", e.getCause().getCause()); publishEvent("Consumer raised exception, attempting restart", false, e); } else { this.logConsumerException(e); } } catch (Error e) {//NOSONAR // ok to catch Error - we're aborting so will stop logger.error("Consumer thread error, thread abort.", e); aborted = true; } catch (Throwable t) {//NOSONAR // by now, it must be an exception if (isActive()) { this.logConsumerException(t); } } finally { if (SimpleMessageListenerContainer.this.transactionManager != null) { ConsumerChannelRegistry.unRegisterConsumerChannel(); } } // In all cases count down to allow container to progress beyond startup start.countDown(); if (!isActive(consumer) || aborted) { logger.debug("Cancelling " + this.consumer); try { this.consumer.stop(); SimpleMessageListenerContainer.this.cancellationLock.release(this.consumer); synchronized (consumersMonitor) { if (SimpleMessageListenerContainer.this.consumers != null) { SimpleMessageListenerContainer.this.consumers.remove(this.consumer); } } } catch (AmqpException e) { logger.info("Could not cancel message consumer", e); } if (aborted) { logger.error("Stopping container from aborted consumer"); stop(); } } else { logger.info("Restarting " + this.consumer); restart(this.consumer); } } private void logConsumerException(Throwable t) { if (logger.isDebugEnabled() || !(t instanceof AmqpConnectException || t instanceof ConsumerCancelledException)) { logger.warn( "Consumer raised exception, processing can restart if the connection factory supports it", t); } else { logger.warn("Consumer raised exception, processing can restart if the connection factory supports it. " + "Exception summary: " + t); } publishEvent("Consumer raised exception, attempting restart", false, t); } private void publishEvent(String reason, boolean fatal, Throwable t) { if (applicationEventPublisher != null) { applicationEventPublisher.publishEvent(new ListenerContainerConsumerFailedEvent( SimpleMessageListenerContainer.this, reason, t, fatal)); } } } @Override protected void invokeListener(Channel channel, Message message) throws Exception { proxy.invokeListener(channel, message); } }