当前位置:首页 > 未命名 > 正文内容

RabbitMQ消费端如何配置性能高--解读SimpleMessageListennerContainer

淙嶙7年前 (2018-06-22)未命名1382

SimpleMessageListenerContainer 是RabbitMQ的侦听容器。消费端的一些配置都在这里面(spring-rabbit通过xml的配置listener-container就能完成具体配置声明在spring-rabbit-{version}.xsd中),其中影响

public class SimpleMessageListenerContainer extends AbstractMessageListenerContainer
      implements ApplicationEventPublisherAware {
   //常量值
   private static final long DEFAULT_START_CONSUMER_MIN_INTERVAL = 10000;
   private static final long DEFAULT_STOP_CONSUMER_MIN_INTERVAL = 60000;
   private static final int DEFAULT_CONSECUTIVE_ACTIVE_TRIGGER = 10;
   private static final int DEFAULT_CONSECUTIVE_IDLE_TRIGGER = 10;
   public static final long DEFAULT_RECEIVE_TIMEOUT = 1000;
   public static final int DEFAULT_PREFETCH_COUNT = 1;
   public static final long DEFAULT_SHUTDOWN_TIMEOUT = 5000;
   public static final long DEFAULT_RECOVERY_INTERVAL = 5000;
   private volatile int prefetchCount = DEFAULT_PREFETCH_COUNT;
   //启动消费者最小时间间隔,默认10s
   private volatile long startConsumerMinInterval = DEFAULT_START_CONSUMER_MIN_INTERVAL;
   //关闭空闲的消费者最小时间间隔 默认60s
   private volatile long stopConsumerMinInterval = DEFAULT_STOP_CONSUMER_MIN_INTERVAL;
   //消费者连续成功消费几条消息触发新增消费者(调用considerAddingAConsumer()),默认10条
   private volatile int consecutiveActiveTrigger = DEFAULT_CONSECUTIVE_ACTIVE_TRIGGER;
   //消费者连续空闲几次触发关闭当前消费者(调用considerStoppingAConsumer(this.consumer)),默认10
   private volatile int consecutiveIdleTrigger = DEFAULT_CONSECUTIVE_IDLE_TRIGGER;
   private volatile int txSize = 1;
   //AsyncMessageProcessingConsumer的线程池,新增消费者创建相应的线程
   private volatile Executor taskExecutor = new SimpleAsyncTaskExecutor();
   //并发的消费者数量,默认值1,把我们的消费者变成多个一起运行,增加这个数量可以大大增加消费能力。
   private volatile int concurrentConsumers = 1;
   //最大并发消费者数量,
   private volatile Integer maxConcurrentConsumers;
   //排它性 默认false,如果设置为true,只允许容器中的单个消费者使用队列,并要求并发数为1
   private volatile boolean exclusive;
   //最近一次开启的消费者时间戳
   private volatile long lastConsumerStarted;
   //最近一次关闭的消费者时间戳
   private volatile long lastConsumerStopped;
   //消费者从队列获取消息的超时时间,默认1s
   private long receiveTimeout = DEFAULT_RECEIVE_TIMEOUT;
   //
   private volatile long shutdownTimeout = DEFAULT_SHUTDOWN_TIMEOUT;
   //消费者出现异常时,从休眠到唤醒需要的时间
   private BackOff recoveryBackOff = new FixedBackOff(DEFAULT_RECOVERY_INTERVAL, FixedBackOff.UNLIMITED_ATTEMPTS);

   // Map entry value, when false, signals the consumer to terminate
   private Map<BlockingQueueConsumer, Boolean> consumers;

   private final Object consumersMonitor = new Object();

   private PlatformTransactionManager transactionManager;

   private TransactionAttribute transactionAttribute = new DefaultTransactionAttribute();

   private volatile Advice[] adviceChain = new Advice[0];

   private final ActiveObjectCounter<BlockingQueueConsumer> cancellationLock = new ActiveObjectCounter<BlockingQueueConsumer>();

   private volatile MessagePropertiesConverter messagePropertiesConverter = new DefaultMessagePropertiesConverter();

   private volatile boolean defaultRequeueRejected = true;

   private final Map<String, Object> consumerArgs = new HashMap<String, Object>();

   private volatile RabbitAdmin rabbitAdmin;
   
   private volatile boolean missingQueuesFatal = true;

   private volatile boolean missingQueuesFatalSet;

   private volatile boolean autoDeclare = true;

   private volatile ConsumerTagStrategy consumerTagStrategy;

   private volatile ApplicationEventPublisher applicationEventPublisher;

   public interface ContainerDelegate {
      void invokeListener(Channel channel, Message message) throws Exception;
   }

   private final ContainerDelegate delegate = new ContainerDelegate() {
      @Override
      public void invokeListener(Channel channel, Message message) throws Exception {
         SimpleMessageListenerContainer.super.invokeListener(channel, message);
      }
   };

   private ContainerDelegate proxy = delegate;

   private Integer declarationRetries;

   private Long failedDeclarationRetryInterval;

   private Long retryDeclarationInterval;

   private ConditionalExceptionLogger exclusiveConsumerExceptionLogger = new DefaultExclusiveConsumerLogger();

   public SimpleMessageListenerContainer() {
   }

   public SimpleMessageListenerContainer(ConnectionFactory connectionFactory) {
      this.setConnectionFactory(connectionFactory);
   }

   public void setAdviceChain(Advice[] adviceChain) {
      this.adviceChain = Arrays.copyOf(adviceChain, adviceChain.length);
   }

   /**设置尝试恢复消费者的时间间隔,默认5s
    * Specify the interval between recovery attempts, in <b>milliseconds</b> The default is 5000 ms, that is, 5 seconds
    */
   public void setRecoveryInterval(long recoveryInterval) {
      this.recoveryBackOff = new FixedBackOff(recoveryInterval, FixedBackOff.UNLIMITED_ATTEMPTS);
   }

   /**
    * Specify the {@link BackOff} for interval between recovery attempts.
    * The default is 5000 ms, that is, 5 seconds.
    */
   public void setRecoveryBackOff(BackOff recoveryBackOff) {
      Assert.notNull(recoveryBackOff, "'recoveryBackOff' must not be null.");
      this.recoveryBackOff = recoveryBackOff;
   }


   /** 设置创建多少个并发的消费者,默认创建1个。
    * Specify the number of concurrent consumers to create. Default is 1.
    * 推荐增加并发消费者的数量,从而增加消费能力(即多个消费者比一个消费者的处理消息的能力肯定快)。然而,值得注意的是,
    * 一旦注册了多个消费者,消息的的顺序性将无法得到保证(原本MQ中消息是有序的,多个消费者是没办法保证消息按顺序处理的)。
    * 要是需要保持顺序性,就只能用一个消费者。
    * Raising the number of concurrent consumers is recommended in order to scale the consumption of messages coming in
    * from a queue. However, note that any ordering guarantees are lost once multiple consumers are registered. In
    * general, stick with 1 consumer for low-volume queues. Cannot be more than {@link #maxConcurrentConsumers} (if set).
    * @param concurrentConsumers the minimum number of consumers to create.
    * @see #setMaxConcurrentConsumers(int)
    */
   public void setConcurrentConsumers(final int concurrentConsumers) {
      Assert.isTrue(concurrentConsumers > 0, "'concurrentConsumers' value must be at least 1 (one)");
      Assert.isTrue(!this.exclusive || concurrentConsumers == 1,
            "When the consumer is exclusive, the concurrency must be 1");
      if (this.maxConcurrentConsumers != null) {
         Assert.isTrue(concurrentConsumers <= this.maxConcurrentConsumers,
               "'concurrentConsumers' cannot be more than 'maxConcurrentConsumers'");
      }
      synchronized(consumersMonitor) {
         if (logger.isDebugEnabled()) {
            logger.debug("Changing consumers from " + this.concurrentConsumers + " to " + concurrentConsumers);
         }
         int delta = this.concurrentConsumers - concurrentConsumers;
         this.concurrentConsumers = concurrentConsumers;
         if (isActive() && this.consumers != null) {
            if (delta > 0) {
               Iterator<Entry<BlockingQueueConsumer, Boolean>> entryIterator = consumers.entrySet()
                     .iterator();
               while (entryIterator.hasNext() && delta > 0) {
                  Entry<BlockingQueueConsumer, Boolean> entry = entryIterator.next();
                  if (entry.getValue()) {
                     BlockingQueueConsumer consumer = entry.getKey();
                     consumer.basicCancel();
                     this.consumers.put(consumer, false);
                     delta--;
                  }
               }

            }
            else {
               addAndStartConsumers(-delta);
            }
         }
      }
   }

   /**消费者并发数量上限
    * Sets an upper limit to the number of consumers; defaults to 'concurrentConsumers'. Consumers
    * will be added on demand. Cannot be less than {@link #concurrentConsumers}.
    */
   public void setMaxConcurrentConsumers(int maxConcurrentConsumers) {
      Assert.isTrue(maxConcurrentConsumers >= this.concurrentConsumers,
            "'maxConcurrentConsumers' value must be at least 'concurrentConsumers'");
      Assert.isTrue(!this.exclusive || maxConcurrentConsumers == 1,
            "When the consumer is exclusive, the concurrency must be 1");
      this.maxConcurrentConsumers = maxConcurrentConsumers;
   }
    此处略过部分代码
   /** 当消费者的最大并发数大于初始并发数量,并且当前并发数量还没有到达最大并发数时,消费者成功连续消费多少条后可以创建新的消费者,默认10条。
    * If {@link #maxConcurrentConsumers} is greater then {@link #concurrentConsumers}, and
    * {@link #maxConcurrentConsumers} has not been reached, specifies the number of
    * consecutive cycles when a single consumer was active, in order to consider
    * starting a new consumer. If the consumer goes idle for one cycle, the counter is reset.
    * This is impacted by the {@link #txSize}.
    * Default is 10 consecutive messages.
    * @param consecutiveActiveTrigger The number of consecutive receives to trigger a new consumer.
    */
   public final void setConsecutiveActiveTrigger(int consecutiveActiveTrigger) {
      Assert.isTrue(consecutiveActiveTrigger > 0, "'consecutiveActiveTrigger' must be > 0");
      this.consecutiveActiveTrigger = consecutiveActiveTrigger;
   }

   /**关闭消费者,当消费者的最大并发数大于初始并发数量,并且当前的并发消费者数量大于初始的设定的并发数量时,消费者没有获取到消息处理结果的次数到达consecutiveIdleTrigger时,
    * 会去关闭当前消费者。
    * If {@link #maxConcurrentConsumers} is greater then {@link #concurrentConsumers}, and
    * the number of consumers exceeds {@link #concurrentConsumers}, specifies the
    * number of consecutive receive attempts that return no data; after which we consider
    * stopping a consumer. The idle time is effectively
    * {@link #receiveTimeout} * {@link #txSize} * this value because the consumer thread waits for
    * a message for up to {@link #receiveTimeout} up to {@link #txSize} times.
    * Default is 10 consecutive idles.
    * @param consecutiveIdleTrigger The number of consecutive timeouts to trigger stopping a consumer.
    */
   public final void setConsecutiveIdleTrigger(int consecutiveIdleTrigger) {
      Assert.isTrue(consecutiveIdleTrigger > 0, "'consecutiveIdleTrigger' must be > 0");
      this.consecutiveIdleTrigger = consecutiveIdleTrigger;
   }
   略过部分代码

   /** 设置单次请求中发给每个消费者的消息数量,通常这个值设置的高一些能提高吞吐量。这个值应该大于或等于一次事物处理的消息数
    * Tells the broker how many messages to send to each consumer in a single request. Often this can be set quite high
    * to improve throughput. It should be greater than or equal to {@link #setTxSize(int) the transaction size}.
    * @param prefetchCount the prefetch count
    */
   public void setPrefetchCount(int prefetchCount) {
      this.prefetchCount = prefetchCount;
   }

   /**设置容器在一次事物中处理多少条消息(通道需为事物行的)。为了得到最优结果,这个值应该小于等于prefetch count。。。。
    * Tells the container how many messages to process in a single transaction (if the channel is transactional). For
    * best results it should be less than or equal to {@link #setPrefetchCount(int) the prefetch count}. Also affects
    * how often acks are sent when using {@link AcknowledgeMode#AUTO} - one ack per txSize. Default is 1.
    * @param txSize the transaction size
    */
   public void setTxSize(int txSize) {
      Assert.isTrue(txSize > 0, "'txSize' must be > 0");
      this.txSize = txSize;
   }   //略过部分代码

   /** 创建设定的并发数量的消费者,
    * Creates the specified number of concurrent consumers, in the form of a Rabbit Channel plus associated
    * MessageConsumer.
    * @throws Exception Any Exception.
    */
   @Override
   protected void doInitialize() throws Exception {
      checkMissingQueuesFatal();
      if (!this.isExposeListenerChannel() && this.transactionManager != null) {
         logger.warn("exposeListenerChannel=false is ignored when using a TransactionManager");
      }
      initializeProxy();
      if (this.transactionManager != null) {
         if (!isChannelTransacted()) {
            logger.debug("The 'channelTransacted' is coerced to 'true', when 'transactionManager' is provided");
            setChannelTransacted(true);
         }

      }
   }

   @ManagedMetric(metricType = MetricType.GAUGE)
   public int getActiveConsumerCount() {
      return cancellationLock.getCount();
   }

   /**
    * Re-initializes this container's Rabbit message consumers, if not initialized already. Then submits each consumer
    * to this container's task executor.
    * @throws Exception Any Exception.
    */
   @Override
   protected void doStart() throws Exception {
      if (getMessageListener() instanceof ListenerContainerAware) {
         Collection<String> expectedQueueNames = ((ListenerContainerAware) getMessageListener()).expectedQueueNames();
         if (expectedQueueNames != null) {
            String[] queueNames = getQueueNames();
            Assert.state(expectedQueueNames.size() == queueNames.length,
                  "Listener expects us to be listening on '" + expectedQueueNames + "'; our queues: "
                  + Arrays.asList(queueNames));
            boolean found = false;
            for (String queueName : queueNames) {
               if (expectedQueueNames.contains(queueName)) {
                  found = true;
               }
               else {
                  found = false;
                  break;
               }
            }
            Assert.state(found, "Listener expects us to be listening on '" + expectedQueueNames + "'; our queues: "
                  + Arrays.asList(queueNames));
         }
      }
      super.doStart();
      if (this.rabbitAdmin == null && this.getApplicationContext() != null) {
         Map<String, RabbitAdmin> admins = this.getApplicationContext().getBeansOfType(RabbitAdmin.class);
         if (!admins.isEmpty()) {
            this.rabbitAdmin = admins.values().iterator().next();
         }
      }
      if (this.rabbitAdmin == null && this.autoDeclare) {
         RabbitAdmin rabbitAdmin = new RabbitAdmin(this.getConnectionFactory());
         rabbitAdmin.setApplicationContext(this.getApplicationContext());
         this.rabbitAdmin = rabbitAdmin;
      }
      synchronized (this.consumersMonitor) {
         int newConsumers = initializeConsumers();//初始化消费者
         if (this.consumers == null) {
            if (logger.isInfoEnabled()) {
               logger.info("Consumers were initialized and then cleared " +
                     "(presumably the container was stopped concurrently)");
            }
            return;
         }
         if (newConsumers <= 0) {
            if (logger.isInfoEnabled()) {
               logger.info("Consumers are already running");
            }
            return;
         }
         Set<AsyncMessageProcessingConsumer> processors = new HashSet<AsyncMessageProcessingConsumer>();
         for (BlockingQueueConsumer consumer : this.consumers.keySet()) {
            AsyncMessageProcessingConsumer processor = new AsyncMessageProcessingConsumer(consumer);
            processors.add(processor);
            this.taskExecutor.execute(processor);
         }
         for (AsyncMessageProcessingConsumer processor : processors) {
            FatalListenerStartupException startupException = processor.getStartupException();
            if (startupException != null) {
               throw new AmqpIllegalStateException("Fatal exception on listener startup", startupException);
            }
         }
      }
   }

   @Override
   protected void doStop() {
      shutdown();
      super.doStop();
   }
   //关闭全部消费者
   @Override
   protected void doShutdown() {

      if (!this.isRunning()) {
         return;
      }

      try {
         synchronized (consumersMonitor) {
            if (this.consumers != null) {
               for (BlockingQueueConsumer consumer : this.consumers.keySet()) {
                  consumer.basicCancel();
               }
            }
         }
         logger.info("Waiting for workers to finish.");
         boolean finished = cancellationLock.await(shutdownTimeout, TimeUnit.MILLISECONDS);
         if (finished) {
            logger.info("Successfully waited for workers to finish.");
         }
         else {
            logger.info("Workers not finished.");
         }
      }
      catch (InterruptedException e) {
         Thread.currentThread().interrupt();
         logger.warn("Interrupted waiting for workers.  Continuing with shutdown.");
      }

      synchronized (this.consumersMonitor) {
         this.consumers = null;
      }

   }
    //判断消费者是否存活
   private boolean isActive(BlockingQueueConsumer consumer) {
      Boolean consumerActive;
      synchronized(consumersMonitor) {
         if (this.consumers != null) {
            Boolean active = this.consumers.get(consumer);
            consumerActive = active != null && active;
         }
         else {
            consumerActive = false;
         }
      }
      return consumerActive && this.isActive();
   }
   //初始化消费者 初始数量为设定的并发消费者数量
   protected int initializeConsumers() {
      int count = 0;
      synchronized (this.consumersMonitor) {
         if (this.consumers == null) {
            cancellationLock.reset();
            this.consumers = new HashMap<BlockingQueueConsumer, Boolean>(this.concurrentConsumers);
            for (int i = 0; i < this.concurrentConsumers; i++) {
               BlockingQueueConsumer consumer = createBlockingQueueConsumer();
               this.consumers.put(consumer, true);
               count++;
            }
         }
      }
      return count;
   }

   private void checkMissingQueuesFatal() {
      if (!this.missingQueuesFatalSet) {
         try {
            ApplicationContext applicationContext = getApplicationContext();
            if (applicationContext != null) {
               Properties properties = applicationContext.getBean("spring.amqp.global.properties", Properties.class);
               String missingQueuesFatal = properties.getProperty("smlc.missing.queues.fatal");
               if (StringUtils.hasText(missingQueuesFatal)) {
                  this.missingQueuesFatal = Boolean.parseBoolean(missingQueuesFatal);
               }
            }
         }
         catch (BeansException be) {
            if (logger.isDebugEnabled()) {
               logger.debug("No global properties bean");
            }
         }
      }
   }

   protected void addAndStartConsumers(int delta) {
      synchronized (this.consumersMonitor) {
         if (this.consumers != null) {
            for (int i = 0; i < delta; i++) {
               BlockingQueueConsumer consumer = createBlockingQueueConsumer();
               this.consumers.put(consumer, true);
               AsyncMessageProcessingConsumer processor = new AsyncMessageProcessingConsumer(consumer);
               if (logger.isDebugEnabled()) {
                  logger.debug("Starting a new consumer: " + consumer);
               }
               this.taskExecutor.execute(processor);
               try {
                  FatalListenerStartupException startupException = processor.getStartupException();
                  if (startupException != null) {
                     this.consumers.remove(consumer);
                     throw new AmqpIllegalStateException("Fatal exception on listener startup", startupException);
                  }
               }
               catch (InterruptedException ie) {
                  Thread.currentThread().interrupt();
               }
               catch (Exception e) {
                  consumer.stop();
                  logger.error("Error starting new consumer", e);
                  this.cancellationLock.release(consumer);
                  this.consumers.remove(consumer);
               }
            }
         }
      }
   }

   private void considerAddingAConsumer() {
      synchronized(consumersMonitor) {
         if (this.consumers != null
               && this.maxConcurrentConsumers != null && this.consumers.size() < this.maxConcurrentConsumers) {
            long now = System.currentTimeMillis();
            if (this.lastConsumerStarted + startConsumerMinInterval < now) {
               this.addAndStartConsumers(1);
               this.lastConsumerStarted = now;
            }
         }
      }
   }

   private void considerStoppingAConsumer(BlockingQueueConsumer consumer) {
      synchronized (consumersMonitor) {
         if (this.consumers != null && this.consumers.size() > concurrentConsumers) {
            long now = System.currentTimeMillis();
            if (this.lastConsumerStopped + this.stopConsumerMinInterval < now) {
               consumer.basicCancel();
               this.consumers.put(consumer, false);
               if (logger.isDebugEnabled()) {
                  logger.debug("Idle consumer terminating: " + consumer);
               }
               this.lastConsumerStopped = now;
            }
         }
      }
   }

   private void queuesChanged() {
      synchronized (consumersMonitor) {
         if (this.consumers != null) {
            int count = 0;
            for (Entry<BlockingQueueConsumer, Boolean> consumer : this.consumers.entrySet()) {
               if (consumer.getValue()) {
                  if (logger.isDebugEnabled()) {
                     logger.debug("Queues changed; stopping consumer: " + consumer.getKey());
                  }
                  consumer.getKey().basicCancel();
                  consumer.setValue(false);
                  count++;
               }
            }
            this.addAndStartConsumers(count);
         }
      }
   }

   @Override
   protected boolean isChannelLocallyTransacted(Channel channel) {
      return super.isChannelLocallyTransacted(channel) && this.transactionManager == null;
   }

   protected BlockingQueueConsumer createBlockingQueueConsumer() {
      BlockingQueueConsumer consumer;
      String[] queues = getRequiredQueueNames();
      // There's no point prefetching less than the tx size, otherwise the consumer will stall because the broker
      // didn't get an ack for delivered messages
      int actualPrefetchCount = prefetchCount > txSize ? prefetchCount : txSize;
      consumer = new BlockingQueueConsumer(getConnectionFactory(), this.messagePropertiesConverter, cancellationLock,
            getAcknowledgeMode(), isChannelTransacted(), actualPrefetchCount, this.defaultRequeueRejected,
            this.consumerArgs, this.exclusive, queues);
      if (this.declarationRetries != null) {
         consumer.setDeclarationRetries(this.declarationRetries);
      }
      if (this.failedDeclarationRetryInterval != null) {
         consumer.setFailedDeclarationRetryInterval(this.failedDeclarationRetryInterval);
      }
      if (this.retryDeclarationInterval != null) {
         consumer.setRetryDeclarationInterval(this.retryDeclarationInterval);
      }
      if (this.consumerTagStrategy != null) {
         consumer.setTagStrategy(this.consumerTagStrategy);
      }
      consumer.setBackOffExecution(this.recoveryBackOff.start());
      return consumer;
   }

   private void restart(BlockingQueueConsumer consumer) {
      synchronized (this.consumersMonitor) {
         if (this.consumers != null) {
            try {
               // Need to recycle the channel in this consumer
               consumer.stop();
               // Ensure consumer counts are correct (another is going
               // to start because of the exception, but
               // we haven't counted down yet)
               this.cancellationLock.release(consumer);
               this.consumers.remove(consumer);
               BlockingQueueConsumer newConsumer = createBlockingQueueConsumer();
               newConsumer.setBackOffExecution(consumer.getBackOffExecution());
               consumer = newConsumer;
               this.consumers.put(consumer, true);
            }
            catch (RuntimeException e) {
               logger.warn("Consumer failed irretrievably on restart. " + e.getClass() + ": " + e.getMessage());
               // Re-throw and have it logged properly by the caller.
               throw e;
            }
            this.taskExecutor.execute(new AsyncMessageProcessingConsumer(consumer));
         }
      }
   }

   private class AsyncMessageProcessingConsumer implements Runnable {

      private final BlockingQueueConsumer consumer;

      private final CountDownLatch start;

      private volatile FatalListenerStartupException startupException;

      private AsyncMessageProcessingConsumer(BlockingQueueConsumer consumer) {
         this.consumer = consumer;
         this.start = new CountDownLatch(1);
      }


      @Override
      public void run() {

         boolean aborted = false;

         int consecutiveIdles = 0;

         int consecutiveMessages = 0;

         try {

            try {
               if (SimpleMessageListenerContainer.this.autoDeclare) {
                  SimpleMessageListenerContainer.this.redeclareElementsIfNecessary();
               }
               this.consumer.start();
               this.start.countDown();
            }
            catch (QueuesNotAvailableException e) {
               if (SimpleMessageListenerContainer.this.missingQueuesFatal) {
                  throw e;
               }
               else {
                  this.start.countDown();
                  handleStartupFailure(this.consumer.getBackOffExecution());
                  throw e;
               }
            }
            catch (FatalListenerStartupException ex) {
               throw ex;
            }
            catch (Throwable t) {//NOSONAR
               this.start.countDown();
               handleStartupFailure(this.consumer.getBackOffExecution());
               throw t;
            }

            if (SimpleMessageListenerContainer.this.transactionManager != null) {
               /*
                * Register the consumer's channel so it will be used by the transaction manager
                * if it's an instance of RabbitTransactionManager.
                */
               ConsumerChannelRegistry.registerConsumerChannel(consumer.getChannel(), getConnectionFactory());
            }

            while (isActive(this.consumer) || this.consumer.hasDelivery()) {
               try {
                  boolean receivedOk = receiveAndExecute(this.consumer); // At least one message received
                  if (SimpleMessageListenerContainer.this.maxConcurrentConsumers != null) {
                     if (receivedOk) {
                        if (isActive(this.consumer)) {
                           consecutiveIdles = 0;
                           if (consecutiveMessages++ > SimpleMessageListenerContainer.this.consecutiveActiveTrigger) {
                              considerAddingAConsumer();
                              consecutiveMessages = 0;
                           }
                        }
                     }
                     else {
                        consecutiveMessages = 0;
                        if (consecutiveIdles++ > SimpleMessageListenerContainer.this.consecutiveIdleTrigger) {
                           considerStoppingAConsumer(this.consumer);
                           consecutiveIdles = 0;
                        }
                     }
                  }
               }
               catch (ListenerExecutionFailedException ex) {
                  // Continue to process, otherwise re-throw
                  if (ex.getCause() instanceof NoSuchMethodException) {
                     throw new FatalListenerExecutionException("Invalid listener", ex);
                  }
               }
               catch (AmqpRejectAndDontRequeueException rejectEx) {
                  /*
                   *  These will normally be wrapped by an LEFE if thrown by the
                   *  listener, but we will also honor it if thrown by an
                   *  error handler.
                   */
               }
            }

         }
         catch (InterruptedException e) {
            logger.debug("Consumer thread interrupted, processing stopped.");
            Thread.currentThread().interrupt();
            aborted = true;
            publishEvent("Consumer thread interrupted, processing stopped", true, e);
         }
         catch (QueuesNotAvailableException ex) {
            if (SimpleMessageListenerContainer.this.missingQueuesFatal) {
               logger.error("Consumer received fatal exception on startup", ex);
               this.startupException = ex;
               // Fatal, but no point re-throwing, so just abort.
               aborted = true;
            }
            publishEvent("Consumer queue(s) not available", aborted, ex);
         }
         catch (FatalListenerStartupException ex) {
            logger.error("Consumer received fatal exception on startup", ex);
            this.startupException = ex;
            // Fatal, but no point re-throwing, so just abort.
            aborted = true;
            publishEvent("Consumer received fatal exception on startup", true, ex);
         }
         catch (FatalListenerExecutionException ex) {
            logger.error("Consumer received fatal exception during processing", ex);
            // Fatal, but no point re-throwing, so just abort.
            aborted = true;
            publishEvent("Consumer received fatal exception during processing", true, ex);
         }
         catch (ShutdownSignalException e) {
            if (RabbitUtils.isNormalShutdown(e)) {
               if (logger.isDebugEnabled()) {
                  logger.debug("Consumer received Shutdown Signal, processing stopped: " + e.getMessage());
               }
            }
            else {
               this.logConsumerException(e);
            }
         }
         catch (AmqpIOException e) {
            if (e.getCause() instanceof IOException && e.getCause().getCause() instanceof ShutdownSignalException
                  && e.getCause().getCause().getMessage().contains("in exclusive use")) {
               exclusiveConsumerExceptionLogger.log(logger, "Exclusive consumer failure", e.getCause().getCause());
               publishEvent("Consumer raised exception, attempting restart", false, e);
            }
            else {
               this.logConsumerException(e);
            }
         }
         catch (Error e) {//NOSONAR
            // ok to catch Error - we're aborting so will stop
            logger.error("Consumer thread error, thread abort.", e);
            aborted = true;
         }
         catch (Throwable t) {//NOSONAR
            // by now, it must be an exception
            if (isActive()) {
               this.logConsumerException(t);
            }
         }
         finally {
            if (SimpleMessageListenerContainer.this.transactionManager != null) {
               ConsumerChannelRegistry.unRegisterConsumerChannel();
            }
         }

         // In all cases count down to allow container to progress beyond startup
         start.countDown();

         if (!isActive(consumer) || aborted) {
            logger.debug("Cancelling " + this.consumer);
            try {
               this.consumer.stop();
               SimpleMessageListenerContainer.this.cancellationLock.release(this.consumer);
               synchronized (consumersMonitor) {
                  if (SimpleMessageListenerContainer.this.consumers != null) {
                     SimpleMessageListenerContainer.this.consumers.remove(this.consumer);
                  }
               }
            }
            catch (AmqpException e) {
               logger.info("Could not cancel message consumer", e);
            }
            if (aborted) {
               logger.error("Stopping container from aborted consumer");
               stop();
            }
         }
         else {
            logger.info("Restarting " + this.consumer);
            restart(this.consumer);
         }

      }

      private void logConsumerException(Throwable t) {
         if (logger.isDebugEnabled()
               || !(t instanceof AmqpConnectException  || t instanceof ConsumerCancelledException)) {
            logger.warn(
                  "Consumer raised exception, processing can restart if the connection factory supports it",
                  t);
         }
         else {
            logger.warn("Consumer raised exception, processing can restart if the connection factory supports it. "
                  + "Exception summary: " + t);
         }
         publishEvent("Consumer raised exception, attempting restart", false, t);
      }

      private void publishEvent(String reason, boolean fatal, Throwable t) {
         if (applicationEventPublisher != null) {
            applicationEventPublisher.publishEvent(new ListenerContainerConsumerFailedEvent(
                  SimpleMessageListenerContainer.this, reason, t, fatal));
         }
      }

   }

   @Override
   protected void invokeListener(Channel channel, Message message) throws Exception {
      proxy.invokeListener(channel, message);
   }
}

相关文章

idea 打开文件所在位置

idea 打开文件所在位置

工欲善其事,必先利其器。idea默认自带打开选中文件的资源管理器位置,右键文件,选择Show in Exlporer;如果希望将它显示在工具栏中,则需要完成一些设置: 1首先保证你的工具栏是...

大数相减

大数相减

描述 两个长度超出常规整形变量上限的大数相减,请避免使用各语言内置大数处理库,如 Java.math.BigInteger 等。 输入 有 N 行测试数据,每一行有两个代表整数的字符串 a 和 b,...

MyBatis-Generator快速入门 <一>(mysql数据源)

MyBatis-Generator快速入门 <一>(mysql数据源)

按官方文档所说,快速上手Mybatis Generator (缩写MBG),只需完成如下几部即可: 1. 花一分钟,写一个配置文件。这个配置文件必须具备以下五个xml标签。 a.一个<jdbcC...

发表评论

访客

◎欢迎参与讨论,请在这里发表您的看法和观点。