前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >11 并行计算

11 并行计算

作者头像
猫叔Rex
发布2020-06-30 14:30:35
1.1K0
发布2020-06-30 14:30:35
举报
文章被收录于专栏:科学计算科学计算

协程

协程是一张轻量级的线程,通过Channel实现多个任务之间的通信。

Channel可以看作是一个管道,一端可读,一端可写。

  • 不同的task可以通过put!往同一个channel中并发地写入
  • 不同的task也可以通过take!从同一个 channel 并发地取数据

Channel的定义方式

代码语言:javascript
复制
# Channel(func::Function, ctype=Any, csize=0, taskref=nothing)
代码语言:javascript
复制
c1 = Channel(32)
c2 = Channel{Float64}(32)
put!(c1, 1)
put!(c2, 2)
take!(c2)
>>2.0

关闭Channel,关闭后就不能再写入

代码语言:javascript
复制
close(c1)
put!(c1, 3)
>>error

但可以读出,fetch只能读第一个数据,但不会把该数据从Channel中删除,但take!会读出后删除。

  • 如果一个 Channel 是空的,读取的 task(即执行 take! 的task)会被阻塞直到有新的数据准备好了。
  • 如果一个 Channel 是满的,那么写入的 task(即执行 put! 的 task)则会被阻塞,直到 Channel 有空余。
  • isready 可以用来检查一个 channel 中是否有已经准备好的元素,而等待一个元素准备好 则用 wait
  • 一个 Channel 一开始处于开启状态,也就是说可以被 take! 读取和 put! 写入。close 会关闭一个 Channel,对于一个已经关闭的 Channel,put! 会失败

Channel 可以在 for 循环中遍历,此时,循环会一直运行直到 Channel 中有数据,遍历过程中会取遍加入到 Channel 中的所有值。一旦 Channel关闭或者取空了,for 循环就会终止。

代码语言:javascript
复制
c = Channel{Int}(10)
foreach(i->put!(c, i), 1:3) 

此时如果我们直接用for去遍历c中的内容,则在读取完后会一直等待Channel中有新的数据进来,如果在REPL上运行,则会停在那里。

代码语言:javascript
复制
data = [i for i in c]

如果我们先把Channel关闭,再用for去遍历,则会读出Channel中的内容,因此此时c已经关闭,for不会再等待写入了。

代码语言:javascript
复制
close(c); 
data = [i for i in c]

用Channel完成多任务之间的数据交互

新建两个Channel,一个是Int型,一个是Tuple型,对于每个任务,延时一段时间后,开始执行该任务的内容,并将结果放入到results中。

代码语言:javascript
复制
const jobs = Channel{Int}(32)
const results = Channel{Tuple}(32)
function do_work()
   for job_id in jobs
       exec_time = rand()
       sleep(exec_time)                # simulates elapsed time doing actual work
                                       # typically performed externally.
       put!(results, (job_id, exec_time))
   end
end

向jobs中放入数据

代码语言:javascript
复制
function make_jobs(n)
   for i in 1:n
       put!(jobs, i)
   end
end

写入12个数据

代码语言:javascript
复制
n = 12
@async make_jobs(n)

@async表示把后面的表达式放到Task里,并加入到程序的执行列表中

开四个任务来处理

代码语言:javascript
复制
for i in 1:4 # start 4 tasks to process requests in parallel
   @async do_work()
end

取出4个任务的执行结果

代码语言:javascript
复制
@elapsed while n > 0 # print out results
   job_id, exec_time = take!(results)
   println("$job_id finished in $(round(exec_time; digits=2)) seconds")
   global n = n - 1
end
>>1 finished in 0.51 seconds
 4 finished in 0.62 seconds
 3 finished in 0.75 seconds
 2 finished in 0.81 seconds
 5 finished in 0.55 seconds
 8 finished in 0.35 seconds
 6 finished in 0.77 seconds
 9 finished in 0.38 seconds
 12 finished in 0.11 seconds
 7 finished in 0.88 seconds
 11 finished in 0.25 seconds
 10 finished in 0.75 seconds

多线程

要用到Threads模块,Threads是Base中的一个模块,程序中默认是using Base的,因此Threads可以直接使用

在REPL上查看当前的线程数,默认是启动一个线程

使用export JULIA_NUM_THREADS=4(Linux OSX)或set JULIA_NUM_THREADS=4(Windows)来设置启动4个线程

当我们要在Jupyter中使用多个线程时,可以在Julia的运行目录中下打开命令行,先设置线程数,再启动Julia,在windows下的操作如下:

这样就可以在Jupyter中使用4个线程进程操作了。

每个线程都有一个id,可以用Threads.threadid()查看线程的id

代码语言:javascript
复制
Threads.threadid()
>>1

我们查看10次线程的id

代码语言:javascript
复制
a = zeros(10)
Threads.@threads for i = 1:10
   a[i] = Threads.threadid()
end
>>1
  1
  1
  2
  2
  2
  3
  3
  3
  4
  4

由于我们只启动了4个线程,因此线程id也只到4,虽然我们进行了10次线程的调用。

原子操作

所谓原子操作是指不会被线程调度机制打断的操作;这种操作一旦开始,就一直运行到结束,中间不会有任何context switch(切换到另一个线程)。

我们先看一个使用多线程但不使用原子操作的例子。

代码语言:javascript
复制
using Base.Threads  # 直接使用Threads中的函数
nthreads()          # 查看当前线程数
acc = Ref(0)
@threads for i in 1:1000   # 用多线程进行计数
    acc[] +=1
end
println(acc[])
>>307

这是因为线程1对acc操作时,先取到它的数据为x,此时很可能线程2也在取它的值,取到的也是x,所以两个线程分别对其加1,结果是x+1,而不是x+2.这就是线程之间没有做同步机制。

如果使用了原子操作,则当前线程在对acc进行操作时,别的线程是不能操作的,只能等acc+1之后再对它进行操作。

代码语言:javascript
复制
acc = Atomic{Int64}(0)
@threads for i in 1:1000
  atomic_add!(acc, 1)
end
println(acc[])
>>1000

多进程

多进程也叫多核心或者分布式处理,就是用一个CPU的多个核心或者多个CPU进行编程。

Julia 中的分布式编程基于两个基本概念:远程引用(remote references)和远程调用(remote calls)。远程引用是一个对象,任意一个进程可以通过它访问存储在某个特定进程上的对象。远程调用指是某个进程发起的执行函数的请求,该函数会在另一个(也可能是同一个)进程中执行。

远程引用有两种类型:Future 和 RemoteChannel。

一次远程调用会返回一个 Future 作为结果。 远程调用会立即返回;也就是说,执行远程调用的进程接下来会继续执行下一个操作,而远程调用则会在另外的进程中进行。你可以通过对返回的 Future 执行 wait 操作来等待远程调用结束,然后用 fetch 获取结果。

对于 RemoteChannel 而言,它可以被反复写入。 例如,多个进程可以通过引用同一个远程 Channel 来协调相互之间的操作。

每个进程都有一个对应的 id,提供 Julia 交互环境的进程的 id 永远是1。我们把用来执行并行任务的进程称为 “worker”,假如总共只有一个进程,那么进程1就被认为是 worker,否则,除了进程1以外的进程都称作 worker。

REPL上多进程使用方式,在julia的可执行文件的路径下打开命令行,执行julia -p n,就是启动n个进程的julia

代码语言:javascript
复制
# 指定进程2来生成一个3x4的随机矩阵
r = remotecall(rand, 2, 3, 4)
# 在进程2中计算
s = @spawnat 2 1 .+ fetch(r)  # fetch是把r中的数据获取到当前进程中
# 取得s
fetch(s)

也可以让Julia自行指定进程

代码语言:javascript
复制
r = @spawn rand(2,2)
s = @spawn 1 .+ fetch(r)
fetch(s)

要想让代码并行执行,需要对所有进程都可见

代码语言:javascript
复制
function f1(a,b)
    a + b
end
代码语言:javascript
复制
fetch(@spawn f1(2,3))

这是因为f1()函数只对进程1可见,别的进程不可见。f1()在进程1中定义的,因为其他进程并不可见,但rand是在Base中,其他进程都可见

为了让f1在所有进程中都可见,可以使用@everywhere宏来定义f1

代码语言:javascript
复制
@everywhere f2(a,b) = a + b
fetch(@spawn f2(2,3))
本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2020-05-30,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 傅里叶的猫 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 多线程
  • 多进程
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档