ubuntu加入开机启动

以nexus为例

1.建立软链接:

ln -s /opt/nexus-2.10.0-02/bin/nexus /etc/init.d/nexus

注意这里一定要使用绝对路径

2.修改nexus权限:

chmod 755 /etc/init.d/nexus

3.设置nexus为系统服务

update-rc.d nexus defaults

有关JVM内存

Java虚拟机运行时的数据区

常用的内存区域调节参数

  1. -Xms:初始堆大小,默认为物理内存的1/64(<1GB);默认(MinHeapFreeRatio参数可以调整)空余堆内存小于40%时,JVM就会增大堆直到-Xmx的最大限制

  2. -Xmx:最大堆大小,默认(MaxHeapFreeRatio参数可以调整)空余堆内存大于70%时,JVM会减少堆直到 -Xms的最小限制

  3. -Xmn:新生代的内存空间大小,注意:此处的大小是(eden+ 2 survivor space)。与jmap -heap中显示的New gen是不同的。整个堆大小=新生代大小 + 老生代大小 + 永久代大小。
    在保证堆大小不变的情况下,增大新生代后,将会减小老生代大小。此值对系统性能影响较大,Sun官方推荐配置为整个堆的3/8。

  4. -XX:SurvivorRatio:新生代中Eden区域与Survivor区域的容量比值,默认值为8。两个Survivor区与一个Eden区的比值为2:8,一个Survivor区占整个年轻代的1/10。

  5. -Xss:每个线程的堆栈大小。JDK5.0以后每个线程堆栈大小为1M,以前每个线程堆栈大小为256K。应根据应用的线程所需内存大小进行适当调整。在相同物理内存下,减小这个值能生成更多的线程。但是操作系统对一个进程内的线程数还是有限制的,不能无限生成,经验值在3000~5000左右。一般小的应用, 如果栈不是很深, 应该是128k够用的,大的应用建议使用256k。这个选项对性能影响比较大,需要严格的测试。和threadstacksize选项解释很类似,官方文档似乎没有解释,在论坛中有这样一句话:”-Xss is translated in a VM flag named ThreadStackSize”一般设置这个值就可以了。

  6. -XX:PermSize:设置永久代(perm gen)初始值。默认值为物理内存的1/64。

  7. -XX:MaxPermSize:设置持久代最大值。物理内存的1/4。

内存分配方法

  • 堆上分配
  • 栈上分配
  • 堆外分配(DirectByteBuffer或直接使用Unsafe.allocateMemory,但不推荐这种方式)

内存监控方法

  • 系统程序运行时可通过jstat –gcutil来查看堆中各个内存区域的变化以及GC的工作状态
  • 启动时可添加-XX:+PrintGCDetails –Xloggc:输出到日志文件来查看GC的状况
  • jmap –heap可用于查看各个内存空间的大小;

TLAB的解释

堆内的对象数据是各个线程所共享的,所以当在堆内创建新的对象时,就需要进行锁操作。锁操作是比较耗时,因此JVM为每个线在堆上分配了一块“自留地”——TLAB(全称是Thread Local Allocation Buffer),位于堆内存的新生代,也就是Eden区。每个线程在创建新的对象时,会首先尝试在自己的TLAB里进行分配,如果成功就返回,失败了再到共享的Eden区里去申请空间。在线程自己的TLAB区域创建对象失败一般有两个原因:一是对象太大,二是自己的TLAB区剩余空间不够。通常默认的TLAB区域大小是Eden区域的1%,当然也可以手工进行调整,对应的JVM参数是-XX:TLABWasteTargetPercent。

有关JVM-GC介绍

断代法可以GC汇总

一、新生代可用GC

1)串行GC(Serial Copying):client模式下默认GC方式,也可通过-XX:+UseSerialGC来强制指定;默认情况下 eden、s0、s1的大小通过-XX:SurvivorRatio来控制,默认为8,含义
为eden:s0的比例,启动后可通过jmap –heap [pid]来查看。

默认情况下,仅在TLAB或eden上分配,只有两种情况下会在老生代分配:

  1. 需要分配的内存大小超过eden space大小;
  2. 在配置了PretenureSizeThreshold的情况下,对象大小大于此值。

默认情况下,触发Minor GC时:
之前Minor GC晋级到old的平均大小 < 老生代的剩余空间 < eden+from Survivor的使用空间。当HandlePromotionFailure为true,则仅触发minor gc;如为false,则触发full GC。

2)并行GC(ParNew):CMS GC时默认采用,也可采用-XX:+UseParNewGC强制指定;垃圾回收的时候采用多线程的方式。

3)并行回收GC(Parallel Scavenge):server模式下默认的GC方式,也可采用-XX:+UseParallelGC强制指定;eden、s0、s1的大小可通过-XX:SurvivorRatio来控制,但默认情况下
以-XX:InitialSurivivorRatio为准,此值默认为8,代表的为新生代大小 : s0,这点要特别注意。

默认情况下,当TLAB、eden上分配都失败时,判断需要分配的内存大小是否 >= eden space的一半大小,如是就直接在老生代上分配;

默认情况下的垃圾回收规则:

  1. 在回收前PS GC会先检测之前每次PS GC时,晋升到老生代的平均大小是否大于老生代的剩余空间,如大于则直接触发full GC;
  2. 在回收后,也会按照上面的规则进行检测。

默认情况下的新生代对象晋升到老生代的规则:

  1. 经历多次minor gc仍存活的对象,可通过以下参数来控制:AlwaysTenure,默认false,表示只要minor GC时存活,就晋升到老生代;NeverTenure,默认false,表示永不晋升到老生代;上面两个都没设置的情冴下,如UseAdaptiveSizePolicy,启动时以InitialTenuringThreshold值作为存活次数的阈值,在每次ps gc后会动态调整,如不使用UseAdaptiveSizePolicy,则以MaxTenuringThreshold为准。
  2. to space放不下的,直接放入老生代。

在回收后,如UseAdaptiveSizePolicy,PS GC会根据运行状态动态调整eden、to以及TenuringThreshold的大小。如果不希望动态调整可设置-XX:-UseAdaptiveSizePolicy。如希望跟踪每次的变化情况,可在启劢参数上增加: PrintAdaptiveSizePolicy。

二、老生代可用GC

  1. 串行GC(Serial Copying):client方式下默认GC方式,可通过-XX:+UseSerialGC强制指定。

    触发机制汇总:

    1. old gen空间不足;
    2. perm gen空间不足;
    3. minor gc时的悲观策略;
    4. minor GC后在eden上分配内存仍然失败;
    5. 执行heap dump时;
    6. 外部调用System.gc,可通过-XX:+DisableExplicitGC来禁止。
  1. 并行回收GC(Parallel Scavenge): server模式下默认GC方式,可通过-XX:+UseParallelGC强制指定; 并行的线程数为当cpu core<=8 ? cpu core : 3+(cpu core*5)/8或通过-XX:ParallelGCThreads=x来强制指定。如ScavengeBeforeFullGC为true(默认值),则先执行minor GC。

  2. 并行Compacting:可通过-XX:+UseParallelOldGC强制指定。

  3. 并发CMS:可通过-XX:+UseConcMarkSweepGC来强制指定。并发的线程数默认为:( 并行GC线程数+3)/4,也可通过ParallelCMSThreads指定。

触发机制:

  1. 当老生代空间的使用到达一定比率时触发;

    Hotspot V 1.6中默认为65%,可通过PrintCMSInitiationStatistics(此参数在V 1.5中不能用)来查看这个值到底是多少;可通过CMSInitiatingOccupancyFraction来强制指定,默认值并不是赋值在了这个值上,是根据如下公式计算出来的: ((100 - MinHeapFreeRatio) +(double)(CMSTriggerRatio * MinHeapFreeRatio) / 100.0)/ 100.0; 其中,MinHeapFreeRatio默认值: 40 CMSTriggerRatio默认值: 80。

  2. 当perm gen采用CMS收集且空间使用到一定比率时触发;

    perm gen采用CMS收集需设置:-XX:+CMSClassUnloadingEnabled Hotspot V 1.6中默认为65%;可通过CMSInitiatingPermOccupancyFraction来强制指定,同样,它是根据如下公式计算出来的:((100 - MinHeapFreeRatio) +(double)(CMSTriggerPermRatio* MinHeapFreeRatio) / 100.0)/ 100.0; 其中,MinHeapFreeRatio默认值: 40 CMSTriggerPermRatio默认值: 80。

  3. Hotspot根据成本计算决定是否需要执行CMS GC;可通过-XX:+UseCMSInitiatingOccupancyOnly来去掉这个动态执行的策略。

  4. 外部调用了System.gc,且设置了ExplicitGCInvokesConcurrent;需要注意,在hotspot 6中,在这种情况下如应用同时使用了NIO,可能会出现bug。

GC组合

1) 默认的GC组合

2) 可选的GC组合

GC监测

1) jstat –gcutil [pid] [intervel] [count]

2) -verbose:gc // 可以辅助输出一些详细的GC信息;

-XX:+PrintGCDetails // 输出GC详细信息;

-XX:+PrintGCApplicationStoppedTime // 输出GC造成应用暂停的时间

-XX:+PrintGCDateStamps // GC发生的时间信息;

-XX:+PrintHeapAtGC // 在GC前后输出堆中各个区域的大小;

-Xloggc:[file] // 将GC信息输出到单独的文件中,建议都加上,这个消耗不大,而且对查问题和调优有很大的帮助。gc的日志拿下来后可使用GCLogViewer或gchisto进行分析。

3) 图形化的情况下可直接用jvisualvm进行分析。

4) 查看内存的消耗状况

(1)长期消耗,可以直接dump,然后MAT(内存分析工具)查看即可

(2)短期消耗,图形界面情况下,可使用jvisualvm的memory profiler或jprofiler。

JVM-GC调优

系统调优方法

步骤:1、评估现状 2、设定目标 3、尝试调优 4、衡量调优 5、细微调整

设定目标:

  1. 降低Full GC的执行频率?
  2. 降低Full GC的消耗时间?
  3. 降低Full GC所造成的应用停顿时间?
  4. 降低Minor GC执行频率?
  5. 降低Minor GC消耗时间?

例如某系统的GC调优目标:降低Full GC执行频率的同时,尽可能降低minor GC的执行频率、消耗时间以及GC对应用造成的停顿时间。

衡量调优:

  1. 衡量工具

    1. 打印GC日志信息:

      • -XX:+PrintGCDetails
      • –XX:+PrintGCApplicationStoppedTime
      • -Xloggc: {文件名}
      • -XX:+PrintGCTimeStamps
    2. jmap:(由于每个版本jvm的默认值可能会有改变,建议还是用jmap首先观察下目前每个代的内存大小、GC方式)

    3. 运行状况监测工具:jstat、jvisualvm、sar 、gclogviewer

  2. 应收集的信息

    1. minor gc的执行频率;full gc的执行频率,每次GC耗时多少?

    2. 高峰期什么状况?

    3. minor gc回收的效果如何?survivor的消耗状况如何,每次有多少对象会进入老生代?

    4. full gc回收的效果如何?(简单的memory leak判断方法)

    5. 系统的load、cpu消耗、qps or tps、响应时间

      QPS每秒查询率:是对一个特定的查询服务器在规定时间内所处理流量多少的衡量标准。在因特网上,作为域名服务器的机器性能经常用每秒查询率来衡量。对应fetches/sec,即每秒的响应请求数,也即是最大吞吐能力。
      TPS(Transaction Per Second):每秒钟系统能够处理的交易或事务的数量。

尝试调优:

注意Java RMI的定时GC触发机制,可通过:-XX:+DisableExplicitGC来禁止或通过 -Dsun.rmi.dgc.server.gcInterval=3600000来控制触发的时间。

  1. 降低Full GC执行频率 – 通常瓶颈

    老生代本身占用的内存空间就一直偏高,所以只要稍微放点对象到老生代,就full GC了;

    • 通常原因:系统缓存的东西太多;
      例如:使用oracle 10g驱动时preparedstatement cache太大;

    • 查找办法:现执行Dump然后再进行MAT分析;

  1. Minor GC后总是有对象不断的进入老生代,导致老生代不断的满

    • 通常原因:Survivor太小了
    • 系统表现:系统响应太慢、请求量太大、每次请求分配的内存太多、分配的对象太大…
    • 查找办法:分析两次minor GC之间到底哪些地方分配了内存;

      利用jstat观察Survivor的消耗状况,-XX:PrintHeapAtGC,输出GC前后的详细信息;

      对于系统响应慢可以采用系统优化,不是GC优化的内容;

  2. 老生代的内存占用一直偏高

调优方法:

  1. 扩大老生代的大小(减少新生代的大小或调大heap的大小);

    • 减少新生代的大小注意对minor gc的影响并且同时有可能造成full gc还是严重;
    • 调大heap注意full gc的时间的延长,cpu够强悍嘛,os是32 bit的吗?
  2. 程序优化(去掉一些不必要的缓存)

  3. Minor GC后总是有对象不断的进入老生代。前提:这些进入老生代的对象在full GC时大部分都会被回收

    1. 降低Minor GC的执行频率;
    2. 让对象尽量在Minor GC中就被回收掉:增大Eden区、增大survivor、增大TenuringThreshold;注意这些可能会造成minor gc执行频繁;
    3. 切换成CMS GC:老生代还没有满就回收掉,从而降低Full GC触发的可能性;
    4. 程序优化:提升响应速度、降低每次请求分配的内存、
  4. 降低单次Full GC的执行时间

    • 通常原因:老生代太大了…
    • 调优方法:1)是并行GC吗? 2)升级CPU 3)减小Heap或老生代
  5. 降低Minor GC执行频率

    • 通常原因:每次请求分配的内存多、请求量大
    • 通常办法:1)扩大heap、扩大新生代、扩大eden。注意点:降低每次请求分配的内存;横向增加机器的数量分担请求的数量。
  6. 降低Minor GC执行时间

    • 通常原因:新生代太大了,响应速度太慢了,导致每次Minor GC时存活的对象多
    • 通常办法:1)减小点新生代吧;2)增加CPU的数量、升级CPU的配置;加快系统的响应速度

细微调整:

首先需要了解以下情况:

  1. 当响应速度下降到多少或请求量上涨到多少时,系统会宕掉?

  2. 参数调整后系统多久会执行一次Minor GC,多久会执行一次Full GC,高峰期会如何?

需要计算的量:

  1. 每次请求平均需要分配多少内存?系统的平均响应时间是多少呢?请求量是多少、多常时间执行一次Minor GC、Full GC?

  2. 现有参数下,应该是多久一次Minor GC、Full GC,对比真实状况,做一定的调整;

必杀技:提升响应速度、降低每次请求分配的内存?

系统调优举例

现象:

  1. 系统响应速度大概为100ms;
  2. 当系统QPS增长到40时,机器每隔5秒就执行一次minor gc,
  3. 每隔3分钟就执行一次full gc,并且很快就一直full GC了;
  4. 每次Full gc后旧生代大概会消耗400M,有点多了。

解决方案:解决Full GC次数过多的问题

  1. 降低响应时间或请求次数,这个需要重构,比较麻烦;——这个是终极方法,往往能够顺利的解决问题,因为大部分的问题均是由程序自身造成的。

  2. 减少老生代内存的消耗,比较靠谱;——可以通过分析Dump文件(jmap dump),并利用MAT查找内存消耗的原因,从而发现程序中造成老生代内存消耗的原因。

  3. 减少每次请求的内存的消耗,貌似比较靠谱;——这个是海市蜃楼,没有太好的办法。

  4. 降低GC造成的应用暂停的时间——可以采用CMS GS垃圾回收器。参数设置如下:

    -Xms1536m -Xmx1536m -Xmn700m -XX:SurvivorRatio=7 -XX:+UseConcMarkSweepGC -XX:+UseCMSCompactAtFullCollection
    -XX:CMSMaxAbortablePrecleanTime=1000 -XX:+CMSClassUnloadingEnabled -XX:+UseCMSInitiatingOccupancyOnly -XX:+DisableExplicitGC

  5. 减少每次minor gc晋升到old的对象。可选方法:1) 调大新生代。2)调大Survivor。3)调大TenuringThreshold。

  6. 调大Survivor:当前采用PS GC,Survivor space会被动态调整。由于调整幅度很小,导致了经常有对象直接转移到了老生代;于是禁止Survivor区的动态调整了,-XX:-UseAdaptiveSizePolicy,并计算Survivor Space需要的大小,于是继续观察,并做微调…。最终将Full GC推迟到2小时1次。

JVM-垃圾回收原理

垃圾回收的原理

内存回收的实现方法:

  • 引用计数:不适合复杂对象的引用关系,尤其是循环依赖的场景。
  • 有向图Tracing:适合于复杂对象的引用关系场景,Hotspot采用这种。常用算法:Copying、Mark-Sweep、Mark-Compact。

Hotspot从root set开始扫描有引用的对象并对Reference类型的对象进行特殊处理。
以下是Root Set的列表:

  1. 当前正在执行的线程;
  2. 全局/静态变量;
  3. JVM Handles;
  4. JNI 【 Java Native Interface 】Handles;

另外:minor GC只扫描新生代,当老生代的对象引用了新生代的对象时,会采用如下的处理方式:在给对象赋引用时,会经过一个write barrier的过程,以便检查是否有老生代引用新生代对象的情况,如有则记录到remember set中。并在minor gc时,remember set指向的新生代对象也作为root set。

新生代串行GC(Serial Copying):

新生代串行GC(Serial Copying)完整内存的分配策略:

  1. 首先在TLAB(本地线程分配缓冲区)上尝试分配;
  2. 检查是否需要在新生代上分配,如需要分配的大小小于PretenureSizeThreshold,则在eden区上进行分配,分配成功则返回;分配失败则继续;
  3. 检查是否需要尝试在老生代上分配,如需要,则遍历所有代并检查是否可在该代上分配,如可以则进行分配;如不需要在老生代上尝试分配,则继续;
  4. 根据策略决定执行新生代GC或Full GC,执行full gc时不清除soft Ref;
  5. 如需要分配的大小大于PretenureSizeThreshold,尝试在老生代上分配,否则尝试在新生代上分配;
  6. 尝试扩大堆并分配;
  7. 执行full gc,并清除所有soft Ref,按步骤5继续尝试分配。

新生代串行GC(Serial Copying)完整内存回收策略

  1. 检查to是否为空,不为空返回false;
  2. 检查老生代剩余空间是否大于当前eden+from已用的大小,如大于则返回true,如小于且HandlePromotionFailure为true,则检查剩余空间是否大于之前每次minor gc晋级到老生代的平均大小,如大于返回true,如小于返回false。
  3. 如上面的结果为false,则执行full gc;如上面的结果为true,执行下面的步骤;
  4. 扫描引用关系,将活的对象copy到to space,如对象在minor gc中的存活次数超过tenuring_threshold或分配失败,则往老生代复制,如仍然复制失败,则取决于HandlePromotionFailure,如不需要处理,直接抛出OOM,并退出vm,如需处理,则保持这些新生代对象不动;

新生代可用GC-PS(UseParallelGC, UseParNewGC, UseSerialGC)

完整内存分配策略

  1. 先在TLAB上分配,分配失败则直接在eden上分配;
  2. 当eden上分配失败时,检查需要分配的大小是否 >= eden space的一半,如是,则直接在老生代分配;
  3. 如分配仍然失败,且gc已超过频率,则抛出OOM;
  4. 进入基本分配策略失败的模式;
  5. 执行PS GC,在eden上分配;
  6. 执行非最大压缩的full gc,在eden上分配;
  7. 在旧生代上分配;
  8. 执行最大压缩full gc,在eden上分配;
  9. 在旧生代上分配;
  10. 如还失败,回到2。

最悲惨的情况,分配触发多次PS GC和多次Full GC,直到OOM。

完整内存回收策略

  1. 如gc所执行的时间超过,直接结束;
  2. 先调用invoke_nopolicy

    1. 先检查是不是要尝试scavenge;
      1. to space必须为空,如不为空,则返回false;
      2. 获取之前所有minor gc晋级到old的平均大小,并对比目前eden+from已使用的大小,取更小的一个值,如老生代剩余空间小于此值,则返回false,如大于则返回true;
    2. 如不需要尝试scavenge,则返回false,否则继续;
    3. 多线程扫描活的对象,并基亍copying算法回收,回收时相应的晋升对象到旧生代;
    4. 如UseAdaptiveSizePolicy,那么重新计算to space和tenuringThreshold的值,并调整。
  3. 如invoke_nopolicy返回的是false,或之前所有minor gc晋级到老生代的平均大小 > 旧生代的剩余空间,那么继续下面的步骤,否则结束;

  4. 如UseParallelOldGC,则执行PSParallelCompact,如不是UseParallelOldGC,则执行PSMarkSweep。

老生代并行CMS GC(并发标记清除):

优缺点:

  1. 大部分时候和应用并发进行,因此只会造成很短的暂停时间;
  2. 浮动垃圾,没办法,所以内存空间要稍微大一点;
  3. 内存碎片,-XX:+UseCMSCompactAtFullCollection 来解决;
  4. 争抢CPU,这GC方式就这样;
  5. 多次remark,所以总的gc时间会比并行的长;
  6. 内存分配,free list方式,so性能稍差,对minor GC会有一点影响;
  7. 和应用并发,有可能分配和回收同时,产生竞争,引入了锁,JVM分配优先。

hadoop-rpc使用实例

Hadoop RPC主要对外提供2种接口

  1. 构造一个客户端代理对象,向服务器发送RPC请求

    1
    public static ProtocolProxy getProxy/waitForProxy
  2. 为某个协议实例构造一个服务器对象,用于处理客户端发送请求。

    1
    public static Server RPC.Builder(Configuration).build()

    注意: 在Hadoop以前的版本中,这个接口为

    1
    public static Server getServer()

Hadoop RPC的使用方法可以分为4个步骤

  1. 定义RPC协议。RPC协议是客户端和服务器之间的通信接口,它定义了服务器对外提供的服务接口(方法),Hadoop中所有自定义的RPC接口都需继承 VersionedProtocol 正如Java RMI需要实现 Remote 接口一样
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    import org.apache.hadoop.ipc.VersionedProtocol;

    public interface DemoProtocol extends VersionedProtocol {
    //注意接口中必须有一个名为 versionID 的字段,
    //因为客户端调用时,服务器会反射获取这个字段,以保证调用的服务接口版本统一,
    //不然会抛出 java.lang.NoSuchFieldException: versionID 异常
    public static final long versionID = 0;

    String echo(String name);
    }
  1. 实现RPC协议。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    import org.apache.hadoop.ipc.ProtocolSignature;

    import java.io.IOException;

    public class DemoProtocolImpl implements DemoProtocol {
    public String echo(String name) {
    return "Hadoop Server echo: " + name;
    }

    public long getProtocolVersion(String s, long l) throws IOException {
    return DemoProtocol.versionID;
    }

    public ProtocolSignature getProtocolSignature(String s, long l, int i) throws IOException {
    return new ProtocolSignature(DemoProtocol.versionID, null);
    }
    }
  2. 构造RPC Server

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.ipc.RPC;

    import java.io.IOException;

    public class RpcServer {

    public static void main(String[] args) throws IOException {
    RPC.Server server = new RPC.Builder(new Configuration()).setProtocol(DemoProtocol.class)
    .setInstance(new DemoProtocolImpl()).setBindAddress("127.0.0.1")
    .setPort(9000).setNumHandlers(5).build();

    server.start();
    }
    }
  3. 构造RPC Client 并发送RPC请求

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.ipc.RPC;

    import java.io.IOException;
    import java.net.InetSocketAddress;

    public class RpcClient {

    public static void main(String[] args) throws IOException {
    DemoProtocol proxy = RPC.getProxy(DemoProtocol.class, DemoProtocol.versionID, new InetSocketAddress("127.0.0.1", 9000), new Configuration());

    String result = proxy.echo("123");

    System.out.println(result);
    }
    }

调用结果 成功输出 Hadoop Server echo: 123

代码需要的依赖 Maven

1
2
3
4
5
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>2.6.4</version>
</dependency>

Hadoop源代码分析

Hadoop源代码分析之Hadoop RPC

RPC是一种通过网络从远程计算机上请求服务,而不需要了解底层网络通信技术的协议。是分布式中最常见和常用的网络通信协议。
Hadoop RPC具有以下几个特点:

  1. 透明性: RPC最基本的特性, 用户在一台计算机中调用另外一台计算机的子程序时,用户自身不应该感觉到其间涉及跨机器间的通信,而是感觉在执行一个本地调用。

  2. 高性能: Hadoop的各个系统(如HDFS, MapReduce)均采用了Master/Slaver的结构。因此需要Hadoop的RPC Server能够高性能的处理各个Client的请求。

  3. 可控性: Hadoop需要精确的控制进程间通信,比如连接,超时,缓存等通信细节,而Java RMI过于重量级且用户可控之处太少

Hadoop的相关代码都在org.apache.hadoop.ipc中,其中最主要的类有三个:

Server: Hadoop RPC Server的实现,这是一个抽象类,只有一个抽象方法

1
public abstract Writable call(Class<?> protocol,Writable param, long receiveTime) throws IOException;

具体的实现在 RPC.Server中, 其中主要包括5各类:

  1. Call: 用于储存客户端的请求。

  2. Listener: 监听类,用于监听客户端发来的请求,把数据封装成Call对象, 添加到callQueue。

  3. Handler:请求处理类,会循环阻塞读取callQueue中的call对象,并对其进行操作。

  4. Responder:响应RPC请求类,请求处理完毕,由Responder发送给请求客户端。

  5. Connection:连接类,真正的客户端请求读取逻辑在这个类中。其中Listener,Handler,Responder都继承了Thread,在服务器启动时同时启动这三个线程,下面看这个三个线程的run方法

Listener的关键代码:其实和一般的NIO服务器ServerChannal写法差不多。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
public void run() {
...
while (running) {
SelectionKey key = null;
selector.select();
Iterator<SelectionKey> iter = selector.selectedKeys().iterator();
while (iter.hasNext()) {
key = iter.next();
iter.remove();
try {
if (key.isValid()) {
if (key.isAcceptable())
// 创建Connection对象,并把Connection做为key的attachment以便下面的doRead方法使用
doAccept(key);
else if (key.isReadable())
// 主要调用Connection的readAndProcess方法,读取客户端发送过来的数据进行处理,分为三个步骤
// 1.读取IPC连接魔数和协议版本号并完成版本检查
// 2.进行连接头检查,主要调用了两个函数processHeader():保证服务器实现了IPC接口和获取用户信息
// authorize():保证用户有相关的权限访问远程接口
// 3.调用processData()方法处理数据,主要是新建一个Call对象,读取数据填充这个Call的成员变量,并加入到callQueue队列中
doRead(key);
}
} catch (IOException e) {
}
key = null;
}
}
...
}

Handler才是真正执行客户端发过来的远程调用,其关键代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
public void run() {
...
while (running) {
try {
// pop the queue; maybe blocked here,从callQueue队列中取出call对象进行处理
final Call call = callQueue.take();
...
//Subject.doAs()是java的鉴权与授权服务(JAAS)中的方法
value = Subject.doAs(call.connection.user,
new PrivilegedExceptionAction<Writable>() {
@Override
public Writable run() throws Exception {
return call(call.connection.protocol,call.param, call.timestamp);
/**调用Server的那个抽象方法(当然是调用它子类的实现啦 (RPC.Server.call()))。其实现的关键代码如下:**/
public Writable call(Class<?> protocol, Writable param, long receivedTime)throws IOException {
try {
//这个就是我们常用的java反射调用方法啦
Invocation call = (Invocation)param;
Method method = protocol.getMethod(call.getMethodName(),call.getParameterClasses());
method.setAccessible(true);
Object value = method.invoke(instance, call.getParameters());
...
return new ObjectWritable(method.getReturnType(), value);
} catch (InvocationTargetException e) {
...
}
}
}
);
}
...
//把返回结果储存在这个Call中的response
setupResponse(buf, call, (error == null) ? Status.SUCCESS : Status.ERROR,
value, errorClass, error);
// 把数据回写给客户端,其中会调用一个Response中一个非常重要的方法processResponse()
// 这个方法只在通道空闲时响应(要处理的Call队列长度为1),忙时不会响应,而是交个Responder
// 进行集中处理和响应
responder.doRespond(call);
}
...
}

Responder关键代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public void run() {
...
while (running) {
try {
waitPending(); // If a channel is being registered, wait.
writeSelector.select(PURGE_INTERVAL);
Iterator<SelectionKey> iter = writeSelector.selectedKeys().iterator();
while (iter.hasNext()) {
SelectionKey key = iter.next();
iter.remove();
try {
if (key.isValid() && key.isWritable()) {
doAsyncWrite(key);//把数据写到缓存区从而发送给客户端,同样调用了processResponse()方法
}
} catch (IOException e) {
...
}
}
}
}

其中响应请求回写数据最关键的代码 processResponse()方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
private boolean processResponse(LinkedList<Call> responseQueue, boolean inHandler) throws IOException {
boolean error = true;
boolean done = false; // 该通道上没有数据要发送
int numElements = 0;
Call call = null;
try {
synchronized (responseQueue) {
// If there are no items for this channel, then we are done
numElements = responseQueue.size();
if (numElements == 0) {
error = false;
return true; // no more data for this channel.
}
call = responseQueue.removeFirst();
...
//异步写尽可能多的数据
int numBytes = channelWrite(channel, call.response);
if (numBytes < 0) {
return true;
}
if (!call.response.hasRemaining()) {
//应答数据已经写完
call.connection.decRpcCount();
if (numElements == 1) { // last call fully processes.
done = true; // no more data for this channel.
} else {
done = false; // more calls pending to be sent.
}
...
} else {
// 为什么会有数据没写完的情况: 在Reactor模式中,业务逻辑被分散的I/O事件所打破, 所以Handler需要适当的机制在所需信息
// 还不全的时候保存上下文,并能在下一次IO事件来的时候继续上次的中断处理
// 应答数据没有写完,插入队列头,等待再次发送
call.connection.responseQueue.addFirst(call);
if (inHandler) {
// 不在Response线程中,在Handler线程中,前面说过当通道空闲时,Handler线程也会调用这个方法往通道中写数据,同样如果
// 数据没写完,就需要交给Responder处理,这是就需要把次没写完的数据标记为Responder感兴趣的事件,等待Responder的
// Selector选择出来并处理
// set the serve time when the response has to be sent later
call.timestamp = System.currentTimeMillis();
// 成员变量peding++,该变量表示现在有多少个线程在进行通道注册
incPending();
try {
// 唤醒在select()方法上等待的Responder线程
writeSelector.wakeup();
//这样才能调用这个注册方法进行注册
channel.register(writeSelector, SelectionKey.OP_WRITE, call);
} catch (ClosedChannelException e) {
//Its ok. channel might be closed else where.
done = true;
} finally {
decPending();
}
}
...
}
error = false;// everything went off well
}
} finally {
...
}
return done;
}

Hadoop RPC Server 是一个典型的Reactor模式的实现。
Reactor模式主要有两个特点

  1. 通过派发/分离I/O操作事件提高系统的并发性能
  2. 提供粗粒度的并发控制,使用单线程实现,避免了负责的同步处理

//TODO 写Hadoop RPC是如何实现Reactor模式的

Hadoop RPC参数调优

  • Reader线程数目: (ipc.server.read.threadpool.size), 默认是1

  • 每个Handler线程对应的Call数目: (ipc.server.handler.queue.size)指定, 默认100

  • Handler线程数目: JobTracker 和 NameNode 是HDFS中的两个RPC Server,其对应的Handler数目分别有参数(mapred.job.tracker.handler.count)和(dfs.namenode.service.handler.count)指定,默认值都为10,这个参数的配置会极大的影响性能。因为Handler线程处理业务逻辑,而其中有可能牵扯计算密集或I/O密集,线程少,耗时的业务逻辑会让大部分的线程阻塞,而响应快的请求得不到及时的处理,这时Reader收集的请求队列会长期处于满的状态,导致通讯恶化,线程过多,又会导致频繁的切换线程的开销

  • 客户端重试次数: (ipc.client.connect.max.retries)指定,默认为10