一、概述

根据《深入理解Spark:核心思想与源码分析》一书,结合最新的spark源代码master分支进行源码阅读,对新版本的代码加上自己的一些理解,如有错误,希望指出。

1.块管理器BlockManager的实现

块管理器是Spark存储体系的核心组件,Driver Application和Executor都会创建BlockManager,源代码位置在core/org.apache.spark.storage,部分代码如下。

Android培训,安卓培训,手机开发培训,移动开发培训,云培训培训 View Code

上面代码中声明的BlockInfoManager用于管理BlockManager缓存BlockId及对应的BlockInfo,BlockInfoManager提供一些列的同步读写策略。BlockManager由以下部分组成。

1)shuffle客户端shuffleClient;

2)BlockManagerMaster,对存在于所有Executor上的BlockManager进行统一管理;

3)磁盘块管理器DiskBlockManager;

4)内存存储MemoryStore;

5)磁盘存储DiskStore;

BlockManager要生效必须要初始化,初始化代码如下,

Android培训,安卓培训,手机开发培训,移动开发培训,云培训培训 View Code

1)BlockTransferService和shuffle客户端shuffleClient的初始化,ShuffleClien默认是BlockTransferService,当有外部的ShuffleService时,则调用外部的ExternalShuffleClient。

2)创建id为本地BlockManagerId,向BlockManagerMaster注册此id,获取从BlockManagerMaster的idFromMaster,如果idFromMaster为空则BlockManagerId为刚才创建的id,否则使用BlockManagerMaster注册到的idFromMaster。

3)ShuffleServerId的创建,当有外部的ShuffleService时,创建新的BlockManagerId作为ShuffleServerId。

4)当有外部的ShuffleService并且当前BlockMaId不是Driver端,则需要向ShuffleClient注册ShuffleServerId

2.Spark存储体系架构

Android培训,安卓培训,手机开发培训,移动开发培训,云培训培训

1)1表示Executor的BlockManager与Driver的BlockManager进行消息通信,例如注册BlockManager、更新Block信息、获取Block所在的BlockManager、删除Executor等

2)2表示对BlockManager的读操作如get、doGetLocal等和写操作doPut、puSingle等

3)3表示当MemoryStore的内存不足时,写入DiskStore,而DiskStore实际依赖于DiskBlockManager

4)4表示通过访问远端节点的Executor的BlockManager中的TransportServer提供RPC服务下载或者上传Block

5)5表示远端节点的Executor的BlockManager访问本地Executor的BlockManager中的TransportServer提供的RPC服务下载或者上传Block。

二、shuffle服务与客户端

1.Block的RPC服务

当map任务与reduce任务处于不同的节点时,reduce任务需要从远端节点下载map任务的中间件输出,因此NettyBlockRpcServer提供打开,即下载Block文件的功能;一些情况下,为了容错,需要将Block的数据备份到其他节点上,所以NettyBlockRpcServer还提供了上传Block文件的RPC服务,实现见代码,代码位置:core/org.apache.spark.network.netty。

Android培训,安卓培训,手机开发培训,移动开发培训,云培训培训 View Code

 2.构造传输上下文TransportContext

代码位置,common/network-common/org.apache.spark.network

Android培训,安卓培训,手机开发培训,移动开发培训,云培训培训 View Code

TransportContext既可以创建Netty服务,也可以创建Netty访问客户端,组成部分如下。

1)TransportConf:主要控制Netty框架提供的shuffle的I/O交互的客户端和服务端线程数量等

2)RpcHandler:负责shuffle的I/O服务端在接收到客户端的RPC请求后,提供打开Block或者上传Block的RPC处理,此处实现为NettyBlockRpcServer

3)是否关闭闲置连接

3.RPC客户端工厂TransportClientFactory

Android培训,安卓培训,手机开发培训,移动开发培训,云培训培训 View Code

以下分析TransportClientFactory代码。

Android培训,安卓培训,手机开发培训,移动开发培训,云培训培训 View Code

1)clientBootstraps:用于缓存客户端列表

2)connectionPool:用户缓存客户端连接

3)numConnectionsPerPeer:节点之间取数据的连接数,可以使用属性spark.shuffle.io.numConnectionsPerPeer来配置,默认为1

4)SocketChannelClass:客户端channel被创建时使用的类,可以使用属性spark.shuffle.io.mode来配置

5)workerGroup:根据Netty的规范,客户端只有worker组,所以此处创建workerGroup,实际是NioEventLoopGroup

6)pooledAllocator:汇集ByteBuf但对本地线程缓存禁用的分配器。

4.Netty服务器TransportServer

TransportServer提供了Netty实现的服务器端,用于提供RPC服务,如上传、下载等,代码如下。

Android培训,安卓培训,手机开发培训,移动开发培训,云培训培训 View Code

TransportServer构造器如下

Android培训,安卓培训,手机开发培训,移动开发培训,云培训培训 View Code

init方法对TransportServer进行初始化,通过使用Netty框架的EventLoopGroup、ServerBootstrap等API创建shuffle的I/O交互的服务端,主要代码见清单。

Android培训,安卓培训,手机开发培训,移动开发培训,云培训培训 View Code

1)ioMode:NIO或者EPOLL

2) ChannelOption.ALLOCATOR:在Netty 4中实现了一个新的ByteBuf内存池,它是一个纯Java版本的 jemalloc (Facebook也在用)。现在,Netty不会再因为用零填充缓冲区而浪费内存带宽了。不过,由于它不依赖于GC,开发人员需要小心内存泄漏。如果忘记在处理程序中释放缓冲区,那么内存使用率会无限地增长。Netty默认不使用内存池,需要在创建客户端或者服务端的时候进行指定,使用内存池之后,内存的申请和释放必须成对出现,即retain()和release()要成对出现,否则会导致内存泄露。

3)RpcHandler处理接收到的数据逻辑

5.获取远程shuffle文件

NettyBlockTransferService的fetchBlocks方法用于获取远程的shuffle文件,实际是使用NettyBlockTransferService中创建的Netty服务。

Android培训,安卓培训,手机开发培训,移动开发培训,云培训培训 View Code

6.上传shuffle文件

NettyBlockTransferService的uploadBlock方法用于上传shuffle文件到远程的Executor,实际也是用NettyBlockTransferService中创建的Netty服务,步骤如下。

Android培训,安卓培训,手机开发培训,移动开发培训,云培训培训 View Code

1)创建Netty服务的客户端,客户端连接的hostname和port正是BlockManager的hostname和port

2)将Block的存储级别StorageLevel和类标签序列化

3)将Block的ByteBuffer转化为数据,便于序列化

4)将appId、execId、blockId、metadata、转化为数组的Block封装为UploadBlock,并将其序列化为字节数组

5)最终调用Netty客户端的sendRpc方法将字节数组上传,回掉函数RpcResponseCallback根据RPC的结果更改上传状态。

http://www.cnblogs.com/ChouYarn/p/7169472.html

网友评论

更多精彩分享

万码学堂联系方式-Java培训机构,青岛Java培训,青岛计算机培训,软件编程培训,seo优化培训,网络推广培训,网络营销培训,SEM培训,网络优化,在线营销培训,Java培训万码学堂联系方式