Как внедрить многопоточность для выполнения множества однотипных тяжелых операций в Java-сервисе?

Ответ

Стандартный и эффективный подход — использование ExecutorService с пулом потоков фиксированного размера.

Пример реализации:

import java.util.concurrent.*;
import java.util.ArrayList;
import java.util.List;

public class HeavyOperationService {
    private final ExecutorService executor = Executors.newFixedThreadPool(
        Runtime.getRuntime().availableProcessors() // Оптимальный размер пула
    );

    public List<Result> processBatch(List<Task> tasks) throws InterruptedException, ExecutionException {
        List<Callable<Result>> callables = new ArrayList<>();
        for (Task task : tasks) {
            callables.add(() -> performHeavyOperation(task)); // Лямбда для Callable
        }

        // invokeAll() блокирует, пока все задачи не завершатся
        List<Future<Result>> futures = executor.invokeAll(callables);

        List<Result> results = new ArrayList<>();
        for (Future<Result> future : futures) {
            results.add(future.get()); // Получаем результат каждой задачи
        }
        return results;
    }

    private Result performHeavyOperation(Task task) {
        // Тяжелая операция
        return new Result();
    }

    public void shutdown() {
        executor.shutdown(); // Плавное завершение
    }
}

Ключевые моменты и альтернативы:

  • Выбор пула: newFixedThreadPool подходит для CPU-интенсивных задач. Для I/O-интенсивных (например, сетевые запросы) можно использовать newCachedThreadPool.
  • Обработка результатов: Future.get() блокирует поток. Для неблокирующей обработки используйте CompletableFuture.
  • Продвинутый вариант с CompletableFuture:

    List<CompletableFuture<Result>> futures = tasks.stream()
        .map(task -> CompletableFuture.supplyAsync(() -> performHeavyOperation(task), executor))
        .collect(Collectors.toList());
    
    // Ожидаем завершения всех и собираем результаты
    List<Result> results = CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]))
        .thenApply(v -> futures.stream()
            .map(CompletableFuture::join)
            .collect(Collectors.toList()))
        .join();
  • Важно: Всегда завершайте ExecutorService (shutdown() или shutdownNow()), чтобы освободить ресурсы JVM.

Ответ 18+ 🔞

А, ну ты про то, как кучу тяжёлых задач по-быстрому разгрести, да? Ну, классика жанра, блядь — берёшь ExecutorService с пулом потоков, и впердоливаешь туда всё, что шевелится.

Вот, смотри, как это обычно выглядит, чтоб не изобретать велосипед с квадратными колёсами:

import java.util.concurrent.*;
import java.util.ArrayList;
import java.util.List;

public class HeavyOperationService {
    // Держим пул потоков. Размер? Ну, по ядрам процессора — классика, хуле.
    private final ExecutorService executor = Executors.newFixedThreadPool(
        Runtime.getRuntime().availableProcessors()
    );

    public List<Result> processBatch(List<Task> tasks) throws InterruptedException, ExecutionException {
        List<Callable<Result>> callables = new ArrayList<>();
        for (Task task : tasks) {
            // Заворачиваем каждую задачу в Callable, чтоб потом результат выковыривать
            callables.add(() -> performHeavyOperation(task));
        }

        // invokeAll() — наш бро. Кидает все задачи в пул и ждёт, пока все не отстреляются.
        List<Future<Result>> futures = executor.invokeAll(callables);

        List<Result> results = new ArrayList<>();
        for (Future<Result> future : futures) {
            results.add(future.get()); // Тут вытаскиваем результат. Блокирует, да, но мы же и так ждём всех.
        }
        return results;
    }

    private Result performHeavyOperation(Task task) {
        // А вот тут уже твоя тяжёлая магия, ебать её в сраку.
        return new Result();
    }

    public void shutdown() {
        executor.shutdown(); // Не забудь прибраться, а то потоки как сироты останутся.
    }
}

А теперь, блядь, нюансы, на которых можно обжечься:

  • Какой пул выбрать? newFixedThreadPool — это для задач, где процессор пашет, а не спит. Если же твои операции больше тормозят на вводе-выводе (сеть, диск), то newCachedThreadPool может быть шустрее, он потоков наклепает, сколько попросишь. Но осторожно, а то овердохуища потоков создаст.
  • Future.get() блокирует. Это как стоять в очереди за пивом — пока все не получат, не уйдёшь. Если хочешь асинхронности и грации, то добро пожаловать в CompletableFuture, ёпта. С ним можно результаты обрабатывать по мере готовности.

Вот, глянь, как это выглядит на CompletableFuture, чтоб вообще охуенно было:

List<CompletableFuture<Result>> futures = tasks.stream()
    .map(task -> CompletableFuture.supplyAsync(() -> performHeavyOperation(task), executor))
    .collect(Collectors.toList());

// Ждём, когда ВСЕ эти будущие результаты станут настоящими
List<Result> results = CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]))
    .thenApply(v -> futures.stream()
        .map(CompletableFuture::join) // join тут почти как get, но без checked exceptions
        .collect(Collectors.toList()))
    .join(); // Да, тут тоже join, чтоб дождаться окончательного списка

И главное, чувак, не будь мудаком — завершай ExecutorService! Вызови shutdown() после работы, а то потоки так и будут висеть, как маньяки в подворотне, ждущие новых жертв. JVM-то ресурсы не бесконечные, в рот меня чих-пых!