前言

在 Kotlin 的协程(Coroutine) 中,虽然协程本身提供了一种简化并发编程的方式,但并不天然地解决所有的并发安全问题。当多个协程对共享状态进行读写操作时,仍可能出现并发安全问题。所以我们在使用协程时需要注意并发安全,避免产生相关的问题从而导致一些难以排查的问题。

协程中的并发安全问题1、共享状态竞争

多个协程同时访问和修改共享的可变状态时,可能会导致状态不一致。我们来看一个官网的代码示例:

suspend fun massiveRun(action: suspend () -> Unit) {
    val n = 100  // 启动的协程数量
    val k = 1000 // 每个协程重复执行同一动作的次数
    val time = measureTimeMillis {
        coroutineScope { // 协程的作用域
            repeat(n) {
                launch {
                    repeat(k) { action() }
                }
            }
        }
    }
    println("Completed ${n * k} actions in $time ms")    
}
//sampleStart
var counter = 0
fun main() = runBlocking {
    withContext(Dispatchers.Default) {
        massiveRun {
            counter++
        }
    }
    println("Counter = $counter")
}

在上述代码中,counter++不是线程安全的操作,因为它包含三个步骤(读取、计算、写入),多个协程可能会交错执行这些步骤,导致最终值错误。

2、数据不可见性

某个协程对共享变量的更新,可能不会立即被其他协程看见,尤其是在多线程环境下,协程可能运行在不同的线程上。

3、死锁与资源竞争

协程的挂起与恢复可能导致锁资源竞争。如果协程挂起期间未正确释放锁,可能造成死锁。这与我们在使用线程时是一样的,对锁的使用不当、设计不合理就会导致死锁问题,而死锁问题往往很难排查。

解决并发安全问题的方式1、使用线程安全的数据结构

对于简单的场景,可以直接使用线程安全的类,如:

对上述代码改造如下:

//sampleStart
var counter = AtomicInteger(0)
fun main() = runBlocking {
    withContext(Dispatchers.Default) {
        massiveRun {
            counter.incrementAndGet()
        }
    }
    println("Counter = $counter")
}

2、使用 Mutex(互斥锁)

kotlinx.coroutines.sync.Mutex提供了协程友好的锁,用于保护共享状态。

对上述代码改造如下:

//sampleStart
var counter = 0
val mutex = Mutex()
fun main() = runBlocking {
    withContext(Dispatchers.Default) {
        massiveRun {
            mutex.withLock {
                counter++
            }
        }
    }
    println("Counter = $counter")
}

3、使用 Channel

Channel是协程的线程安全队列,可以用来发送和接收数据,避免直接访问共享变量。

4、使用 StateFlow 或 SharedFlow

StateFlow是协程的状态容器,天然支持并发安全,用于维护和订阅状态的变化。

对上述代码改造如下:

//sampleStart
val counterFlow = MutableStateFlow(0)
fun main() = runBlocking {
    withContext(Dispatchers.Default) {
        massiveRun {
            counterFlow.update { currentValue -> currentValue + 1 }  // StateFlow 线程安全
        }
    }
    println("Counter = $counterFlow")
}

5、使用 withContext 和单线程调度器

将共享资源的操作限制在单一线程中,避免多线程并发问题。

对上述代码改造如下:

val singleThreadContext = newSingleThreadContext("SingleThread")
suspend fun massiveRun(action: suspend () -> Unit) {
    val n = 100  // 启动的协程数量
    val k = 1000 // 每个协程重复执行同一动作的次数
    val time = measureTimeMillis {
        coroutineScope {
            repeat(n) {
                launch(singleThreadContext) {
                    repeat(k) { action() }
                }
            }
        }
    }
    println("Completed ${n * k} actions in $time ms")
}
//sampleStart
var counter = 0
@Test
fun main() = runBlocking {
    withContext(Dispatchers.Default) {
        massiveRun {
            counter++
        }
    }
    println("Counter = $counter")
}

优劣对比及适用场景方案优点缺点适用场景

线程安全数据结构

简单高效,直接使用

适用范围有限,仅支持固定结构

简单计数器等

Mutex

灵活性高,适合保护复杂临界区

可能导致性能下降,存在锁竞争

多协程并发修改同一数据

Channel

线程安全,易实现生产者-消费者模型

实现复杂度较高,可能影响性能

消息队列、事件传递

StateFlow

简化状态管理,天然线程安全

性能有限,高频操作不适用

UI 状态同步

单线程调度器

逻辑简单,完全避免并发问题

性能受限,单线程可能成为瓶颈

严格顺序依赖的场景

总结与推荐

优先级排序:

如果操作简单,优先使用线程安全的数据结构,如AtomicInteger。如果需要保护复杂的共享状态,优先考虑Mutex。在需要流式状态管理时,推荐使用StateFlow。如果是生产者-消费者模型,使用Channel。对于顺序执行或线程受限的场景,使用单线程调度器。

建议:

在设计时尽量减少对共享状态的依赖,尽可能让任务无状态化。根据需求权衡性能和并发安全,选择最合适的解决方案。StateFlow 和 SharedFlow 为什么是天然线程安全的1、什么是 StateFlow 和 SharedFlow?

两者都是 Kotlin 协程的FlowAPI 的扩展,基于kotlinx.coroutines实现。

2、天然线程安全性

StateFlow和SharedFlow的实现基于协程库内部的线程安全机制:

示例代码说明线程安全性:

//sampleStart
val counterFlow = MutableStateFlow(0)
fun main() = runBlocking {
    withContext(Dispatchers.Default) {
        massiveRun {

并发安全实现的几种方式_并发实现_

counterFlow.update { currentValue -> currentValue + 1 } // 线程安全的自增 } } println("Counter = $counterFlow") }

上述代码中,尽管多个协程同时修改counterFlow.value,由于StateFlow使用了CAS(Compare-And-Swap) 操作和内部同步机制,状态始终保持一致性。

3、为什么是天然线程安全的?

基于原子操作:

StateFlow和SharedFlow的核心操作(如值更新)是基于CAS,确保状态更新是不可分割的原子操作,避免了数据竞争。

内部状态保护:

这些类内部使用了Volatile修饰符或同步块,确保共享变量在多个线程之间的可见性。

背后的调度器模型:

协程本身由Dispatchers驱动,而这些调度器提供了线程池或单线程的安全性保障。

并发控制:

对于复杂场景(如订阅者的注册与事件分发),通过锁(如Mutex)和等待通知机制来确保安全。

Mutex 详解1、什么是 Mutex?

Mutex是协程库提供的一个协程友好的互斥锁,它允许多个协程安全地访问共享资源。

不同于传统的线程锁(如ReentrantLock),Mutex针对协程进行了优化,支持挂起操作而非阻塞线程。

2、Mutex 的核心特性

挂起而非阻塞:

如果一个协程尝试获取已被持有的锁,它将被挂起,而非阻塞整个线程。这使得其他协程或任务可以继续运行。

公平性支持:

Mutex默认是公平锁,按照请求的顺序授予锁,从而避免线程饥饿。

重入支持:

协程可以重复获取同一个锁(类似 Java 的ReentrantLock)。

3、使用 Mutex 的场景

保护共享资源:

在多协程并发访问共享变量时,避免状态不一致。

实现协程之间的临界区控制:

限制某段代码在某一时刻只能被一个协程执行。

4、Mutex 的核心方法lock() 和 unlock() 手动获取和释放锁

val mutex = Mutex()
suspend fun criticalSection() {
    mutex.lock() // 获取锁
    try {
        // 临界区代码
    } finally {
        mutex.unlock() // 确保释放锁
    }
}

withLock() 推荐的方法,用于自动管理锁的获取和释放。

val mutex = Mutex()
suspend fun criticalSection() {
    mutex.withLock {
        // 临界区代码
    }
}

5、Mutex 可能导致死锁的场景锁重入的死锁

虽然Mutex支持协程的重入,但在多个协程嵌套调用同一资源时,可能会导致死锁。例如:

val mutex = Mutex()
suspend fun operationA() {
    mutex.withLock {
        operationB() // 递归调用会尝试重新获取同一把锁
    }
}
suspend fun operationB() {
    mutex.withLock {
        // 临界区
    }
}

多个锁交叉获取的死锁

如果多个协程以不同的顺序获取多个锁,可能会形成死锁。

val lock1 = Mutex()
val lock2 = Mutex()
suspend fun operation1() {
    lock1.withLock {
        delay(50) // 模拟操作
        lock2.withLock { /* 临界区 */ }
    }
}
suspend fun operation2() {
    lock2.withLock {
        delay(50) // 模拟操作
        lock1.withLock { /* 临界区 */ }
    }
}

在这种情况下,operation1和operation2都会等待对方释放锁,导致死锁。

6、如何避免死锁?

Mutex本身无法完全避免死锁问题,但通过合理的使用方式,可以极大地减少发生死锁的风险。合理设计协程的并发逻辑,并避免复杂的锁依赖关系,是避免死锁的根本解决方法

保持锁获取的顺序一致

多个锁的获取顺序应保持一致,以避免交叉锁定。

suspend fun operation1() {
    lock1.withLock {
        lock2.withLock { /* 临界区 */ }
    }
}
suspend fun operation2() {
    lock1.withLock {
        lock2.withLock { /* 临界区 */ }
    }
}

减少锁的粒度

尽可能减少锁的范围,只在绝对必要的临界区内持有锁,从而减少锁竞争和死锁的可能性。

suspend fun operation() {
    // 执行非临界区代码
    lock.withLock {
        // 仅锁住临界区
    }
}

使用超时

使用超时来避免协程无限期等待锁,可以通过withTimeout来实现:

import kotlinx.coroutines.withTimeout
val mutex = Mutex()
suspend fun safeLock() {
    try {
        withTimeout(1000) { // 1秒超时
            mutex.withLock {
                // 临界区代码
            }
        }
    } catch (e: TimeoutCancellationException) {
        println("获取锁超时,避免了死锁")
    }
}

避免嵌套锁定

尽量避免在withLock内部再次调用另一个withLock,改为拆分逻辑,减少锁的嵌套。

使用 tryLock(非阻塞锁)

Mutex提供了tryLock方法,可以尝试获取锁而不会阻塞:

if (mutex.tryLock()) {
    try {
        // 临界区代码
    } finally {
        mutex.unlock()
    }
} else {
    println("锁已被占用")
}

使用更高层次的并发控制工具

如果业务逻辑允许,可以使用更高级的并发工具,比如StateFlow、Channel或atomic变量,避免显式使用锁。

7、Mutex 的优缺点优点缺点

协程友好,挂起而非阻塞线程

存在锁竞争时会降低并发性能

提供线程间的临界区保护

可能导致死锁(如果不正确释放锁)

支持公平性,防止线程饥饿

实现稍微复杂,尤其是在高并发场景

声明:本站所有文章,如无特殊说明或标注,均为本站原创发布。任何个人或组织,在未征得本站同意时,禁止复制、盗用、采集、发布本站内容到任何网站、书籍等各类媒体平台。如若本站内容侵犯了原著者的合法权益,可联系我们进行处理。