协程
协程是一张轻量级的线程,通过Channel实现多个任务之间的通信。
Channel可以看作是一个管道,一端可读,一端可写。
put!
往同一个channel中并发地写入take!
从同一个 channel 并发地取数据Channel的定义方式
# Channel(func::Function, ctype=Any, csize=0, taskref=nothing)
c1 = Channel(32)
c2 = Channel{Float64}(32)
put!(c1, 1)
put!(c2, 2)
take!(c2)
>>2.0
关闭Channel,关闭后就不能再写入
close(c1)
put!(c1, 3)
>>error
但可以读出,fetch
只能读第一个数据,但不会把该数据从Channel中删除,但take!
会读出后删除。
put!
会失败Channel 可以在 for 循环中遍历,此时,循环会一直运行直到 Channel 中有数据,遍历过程中会取遍加入到 Channel 中的所有值。一旦 Channel关闭或者取空了,for 循环就会终止。
c = Channel{Int}(10)
foreach(i->put!(c, i), 1:3)
此时如果我们直接用for
去遍历c
中的内容,则在读取完后会一直等待Channel中有新的数据进来,如果在REPL上运行,则会停在那里。
data = [i for i in c]
如果我们先把Channel关闭,再用for
去遍历,则会读出Channel中的内容,因此此时c
已经关闭,for
不会再等待写入了。
close(c);
data = [i for i in c]
用Channel完成多任务之间的数据交互
新建两个Channel,一个是Int型,一个是Tuple型,对于每个任务,延时一段时间后,开始执行该任务的内容,并将结果放入到results中。
const jobs = Channel{Int}(32)
const results = Channel{Tuple}(32)
function do_work()
for job_id in jobs
exec_time = rand()
sleep(exec_time) # simulates elapsed time doing actual work
# typically performed externally.
put!(results, (job_id, exec_time))
end
end
向jobs中放入数据
function make_jobs(n)
for i in 1:n
put!(jobs, i)
end
end
写入12个数据
n = 12
@async make_jobs(n)
@async表示把后面的表达式放到Task里,并加入到程序的执行列表中
开四个任务来处理
for i in 1:4 # start 4 tasks to process requests in parallel
@async do_work()
end
取出4个任务的执行结果
@elapsed while n > 0 # print out results
job_id, exec_time = take!(results)
println("$job_id finished in $(round(exec_time; digits=2)) seconds")
global n = n - 1
end
>>1 finished in 0.51 seconds
4 finished in 0.62 seconds
3 finished in 0.75 seconds
2 finished in 0.81 seconds
5 finished in 0.55 seconds
8 finished in 0.35 seconds
6 finished in 0.77 seconds
9 finished in 0.38 seconds
12 finished in 0.11 seconds
7 finished in 0.88 seconds
11 finished in 0.25 seconds
10 finished in 0.75 seconds
要用到Threads模块,Threads是Base中的一个模块,程序中默认是using Base
的,因此Threads可以直接使用
在REPL上查看当前的线程数,默认是启动一个线程
使用export JULIA_NUM_THREADS=4
(Linux OSX)或set JULIA_NUM_THREADS=4
(Windows)来设置启动4个线程
当我们要在Jupyter中使用多个线程时,可以在Julia的运行目录中下打开命令行,先设置线程数,再启动Julia,在windows下的操作如下:
这样就可以在Jupyter中使用4个线程进程操作了。
每个线程都有一个id,可以用Threads.threadid()
查看线程的id
Threads.threadid()
>>1
我们查看10次线程的id
a = zeros(10)
Threads.@threads for i = 1:10
a[i] = Threads.threadid()
end
>>1
1
1
2
2
2
3
3
3
4
4
由于我们只启动了4个线程,因此线程id也只到4,虽然我们进行了10次线程的调用。
原子操作
所谓原子操作是指不会被线程调度机制打断的操作;这种操作一旦开始,就一直运行到结束,中间不会有任何context switch(切换到另一个线程)。
我们先看一个使用多线程但不使用原子操作的例子。
using Base.Threads # 直接使用Threads中的函数
nthreads() # 查看当前线程数
acc = Ref(0)
@threads for i in 1:1000 # 用多线程进行计数
acc[] +=1
end
println(acc[])
>>307
这是因为线程1对acc操作时,先取到它的数据为x,此时很可能线程2也在取它的值,取到的也是x,所以两个线程分别对其加1,结果是x+1,而不是x+2.这就是线程之间没有做同步机制。
如果使用了原子操作,则当前线程在对acc进行操作时,别的线程是不能操作的,只能等acc+1之后再对它进行操作。
acc = Atomic{Int64}(0)
@threads for i in 1:1000
atomic_add!(acc, 1)
end
println(acc[])
>>1000
多进程也叫多核心或者分布式处理,就是用一个CPU的多个核心或者多个CPU进行编程。
Julia 中的分布式编程基于两个基本概念:远程引用(remote references)和远程调用(remote calls)。远程引用是一个对象,任意一个进程可以通过它访问存储在某个特定进程上的对象。远程调用指是某个进程发起的执行函数的请求,该函数会在另一个(也可能是同一个)进程中执行。
远程引用有两种类型:Future 和 RemoteChannel。
一次远程调用会返回一个 Future 作为结果。 远程调用会立即返回;也就是说,执行远程调用的进程接下来会继续执行下一个操作,而远程调用则会在另外的进程中进行。你可以通过对返回的 Future 执行 wait 操作来等待远程调用结束,然后用 fetch 获取结果。
对于 RemoteChannel 而言,它可以被反复写入。 例如,多个进程可以通过引用同一个远程 Channel 来协调相互之间的操作。
每个进程都有一个对应的 id,提供 Julia 交互环境的进程的 id 永远是1。我们把用来执行并行任务的进程称为 “worker”,假如总共只有一个进程,那么进程1就被认为是 worker,否则,除了进程1以外的进程都称作 worker。
REPL上多进程使用方式,在julia的可执行文件的路径下打开命令行,执行julia -p n
,就是启动n个进程的julia
# 指定进程2来生成一个3x4的随机矩阵
r = remotecall(rand, 2, 3, 4)
# 在进程2中计算
s = @spawnat 2 1 .+ fetch(r) # fetch是把r中的数据获取到当前进程中
# 取得s
fetch(s)
也可以让Julia自行指定进程
r = @spawn rand(2,2)
s = @spawn 1 .+ fetch(r)
fetch(s)
要想让代码并行执行,需要对所有进程都可见
function f1(a,b)
a + b
end
fetch(@spawn f1(2,3))
这是因为f1()函数只对进程1可见,别的进程不可见。f1()在进程1中定义的,因为其他进程并不可见,但rand是在Base中,其他进程都可见
为了让f1在所有进程中都可见,可以使用@everywhere宏来定义f1
@everywhere f2(a,b) = a + b
fetch(@spawn f2(2,3))