
并发编程是现代系统开发的必修课。在多核时代,充分利用处理器能力意味着必须掌握并发编程。仓颉作为系统级编程语言,在并发领域提供了丰富而强大的原语支持。与Rust追求极致安全的方式不同,仓颉的并发设计更加务实:提供足够的安全保证,同时保留必要的灵活性。
本文将从基础的线程管理到高级的消息传递模式,全面探讨仓颉并发原语的使用方法、性能特性,以及在实战开发中的最佳实践。

仓颉提供了直观的线程API。最基本的方式是使用spawn函数创建新线程:
import std.threading.*
func helloFromThread() {
println("Hello from spawned thread")
}
func basicThreading() {
// 创建并执行线程
let thread = spawn {
println("Thread started")
Thread.sleep(Duration.millis(100))
println("Thread finished")
}
// 等待线程完成
thread.join()
println("Main thread continues")
}这个简单的例子展示了仓颉并发的核心特点:通过闭包捕获代码,通过join()同步执行。但在实际应用中,我们需要更复杂的线程协调机制。
对于需要处理大量任务的场景,频繁创建销毁线程是低效的。线程池通过复用线程大大提升性能:
import std.threading.*
class ThreadPool {
private var workers: Array<Worker>
private var sender: Sender<Task>
init(threadCount: Int64) {
this.workers = Array<Worker>()
let (sender, receiver) = Channel<Task>(threadCount)
this.sender = sender
let sharedReceiver = Arc.new(Mutex.new(receiver))
for (i in 0..threadCount) {
let receiver = sharedReceiver.clone()
let worker = Worker(i, receiver)
workers.append(worker)
}
}
func submit(task: (Int64) -> ()) {
sender.send(Task(task))
}
func shutdown() {
drop(sender) // 关闭发送端
for (worker in workers) {
worker.thread.join()
}
}
}
struct Task {
var run: (Int64) -> ()
init(run: (Int64) -> ()) {
this.run = run
}
}
class Worker {
var thread: Thread
private var id: Int64
init(id: Int64, receiver: Arc<Mutex<Receiver<Task>>>) {
this.id = id
this.thread = spawn {
loop {
let task = {
let locked = receiver.lock()
defer { locked.unlock() }
locked.recv()
}
match (task) {
case Some(t) => t.run(id)
case None => break // 通道已关闭
}
}
println("Worker ${id} shutdown")
}
}
}
func testThreadPool() {
let pool = ThreadPool(4)
for (i in 0..100) {
pool.submit({ workerId =>
println("Task executing on worker ${workerId}")
Thread.sleep(Duration.millis(10))
})
}
pool.shutdown()
}线程池展示了并发编程的一个重要原则:资源复用。频繁分配资源与立即释放是并发系统的大敌。
互斥锁(Mutex)是最基础的同步原语,用于保护共享资源的互斥访问:
import std.threading.*
class Counter {
private var value: Int64 = 0
private var lock: Mutex = Mutex()
func increment() {
lock.lock()
defer { lock.unlock() }
value += 1
}
func decrement() {
lock.lock()
defer { lock.unlock() }
value -= 1
}
func getValue(): Int64 {
lock.lock()
defer { lock.unlock() }
return value
}
}
func testCounter() {
let counter = Counter()
var threads = Array<Thread>()
// 100个线程并发增加计数
for (i in 0..100) {
let t = spawn {
for (j in 0..1000) {
counter.increment()
}
}
threads.append(t)
}
// 等待所有线程完成
for (t in threads) {
t.join()
}
println("Final count: ${counter.getValue()}") // 应该是 100000
}这个例子使用了defer语句确保在任何情况下锁都会被释放,这是避免死锁的关键技巧。
条件变量允许线程在特定条件满足前进入睡眠状态,减少轮询的CPU开销:
import std.threading.*
class BoundedQueue<T> {
private var items: Array<T>
private var capacity: Int64
private var lock: Mutex
private var notEmpty: CondVar
private var notFull: CondVar
init(capacity: Int64) {
this.items = Array<T>()
this.capacity = capacity
this.lock = Mutex()
this.notEmpty = CondVar()
this.notFull = CondVar()
}
func enqueue(item: T) {
lock.lock()
defer { lock.unlock() }
// 等待队列不满
while (items.size >= capacity) {
notFull.wait(lock)
}
items.append(item)
notEmpty.signal() // 唤醒等待的消费者
}
func dequeue(): T {
lock.lock()
defer { lock.unlock() }
// 等待队列不空
while (items.size == 0) {
notEmpty.wait(lock)
}
let item = items.removeFirst()
notFull.signal() // 唤醒等待的生产者
return item
}
}
func testBoundedQueue() {
let queue = BoundedQueue<Int64>(10)
// 生产者线程
let producer = spawn {
for (i in 0..100) {
queue.enqueue(i)
println("Produced ${i}")
}
}
// 消费者线程
let consumer = spawn {
for (i in 0..100) {
let item = queue.dequeue()
println("Consumed ${item}")
}
}
producer.join()
consumer.join()
}条件变量展示了更高效的同步模式。与忙轮询相比,条件变量让线程主动放弃CPU,直到有事件发生。
对于简单的计数和标志,原子操作提供了比锁更好的性能:
import std.sync.atomic.*
class AtomicCounter {
private var value: Atomic<Int64> = Atomic<Int64>(0)
func increment() {
// 原子地加1,无需锁
value.fetchAdd(1, MemoryOrder.Relaxed)
}
func compareAndSwap(expected: Int64, new: Int64): Bool {
match (value.compareExchange(expected, new,
MemoryOrder.SeqCst,
MemoryOrder.SeqCst)) {
case Ok(_) => return true
case Err(_) => return false
}
}
func get(): Int64 {
return value.load(MemoryOrder.SeqCst)
}
}
func testAtomicCounter() {
let counter = AtomicCounter()
var threads = Array<Thread>()
// 1000个线程并发增加计数
for (i in 0..1000) {
let t = spawn {
for (j in 0..1000) {
counter.increment()
}
}
threads.append(t)
}
for (t in threads) {
t.join()
}
println("Final count: ${counter.get()}") // 应该是 1000000
}Compare-And-Swap(CAS)是构建无锁数据结构的基础:
import std.sync.atomic.*
class LockFreeStack<T> {
private struct Node {
var value: T
var next: ?Weak<Node>
}
private var head: Atomic<?Box<Node>>
init() {
this.head = Atomic<?Box<Node>>(None)
}
func push(value: T) {
let node = Box(Node(value, None))
loop {
let oldHead = head.load(MemoryOrder.Acquire)
node.next = oldHead.flatMap { Weak($0) }
match (head.compareExchange(
oldHead, Some(node),
MemoryOrder.Release,
MemoryOrder.Acquire)) {
case Ok(_) => break
case Err(_) => continue // 重试
}
}
}
func pop(): ?T {
loop {
let oldHead = head.load(MemoryOrder.Acquire)
match (oldHead) {
case None => return None
case Some(node) => {
let newHead = node.next.flatMap { $0.upgrade() }
.flatten()
match (head.compareExchange(
oldHead, newHead,
MemoryOrder.Release,
MemoryOrder.Acquire)) {
case Ok(_) => return Some(node.value)
case Err(_) => continue // 重试
}
}
}
}
}
}无锁数据结构避免了锁的开销和死锁的风险,但正确实现非常困难,需要深入理解内存序。
通道是Golang风格的并发抽象,鼓励通过消息传递而非共享内存来实现并发:
import std.threading.*
func basicChannel() {
// 创建一个容量为10的通道
let (sender, receiver) = Channel<String>(10)
// 发送者线程
let producer = spawn {
for (i in 0..5) {
let msg = "Message ${i}"
sender.send(msg)
println("Sent: ${msg}")
}
}
// 接收者线程
let consumer = spawn {
for (i in 0..5) {
match (receiver.recv()) {
case Some(msg) => println("Received: ${msg}")
case None => break
}
}
}
producer.join()
consumer.join()
}通道的真正威力在于支持复杂的生产-消费模式:
import std.threading.*
class WorkPool {
private var workers: Int64
private var sender: Sender<Work>
struct Work {
var id: Int64
var data: String
}
init(workers: Int64) {
this.workers = workers
let (sender, receiver) = Channel<Work>(workers * 2)
this.sender = sender
let sharedReceiver = Arc.new(receiver)
for (i in 0..workers) {
let receiver = sharedReceiver.clone()
spawn {
loop {
match (receiver.recv()) {
case Some(work) => {
println("Worker processing work ${work.id}")
Thread.sleep(Duration.millis(50))
}
case None => break
}
}
}
}
}
func submit(id: Int64, data: String) {
sender.send(Work(id, data))
}
}
func testWorkPool() {
let pool = WorkPool(4)
for (i in 0..20) {
pool.submit(i, "Data ${i}")
}
Thread.sleep(Duration.secs(2)) // 等待所有工作完成
}一个任务分散到多个worker,然后收集结果:
import std.threading.*
func fanOut<T>(work: Array<T>, workers: Int64)
-> (Sender<T>, Receiver<T>)
{
let (inSender, inReceiver) = Channel<T>(work.size)
let (outSender, outReceiver) = Channel<T>(work.size)
// 发送初始工作
for (item in work) {
inSender.send(item)
}
// 启动worker线程
for (i in 0..workers) {
spawn {
let sharedReceiver = Arc.new(inReceiver)
loop {
match (sharedReceiver.recv()) {
case Some(item) => {
// 处理item
let result = process(item) * 2
outSender.send(result)
}
case None => break
}
}
}
}
return (outSender, outReceiver)
}
func process(item: Int64): Int64 {
return item * 2
}
func testFanOut() {
let work = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
let (_, receiver) = fanOut(work, 4)
var results = Array<Int64>()
for (i in 0..10) {
match (receiver.recv()) {
case Some(result) => results.append(result)
case None => break
}
}
println("Results: ${results}")
}数据流经多个处理阶段,每个阶段由独立的goroutine处理:
import std.threading.*
class Pipeline {
func stage1(input: Receiver<Int64>) -> Sender<Int64> {
let (sender, receiver) = Channel<Int64>(10)
spawn {
loop {
match (input.recv()) {
case Some(value) => {
// 第一阶段处理
let result = value * 2
sender.send(result)
}
case None => break
}
}
}
return sender
}
func stage2(input: Receiver<Int64>) -> Sender<Int64> {
let (sender, receiver) = Channel<Int64>(10)
spawn {
loop {
match (input.recv()) {
case Some(value) => {
// 第二阶段处理
let result = value + 10
sender.send(result)
}
case None => break
}
}
}
return sender
}
func stage3(input: Receiver<Int64>) {
spawn {
loop {
match (input.recv()) {
case Some(value) => {
// 最终阶段:输出
println("Final result: ${value}")
}
case None => break
}
}
}
}
}
func testPipeline() {
let pipeline = Pipeline()
let (source, receiver) = Channel<Int64>(10)
// 构建管道
let stage1Out = pipeline.stage1(receiver)
let stage2Out = pipeline.stage2(stage1Out)
pipeline.stage3(stage2Out)
// 发送数据
for (i in 0..10) {
source.send(i)
}
}高竞争的锁会成为性能瓶颈。可以通过分片(Sharding)来减少竞争:
import std.threading.*
class ShardedCounter {
private var shards: Array<Mutex<Int64>>
private var shardCount: Int64
init(shardCount: Int64) {
this.shardCount = shardCount
this.shards = Array<Mutex<Int64>>()
for (i in 0..shardCount) {
shards.append(Mutex<Int64>(0))
}
}
func increment() {
let shardId = getShardId()
let shard = shards[shardId]
shard.lock()
defer { shard.unlock() }
shard.value += 1
}
func getValue(): Int64 {
var sum: Int64 = 0
for (shard in shards) {
shard.lock()
defer { shard.unlock() }
sum += shard.value
}
return sum
}
private func getShardId(): Int64 {
return Thread.current().id % shardCount
}
}分片策略大大提升了高并发场景的性能。
在NUMA系统上,考虑任务亲和性可以显著提升性能:
import std.threading.*
class NUMAAwareScheduler {
private var cpuTopology: CpuTopology
init() {
this.cpuTopology = CpuTopology()
}
func spawnOnCore(coreId: Int64, task: () -> ()) -> Thread {
let thread = spawn {
Thread.current().setAffinity(coreId)
task()
}
return thread
}
func spawnOnNuma(numaNode: Int64, task: () -> ()) -> Thread {
let cores = cpuTopology.getCoresOnNuma(numaNode)
let coreId = cores[0] // 选择第一个核
return spawnOnCore(coreId, task)
}
}import std.threading.*
import std.net.*
class HttpServer {
private var listener: TcpListener
private var workerPool: ThreadPool
init(addr: String, port: Int64, workers: Int64) {
this.listener = TcpListener.bind(addr, port)
this.workerPool = ThreadPool(workers)
}
func start() {
spawn {
loop {
match (listener.accept()) {
case Some(stream) => {
self.workerPool.submit({ _ =>
self.handleConnection(stream)
})
}
case None => break
}
}
}
}
private func handleConnection(stream: TcpStream) {
let request = stream.readRequest()
let response = self.processRequest(request)
stream.write(response)
}
private func processRequest(request: HttpRequest): HttpResponse {
// 处理HTTP请求
return HttpResponse.ok("Hello, World!")
}
}
func testHttpServer() {
let server = HttpServer("127.0.0.1", 8080, 16)
server.start()
// 服务器持续运行
Thread.sleep(Duration.secs(3600))
}最常见的死锁场景是多线程按不同顺序获取锁:
// 错误示例:容易死锁
class Account {
var balance: Int64
var lock: Mutex
}
func transferMoney(from: Account, to: Account, amount: Int64) {
from.lock.lock()
// 线程1获得from的锁
Thread.sleep(Duration.millis(1))
to.lock.lock() // 等待to的锁
from.balance -= amount
to.balance += amount
to.lock.unlock()
from.lock.unlock()
}
// 正确示例:始终按相同顺序获取锁
func transferMoneyCorrect(from: Account, to: Account, amount: Int64) {
// 始终先锁ID小的账户
if (from.id < to.id) {
from.lock.lock()
defer { from.lock.unlock() }
to.lock.lock()
defer { to.lock.unlock() }
} else {
to.lock.lock()
defer { to.lock.unlock() }
from.lock.lock()
defer { from.lock.unlock() }
}
from.balance -= amount
to.balance += amount
}使用过弱的内存顺序可能导致看不见应该看见的变化:
import std.sync.atomic.*
class WrongMemoryOrder {
var ready: Atomic<Bool> = Atomic<Bool>(false)
var value: Int64 = 0
func writer() {
value = 42
// 错误:使用Relaxed,value的写入可能对reader不可见
ready.store(true, MemoryOrder.Relaxed)
}
func reader() {
while (!ready.load(MemoryOrder.Relaxed)) {
// 忙轮询
}
println("Value: ${value}") // 可能输出0!
}
}
// 正确做法
class CorrectMemoryOrder {
var ready: Atomic<Bool> = Atomic<Bool>(false)
var value: Int64 = 0
func writer() {
value = 42
// 使用Release确保之前的写入对reader可见
ready.store(true, MemoryOrder.Release)
}
func reader() {
while (!ready.load(MemoryOrder.Acquire)) {
// 忙轮询
}
println("Value: ${value}") // 正确输出42
}
}首先,优先使用消息传递而非共享内存。通道虽然可能稍慢,但更安全且易于理解。
其次,避免锁嵌套。如果必须持有多个锁,确保总是按相同顺序获取。
第三,理解你的数据结构的并发特性。不是所有操作都需要保护。
第四,使用原子操作处理简单的计数和标志。
第五,充分测试并发代码。使用stress test和race condition检测工具。
func detectRaceConditions() {
// 运行在race detector模式下
// cargo build --features race-detector
let counter = Counter()
var threads = Array<Thread>()
for (i in 0..100) {
let t = spawn {
for (j in 0..1000) {
counter.increment()
}
}
threads.append(t)
}
for (t in threads) {
t.join()
}
// Race detector会报告任何数据竞争
}通过设置锁的获取顺序日志来检测潜在死锁:
class DeadlockDetector {
private var lockOrder: ThreadLocal<Array<Int64>> = ThreadLocal()
func acquireLock(lockId: Int64) {
let order = lockOrder.get() ?? Array()
// 检查是否违反顺序
for (prevId in order) {
if (prevId > lockId) {
println("Warning: Potential deadlock - acquiring ${lockId} after ${prevId}")
}
}
order.append(lockId)
lockOrder.set(order)
}
func releaseLock() {
var order = lockOrder.get() ?? Array()
order.removeLast()
lockOrder.set(order)
}
}仓颉的并发原语设计体现了实用主义哲学。它既不追求Rust那样的编译时并发安全检查的极致,也不依赖Java那样的GC支撑。而是提供了一套实用的、可理解的、高性能的并发工具集。
掌握这些原语的使用,开发者可以:编写无死锁的并发代码、优化高并发系统的性能、使用消息传递模式构建可维护的系统、识别并解决并发问题。
仓颉在并发领域的设计证明了系统编程语言可以在安全性和灵活性之间找到良好的平衡点。这正是其价值所在。🚀