Как внедрить многопоточность для выполнения множества однотипных тяжелых операций в Java-сервисе?

«Как внедрить многопоточность для выполнения множества однотипных тяжелых операций в Java-сервисе?» — вопрос из категории Java Core, который задают на 10% собеседований Java Разработчик. Ниже — развёрнутый ответ с разбором ключевых моментов.

Ответ

Стандартный и эффективный подход — использование ExecutorService с пулом потоков фиксированного размера.

Пример реализации:

import java.util.concurrent.*;
import java.util.ArrayList;
import java.util.List;

public class HeavyOperationService {
    private final ExecutorService executor = Executors.newFixedThreadPool(
        Runtime.getRuntime().availableProcessors() // Оптимальный размер пула
    );

    public List<Result> processBatch(List<Task> tasks) throws InterruptedException, ExecutionException {
        List<Callable<Result>> callables = new ArrayList<>();
        for (Task task : tasks) {
            callables.add(() -> performHeavyOperation(task)); // Лямбда для Callable
        }

        // invokeAll() блокирует, пока все задачи не завершатся
        List<Future<Result>> futures = executor.invokeAll(callables);

        List<Result> results = new ArrayList<>();
        for (Future<Result> future : futures) {
            results.add(future.get()); // Получаем результат каждой задачи
        }
        return results;
    }

    private Result performHeavyOperation(Task task) {
        // Тяжелая операция
        return new Result();
    }

    public void shutdown() {
        executor.shutdown(); // Плавное завершение
    }
}

Ключевые моменты и альтернативы:

  • Выбор пула: newFixedThreadPool подходит для CPU-интенсивных задач. Для I/O-интенсивных (например, сетевые запросы) можно использовать newCachedThreadPool.
  • Обработка результатов: Future.get() блокирует поток. Для неблокирующей обработки используйте CompletableFuture.
  • Продвинутый вариант с CompletableFuture:

    List<CompletableFuture<Result>> futures = tasks.stream()
        .map(task -> CompletableFuture.supplyAsync(() -> performHeavyOperation(task), executor))
        .collect(Collectors.toList());
    
    // Ожидаем завершения всех и собираем результаты
    List<Result> results = CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]))
        .thenApply(v -> futures.stream()
            .map(CompletableFuture::join)
            .collect(Collectors.toList()))
        .join();
  • Важно: Всегда завершайте ExecutorService (shutdown() или shutdownNow()), чтобы освободить ресурсы JVM.