前言
在 Kotlin 的协程(Coroutine) 中,虽然协程本身提供了一种简化并发编程的方式,但并不天然地解决所有的并发安全问题。当多个协程对共享状态进行读写操作时,仍可能出现并发安全问题。所以我们在使用协程时需要注意并发安全,避免产生相关的问题从而导致一些难以排查的问题。
协程中的并发安全问题1、共享状态竞争
多个协程同时访问和修改共享的可变状态时,可能会导致状态不一致。我们来看一个官网的代码示例:
suspend fun massiveRun(action: suspend () -> Unit) {
val n = 100 // 启动的协程数量
val k = 1000 // 每个协程重复执行同一动作的次数
val time = measureTimeMillis {
coroutineScope { // 协程的作用域
repeat(n) {
launch {
repeat(k) { action() }
}
}
}
}
println("Completed ${n * k} actions in $time ms")
}
//sampleStart
var counter = 0
fun main() = runBlocking {
withContext(Dispatchers.Default) {
massiveRun {
counter++
}
}
println("Counter = $counter")
}
在上述代码中,counter++不是线程安全的操作,因为它包含三个步骤(读取、计算、写入),多个协程可能会交错执行这些步骤,导致最终值错误。
2、数据不可见性
某个协程对共享变量的更新,可能不会立即被其他协程看见,尤其是在多线程环境下,协程可能运行在不同的线程上。
3、死锁与资源竞争
协程的挂起与恢复可能导致锁资源竞争。如果协程挂起期间未正确释放锁,可能造成死锁。这与我们在使用线程时是一样的,对锁的使用不当、设计不合理就会导致死锁问题,而死锁问题往往很难排查。
解决并发安全问题的方式1、使用线程安全的数据结构
对于简单的场景,可以直接使用线程安全的类,如:
对上述代码改造如下:
//sampleStart
var counter = AtomicInteger(0)
fun main() = runBlocking {
withContext(Dispatchers.Default) {
massiveRun {
counter.incrementAndGet()
}
}
println("Counter = $counter")
}
2、使用 Mutex(互斥锁)
kotlinx.coroutines.sync.Mutex提供了协程友好的锁,用于保护共享状态。
对上述代码改造如下:
//sampleStart
var counter = 0
val mutex = Mutex()
fun main() = runBlocking {
withContext(Dispatchers.Default) {
massiveRun {
mutex.withLock {
counter++
}
}
}
println("Counter = $counter")
}
3、使用 Channel
Channel是协程的线程安全队列,可以用来发送和接收数据,避免直接访问共享变量。
4、使用 StateFlow 或 SharedFlow
StateFlow是协程的状态容器,天然支持并发安全,用于维护和订阅状态的变化。
对上述代码改造如下:
//sampleStart
val counterFlow = MutableStateFlow(0)
fun main() = runBlocking {
withContext(Dispatchers.Default) {
massiveRun {
counterFlow.update { currentValue -> currentValue + 1 } // StateFlow 线程安全
}
}
println("Counter = $counterFlow")
}
5、使用 withContext 和单线程调度器
将共享资源的操作限制在单一线程中,避免多线程并发问题。
对上述代码改造如下:
val singleThreadContext = newSingleThreadContext("SingleThread")
suspend fun massiveRun(action: suspend () -> Unit) {
val n = 100 // 启动的协程数量
val k = 1000 // 每个协程重复执行同一动作的次数
val time = measureTimeMillis {
coroutineScope {
repeat(n) {
launch(singleThreadContext) {
repeat(k) { action() }
}
}
}
}
println("Completed ${n * k} actions in $time ms")
}
//sampleStart
var counter = 0
@Test
fun main() = runBlocking {
withContext(Dispatchers.Default) {
massiveRun {
counter++
}
}
println("Counter = $counter")
}
优劣对比及适用场景方案优点缺点适用场景
线程安全数据结构
简单高效,直接使用
适用范围有限,仅支持固定结构
简单计数器等
Mutex
灵活性高,适合保护复杂临界区
可能导致性能下降,存在锁竞争
多协程并发修改同一数据
Channel
线程安全,易实现生产者-消费者模型
实现复杂度较高,可能影响性能
消息队列、事件传递
StateFlow
简化状态管理,天然线程安全
性能有限,高频操作不适用
UI 状态同步
单线程调度器
逻辑简单,完全避免并发问题
性能受限,单线程可能成为瓶颈
严格顺序依赖的场景
总结与推荐
优先级排序:
如果操作简单,优先使用线程安全的数据结构,如AtomicInteger。如果需要保护复杂的共享状态,优先考虑Mutex。在需要流式状态管理时,推荐使用StateFlow。如果是生产者-消费者模型,使用Channel。对于顺序执行或线程受限的场景,使用单线程调度器。
建议:
在设计时尽量减少对共享状态的依赖,尽可能让任务无状态化。根据需求权衡性能和并发安全,选择最合适的解决方案。StateFlow 和 SharedFlow 为什么是天然线程安全的1、什么是 StateFlow 和 SharedFlow?
两者都是 Kotlin 协程的FlowAPI 的扩展,基于kotlinx.coroutines实现。
2、天然线程安全性
StateFlow和SharedFlow的实现基于协程库内部的线程安全机制:
示例代码说明线程安全性:
//sampleStart
val counterFlow = MutableStateFlow(0)
fun main() = runBlocking {
withContext(Dispatchers.Default) {
massiveRun {
counterFlow.update { currentValue -> currentValue + 1 } // 线程安全的自增
}
}
println("Counter = $counterFlow")
}
上述代码中,尽管多个协程同时修改counterFlow.value,由于StateFlow使用了CAS(Compare-And-Swap) 操作和内部同步机制,状态始终保持一致性。
3、为什么是天然线程安全的?
基于原子操作:
StateFlow和SharedFlow的核心操作(如值更新)是基于CAS,确保状态更新是不可分割的原子操作,避免了数据竞争。
内部状态保护:
这些类内部使用了Volatile修饰符或同步块,确保共享变量在多个线程之间的可见性。
背后的调度器模型:
协程本身由Dispatchers驱动,而这些调度器提供了线程池或单线程的安全性保障。
并发控制:
对于复杂场景(如订阅者的注册与事件分发),通过锁(如Mutex)和等待通知机制来确保安全。
Mutex 详解1、什么是 Mutex?
Mutex是协程库提供的一个协程友好的互斥锁,它允许多个协程安全地访问共享资源。
不同于传统的线程锁(如ReentrantLock),Mutex针对协程进行了优化,支持挂起操作而非阻塞线程。
2、Mutex 的核心特性
挂起而非阻塞:
如果一个协程尝试获取已被持有的锁,它将被挂起,而非阻塞整个线程。这使得其他协程或任务可以继续运行。
公平性支持:
Mutex默认是公平锁,按照请求的顺序授予锁,从而避免线程饥饿。
重入支持:
协程可以重复获取同一个锁(类似 Java 的ReentrantLock)。
3、使用 Mutex 的场景
保护共享资源:
在多协程并发访问共享变量时,避免状态不一致。
实现协程之间的临界区控制:
限制某段代码在某一时刻只能被一个协程执行。
4、Mutex 的核心方法lock() 和 unlock() 手动获取和释放锁
val mutex = Mutex()
suspend fun criticalSection() {
mutex.lock() // 获取锁
try {
// 临界区代码
} finally {
mutex.unlock() // 确保释放锁
}
}
withLock() 推荐的方法,用于自动管理锁的获取和释放。
val mutex = Mutex()
suspend fun criticalSection() {
mutex.withLock {
// 临界区代码
}
}
5、Mutex 可能导致死锁的场景锁重入的死锁
虽然Mutex支持协程的重入,但在多个协程嵌套调用同一资源时,可能会导致死锁。例如:
val mutex = Mutex()
suspend fun operationA() {
mutex.withLock {
operationB() // 递归调用会尝试重新获取同一把锁
}
}
suspend fun operationB() {
mutex.withLock {
// 临界区
}
}
多个锁交叉获取的死锁
如果多个协程以不同的顺序获取多个锁,可能会形成死锁。
val lock1 = Mutex()
val lock2 = Mutex()
suspend fun operation1() {
lock1.withLock {
delay(50) // 模拟操作
lock2.withLock { /* 临界区 */ }
}
}
suspend fun operation2() {
lock2.withLock {
delay(50) // 模拟操作
lock1.withLock { /* 临界区 */ }
}
}
在这种情况下,operation1和operation2都会等待对方释放锁,导致死锁。
6、如何避免死锁?
Mutex本身无法完全避免死锁问题,但通过合理的使用方式,可以极大地减少发生死锁的风险。合理设计协程的并发逻辑,并避免复杂的锁依赖关系,是避免死锁的根本解决方法
保持锁获取的顺序一致
多个锁的获取顺序应保持一致,以避免交叉锁定。
suspend fun operation1() {
lock1.withLock {
lock2.withLock { /* 临界区 */ }
}
}
suspend fun operation2() {
lock1.withLock {
lock2.withLock { /* 临界区 */ }
}
}
减少锁的粒度
尽可能减少锁的范围,只在绝对必要的临界区内持有锁,从而减少锁竞争和死锁的可能性。
suspend fun operation() {
// 执行非临界区代码
lock.withLock {
// 仅锁住临界区
}
}
使用超时
使用超时来避免协程无限期等待锁,可以通过withTimeout来实现:
import kotlinx.coroutines.withTimeout
val mutex = Mutex()
suspend fun safeLock() {
try {
withTimeout(1000) { // 1秒超时
mutex.withLock {
// 临界区代码
}
}
} catch (e: TimeoutCancellationException) {
println("获取锁超时,避免了死锁")
}
}
避免嵌套锁定
尽量避免在withLock内部再次调用另一个withLock,改为拆分逻辑,减少锁的嵌套。
使用 tryLock(非阻塞锁)
Mutex提供了tryLock方法,可以尝试获取锁而不会阻塞:
if (mutex.tryLock()) {
try {
// 临界区代码
} finally {
mutex.unlock()
}
} else {
println("锁已被占用")
}
使用更高层次的并发控制工具
如果业务逻辑允许,可以使用更高级的并发工具,比如StateFlow、Channel或atomic变量,避免显式使用锁。
7、Mutex 的优缺点优点缺点
协程友好,挂起而非阻塞线程
存在锁竞争时会降低并发性能
提供线程间的临界区保护
可能导致死锁(如果不正确释放锁)
支持公平性,防止线程饥饿
实现稍微复杂,尤其是在高并发场景