Programing

스레드 풀과 함께 MDC를 사용하는 방법

c10106 2022. 5. 19. 22:32
반응형

스레드 풀과 함께 MDC를 사용하는 방법

우리의 소프트웨어에서 우리는 광범위하게 MDC를 사용하여 웹 요청에 대한 세션 ID와 사용자 이름과 같은 것들을 추적한다.이것은 원래 실에서 달리는 동안 잘 작동한다.

하지만, 뒤에서 처리해야 할 일들이 많다.이를 위해 우리는java.concurrent.ThreadPoolExecutor그리고java.util.Timer클래스와 일부 자체 롤링 비동기 실행 서비스.이 모든 서비스는 자체 스레드 풀을 관리한다.

이러한 환경에서 MDC를 사용하는 것에 대해 Logback의 설명서는 다음과 같이 말하고 있다.

매핑된 진단 컨텍스트의 복사본이 시작 스레드에서 작업자 스레드에 의해 항상 상속될 수 있는 것은 아니다.java.util.concurrent가 이에 해당한다.실행자는 스레드 관리에 사용된다.예를 들어 newCachedThreadPool 방법은 ThreadPoolExecutor를 생성하며 다른 스레드 풀링 코드와 마찬가지로 복잡한 스레드 생성 논리를 가지고 있다.

이 경우 작업을 실행자에게 제출하기 전에 원래(마스터) 스레드에서 MDC.getCopyOfContextMap()을 호출하는 것이 좋다.태스크가 실행되면 첫 번째 작업으로 MDC.setContextMapValues()를 호출하여 원래 MDC 값의 저장된 복사본을 새 실행자 관리 스레드와 연결해야 한다.

이것은 괜찮겠지만, 그 전화들을 추가하는 것은 매우 쉬운 일이며, 너무 늦기 전까지는 문제를 인식할 수 있는 쉬운 방법이 없다.Log4j의 유일한 부호는 로그에서 MDC 정보가 누락되고, Logback을 사용하면 오래된 MDC 정보가 생성된다는 것이다(트레드 풀의 스레드가 실행된 첫 번째 작업에서 해당 MDC를 상속받으므로).둘 다 생산 시스템의 심각한 문제들이다.

나는 우리의 상황을 어떤 식으로든 특별하게 보지 않지만, 나는 웹에서 이 문제에 대해 많은 것을 찾을 수 없었다.보아하니 이것은 많은 사람들이 부딪치는 일이 아니니 피할 방도가 있을 것이다.우리가 여기서 뭘 잘못하고 있는 거지?

그래, 나도 이 문제에 부딪혔어.몇 가지 해결책이 있지만(설명된 바와 같이 수동으로 설정하는 것과 같이), 이상적으로는 다음과 같은 해결책이 필요하다.

  • MDC를 일관성 있게 설정.
  • MDC가 잘못되었지만 사용자가 모르는 경우 암묵적인 버그를 방지함
  • 스레드 풀 사용 방법(예: 하위 클래스) 변경 최소화Callable와 함께MyCallable도처에, 또는 유사한 추악함.

여기 이 세 가지 니즈를 충족시키는 해결책이 있다.코드는 스스로 설명해야 한다.

(부기사항으로 이 실행자를 만들어 과바에게 먹일 수 있다.MoreExecutors.listeningDecorator(), 과바를 사용한다면ListanableFuture.)

import org.slf4j.MDC;

import java.util.Map;
import java.util.concurrent.*;

/**
 * A SLF4J MDC-compatible {@link ThreadPoolExecutor}.
 * <p/>
 * In general, MDC is used to store diagnostic information (e.g. a user's session id) in per-thread variables, to facilitate
 * logging. However, although MDC data is passed to thread children, this doesn't work when threads are reused in a
 * thread pool. This is a drop-in replacement for {@link ThreadPoolExecutor} sets MDC data before each task appropriately.
 * <p/>
 * Created by jlevy.
 * Date: 6/14/13
 */
public class MdcThreadPoolExecutor extends ThreadPoolExecutor {

    final private boolean useFixedContext;
    final private Map<String, Object> fixedContext;

    /**
     * Pool where task threads take MDC from the submitting thread.
     */
    public static MdcThreadPoolExecutor newWithInheritedMdc(int corePoolSize, int maximumPoolSize, long keepAliveTime,
                                                            TimeUnit unit, BlockingQueue<Runnable> workQueue) {
        return new MdcThreadPoolExecutor(null, corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
    }

    /**
     * Pool where task threads take fixed MDC from the thread that creates the pool.
     */
    @SuppressWarnings("unchecked")
    public static MdcThreadPoolExecutor newWithCurrentMdc(int corePoolSize, int maximumPoolSize, long keepAliveTime,
                                                          TimeUnit unit, BlockingQueue<Runnable> workQueue) {
        return new MdcThreadPoolExecutor(MDC.getCopyOfContextMap(), corePoolSize, maximumPoolSize, keepAliveTime, unit,
                workQueue);
    }

    /**
     * Pool where task threads always have a specified, fixed MDC.
     */
    public static MdcThreadPoolExecutor newWithFixedMdc(Map<String, Object> fixedContext, int corePoolSize,
                                                        int maximumPoolSize, long keepAliveTime, TimeUnit unit,
                                                        BlockingQueue<Runnable> workQueue) {
        return new MdcThreadPoolExecutor(fixedContext, corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
    }

    private MdcThreadPoolExecutor(Map<String, Object> fixedContext, int corePoolSize, int maximumPoolSize,
                                  long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
        this.fixedContext = fixedContext;
        useFixedContext = (fixedContext != null);
    }

    @SuppressWarnings("unchecked")
    private Map<String, Object> getContextForTask() {
        return useFixedContext ? fixedContext : MDC.getCopyOfContextMap();
    }

    /**
     * All executions will have MDC injected. {@code ThreadPoolExecutor}'s submission methods ({@code submit()} etc.)
     * all delegate to this.
     */
    @Override
    public void execute(Runnable command) {
        super.execute(wrap(command, getContextForTask()));
    }

    public static Runnable wrap(final Runnable runnable, final Map<String, Object> context) {
        return new Runnable() {
            @Override
            public void run() {
                Map previous = MDC.getCopyOfContextMap();
                if (context == null) {
                    MDC.clear();
                } else {
                    MDC.setContextMap(context);
                }
                try {
                    runnable.run();
                } finally {
                    if (previous == null) {
                        MDC.clear();
                    } else {
                        MDC.setContextMap(previous);
                    }
                }
            }
        };
    }
}

우리는 비슷한 문제에 부딪혔다.새 스레드를 시작/중지하기 전에 필요한 MDC 호출을 실행하기 위해 ThreadPoolExecutor를 확장하고 실행 전/후에 MDC 호출을 재정의할 수 있다.

IMHO의 최선의 해결책은 다음과 같다.

  • 사용하다ThreadPoolTaskExecutor
  • 당신 자신의 것을 실행하다TaskDecorator
  • 사용:executor.setTaskDecorator(new LoggingTaskDecorator());

장식가는 다음과 같이 보일 수 있다.

private final class LoggingTaskDecorator implements TaskDecorator {

    @Override
    public Runnable decorate(Runnable task) {
        // web thread
        Map<String, String> webThreadContext = MDC.getCopyOfContextMap();
        return () -> {
            // work thread
            try {
                // TODO: is this thread safe?
                MDC.setContextMap(webThreadContext);
                task.run();
            } finally {
                MDC.clear();
            }
        };
    }

}

고정 스레드 풀 및 실행자를 사용하여 다음 작업을 수행하십시오.

ExecutorService executor = Executors.newFixedThreadPool(4);
Map<String, String> mdcContextMap = MDC.getCopyOfContextMap();

나사산 부분:

executor.submit(() -> {
    MDC.setContextMap(mdcContextMap);
    // my stuff
});

을 사용하여 태스크를 실행하는 스프링 프레임워크 관련 환경에서 이 문제가 발생하는 경우@AsyncTaskDecorator 접근방식을 사용하여 작업을 꾸밀 수 있는 주석.

이 작업을 수행하는 방법에 대한 샘플:

나는 이 문제에 직면했고 위의 기사는 내가 이 문제를 다루는데 도움을 주었고 그래서 나는 이 문제를 여기서 공유하고 있다.

이전에 게시된 솔루션과 유사하게 다음 작업을 수행할 수 있는Runnable그리고Callable인수를 작성할 때 인수를 마무리하기 위해 덮어쓸 수 있다(허용된 솔루션 참조)RunnableFuture.

참고: 결과적으로,executorServicesubmit메소드가 아닌 메소드를 불러야 한다.execute방법의

를 위해ScheduledThreadPoolExecutor대신 메소드를 덮어쓰게 될 것이다.

여기서 기존의 답변과 유사한 또 다른 변화는 구현하는 것이다.ExecutorService그리고 대표자를 그쪽으로 넘겨주도록 허락한다.그런 다음 제네릭을 사용하면 통계를 얻고자 할 경우(다른 수정 방법을 사용하지 않는 한) 실제 대리인을 노출시킬 수 있다.

참조 코드:

public class MDCExecutorService<D extends ExecutorService> implements ExecutorService {

    private final D delegate;

    public MDCExecutorService(D delegate) {
        this.delegate = delegate;
    }

    @Override
    public void shutdown() {
        delegate.shutdown();
    }

    @Override
    public List<Runnable> shutdownNow() {
        return delegate.shutdownNow();
    }

    @Override
    public boolean isShutdown() {
        return delegate.isShutdown();
    }

    @Override
    public boolean isTerminated() {
        return delegate.isTerminated();
    }

    @Override
    public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {
        return delegate.awaitTermination(timeout, unit);
    }

    @Override
    public <T> Future<T> submit(Callable<T> task) {
        return delegate.submit(wrap(task));
    }

    @Override
    public <T> Future<T> submit(Runnable task, T result) {
        return delegate.submit(wrap(task), result);
    }

    @Override
    public Future<?> submit(Runnable task) {
        return delegate.submit(wrap(task));
    }

    @Override
    public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) throws InterruptedException {
        return delegate.invokeAll(wrapCollection(tasks));
    }

    @Override
    public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException {
        return delegate.invokeAll(wrapCollection(tasks), timeout, unit);
    }

    @Override
    public <T> T invokeAny(Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException {
        return delegate.invokeAny(wrapCollection(tasks));
    }

    @Override
    public <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
        return delegate.invokeAny(wrapCollection(tasks), timeout, unit);
    }

    @Override
    public void execute(Runnable command) {
        delegate.execute(wrap(command));
    }

    public D getDelegate() {
        return delegate;
    }

    /* Copied from https://github.com/project-ncl/pnc/blob/master/common/src/main/java/org/jboss/pnc/common
    /concurrent/MDCWrappers.java */

    private static Runnable wrap(final Runnable runnable) {
        final Map<String, String> context = MDC.getCopyOfContextMap();
        return () -> {
            Map previous = MDC.getCopyOfContextMap();
            if (context == null) {
                MDC.clear();
            } else {
                MDC.setContextMap(context);
            }
            try {
                runnable.run();
            } finally {
                if (previous == null) {
                    MDC.clear();
                } else {
                    MDC.setContextMap(previous);
                }
            }
        };
    }

    private static <T> Callable<T> wrap(final Callable<T> callable) {
        final Map<String, String> context = MDC.getCopyOfContextMap();
        return () -> {
            Map previous = MDC.getCopyOfContextMap();
            if (context == null) {
                MDC.clear();
            } else {
                MDC.setContextMap(context);
            }
            try {
                return callable.call();
            } finally {
                if (previous == null) {
                    MDC.clear();
                } else {
                    MDC.setContextMap(previous);
                }
            }
        };
    }

    private static <T> Consumer<T> wrap(final Consumer<T> consumer) {
        final Map<String, String> context = MDC.getCopyOfContextMap();
        return (t) -> {
            Map previous = MDC.getCopyOfContextMap();
            if (context == null) {
                MDC.clear();
            } else {
                MDC.setContextMap(context);
            }
            try {
                consumer.accept(t);
            } finally {
                if (previous == null) {
                    MDC.clear();
                } else {
                    MDC.setContextMap(previous);
                }
            }
        };
    }

    private static <T> Collection<Callable<T>> wrapCollection(Collection<? extends Callable<T>> tasks) {
        Collection<Callable<T>> wrapped = new ArrayList<>();
        for (Callable<T> task : tasks) {
            wrapped.add(wrap(task));
        }
        return wrapped;
    }
}

다음 접근 방식을 사용하여 이 문제를 해결할 수 있었다.

주 스레드(Application.java, 내 응용 프로그램의 진입점)

static public Map<String, String> mdcContextMap = MDC.getCopyOfContextMap();

실행자가 호출하는 클래스의 실행 방법

MDC.setContextMap(Application.mdcContextMap);

참조URL: https://stackoverflow.com/questions/6073019/how-to-use-mdc-with-thread-pools

반응형