我们在前面曾经写过一个教程《R语言实现并行》,在其中我们测试了下几个基础的功能函数。今天给给大家带来另一个建立在基础包以上整合的并行R包BiocParallel。首先看下包的安装:
BiocManager::install("BiocParallel")
##辅助包
BiocManager::install("ShortRead")
BiocManager::install("RNAseqData.HNRNPC.bam.chr14")
BiocManager::install("GenomicAlignments")
BiocManager::install("Rsamtools")
我们通过实例看下具体的函数:
##查看系统存在的并行环境
registered()
Windows:
Linux:
##查看任务情况
show(x)
根据上面的信息我们可以看到在linux和mac中MulticoreParam是特有的。另外两个在所有平台都是存在的。
接下来我们通过实例看下在这个包中的核心函数:
##将输入的参数赋值并进行并行计算
fun <- function(v) {
message("working") ## 10 tasks
sqrt(v)
}
bplapply(1:10, fun)
##向量化数据结果函数
result <- bpvec(1:10, fun)
##两个数据进行顺序并列运行
fun <- function(greet, who) {
paste(Sys.getpid(), greet, who)## Sys.getpid()获取任务ID
}
greet <- c("morning", "night")
who <- c("sun", "moon")
param <- bpparam()
original <- bpworkers(param)
bpworkers(param) <- 2 ##设置并行的进程数
result <- bpmapply(fun, greet, who, BPPARAM = param)
##ITER主函数提供数据块,各进程函数独立设置运算,另外还可以设置合并结果的函数REDUCE,可以通过init设置REDUCE 的初始值。
require(Rsamtools)
require(RNAseqData.HNRNPC.bam.chr14)
require(GenomicAlignments)
require(ShortRead)
library(GenomicRanges)
library(IRanges)
sp <- SolexaPath(system.file('extdata', package = 'ShortRead'))
fl <- file.path(analysisPath(sp), "s_1_sequence.txt")
## Create an iterator that returns data chunks the size of 'n'.
fastqIterator <- function(fqs) {
done <- FALSE
if (!isOpen(fqs))
open(fqs)
function() {
if (done)
return(NULL)
yld <- yield(fqs)
if (length(yld) == 0L) {
close(fqs)
done <<- TRUE
NULL
} else yld
}
}
## The process function summarizes the number of times each sequence occurs.
summary <- function(reads, ...) {
ShortRead::tables(reads, n = 0)$distribution
}
## Create a param.
bpparam <- SnowParam(workers = 2)
## Initialize the streamer and iterator.
fqs <- FastqStreamer(fl, n = 100)
ITER <- fastqIterator(fqs)
bpiterate(ITER, summary, BPPARAM = bpparam)
## Results from the workers are combined on the fly when REDUCE is used.
## Collapsing the data in this way can substantially reduce memory
## requirements.
fqs <- FastqStreamer(fl, n = 100)
ITER <- fastqIterator(fqs)
bpiterate(ITER, summary, REDUCE = rbind, all = TRUE, BPPARAM = bpparam)
##根据数据子集进行数据的并行计算
require(Rsamtools)
require(GenomicAlignments)
fl <- system.file("extdata", "ex1.bam", package="Rsamtools")
param <- ScanBamParam(what = c("flag", "mapq"))
gal <- readGAlignments(fl, param=param)
## Report the mean map quality by range cutoff:
cutoff <- rep(0, length(gal))
cutoff[start(gal) > 1000 & start(gal) < 1500] <- 1
cutoff[start(gal) > 1500] <- 2
bpaggregate(as.data.frame(mcols(gal)$mapq), list(cutoff = cutoff), mean)
此外本包还提供了计算环境的基础配置的获取功能,不进行一一的展开了。大家可以直接在包中查看。当然结合batchtools包可以实现更加灵活的并行计算。大家感兴趣可以深入研究。