Ответ
Стандартный и эффективный подход — использование ExecutorService с пулом потоков фиксированного размера.
Пример реализации:
import java.util.concurrent.*;
import java.util.ArrayList;
import java.util.List;
public class HeavyOperationService {
private final ExecutorService executor = Executors.newFixedThreadPool(
Runtime.getRuntime().availableProcessors() // Оптимальный размер пула
);
public List<Result> processBatch(List<Task> tasks) throws InterruptedException, ExecutionException {
List<Callable<Result>> callables = new ArrayList<>();
for (Task task : tasks) {
callables.add(() -> performHeavyOperation(task)); // Лямбда для Callable
}
// invokeAll() блокирует, пока все задачи не завершатся
List<Future<Result>> futures = executor.invokeAll(callables);
List<Result> results = new ArrayList<>();
for (Future<Result> future : futures) {
results.add(future.get()); // Получаем результат каждой задачи
}
return results;
}
private Result performHeavyOperation(Task task) {
// Тяжелая операция
return new Result();
}
public void shutdown() {
executor.shutdown(); // Плавное завершение
}
}
Ключевые моменты и альтернативы:
- Выбор пула:
newFixedThreadPoolподходит для CPU-интенсивных задач. Для I/O-интенсивных (например, сетевые запросы) можно использоватьnewCachedThreadPool. - Обработка результатов:
Future.get()блокирует поток. Для неблокирующей обработки используйтеCompletableFuture. -
Продвинутый вариант с
CompletableFuture:List<CompletableFuture<Result>> futures = tasks.stream() .map(task -> CompletableFuture.supplyAsync(() -> performHeavyOperation(task), executor)) .collect(Collectors.toList()); // Ожидаем завершения всех и собираем результаты List<Result> results = CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])) .thenApply(v -> futures.stream() .map(CompletableFuture::join) .collect(Collectors.toList())) .join(); - Важно: Всегда завершайте
ExecutorService(shutdown()илиshutdownNow()), чтобы освободить ресурсы JVM.
Ответ 18+ 🔞
А, ну ты про то, как кучу тяжёлых задач по-быстрому разгрести, да? Ну, классика жанра, блядь — берёшь ExecutorService с пулом потоков, и впердоливаешь туда всё, что шевелится.
Вот, смотри, как это обычно выглядит, чтоб не изобретать велосипед с квадратными колёсами:
import java.util.concurrent.*;
import java.util.ArrayList;
import java.util.List;
public class HeavyOperationService {
// Держим пул потоков. Размер? Ну, по ядрам процессора — классика, хуле.
private final ExecutorService executor = Executors.newFixedThreadPool(
Runtime.getRuntime().availableProcessors()
);
public List<Result> processBatch(List<Task> tasks) throws InterruptedException, ExecutionException {
List<Callable<Result>> callables = new ArrayList<>();
for (Task task : tasks) {
// Заворачиваем каждую задачу в Callable, чтоб потом результат выковыривать
callables.add(() -> performHeavyOperation(task));
}
// invokeAll() — наш бро. Кидает все задачи в пул и ждёт, пока все не отстреляются.
List<Future<Result>> futures = executor.invokeAll(callables);
List<Result> results = new ArrayList<>();
for (Future<Result> future : futures) {
results.add(future.get()); // Тут вытаскиваем результат. Блокирует, да, но мы же и так ждём всех.
}
return results;
}
private Result performHeavyOperation(Task task) {
// А вот тут уже твоя тяжёлая магия, ебать её в сраку.
return new Result();
}
public void shutdown() {
executor.shutdown(); // Не забудь прибраться, а то потоки как сироты останутся.
}
}
А теперь, блядь, нюансы, на которых можно обжечься:
- Какой пул выбрать?
newFixedThreadPool— это для задач, где процессор пашет, а не спит. Если же твои операции больше тормозят на вводе-выводе (сеть, диск), тоnewCachedThreadPoolможет быть шустрее, он потоков наклепает, сколько попросишь. Но осторожно, а то овердохуища потоков создаст. Future.get()блокирует. Это как стоять в очереди за пивом — пока все не получат, не уйдёшь. Если хочешь асинхронности и грации, то добро пожаловать вCompletableFuture, ёпта. С ним можно результаты обрабатывать по мере готовности.
Вот, глянь, как это выглядит на CompletableFuture, чтоб вообще охуенно было:
List<CompletableFuture<Result>> futures = tasks.stream()
.map(task -> CompletableFuture.supplyAsync(() -> performHeavyOperation(task), executor))
.collect(Collectors.toList());
// Ждём, когда ВСЕ эти будущие результаты станут настоящими
List<Result> results = CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]))
.thenApply(v -> futures.stream()
.map(CompletableFuture::join) // join тут почти как get, но без checked exceptions
.collect(Collectors.toList()))
.join(); // Да, тут тоже join, чтоб дождаться окончательного списка
И главное, чувак, не будь мудаком — завершай ExecutorService! Вызови shutdown() после работы, а то потоки так и будут висеть, как маньяки в подворотне, ждущие новых жертв. JVM-то ресурсы не бесконечные, в рот меня чих-пых!