深入剖析HSF调用异常:从活锁到死锁的解决之道

探讨HSF调用异常,分析死锁与活锁现象,并分享解决经验。

原文标题:我的程序突然罢工了|深入探究HSF调用异常,从死锁到活锁的全面分析与解决

原文作者:阿里云开发者

冷月清谈:

本文详细记录了作者在处理高层服务框架(HSF)调用异常的过程中,探索和解决活锁问题的经历。文章首先介绍了在应用程序突然罢工后,通过回溯日志和源码进行的排查,经历了从死锁的怀疑到发现活锁的整个过程。通过深入分析线程池的使用,发现了CompletableFuture和parallelStream共享同一个ForkJoin池导致的并发问题。最终,作者提出了针对ForkJoin池的使用建议,强调避免复杂操作,以减少类似问题的发生。

怜星夜思:

1、你曾经遇到过类似的并发问题吗?是如何解决的?
2、ForkJoin池与线程池的使用,有什么关键区别吗?
3、遇到活锁时,你会采取什么样的调试策略?

原文内容

阿里妹导读


本文详细记录了作者在处理HSF调用异常问题的过程中,从初步怀疑死锁到最终发现并解决活锁问题的全过程。

一、背景

前两天应用线上机器突然罢工了,HSF调用程序的某个接口一直处于运行中状态,持续了20分钟(超时时间为60分钟),正常的响应时间在2分钟以内,但是奇怪的是业务逻辑也没有再运行,非常诡异,层层排查从怀疑ForkJoin池使用不当导致程序出现活锁,再到复现问题时结果与现象相悖直接头晕爆炸,最后终于从源码层面上面找到最终答案。



二、分析过程


2.1 初步分析

首先查看日志,排查业务逻辑是否运行,从日志上面来看,打印出了获取到锁的日志,业务逻辑处于运行状态。

前一段时间其他项目发生过死锁问题,导致业务逻辑无法正常运行,这个时候第一想法是出现了死锁,自信的下载Arthas,输入thread -b,No most blocking thread found!





当场直接愣住,为什么没有死锁呢?没有死锁线程,我的程序在干什么呢?

图片


2.2 深入排查

本着程序绝大多数时间比人可靠的观点,继续深入排查。

既然是HSF线程池在Wait,那么就看看HSF在Wait什么,一步一步排查,Arthas一顿操作,找到了wait的HSF线程(以下截图的当时线程的快照),在wait CompletableFuture的结果返回。





这个时候Arthas再去查看业务的线程在做什么,居然锁在了parallelStream.collect的操作,查看代码collect操作只是一个普通的并发操作序列化对象信息。





突然灵光一现,CompletableFuture 和 parallelStream 使用的是一个公共线程池ForkJoin池,是不是出现了此线程池出现了问题呢?

Arthas 查看ForkJoin池在做什么,发现所有的线程都在等待一个锁,而这个锁的持有者是正在wait collect的业务线程。





好了,大功告成,CompletableFuture 和 parallelStream使用一个线程池并发的问题,把其中一个并发去了就死锁就解除了。


2.3 问题复现

线程池线程设置为1,此时提交一个任务A,任务A的方法是给线程池提交一个任务B,然后获取任务B的返回值,程序运行后,会发现检测不到死锁,但是程序无法正常工作,此时便处于活锁状态。

package com.example.learn.thread;

import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class LockTest {
public static ThreadPoolExecutor handleExecutor = new ThreadPoolExecutor(1, 1,
5L, TimeUnit.SECONDS, new LinkedBlockingQueue<>(2), Executors.defaultThreadFactory(),
new ThreadPoolExecutor.CallerRunsPolicy());

public static void main(String args) throws InterruptedException, ExecutionException {
Callable<String> taskA = new Callable<String>(){
@Override
public String call() {
try {
System.out.println(“taskA run”);

Future<String> taskB = handleExecutor.submit(new Callable<String>() {
@Override
public String call() throws Exception {
System.out.println(“taskB run”);
return “taskB”;
}
});

return taskB.get();
} catch (Exception e) {
e.printStackTrace();
}
return “taskA”;
}
};

Future<?> submit = handleExecutor.submit(taskA);
submit.get();
System.out.println(“finish”);
}
}

三、山重水复疑无路


3.1 复现问题,结果与现象相悖

解决完死锁问题后,长舒一口气,但是突然脑子里面蹦出来几个问题,始终让我觉得问题没这么简单:

  • 代码版本很久没变更了,为什么这次出问题了?

  • CompletableFuture 和 parallelStream 这种java自带的用法,如果并发有问题的话,所有的程序都会有这个隐藏问题。而且使用两个用法的地方穿插在无数类里面,尤其是在多人开发的情况下,如果我负责写入口方法,想要用parallelStream做并发操作,其他人提供的实现也正好用了parallelStream,那不就凉了吗?而且方法如果很复杂,涉及到几十个类的话,这种问题怎么避免呢?

不管了,直接把业务逻辑给简化一下,然后写一个程序运行看看是否会出现问题吧。 

逻辑如下:





提交4个任务,Fork-join池改成2个(为什么是2个?因为1个的话CompletableFuture不会使用Fork-join的公共池),理论上来讲,Fork-join池都会被全局锁给锁住,此时获取到锁的线程用parallelStream应该获取不到Fork-join池的线程来做操作,从而导致活锁。

代码如下:

package com.example.learn.thread;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.stream.Collectors;

public class LockThread {
private static final List<String> nameList = new ArrayList<>();

public static ThreadPoolExecutor handleExecutor = new ThreadPoolExecutor(1, 1,
5L, TimeUnit.SECONDS, new LinkedBlockingQueue<>(1), Executors.defaultThreadFactory(),
new ThreadPoolExecutor.CallerRunsPolicy());

static {
System.setProperty(“java.util.concurrent.ForkJoinPool.common.parallelism”, “2”);
nameList.add(“1”);
nameList.add(“2”);
nameList.add(“3”);
nameList.add(“4”);
}

private static final Lock LOCK = new ReentrantLock();

public static void main(String args) throws InterruptedException, ExecutionException {
List<CompletableFuture<Void>> futures = nameList.stream()
.map(name -> CompletableFuture.runAsync(() -> {
processJob(name);
}))
.collect(Collectors.toList());

try {
CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).join();
} catch (CompletionException e) {
System.out.println(“catch Error”);
}

System.out.println(“finish”);
}

/**

  • 做一些复杂操作
  • @param name
    */
    private static void processJob(String name) {
    try {
    //其他操作
    System.out.println(“submit job:” + name + “Thread:” + Thread.currentThread().getName() + “,count:” +
    handleExecutor.getQueue().size());
    handleExecutor.submit(() -> publishMessage(name));
    } catch (Exception e) {

}

}

private static void publishMessage(String name) {
try {
boolean acquired = false;
while (!acquired) {
try {
acquired = LOCK.tryLock(3, TimeUnit.SECONDS);
if (!acquired) {
Thread.sleep(1000);
}
} catch (Exception e) {

}
}

// 复杂操作
Thread.sleep(2000);

List<String> jobList = findByName(name);
List<String> resultList = jobList.parallelStream()
.map(s -> {
try {
Thread.sleep(10);
} catch (Exception e) {

}
System.out.println(“Thread:” + Thread.currentThread().getName() + “,job:” + s);
return s + “complete”;
})
.collect(Collectors.toList());
System.out.println(“Thread:” + Thread.currentThread().getName()+ “,result:” + resultList);
} catch (Exception e) {

} finally {
System.out.println(“unlock”);
LOCK.unlock();
}
}

private static List<String> findByName(String name) {
List<String> result = new ArrayList<>();
for (int i = 0; i < 5; i++) {
result.add(name + “-” + i);
}
return result;
}
}

最后的结果出乎意料,线程没有出现锁,任务都顺利完成了。

任务顺序完成的时候,我的头直接爆炸了,原来的分析都是错的吗?Arthas抓住的活锁难道是假的吗?只是正好看的瞬间在wait吗?


3.2 查看监控,简化问题,找出蛛丝马迹

感谢monitor监控会有机器采样,重新观察当时的栈快照的详情,发现与Arthas看到的现象一致,而且观察wait的对象,block的时间等等,最终确定还是CompletableFuture 和 parallelStream 出现冲突,导致程序活锁。

写一个简单的程序,先占满Fork-join池,在用parallelStream 看看能不能完成,最终发现可以完成,但是也发现一些蛛丝马迹,parallelStream 只有一个线程在做事情,而且是当前线程,并不是Fork-Join池线程。

package com.example.learn.thread;

import java.util.ArrayList;
import java.util.List;
import java.util.stream.Collectors;

public class ParallelStreamTest {
   static {
       System.setProperty(“java.util.concurrent.ForkJoinPool.common.parallelism”, “2”);
   }
   public static void main(String args) {
       List<String> jobList = findByName(“1”);
       boolean always = true;

       new Thread(() -> {
           List<String> resultList = jobList.parallelStream()
               .map(s -> {
                   while(always) {
                       try {
                           Thread.sleep(1000);
                       } catch (Exception e) {

                       }
                       //System.out.println(“Thread:” + Thread.currentThread().getName() + “, job:” + s);
                   }

                   return s + “complete”;
               })
               .collect(Collectors.toList());
           System.out.println(resultList);
       }).start();

       List<String> jobList2 = findByNameTwo(“1”);
       new Thread(() ->{
           while (true) {
               try {
                   jobList2.parallelStream()
                       .map(s -> {
                           try {
                               Thread.sleep(20);
                           } catch (Exception e) {

                           }
                           System.out.println(“Thread:” + Thread.currentThread().getName() + “, 外部循环:” + s);
                           return s + “complete”;
                       })
                       .collect(Collectors.toList());
                   Thread.sleep(10000);
               } catch (Exception e) {

               }
           }
       }).start();
   }

   private static List<String> findByName(String name) {
       List<String> result = new ArrayList<>();
       for (int i = 0; i < 5; i++) {
           result.add(name + “-” + i);
       }
       return result;
   }

   private static List<String> findByNameTwo(String name) {
       List<String> result = new ArrayList<>();
       for (int i = 0; i < 1000; i++) {
           result.add(name + “-” + i);
       }
       return result;
   }
}

这个时候崩溃了,如果parallelStream的用法可以保证Fork-join池就算满了,也能用当前线程执行,为什么我的业务线程还会被锁住呢?


3.3 搜索文档,询问其他人有没有遇到过类似的问题

网上都搜索不到类似问题,询问其他人也没有遇到过类似的问题,难道真的要用出最后一招了吗?看源码,debug源码,看看parallelStream到底是怎么运行的?

四、柳岸花明又一村


4.1 找源码解析的文档和模型抽象图

截取几张重要的图片









参考以下文档:

https://www.cnblogs.com/FraserYu/p/14439497.html

https://www.cnblogs.com/ciel717/p/16444880.html


4.2 文档只是引路人,还需要安安心心debug代码

看了很多文章讲述Fork-join池的原理,但是都没有解开我心中的问题,到底什么时候会用Fork-join的线程池呢?什么时候用本线程呢?到底会不会出现活锁呢?

初步怀疑,当前线程提交任务的时候,如果发现Fork-join线程有问题就不提交了,自己去执行?

但是经不起推敲,怎么发现线程有问题的呢?

经过Debug发现,parallelStream执行后,确实调用了Fork-join中的fork操作,然后将任务放到frok-join的队列中。









向出现死锁的线程排队队列中提交任务,然后还能完成?难道当前线程可以获取到队列的任务吗?

顺着代码排查,发现wait任务完成之前有一个奇怪的方法(“帮忙”?)





最后发现,当前线程在“帮忙”的时候,能把队列中的任务都给处理完成,直到本任务结束。





但是如果是这样的话,那么为什么活锁出现的现场,当前线程没有“帮忙”把所有任务完成呢?


4.3 真相大白

头晕眼花的时候,突然发现“帮忙”的时候拿取的是一个队列,Fork-join池最少有两个队列,为啥只帮我处理一个队列呢?检查别的地方没发现有遍历全部队列的地方,难道说当时是因为有一个任务分配给其他的死锁的队列里面了吗?

向Fork-Join池中提交任务时源码再探究

发现用当前线程提交的任务都只会分配到一个队列里面,而且“帮忙”的时候也只会帮忙这一个队列。









哎,不对,那我这个parallelStream只能用两个线程吗?通过别的文档发现,Fork-join线程池的模型与线程池存在区别,而且有一个窃取算法,可以窃取任务到本队列。





按照结果和现象来推论当时为什么会出现活锁,线程池有2个情况下,Work线程提交了两个普通任务,Hsf线程提交了两个死锁任务,但是很不巧,负责Hsf线程处理的fork-1的线程stole了一个普通任务,而且过程中hsf线程提交了两个死锁任务,导致fork-1处于无法工作的状态,这个时候fork-1队列中的普通任务无法完成,fork-2拉到死锁任务,然后Fork-join线程全部死锁。

第一步:





第二步:





第三步:





第四步:





第五步





上述分析很有道理,但是需要需要代码证明,Stole的时候并不会提前上一个全局锁,不然fork-1线程Stole的时候,直接执行的话或者一定此任务优化的话,Stole来的3+4就能算完,不会出现活锁问题。

scan是Stole的核心代码





最后发现窃取的任务和普通的任务一样,都是向队列做一个push操作,并没有上全局锁,而且fork线程做stole操作的任务,一定会放在自己的队列中。









自此真相大白,如果是上面的这种情况,发生的概率非常低,而且复现的难度也比较高,所以线上运行了很久才出现了这一次问题。



五、总结

对于ForkJoin池的理解不够,本次问题排查一波三折,期间无数次各种怀疑,最终终于真相大白,支撑排查下来的理念就是程序绝大多数时间比人可靠的观点,并且80%的问题都可以解决。

后续的建议和修改方案是对于用到ForkJoin池相关的操作如CompletableFuture 和 parallelStream等不要做任何复杂的操作,不要调用其他类的方法,只做一些无锁的基础操作,如果需要调用其他类的方法需要使用自定义线程池。

学习一个新知识的时候,搜索文档是必要的而且有用的,但是大部分的文档都是宏观层面,并不会深入探究细节,此时需要自己深入debug代码结合文档一起学习。

理论为实践提供指导和支持,实践则是理论得到验证和应用的手段。

通过HPA实现容器应用的水平弹性伸缩


本方案使用应用型负载均衡和容器服务 Kubernetes 版智能分配网络流量,提高应用的高可用性和吞吐量,使用HPA内置组件进行弹性伸缩,提升资源利用率,缩减资源成本。    


点击阅读原文查看详情。

我记得ForkJoin池提供的工作窃取机制非常独特,但我觉得用太多了反而可能导致不必要的复杂性,尤其是在处理长任务时。

首先检查现场的线程状态,通过日志输出和线程转储找出被锁的情况,确认是哪部分逻辑导致的活锁。

我会用一些工具,比如使用Java的jstack命令来抓取线程堆栈,找出任务的执行序列,通常会帮助我明确问题所在。

我是直接重写了那些有问题的方法,使用了更细粒度的锁,成效不错。发现有时候直接简单的改动就能解决隐蔽问题~

ForkJoin池适合于大量短任务的并行,而线程池则更适合需要保持长时间运行的任务。这两者的粒度和调度策略有所不同。

有啊,我当时是通过调整线程池的大小,优化任务划分来缓解了这个问题,然后也学习了如何使用context来避免潜在的锁竞争。

会将问题代码单独拆分出来,试着复现活锁现象,并逐步排查代码逻辑,往往这样能更快找到问题。

我之前在一个项目中遇到过资源竞争的问题,锁的使用不当导致了性能问题。最后我转向了使用红票和信号量的结合,效果明显!

ForkJoin池更注重的是任务的执行效率,通过任务窃取机制提高资源使用率,而普通线程池经常设定固定线程数。