一、概述
根据《深入理解Spark:核心思想与源码分析》一书,结合最新的spark源代码master分支进行源码阅读,对新版本的代码加上自己的一些理解,如有错误,希望指出。
1.块管理器BlockManager的实现
块管理器是Spark存储体系的核心组件,Driver Application和Executor都会创建BlockManager,源代码位置在core/org.apache.spark.storage,部分代码如下。
View Code
上面代码中声明的BlockInfoManager用于管理BlockManager缓存BlockId及对应的BlockInfo,BlockInfoManager提供一些列的同步读写策略。BlockManager由以下部分组成。
1)shuffle客户端shuffleClient;
2)BlockManagerMaster,对存在于所有Executor上的BlockManager进行统一管理;
3)磁盘块管理器DiskBlockManager;
4)内存存储MemoryStore;
5)磁盘存储DiskStore;
BlockManager要生效必须要初始化,初始化代码如下,
View Code
1)BlockTransferService和shuffle客户端shuffleClient的初始化,ShuffleClien默认是BlockTransferService,当有外部的ShuffleService时,则调用外部的ExternalShuffleClient。
2)创建id为本地BlockManagerId,向BlockManagerMaster注册此id,获取从BlockManagerMaster的idFromMaster,如果idFromMaster为空则BlockManagerId为刚才创建的id,否则使用BlockManagerMaster注册到的idFromMaster。
3)ShuffleServerId的创建,当有外部的ShuffleService时,创建新的BlockManagerId作为ShuffleServerId。
4)当有外部的ShuffleService并且当前BlockMaId不是Driver端,则需要向ShuffleClient注册ShuffleServerId
2.Spark存储体系架构
1)1表示Executor的BlockManager与Driver的BlockManager进行消息通信,例如注册BlockManager、更新Block信息、获取Block所在的BlockManager、删除Executor等
2)2表示对BlockManager的读操作如get、doGetLocal等和写操作doPut、puSingle等
3)3表示当MemoryStore的内存不足时,写入DiskStore,而DiskStore实际依赖于DiskBlockManager
4)4表示通过访问远端节点的Executor的BlockManager中的TransportServer提供RPC服务下载或者上传Block
5)5表示远端节点的Executor的BlockManager访问本地Executor的BlockManager中的TransportServer提供的RPC服务下载或者上传Block。
二、shuffle服务与客户端
1.Block的RPC服务
当map任务与reduce任务处于不同的节点时,reduce任务需要从远端节点下载map任务的中间件输出,因此NettyBlockRpcServer提供打开,即下载Block文件的功能;一些情况下,为了容错,需要将Block的数据备份到其他节点上,所以NettyBlockRpcServer还提供了上传Block文件的RPC服务,实现见代码,代码位置:core/org.apache.spark.network.netty。
View Code
2.构造传输上下文TransportContext
代码位置,common/network-common/org.apache.spark.network
View Code
TransportContext既可以创建Netty服务,也可以创建Netty访问客户端,组成部分如下。
1)TransportConf:主要控制Netty框架提供的shuffle的I/O交互的客户端和服务端线程数量等
2)RpcHandler:负责shuffle的I/O服务端在接收到客户端的RPC请求后,提供打开Block或者上传Block的RPC处理,此处实现为NettyBlockRpcServer
3)是否关闭闲置连接
3.RPC客户端工厂TransportClientFactory
View Code
以下分析TransportClientFactory代码。
View Code
1)clientBootstraps:用于缓存客户端列表
2)connectionPool:用户缓存客户端连接
3)numConnectionsPerPeer:节点之间取数据的连接数,可以使用属性spark.shuffle.io.numConnectionsPerPeer来配置,默认为1
4)SocketChannelClass:客户端channel被创建时使用的类,可以使用属性spark.shuffle.io.mode来配置
5)workerGroup:根据Netty的规范,客户端只有worker组,所以此处创建workerGroup,实际是NioEventLoopGroup
6)pooledAllocator:汇集ByteBuf但对本地线程缓存禁用的分配器。
4.Netty服务器TransportServer
TransportServer提供了Netty实现的服务器端,用于提供RPC服务,如上传、下载等,代码如下。
View Code
TransportServer构造器如下
View Code
init方法对TransportServer进行初始化,通过使用Netty框架的EventLoopGroup、ServerBootstrap等API创建shuffle的I/O交互的服务端,主要代码见清单。
View Code
1)ioMode:NIO或者EPOLL
2) ChannelOption.ALLOCATOR:在Netty 4中实现了一个新的ByteBuf内存池,它是一个纯Java版本的 jemalloc (Facebook也在用)。现在,Netty不会再因为用零填充缓冲区而浪费内存带宽了。不过,由于它不依赖于GC,开发人员需要小心内存泄漏。如果忘记在处理程序中释放缓冲区,那么内存使用率会无限地增长。Netty默认不使用内存池,需要在创建客户端或者服务端的时候进行指定,使用内存池之后,内存的申请和释放必须成对出现,即retain()和release()要成对出现,否则会导致内存泄露。
3)RpcHandler处理接收到的数据逻辑
5.获取远程shuffle文件
NettyBlockTransferService的fetchBlocks方法用于获取远程的shuffle文件,实际是使用NettyBlockTransferService中创建的Netty服务。
View Code
6.上传shuffle文件
NettyBlockTransferService的uploadBlock方法用于上传shuffle文件到远程的Executor,实际也是用NettyBlockTransferService中创建的Netty服务,步骤如下。
View Code
1)创建Netty服务的客户端,客户端连接的hostname和port正是BlockManager的hostname和port
2)将Block的存储级别StorageLevel和类标签序列化
3)将Block的ByteBuffer转化为数据,便于序列化
4)将appId、execId、blockId、metadata、转化为数组的Block封装为UploadBlock,并将其序列化为字节数组
5)最终调用Netty客户端的sendRpc方法将字节数组上传,回掉函数RpcResponseCallback根据RPC的结果更改上传状态。
http://www.cnblogs.com/ChouYarn/p/7169472.html