(给ImportNew加星标,提高Java技能)
本文一起来看看其中最受 Java 开发者关注的一项新特性:Loom 项目的两个新特性之一的 ”虚拟线程[1](Virtual Thread)“(另外一个新特性是 ”结构化并发[2](Structured Concurrency)“,当前是预览状态),它被称之为 Java 版的 ”协程“,它到底是什么?有什么神奇之处吗?
Thread thread = Thread.ofVirtual().start(() -> System.out.println("Hello"));
thread.join(); // 等待虚拟线程终止
Thread.startVirtualThread(task) 可以快捷地创建并启动虚拟线程,它与 Thread.ofVirtual().start(task) 是等价的。
Thread.Builder builder = Thread.ofVirtual().name("MyThread"); // 虚拟线程的名称是 MyThread
Runnable task = () -> System.out.println("Running thread");
Thread t = builder.start(task);
System.out.println("Thread t name: " + t.getName()); // 控制台打印:Thread t name: MyThread
t.join();
下面的示例代码创建了 2 个虚拟线程,名称分别是 worker-0 和 worker-1(这个是由 name() 中的两个参数 prefix 和 start 指定的):
Thread.Builder builder = Thread.ofVirtual().name("worker-", 0);
Runnable task = () -> System.out.println("Thread ID: " + Thread.currentThread().threadId());
// 虚拟线程 1,名称为 worker-0
Thread t1 = builder.start(task);
t1.join();
System.out.println(t1.getName() + " terminated");
// 虚拟线程 2,名称为 worker-1
Thread t2 = builder.start(task);
t2.join();
System.out.println(t2.getName() + " terminated");
以上示例代码运行结果,在控制台中打印内容如下:
Thread ID: 21
worker-0 terminated
Thread ID: 24
worker-1 terminated
// Java 21 中 ExecutorService 接口继承了 AutoCloseable 接口,
// 所以可以使用 try-with-resources 语法使 Executor 在最后被自动地 close()
try (ExecutorService myExecutor = Executors.newVirtualThreadPerTaskExecutor()) {
// 每次 submit() 调用向 Executor 提交任务时都会创建和启动一个新的虚拟线程
Future<?> future = myExecutor.submit(() -> System.out.println("Running thread"));
future.get(); // 等待线程任务执行完成
System.out.println("Task completed");
} catch (ExecutionException | InterruptedException ignore) {}
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.PrintWriter;
import java.net.ServerSocket;
import java.net.Socket;
public class EchoServer {
public static void main(String[] args) {
try (ServerSocket serverSocket = new ServerSocket(8080)) {
while (true) {
try {
// 接受传入的客户端连接
Socket clientSocket = serverSocket.accept();
// 启动服务线程,处理这个客户端连接传输的数据并回显。可以通过虚拟线程同时服务多个客户端,每个客户端连接一个线程。
Thread.ofVirtual().start(() -> {
try (PrintWriter out = new PrintWriter(clientSocket.getOutputStream(), true);
BufferedReader in = new BufferedReader(new InputStreamReader(clientSocket.getInputStream()))) {
String inputLine;
while ((inputLine = in.readLine()) != null) {
System.out.println(inputLine);
out.println(inputLine);
}
} catch (IOException ignore) {}
});
} catch (Throwable unknown) {
break;
}
}
} catch (IOException e) {
System.err.println("Exception caught when trying to listen on port 8080 or listening for a connection: " + e.getMessage());
System.exit(1);
}
}
}
EchoClient 为回显客户端程序,它连接到本地的服务器并发送在命令行输入的文本消息:
import java.io.BufferedReader;
import java.io.InputStreamReader;
import java.io.PrintWriter;
import java.net.Socket;
public class EchoClient {
public static void main(String[] args) {
try (Socket echoSocket = new Socket("127.0.0.1", 8080);
PrintWriter out = new PrintWriter(echoSocket.getOutputStream(), true);
BufferedReader in = new BufferedReader(new InputStreamReader(echoSocket.getInputStream()))) {
BufferedReader stdIn = new BufferedReader(new InputStreamReader(System.in));
String userInput;
while ((userInput = stdIn.readLine()) != null) {
out.println(userInput);
System.out.println("echo: " + in.readLine());
if (userInput.equals("bye")) {
break;
}
}
} catch (Exception e) {
System.err.println("Couldn't get I/O for the connection to 127.0.0.1:8080: " + e.getMessage());
System.exit(1);
}
}
}
long startMills = System.currentTimeMillis();
ExecutorService executor = Executors.newFixedThreadPool(256);
List<CompletableFuture<Void>> futures = new ArrayList<>();
IntStream.range(0, 10000).forEach(i -> {
// 如果 runAsync 不指定 Executor,则会使用默认的线程池(除非系统不支持并行,否则会使用一个通用的 ForkJoinPool.commonPool 线程池)
CompletableFuture<Void> f = CompletableFuture.runAsync(() -> {
try {
Thread.sleep(1000L);
} catch (InterruptedException ignore) {
Thread.currentThread().interrupt();
}
}, executor);
futures.add(f);
});
CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).join();
executor.shutdown();
System.out.println("【线程池】任务执行时间:" + (System.currentTimeMillis() - startMills) / 1000 + " 秒!");
以上示例代码运行结果,在控制台中打印内容如下:
【线程池】任务执行时间:40 秒!
在有虚拟线程后,其实改动非常少,只需要将平台线程池的 executor 替换为虚拟线程的 executor 即可:
long startMills = System.currentTimeMillis();
List<CompletableFuture<Void>> futures = new ArrayList<>();
try (var executor = Executors.newVirtualThreadPerTaskExecutor()) {
IntStream.range(0, 10000).forEach(i -> {
// 如果 runAsync 不指定 Executor,则会使用默认的线程池(除非系统不支持并行,否则会使用一个通用的 ForkJoinPool.commonPool 线程池)
CompletableFuture<Void> f = CompletableFuture.runAsync(() -> {
try {
Thread.sleep(1000L);
} catch (InterruptedException ignore) {
Thread.currentThread().interrupt();
}
}, executor);
futures.add(f);
});
CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).join();
}
System.out.println("【虚拟线程】任务执行时间:" + (System.currentTimeMillis() - startMills) / 1000 + " 秒!");
以上示例代码运行结果,在控制台中打印内容如下:
【虚拟线程】任务执行时间:1 秒!
同时,也可以看到在这个示例代码的场景下,虚拟线程相比平台线程池的方案在性能上提升了约 40 倍!
平台线程由操作系统来调度并决定何时运行,但是虚拟线程是由 Java 运行时来调度并决定何时运行的。当 Java 运行时调度虚拟线程时,它在平台线程上分配或挂载虚拟线程,然后操作系统像往常一样调度该平台线程,这个平台线程称为载体(Carrier)。运行一些代码后,虚拟线程可以从它的载体卸载,这通常发生在虚拟线程执行阻塞 I/O 操作时。虚拟线程从它的载体上卸载后,载体是空闲的,这意味着 Java 运行时调度器可以在其上挂载不同的虚拟线程。
在阻塞操作期间,当虚拟线程被固定到它的载体上时,它不能被卸载。虚拟线程在以下情况下会被固定(pinning):
虚拟线程在 synchronized 同步块或方法中运行代码;
虚拟线程运行本地方法(native method)或外部函数(foreign function)。
虚拟线程仍然是线程,调试器可以像平台线程那样对它们进行步进。Java Flight Recorder (JFR) 和 jcmd 工具具有额外的特性功能可以帮助观察应用程序中的虚拟线程。
jfr print --events jdk.VirtualThreadStart,jdk.VirtualThreadEnd,jdk.VirtualThreadPinned,jdk.VirtualThreadSubmitFailed recording.jfr
jcmd <PID> Thread.dump_to_file -format=text <file>
jcmd <PID> Thread.dump_to_file -format=json <file>
CompletableFuture.supplyAsync(info::getUrl, pool)
.thenCompose(url -> getBodyAsync(url, HttpResponse.BodyHandlers.ofString()))
.thenApply(info::findImage)
.thenCompose(url -> getBodyAsync(url, HttpResponse.BodyHandlers.ofByteArray()))
.thenApply(info::setImageData)
.thenAccept(this::process)
.exceptionally(ignore -> null);
但是下面这种以同步风格编写并使用简单阻塞 I/O 的代码却将受益匪浅:
try {
String page = getBody(info.getUrl(), HttpResponse.BodyHandlers.ofString());
String imageUrl = info.findImage(page);
byte[] data = getBody(imageUrl, HttpResponse.BodyHandlers.ofByteArray());
info.setImageData(data);
process(info);
} catch (Exception ignore) {}
这样的代码也更容易在调试器中进行调试,在分析器中进行概要分析,或者使用线程转储进行观察。为了观察虚拟线程,使用 jcmd 命令创建一个线程转储:
jcmd <pid> Thread.dump_to_file -format=json <file>
以这种风格编写的堆栈越多,虚拟线程的性能和可观察性就越好。用其他风格编写的程序或框架,如果没有为每个任务指定一个线程,就不应该期望从虚拟线程中获得显著的好处。避免将同步、阻塞代码与异步框架混在一起。
关于虚拟线程,最难内化的是,虽然它们具有与平台线程相同的行为,但它们不应该表示相同的程序概念。
平台线程是稀缺的,因此是一种宝贵的资源。需要管理宝贵的资源,管理平台线程的最常用方法是使用线程池。接下来需要回答的问题是,池中应该有多少线程?
但是虚拟线程非常多,因此每个线程不应该代表一些共享的、池化的资源,而应该代表一个任务。线程从托管资源转变为应用程序域对象。我们应该有多少个虚拟线程的问题变得很明显,就像我们应该使用多少个字符串在内存中存储一组用户名的问题一样:虚拟线程的数量总是等于应用程序中并发任务的数量。
将 n 个平台线程转换为 n 个虚拟线程不会产生什么好处;相反,需要转换的是任务。
为了将每个应用程序任务表示为一个线程,不要像下面的例子那样使用共享线程池执行器:
Future<ResultA> f1 = sharedThreadPoolExecutor.submit(task1);
Future<ResultB> f2 = sharedThreadPoolExecutor.submit(task2);
// ... 使用 f1、f2
相反地,应该使用虚拟线程执行器,如下例所示:
try (var executor = Executors.newVirtualThreadPerTaskExecutor()) { // 注意这里实际上并没有将虚拟线程进行池化
Future<ResultA> f1 = executor.submit(task1);
Future<ResultB> f2 = executor.submit(task2);
// ... 使用 f1、f2
}
代码仍然使用 ExecutorService,但是从 Executors.newVirtualThreadPerTaskExecutor() 返回的那个没有使用线程池。相反,它为每个提交的任务创建一个新的虚拟线程。
此外,ExecutorService 本身是轻量级的,我们可以创建一个新的,就像处理任何简单的对象一样。这允许我们依赖于新添加的ExecutorService.close() 方法和 try-with-resources 语句。在 try 块结束时隐式调用的 close 方法将自动等待提交给ExecutorService 的所有任务(即由 ExecutorService 生成的所有虚拟线程)终止。
对于 fanout 场景,这是一个特别有用的模式,在这种场景中,我们希望并发地向不同的服务执行多个传出调用,如下面的示例所示:
void handle(Request request, Response response) {
var url1 = ...
var url2 = ...
try (var executor = Executors.newVirtualThreadPerTaskExecutor()) {
var future1 = executor.submit(() -> fetchURL(url1));
var future2 = executor.submit(() -> fetchURL(url2));
response.send(future1.get() + future2.get());
} catch (ExecutionException | InterruptedException e) {
response.fail(e);
}
}
String fetchURL(URL url) throws IOException {
try (var in = url.openStream()) {
return new String(in.readAllBytes(), StandardCharsets.UTF_8);
}
}
ExecutorService es = Executors.newFixedThreadPool(10); // 固定线程池的核心及最大线程数量为 10
...
Result foo() {
try {
var fut = es.submit(() -> callLimitedService());
return f.get();
} catch (...){ ...}
}
此示例确保对有限的服务最多有 10 个并发请求。
但是限制并发性只是线程池操作的副作用。池被设计为共享稀缺资源,而虚拟线程并不稀缺,因此永远不应该被池化!
在使用虚拟线程时,如果希望限制访问某些服务的并发性,则应该使用专门为此目的设计的构造:Semaphore 类。如下示例:
Semaphore sem = new Semaphore(10); // 初始化一个信号量,拥有 10 个许可
...
Result foo() {
sem.acquire(); // 申请许可,如果当前没有许可了,则阻塞直至其他线程 release 以释放许可
try {
return callLimitedService(); // 只有申请并获得了许可的线程,才能进入此处执行业务逻辑,从而控制了并发性
} finally {
sem.release(); // 释放许可,以供其他线程使用
}
}
简单地用信号量阻塞一些虚拟线程可能看起来与将任务提交到一个固定线程池有很大的不同,但事实上并非如此。将任务提交到线程池会将它们排队等待以供稍后执行,但是信号量内部(或任何其他类似的阻塞同步构造)会创建一个阻塞在它上面的线程队列,这些线程被阻塞在其上,与等待池化的平台线程来执行它们的任务队列相对应。因为虚拟线程即是任务,所以其结果结构是等价的:
static final ThreadLocal<SimpleDateFormat> cachedFormatter = ThreadLocal.withInitial(SimpleDateFormat::new);
void foo() {
...
cachedFormatter.get().format(...);
...
}
这种类型的缓存仅在线程(因此在线程局部缓存的昂贵对象)被多个任务共享和重复使用时才有帮助,就像在平台线程池中的池化线程时的情况一样。在线程池中运行时,许多任务可能会调用 foo,但由于池中只包含一些线程,该对象只会被实例化几次 - 每个池线程一次 - 然后被缓存和重复使用。
然而,虚拟线程从不被池化,也不会被不相关的任务重复使用。因为每个任务都有自己的虚拟线程,来自不同任务的每次对 foo 的调用都会触发新的 SimpleDateFormat 实例的实例化。而且,由于可能有大量虚拟线程同时运行,昂贵的对象可能会消耗大量内存。这与线程局部缓存的预期成果完全相反。
没有单一的通用替代方案,但在 SimpleDateFormat 的情况下,我们应该将其替换为 DateTimeFormatter。DateTimeFormatter 是不可变的,因此可以由所有线程共享单个实例:
static final DateTimeFormatter formatter = DateTimeFormatter….;
void foo() {
...
formatter.format(...);
...
}
或者,我们可以使用系统属性 jdk.tracePinnedThreads,在线程被固定时发出堆栈跟踪。使用选项 -Djdk.tracePinnedThreads=full 时,当线程被固定时会打印完整的堆栈跟踪,突出显示本机帧和持有监视器的帧。使用选项 -Djdk.tracePinnedThreads=short 时,输出将限制为仅包括有问题的帧。
如果这些机制检测到 pinning 在某些地方既长时间存在又频繁发生,那么在那些特定地方使用 ReentrantLock 替代 synchronized(再次强调,**不需要替代用于保护短时操作或不频繁操作的 synchronized**)。以下是一个长时间存在且频繁使用同步块的示例:
synchronized(lockObj) {
frequentIO();
}
我们可以将其替换为:
lock.lock();
try {
frequentIO();
} finally {
lock.unlock();
}
转自:calvinit,
链接:cnblogs.com/calvinit/p/17730501.html
- EOF -
推荐阅读 点击标题可跳转看完本文有收获?请转发分享给更多人
关注「ImportNew」,提升Java技能
点赞和在看就是最大的支持❤️