2015-10-19 23 views
2

我们有一个高负载的Apache Camel应用程序,利用logback/MDC记录信息。我们发现一些MDC信息在logback的文档中预先警告过。我发现这太问题,解决这个问题:骆驼MDC Logback旧信息在卷

How to use MDC with thread pools?

我们应该如何应用此我们CAMEL应用避免旧的信息?是否有一个简单的方法来将链接问题中建议的默认ThreadPoolExecutor全局更改为自定义变体?我看到你可以为游泳池本身做到这一点,但没有看到执行者的任何例子。请记住,我们的应用程序相当庞大,并且每天都为大量订单提供服务 - 我希望尽可能减少对现有应用程序的影响。

回答

2

我想通了,并想发布我所做的事情,以防止他人受益。请注意,我使用JDK 6/camel2.13.2

  • 骆驼有一个使用DefaultThreadPoolFactory一个DefaultExecutorServiceManager。我将默认工厂扩展为MdcThreadPoolFactory

  • DefaultThreadPoolFactory具有生成RejectableThreadPoolExecutor s和RejectableScheduledThreadPoolExecutor s的方法。我将这两个扩展为Mdc *版本,覆盖​​方法以包装Runnable并在线程之间切换MDC信息(如我原始问题中的链接所指定的那样)。

    package com.mypackage.concurrent 
    
    import org.apache.camel.util.concurrent.RejectableThreadPoolExecutor 
    import org.slf4j.MDC; 
    
    import java.util.Map; 
    import java.util.concurrent.*; 
    
    /** 
    * A SLF4J MDC-compatible {@link ThreadPoolExecutor}. 
    * <p/> 
    * In general, MDC is used to store diagnostic information (e.g. a user's session id) in per-thread variables, to facilitate 
    * logging. However, although MDC data is passed to thread children, this doesn't work when threads are reused in a 
    * thread pool. This is a drop-in replacement for {@link ThreadPoolExecutor} sets MDC data before each task appropriately. 
    * <p/> 
    * Created by broda20. 
    * Date: 10/29/15 
    */ 
    public class MdcThreadPoolExecutor extends RejectableThreadPoolExecutor { 
    
        @SuppressWarnings("unchecked") 
        private Map<String, Object> getContextForTask() { 
         return MDC.getCopyOfContextMap(); 
        } 
    
        public MdcThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, 
                 BlockingQueue<Runnable> workQueue) { 
         super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue); 
        } 
    
        public MdcThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, 
                 BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory) { 
         super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory); 
        } 
    
        public MdcThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, 
                 BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler) { 
         super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, handler); 
        } 
    
        public MdcThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, 
                 BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) { 
         super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler); 
        } 
    
        /** 
        * All executions will have MDC injected. {@code ThreadPoolExecutor}'s submission methods ({@code submit()} etc.) 
        * all delegate to this. 
        */ 
        @Override 
        public void execute(Runnable command) { 
         super.execute(wrap(command, getContextForTask())); 
        } 
    
        public static Runnable wrap(final Runnable runnable, final Map<String, Object> context) { 
         return new Runnable() { 
          @Override 
          public void run() { 
           Map previous = MDC.getCopyOfContextMap(); 
           if (context == null) { 
            MDC.clear(); 
           } else { 
            MDC.setContextMap(context); 
           } 
           try { 
            runnable.run(); 
           } finally { 
            if (previous == null) { 
             MDC.clear(); 
            } else { 
             MDC.setContextMap(previous); 
            } 
           } 
          } 
         }; 
        } 
    } 
    

    MdcScheduledThreadPoolExecutor:

  • 我在我的应用程序配置由骆驼自动拾取和使用的ExecutorServiceManager

MdcThreadPoolExecutor创建MdcThreadPoolFactory的bean实例

package com.mypackage.concurrent 

import org.apache.camel.util.concurrent.RejectableScheduledThreadPoolExecutor 
import org.slf4j.MDC; 

import java.util.Map; 
import java.util.concurrent.*; 

/** 
* A SLF4J MDC-compatible {@link ThreadPoolExecutor}. 
* <p/> 
* In general, MDC is used to store diagnostic information (e.g. a user's session id) in per-thread variables, to facilitate 
* logging. However, although MDC data is passed to thread children, this doesn't work when threads are reused in a 
* thread pool. This is a drop-in replacement for {@link ThreadPoolExecutor} sets MDC data before each task appropriately. 
* <p/> 
* Created by broda20. 
* Date: 10/29/15 
*/ 
public class MdcScheduledThreadPoolExecutor extends RejectableScheduledThreadPoolExecutor { 

    @SuppressWarnings("unchecked") 
    private Map<String, Object> getContextForTask() { 
     return MDC.getCopyOfContextMap(); 
    } 

    public MdcScheduledThreadPoolExecutor(int corePoolSize) { 
     super(corePoolSize); 
    } 

    public MdcScheduledThreadPoolExecutor(int corePoolSize, ThreadFactory threadFactory) { 
     super(corePoolSize, threadFactory); 
    } 

    public MdcScheduledThreadPoolExecutor(int corePoolSize, RejectedExecutionHandler handler) { 
     super(corePoolSize, handler); 
    } 

    public MdcScheduledThreadPoolExecutor(int corePoolSize, ThreadFactory threadFactory, RejectedExecutionHandler handler) { 
     super(corePoolSize, threadFactory, handler); 
    } 

    /** 
    * All executions will have MDC injected. {@code ThreadPoolExecutor}'s submission methods ({@code submit()} etc.) 
    * all delegate to this. 
    */ 
    @Override 
    public void execute(Runnable command) { 
     super.execute(wrap(command, getContextForTask())); 
    } 

    public static Runnable wrap(final Runnable runnable, final Map<String, Object> context) { 
     return new Runnable() { 
      @Override 
      public void run() { 
       Map previous = MDC.getCopyOfContextMap(); 
       if (context == null) { 
        MDC.clear(); 
       } else { 
        MDC.setContextMap(context); 
       } 
       try { 
        runnable.run(); 
       } finally { 
        if (previous == null) { 
         MDC.clear(); 
        } else { 
         MDC.setContextMap(previous); 
        } 
       } 
      } 
     }; 
    } 
} 

MdcThreadPoolFactory:

package com.mypackage.concurrent 

import org.apache.camel.impl.DefaultThreadPoolFactory 
import org.apache.camel.spi.ThreadPoolProfile 
import org.apache.camel.util.concurrent.SizedScheduledExecutorService 
import org.slf4j.MDC; 

import java.util.Map; 
import java.util.concurrent.*; 

public class MdcThreadPoolFactory extends DefaultThreadPoolFactory { 

    @SuppressWarnings("unchecked") 
    private Map<String, Object> getContextForTask() { 
     return MDC.getCopyOfContextMap(); 
    } 


    public ExecutorService newThreadPool(int corePoolSize, int maxPoolSize, long keepAliveTime, TimeUnit timeUnit, int maxQueueSize, boolean allowCoreThreadTimeOut, 
              RejectedExecutionHandler rejectedExecutionHandler, ThreadFactory threadFactory) throws IllegalArgumentException { 

      // the core pool size must be 0 or higher 
      if (corePoolSize < 0) { 
       throw new IllegalArgumentException("CorePoolSize must be >= 0, was " + corePoolSize); 
      } 

      // validate max >= core 
      if (maxPoolSize < corePoolSize) { 
       throw new IllegalArgumentException("MaxPoolSize must be >= corePoolSize, was " + maxPoolSize + " >= " + corePoolSize); 
      } 

      BlockingQueue<Runnable> workQueue; 
      if (corePoolSize == 0 && maxQueueSize <= 0) { 
       // use a synchronous queue for direct-handover (no tasks stored on the queue) 
       workQueue = new SynchronousQueue<Runnable>(); 
       // and force 1 as pool size to be able to create the thread pool by the JDK 
       corePoolSize = 1; 
       maxPoolSize = 1; 
      } else if (maxQueueSize <= 0) { 
       // use a synchronous queue for direct-handover (no tasks stored on the queue) 
       workQueue = new SynchronousQueue<Runnable>(); 
      } else { 
       // bounded task queue to store tasks on the queue 
       workQueue = new LinkedBlockingQueue<Runnable>(maxQueueSize); 
      } 

      ThreadPoolExecutor answer = new MdcThreadPoolExecutor(corePoolSize, maxPoolSize, keepAliveTime, timeUnit, workQueue); 
      answer.setThreadFactory(threadFactory); 
      answer.allowCoreThreadTimeOut(allowCoreThreadTimeOut); 
      if (rejectedExecutionHandler == null) { 
       rejectedExecutionHandler = new ThreadPoolExecutor.CallerRunsPolicy(); 
      } 
      answer.setRejectedExecutionHandler(rejectedExecutionHandler); 
      return answer; 
     } 

     @Override 
     public ScheduledExecutorService newScheduledThreadPool(ThreadPoolProfile profile, ThreadFactory threadFactory) { 
      RejectedExecutionHandler rejectedExecutionHandler = profile.getRejectedExecutionHandler(); 
      if (rejectedExecutionHandler == null) { 
       rejectedExecutionHandler = new ThreadPoolExecutor.CallerRunsPolicy(); 
      } 

      ScheduledThreadPoolExecutor answer = new MdcScheduledThreadPoolExecutor(profile.getPoolSize(), threadFactory, rejectedExecutionHandler); 
      //JDK7: answer.setRemoveOnCancelPolicy(true); 

      // need to wrap the thread pool in a sized to guard against the problem that the 
      // JDK created thread pool has an unbounded queue (see class javadoc), which mean 
      // we could potentially keep adding tasks, and run out of memory. 
      if (profile.getMaxPoolSize() > 0) { 
       return new SizedScheduledExecutorService(answer, profile.getMaxQueueSize()); 
      } else { 
       return answer; 
      } 
     } 
} 

最后,bean实例:

<bean id="mdcThreadPoolFactory" class="com.mypackage.concurrent.MdcThreadPoolFactory"/> 
+1

为了得到这个骆驼2.16.3工作由org.apache.camel.util.component.AbstractApiProducer要求的新主题。进程(Exchange,AsyncCallback)我也不得不重写java.util.concurrent.ScheduledThreadPoolExecutor.submit(Runnable) –

+1

cool。当我能够升级我们的骆驼 –

+1

我随后改变了这个以覆盖public Sc​​heduledFuture schedule(Runnable command,long delay,TimeUnit unit),这是submit()和execute()委托给(在JDK8中最小)。我认为这会让核心骆驼很好的投入。如果我找时间了解这一点,您可以向Apache签署版权(或需要进行任何许可工作)? –