1、SupR:让 R 语言走向多线程并行计算1概要2R 并行计算概览并行模式:进程 vs 线程SupR:当 R 遇上大数据概要3R 并行计算概览并行模式:进程 vs 线程SupR:当 R 遇上大数据R 中可实现的并行方式4 C/C+/FORTRAN 并行 多进程并行 Hadoop/Spark 并行 GPU 并行C/C+/FORTRAN 并行5 由 C/C+/FORTRAN 代码进行并行,R 直接调用 优势:使用简单(作为用户),性能良好 缺点:编程困难(作为开发者),涉及 R 对象时并非线程安全 例子:OpenBLAS(Pthread/OpenMP)xgboost 包(OpenMP)recosys
2、tem 包(C+11 线程库)RcppParallel 包(Intel TBB)多进程并行6 创建多个 R 进程同时进行计算,使用 Socket 或 MPI 等软件 库进行进程通信 优势:R 中使用范围最广的方式,可直接编写 R 代码并行 缺点:通信成本较高,内存使用量大 例子:parallel 包,mclapply()和 parLapply()函数 snow,Rmpi 和 pbdMPI 等使用 MPI 接口的软件包Hadoop/Spark 并行 从 R 调用 Hadoop 和 Spark 等并行计算平台 优势:基于成熟的并行计算平台,能够处理大 规模的数据 缺点:与 R 之间通信成本非常高,
3、内存占用大 例子:RHIPE 包(Hadoop)SparkR 包(Spark)7GPU 并行 使用 GPU 进行并行计算 优势:处理器数目多,并行效果显著 缺点:硬件要求高,实现的算法较少 例子:gputools 包(CUDA)gpuR 包(OpenCL)8概要9R 并行计算概览并行模式:进程 vs 线程SupR:当 R 遇上大数据两种并行模式多进程并行多线程并行10进程 vs 线程 一个进程可包含多个线程 线程比进程更为轻量级 同一进程中的线程可共享内存资源 线程间的通信更为便捷,开销较小11R 中的并行模式12 R 中大部分已有的并行方式都是基于多进程并行 多线程并行一般依赖于 C/C+/
4、FORTRAN/Java 相较于多进程并行,多线程并行具有诸多优势 然而,在 R 中进行多线程并行非常困难 解释器、内存分配和垃圾回收都不是线程安全的 多个线程同时对 R 对象进行操作时,可能会破坏 R 的运行机 制和内部结构 要实现多线程的 R,就必须更改 R 的底层源代码概要13R 并行计算概览并行模式:进程 vs 线程SupR:当 R 遇上大数据当 R 遇上大数据14 基于种种原因,R 对于真正的大数据支持尚不理想 其受限的并行机制是其中重要的一点 理想的大数据分析平台 单机上进行多线程并行 集群上进行多机分布式计算 良好的交互性 完善的社区支持SupR 诞生15 SupR 是一款同时支
5、持多线程和分布式计算的修改版 R 由普渡大学统计系刘传海教授开发 力图实现基于 R 的大数据分析平台 主要特性 保持 R 的语法和内部数据结构不变 提供多线程并行计算支持 提供类似 Spark 的分布式集群运算 分布式文件系统支持SupR 多线程并行计算16 SupR 在源代码级别对 R 进行了修改 在可能引起线程冲突的部分加入互斥锁,保证程序正常运行 提供完整的线程创建、查询、打断、同步和取消机制主要函数17 new.thread():创建线程 start.thread():开启线程 sync.eval():同步线程执行 wait():令线程睡眠,等待信号 notify():唤醒线程例子简单
6、并行18 将需要并行处理的两个表达式放到两个线程中执行set.seed(123)n=10A=matrix(rnorm(n2),n)B=matrix(rnorm(n2),n)th1=new.thread(C1-A%*%B,1:(n/2)th2=new.thread(C2-A%*%B,(n/2+1):n)start.thread(th1);start.thread(th2)ls()#1 ABC1 C2 nth1 th2例子线程同步 所有具有相同 x 对象的 sync.eval()语句,任何时间只 能有一个被执行x=any objectthreads=vector(list,5)for(i in 1
7、:5)threadsi=new.thread(sync.eval(x,cat(current.thread(),:,as.character(Sys.time(),n,sep=)thread.sleep(2)for(i in 1:5)start.thread(threadsi)#thread_1:2016-05-25 09:21:24#thread_3:2016-05-25 09:21:27#thread_5:2016-05-25 09:21:29#thread_4:2016-05-25 09:21:31#thread_2:2016-05-25 09:21:3319例子线程通信(1)wait(
8、)使线程进入睡眠,直到被 notify()唤醒 主线程1.创建主线程2.打印欢迎信息3.唤醒工人,等待工人工作结束4.打印结束信息sync.m=master barrier sync.w=worker barrier threads=vector(list,3)count=0master=new.thread(sync.eval(sync.m,cat(Thread M:Program startsn)#Notify workers starting to worksync.eval(sync.w,notify(sync.w,all=TRUE)#Wait for workers to finis
9、hwait(sync.m)cat(Thread M:Program endsn)20例子线程通信(2)工人线程1.创建工人线程,等待主线程“宣布开工”2.把 count 加 1 并打印信息,任一时刻只有一个线程在工作3.工作完成后(count 到 10)通知主线程,线程结束work=function()sync.eval(sync.w,wait(sync.w)while(TRUE)sync.eval(sync.w,if(count 10)count-count+1cat(current.thread(),is working,count=,count,n)else break)sync.eva
10、l(sync.m,notify(sync.m)for(i in 1:3)new.thread(work(),start=TRUE)21例子线程通信(3)输出结果start.thread(master)#Thread M:Program starts#thread_3 is working,count=1#thread_4 is working,count=2#thread_2 is working,count=3#thread_3 is working,count=4#thread_4 is working,count=5#thread_2 is working,count=6#thread_3 is working,count=7#thread_4 is working,count=8#thread_2 is working,count=9#thread_3 is working,count=10#Thread M:Program ends22SupR 与分布式计算23预告:刘传海教授将做客统计之都七月份的 COS 沙龙,敬请关注问题?