Hadoop

Hadoop和Hadoop生态系统

Hadoop生态系统

广义上来说,Hadoop通常是指一个更广泛的概念。

  • Zookeeper:是一个开源的分布式应用程序协调服务,基于zookeeper可以实现同步服务,配置维护,命名服务。

  • Flume:一个高可用的,高可靠的,分布式的海量日志采集、聚合和传输的系统。

  • Hbase:是一个分布式的、面向列的开源数据库, 利用Hadoop HDFS作为其存储系统。

  • Hive:基于Hadoop的一个数据仓库工具,可以将结构化的数据档映射为一张数据库表,并提供简单的sql 查询功能,可以将sql语句转换为MapReduce任务进行运行。

  • Sqoop:将一个关系型数据库中的数据导进到Hadoop的 HDFS中,也可以将HDFS的数据导进到关系型数据库中。

Hadoop

狭义上说,Hadoop指Apache这款开源框架,它的核心组件有:

  • HDFS(分布式文件系统):解决海量数据存储

  • YARN(作业调度和集群资源管理的框架):解决资源任务调度

  • MAPREDUCE(分布式运算编程框架):解决海量数据计算

Hadoop的特性优点

  • 扩容能力(Scalable):Hadoop是在可用的计算机集群间分配数据并完成计算任务的,这些集群可用方便的扩展到数以千计的节点中。
  • 成本低(Economical):Hadoop通过普通廉价的机器组成服务器集群来分发以及处理数据,以至于成本很低。
  • 高效率(Efficient):通过并发数据,Hadoop可以在节点之间动态并行的移动数据,使得速度非常快。
  • 可靠性(Rellable):能自动维护数据的多份复制,并且在任务失败后能自动地重新部署(redeploy)计算任务。所以Hadoop的按位存储和处理数据的能力值得人们信赖。

Hadoop的运行模式

单机版、伪分布式模式、完全分布式模式

Hadoop集群启动节点

  • namenode:HDFS的守护进程,负责维护整个文件系统,存储着整个文件系统的元数据信息,image+edit log
  • datanode:是具体文件系统的工作节点,当我们需要某个数据,namenode告诉我们去哪里找,就直接和那个DataNode对应的服务器的后台进程进行通信,由DataNode进行数据的检索,然后进行具体的读/写操作
  • secondarynamenode:它不是namenode的冗余守护进程,而是提供周期检查点和清理任务。帮助NN合并image和editslog,减少NN启动时间。
  • resourcemanager:是yarn平台的守护进程,负责所有资源的分配与调度,client的请求由此负责,监控nodemanager
  • nodemanager:是单个节点的资源管理,执行来自resourcemanager的具体任务和命令
  • DFSZKFailoverController:高可用时它负责监控NN的状态,并及时的把状态信息写入ZK。它通过一个独立线程周期性的调用NN上的一个特定接口来获取NN的健康状态。FC也有选择谁作为Active NN的权利,因为最多只有两个节点,目前选择策略还比较简单(先到先得,轮换)。
  • JournalNode:高可用情况下存放namenode的editlog文件

主要配置文件

  • hadoop-env.sh

    • 文件中设置的是Hadoop运行时需要的环境变量。JAVA_HOME是必须设置的,即使我们当前的系统中设置了JAVA_HOME,它也是不认识的,因为Hadoop即使是在本机上执行,它也是把当前的执行环境当成远程服务器。
  • core-site.xml

    • 设置Hadoop的文件系统地址

      1
      2
      3
      4
      <property>
      <name>fs.defaultFS</name>
      <value>hdfs://node-1:9000</value>
      </property>
  • hdfs-site.xml

    • 指定HDFS副本的数量

    • secondary namenode 所在主机的ip和端口

      1
      2
      3
      4
      5
      6
      7
      8
      9
      <property>
      <name>dfs.replication</name>
      <value>2</value>
      </property>

      <property>
      <name>dfs.namenode.secondary.http-address</name>
      <value>node-2:50090</value>
      </property>
  • mapred-site.xml

    • 指定mr运行时框架,这里指定在yarn上,默认是local

      1
      2
      3
      4
      <property>
      <name>mapreduce.framework.name</name>
      <value>yarn</value>
      </property>
  • yarn-site.xml

    • 指定YARN的主角色(ResourceManager)的地址

      1
      2
      3
      4
      <property>
      <name>yarn.resourcemanager.hostname</name>
      <value>node-1</value>
      </property>

重要命令

1
2
3
4
5
6
7
8
9
10
11
12
13
14
//初始化
hadoop namenode –format

//启动dfs
start-dfs.sh

//启动yarn
start-yarn.sh

//启动任务历史服务器
mr-jobhistory-daemon.sh start historyserver

//一键启动
start-all.sh

启动成功后:

NameNode的web 访问端口:50070.

ResourceManager的web 访问端口:8088

历史服务器 的web 访问端口:19888

选项名称 使用格式 含义
-ls -ls <路径> 查看指定路径的当前目录结构
-lsr -lsr <路径> 递归查看指定路径的目录结构
-du -du <路径> 统计目录下个文件大小
-dus -dus <路径> 汇总统计目录下文件(夹)大小
-count -count [-q] <路径> 统计文件(夹)数量
-mv -mv <源路径> <目的路径> 移动
-cp -cp <源路径> <目的路径> 复制
-rm -rm [-skipTrash] <路径> 删除文件/空白文件夹
-rmr -rmr [-skipTrash] <路径> 递归删除
-put -put <多个linux上的文件> <hdfs路径> 上传文件
-copyFromLocal -copyFromLocal <多个linux上的文件> <hdfs路径> 从本地复制
-moveFromLocal -moveFromLocal <多个linux上的文件> <hdfs路径> 从本地移动
-getmerge -getmerge <源路径> <linux路径> 合并到本地
-cat -cat <hdfs路径> 查看文件内容
-text -text <hdfs路径> 查看文件内容
-copyToLocal -copyToLocal [-ignoreCrc] [-crc] [hdfs源路径] [linux目的路径] 从本地复制
-moveToLocal -moveToLocal [-crc] <hdfs源路径> <linux目的路径> 从本地移动
-mkdir -mkdir <hdfs路径> 创建空白文件夹
-setrep -setrep [-R] [-w] <副本数> <路径> 修改副本数量
-touchz -touchz <文件路径> 创建空白文件
-stat -stat [format] <路径> 显示文件统计信息
-tail -tail [-f] <文件> 查看文件尾部信息
-chmod -chmod [-R] <权限模式> [路径] 修改权限
-chown -chown [-R] [属主][:[属组]] 路径 修改属主
-chgrp -chgrp [-R] 属组名称 路径 修改属组
-help -help [命令选项] 帮助

HDFS

HDFS的组成架构

HDFS架构
  • Client:就是客户端。
    (1)文件切分。文件上传HDFS的时候,Client将文件切分成一个一个的Block,然后进行存储;
      (2)与NameNode交互,获取文件的位置信息;
      (3)与DataNode交互,读取或者写入数据;
      (4)Client提供一些命令来管理HDFS,比如启动或者关闭HDFS;
      (5)Client可以通过一些命令来访问HDFS;
  • NameNode:就是Master,它是一个主管、管理者。
    (1)管理HDFS的名称空间;
      (2)管理数据块(Block)映射信息;
      (3)配置副本策略;
      (4)处理客户端读写请求。
  • DataNode:就是Slave。NameNode下达命令,DataNode执行实际的操作。
    (1)存储实际的数据块;
      (2)执行数据块的读/写操作。
  • SecondaryNameNode:并非NameNode的热备。当NameNode挂掉的时候,它并不能马上替换NameNode并提供服务。
    (1)辅助NameNode,分担其工作量;
      (2)定期合并Fsimage和Edits,并推送给NameNode;
      (3)在紧急情况下,可辅助恢复NameNode。

HDFS写数据流程

HDFS dfs -put a.txt /

HDFS写数据流程

详细步骤:

  1. 客户端通过Distributed FileSystem模块向namenode请求上传文件,namenode检查目标文件是否已存在,父目录是否存在。
  2. namenode返回是否可以上传。
  3. 客户端请求第一个 block上传到哪几个datanode服务器上。
  4. namenode返回3个datanode节点,分别为dn1、dn2、dn3。
  5. 客户端通过FSDataOutputStream模块请求dn1上传数据,dn1收到请求会继续调用dn2,然后dn2调用dn3,将这个通信管道建立完成。
  6. dn1、dn2、dn3逐级应答客户端。
  7. 客户端开始往dn1上传第一个block(先从磁盘读取数据放到一个本地内存缓存),以packet为单位(大小为64k),dn1收到一个packet就会传给dn2,dn2传给dn3;dn1每传一个packet会放入一个应答队列等待应答。
  8. 当一个block传输完成之后,客户端再次请求namenode上传第二个block的服务器。(重复执行3-7步)。

HDFS读数据流程

HDFS读数据流程

详细步骤

  1. 客户端通过Distributed FileSystem向namenode请求下载文件,namenode通过查询元数据,找到文件块所在的datanode地址。
  2. 挑选一台datanode(就近原则,然后随机)服务器,请求读取数据。
  3. datanode开始传输数据给客户端(从磁盘里面读取数据输入流,以packet为单位来做校验,大小为64k)。
  4. 客户端以packet为单位接收,先在本地缓存,然后写入目标文件。

SecondaryNameNode的作用

合并NameNode的editslogfsimage

SecondaryNameNode的作用

NameNode与SecondaryNameNode

区别

  (1)NameNode负责管理整个文件系统的元数据,以及每一个路径(文件)所对应的数据块信息。
  (2)SecondaryNameNode主要用于定期合并命名空间镜像和命名空间镜像的编辑日志。

联系

  (1)SecondaryNameNode中保存了一份和namenode一致的镜像文件(fsimage)和编辑日志(edits)。
  (2)在主namenode发生故障时(假设没有及时备份数据),可以从SecondaryNameNode恢复数据。

HDFS的垃圾桶机制

修改core-site.xml

1
2
3
4
<property>
<name>fs.trash.interval</name>
<value>1440</value>
</property>

注:这个时间以分钟为单位,例如1440=24h=1天。HDFS的垃圾回收的默认配置属性为 0,也就是说,如果你不小心误删除了某样东西,那么这个操作是不可恢复的。

HANameNode工作原理

HANameNode的工作原理

ZKFailoverController主要职责

  1. 健康监测:周期性的向它监控的NN发送健康探测命令,从而来确定某个NameNode是否处于健康状态,如果机器宕机,心跳失败,那么zkfc就会标记它处于一个不健康的状态。
  2. 会话管理:如果NN是健康的,zkfc就会在zookeeper中保持一个打开的会话,如果NameNode同时还是Active状态的,那么zkfc还会在Zookeeper中占有一个类型为短暂类型的znode,当这个NN挂掉时,这个znode将会被删除,然后备用的NN,将会得到这把锁,升级为主NN,同时标记状态为Active。
  3. 当宕机的NN新启动时,它会再次注册zookeper,发现已经有znode锁了,便会自动变为Standby状态,如此往复循环,保证高可靠,需要注意,目前仅仅支持最多配置2个NN。
  4. master选举:如上所述,通过在zookeeper中维持一个短暂类型的znode,来实现抢占式的锁机制,从而判断那个NameNode为Active状态

注:同时出现两个Active状态Onamenode的术语叫脑裂brain split。

防止脑裂的两种方式:

  1. ssh发送kill指令
  2. 调用用户自定义脚本程序

HDFS中block

默认保存3份

老版本默认64m,2.x版本默认128m

HDFS安全模式

文件系统只接受读数据请求,而不接受删除、修改等变更请求。

在NameNode主节点启动时,HDFS首先进入安全模式,集群会开始检查数据块的完整性。DataNode在启动的时候会向namenode汇报可用的block信息,当整个系统达到安全标准时,HDFS自动离开安全模式。

  • 手动进入安全模式

    1
    hdfs dfsadmin -safemode enter
  • 手动离开安全模式

    1
    hdfs dfsadmin -safemode leave

机架感知

hadoop自身是没有机架感知能力的,必须通过人为的设定来达到这个目的。

通过配置一个脚本来进行映射;

通过实现DNSToSwitchMapping接口的resolve()方法来完成网络位置的映射。

1、写一个脚本,然后放到hadoop的core-site.xml配置文件中,用namenode和jobtracker进行调用。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
#!/usr/bin/python
#-*-coding:UTF-8 -*-
import sys

rack = {"hadoop-node-31":"rack1",
"hadoop-node-32":"rack1",
"hadoop-node-33":"rack1",
"hadoop-node-34":"rack1",
"hadoop-node-49":"rack2",
"hadoop-node-50":"rack2",
"hadoop-node-51":"rack2",
"hadoop-node-52":"rack2",
"hadoop-node-53":"rack2",
"hadoop-node-54":"rack2",
"192.168.1.31":"rack1",
"192.168.1.32":"rack1",
"192.168.1.33":"rack1",
"192.168.1.34":"rack1",
"192.168.1.49":"rack2",
"192.168.1.50":"rack2",
"192.168.1.51":"rack2",
"192.168.1.52":"rack2",
"192.168.1.53":"rack2",
"192.168.1.54":"rack2",
}

if __name__=="__main__":
print "/" + rack.get(sys.argv[1],"rack0")

2、将脚本赋予可执行权限chmod +x RackAware.py,并放到bin/目录下。

3、然后打开conf/core-site.html

1
2
3
4
5
6
7
8
9
10
    <property>
<name>topology.script.file.name</name>
<value>/opt/modules/hadoop/hadoop-1.0.3/bin/RackAware.py</value>
<!--机架感知脚本路径-->
</property>
<property>
<name>topology.script.number.args</name>
<value>20</value>
<!--机架服务器数量,由于我写了20个,所以这里写20-->
</property>

4、重启Hadoop集群

HDFS的扩容、缩容

动态扩容

准备

修改新机器系统hostname(通过/etc/sysconfig/network进行修改)

修改hosts文件,将集群所有节点hosts配置进去(集群所有节点保持hosts文件统一)

设置NameNode到DataNode的免密码登录(ssh-copy-id命令实现)

修改主节点slaves文件,添加新增节点的ip信息(集群重启时配合一键启动脚本使用)

在新的机器上上传解压一个新的hadoop安装包,从主节点机器上将hadoop的所有配置文件,scp到新的节点上。

添加datanode
  1. 在namenode所在的机器的/export/servers/hadoop-2.6.0-cdh5.14.0/etc/hadoop目录下创建dfs.hosts文件

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    cd /export/servers/hadoop-2.6.0-cdh5.14.0/etc/hadoop

    vim dfs.hosts

    添加如下主机名称(包含新服役的节点)

    node-1
    node-2
    node-3
    node-4

  2. 在namenode机器的hdfs-site.xml配置文件中增加dfs.hosts属性

    1
    2
    3
    cd /export/servers/hadoop-2.6.0-cdh5.14.0/etc/hadoop

    vim hdfs-site.xml
    1
    2
    3
    4
    <property>
    <name>dfs.hosts</name>
    <value>/export/servers/hadoop-2.6.0-cdh5.14.0/etc/hadoop/dfs.hosts</value>
    </property>

    dfs.hosts属性的意义:命名一个文件,其中包含允许连接到namenode的主机列表。必须指定文件的完整路径名。如果该值为空,则允许所有主机。相当于一个白名单,也可以不配置。

  3. 在新的机器上单独启动datanode

    1
    hadoop-daemon.sh start datanode

    刷新页面就可以看到新的节点加入进来了

datanode负载均衡服务

新加入的节点,没有数据块的存储,使得集群整体来看负载还不均衡。因此最后还需要对hdfs负载设置均衡,因为默认的数据传输带宽比较低,可以设置为64M,即

1
hdfs dfsadmin -setBalancerBandwidth 67108864

默认balancer的threshold为10%,即各个节点与集群总的存储使用率相差不超过10%,我们可将其设置为5%。然后启动Balancer,

1
sbin/start-balancer.sh -threshold 5

等待集群自均衡完成即可。

添加nodemanager

在新的机器上单独启动nodemanager:

1
yarn-daemon.sh start nodemanager

在web页面确认是否成功启用

在ResourceManager,通过yarn node -list查看集群情况

动态缩容

添加退役节点

在namenode所在服务器的hadoop配置目录etc/hadoop下创建dfs.hosts.exclude文件,并添加需要退役的主机名称。

注意:该文件当中一定要写真正的主机名或者ip地址都行,不能写node-4

node04.hadoop.com

在namenode机器的hdfs-site.xml配置文件中增加dfs.hosts.exclude属性

1
2
3
cd /export/servers/hadoop-2.6.0-cdh5.14.0/etc/hadoop

vim hdfs-site.xml
1
2
3
4
<property> 
<name>dfs.hosts.exclude</name>
<value>/export/servers/hadoop-2.6.0-cdh5.14.0/etc/hadoop/dfs.hosts.exclude</value>
</property>

dfs.hosts.exclude属性的意义:命名一个文件,其中包含不允许连接到namenode的主机列表。必须指定文件的完整路径名。如果值为空,则不排除任何主机。

刷新集群

在namenode所在的机器执行以下命令,刷新namenode,刷新resourceManager。

1
2
3
hdfs dfsadmin -refreshNodes

yarn rmadmin –refreshNodes

等待退役节点状态为decommissioned(所有块已经复制完成),停止该节点及节点资源管理器。注意:如果副本数是3,服役的节点小于等于3,是不能退役成功的,需要修改副本数后才能退役。

node-4执行以下命令,停止该节点进程

1
2
3
4
5
cd /export/servers/hadoop-2.6.0-cdh5.14.0

sbin/hadoop-daemon.sh stop datanode

sbin/yarn-daemon.sh stop nodemanager

namenode所在节点执行以下命令刷新namenode和resourceManager

1
2
3
hdfs dfsadmin –refreshNodes

yarn rmadmin –refreshNodes

namenode所在节点执行以下命令进行均衡负载

1
2
3
cd /export/servers/hadoop-2.6.0-cdh5.14.0/

sbin/start-balancer.sh

MapReduce

工作流程

MapReduce执行总流程概览

分片、格式化

输入 Map 阶段的数据源,必须经过分片和格式化操作。

分片:指的是将源文件划分为大小相等的小数据块( Hadoop 2.x 中默认 128MB ),也就是分片( split ),Hadoop 会为每一个分片构建一个 Map 任务,并由该任务运行自定义的 map() 函数,从而处理分片里的每一条记录;
格式化:将划分好的分片( split )格式化为键值对<key,value>形式的数据,其中, key 代表偏移量, value 代表每一行内容。

执行MapTask

MapTask工作机制

  1. Read 阶段: MapTask 通过用户编写的 RecordReader ,从输入的 InputSplit 中解析出一个个 key / value 。
  2. Map 阶段:将解析出的 key / value 交给用户编写的 Map ()函数处理,并产生一系列新的 key / value 。
  3. Collect 阶段:在用户编写的 map() 函数中,数据处理完成后,一般会调用 outputCollector.collect() 输出结果,在该函数内部,它会将生成的 key / value 分片(通过调用 partitioner ),并写入一个环形内存缓冲区中(该缓冲区默认大小是 100MB )。
  4. Spill 阶段:即“溢写”,当缓冲区快要溢出时(默认达到缓冲区大小的 80 %),会在本地文件系统创建一个溢出文件,将该缓冲区的数据写入这个文件。

将数据写入本地磁盘前,先要对数据进行一次本地排序,并在必要时对数据进行合并、压缩等操作。

写入磁盘之前,线程会根据 ReduceTask 的数量,将数据分区,一个 Reduce 任务对应一个分区的数据。

这样做的目的是为了避免有些 Reduce 任务分配到大量数据,而有些 Reduce 任务分到很少的数据,甚至没有分到数据的尴尬局面。

如果此时设置了 Combiner ,将排序后的结果进行 Combine 操作,这样做的目的是尽可能少地执行数据写入磁盘的操作。

  1. Combine 阶段:当所有数据处理完成以后, MapTask 会对所有临时文件进行一次合并,以确保最终只会生成一个数据文件

合并的过程中会不断地进行排序和 Combine 操作,其目的有两个:一是尽量减少每次写人磁盘的数据量;二是尽量减少下一复制阶段网络传输的数据量。最后合并成了一个已分区且已排序的文件。

执行shuffle

1582129765528

  • 每一个Mapper进程都有一个环形的内存缓冲区,用来存储Map的输出数据,这个内存缓冲区的默认大小是100MB,当数据达到阙值0.8,也就是80MB的时候,一个后台的程序就会把数据溢写到磁盘中。在将数据溢写到磁盘的过程中要经过复杂的过程,首先要将数据进行分区排序(按照分区号如0,1,2),分区完以后为了避免Map输出数据的内存溢出,可以将Map的输出数据分为各个小文件再进行分区,这样map的输出数据就会被分为了具有多个小文件的分区已排序过的数据。然后将各个小文件分区数据进行合并成为一个大的文件(将各个小文件中分区号相同的进行合并)。

  • 这个时候Reducer启动了三个分别为0,1,2。0号Reducer会取得0号分区 的数据;1号Reducer会取得1号分区的数据;2号Reducer会取得2号分区的数据。

执行ReduceTask

ReduceTask工作机制

  1. Copy 阶段: Reduce 会从各个 MapTask 上远程复制一片数据(每个 MapTask 传来的数据都是有序的),并针对某一片数据,如果其大小超过一定國值,则写到磁盘上,否则直接放到内存中
  2. Merge 阶段:在远程复制数据的同时, ReduceTask 会启动两个后台线程,分别对内存和磁盘上的文件进行合并,以防止内存使用过多或者磁盘文件过多。
  3. Sort 阶段:用户编写 reduce() 方法输入数据是按 key 进行聚集的一组数据。

为了将 key 相同的数据聚在一起, Hadoop 采用了基于排序的策略。由于各个 MapTask 已经实现对自己的处理结果进行了局部排序,因此, ReduceTask 只需对所有数据进行一次归并排序即可。

  1. Reduce 阶段:对排序后的键值对调用 reduce() 方法,键相等的键值对调用一次 reduce()方法,每次调用会产生零个或者多个键值对,最后把这些输出的键值对写入到 HDFS 中
  2. Write 阶段: reduce() 函数将计算结果写到 HDFS 上。

合并的过程中会产生许多的中间文件(写入磁盘了),但 MapReduce 会让写入磁盘的数据尽可能地少,并且最后一次合并的结果并没有写入磁盘,而是直接输入到 Reduce 函数。

combiner

流程

combiner
  1. Combiner的意义就是对每一个maptask的输出进行局部汇总,以减小网络传输量。

  2. Combiner能够应用的前提是不能影响最终的业务逻辑,而且,Combiner的输出kv应该跟reducer的输入kv类型要对应起来。

  3. Combiner和reducer的区别在于运行的位置:

    Combiner是在每一个maptask所在的节点运行;

    ​ Reducer是接收全局所有Mapper的输出结果。

代码

自定义Combiner:

1
2
3
4
5
6
7
8
9
10
11
public static class MyCombiner extends  Reducer<Text, LongWritable, Text, LongWritable> {
protected void reduce(
Text key, Iterable<LongWritable> values,Context context)throws IOException, InterruptedException {

long count = 0L;
for (LongWritable value : values) {
count += value.get();
}
context.write(key, new LongWritable(count));
};
}

在主类中添加:

1
2
3
Combiner设置
// 设置Map规约Combiner
job.setCombinerClass(MyCombiner.class);

partitioner

在进行MapReduce计算时,有时候需要把最终的输出数据分到不同的文件中,比如按照省份划分的话,需要把同一省份的数据放到一个文件中;按照性别划分的话,需要把同一性别的数据放到一个文件中。负责实现划分数据的类称作Partitioner。

HashPartitioner(默认)

源码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
package org.apache.hadoop.mapreduce.lib.partition;

import org.apache.hadoop.mapreduce.Partitioner;

/** Partition keys by their {@link Object#hashCode()}. */
public class HashPartitioner<K, V> extends Partitioner<K, V> {

/** Use {@link Object#hashCode()} to partition. */
public int getPartition(K key, V value,
int numReduceTasks) {
//默认使用key的hash值与上int的最大值,避免出现数据溢出 的情况
return (key.hashCode() & Integer.MAX_VALUE) % numReduceTasks;
}

}

key、value分别指的是Mapper任务的输出,numReduceTasks指的是设置的Reducer任务数量,默认值是1。那么任何整数与1相除的余数肯定是0。也就是说getPartition(…)方法的返回值总是0。也就是Mapper任务的输出总是送给一个Reducer任务,最终只能输出到一个文件中。

自定义Partitioner

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.mapreduce.Partitioner;

public class FivePartitioner extends Partitioner<IntWritable, IntWritable>{
/**
* 我们的需求:按照能否被5除尽去分区
* 1、如果除以5的余数是0, 放在0号分区
* 2、如果除以5的余数不是0, 放在1分区
*/
@Override
public int getPartition(IntWritable key, IntWritable value, int numPartitions) {

int intValue = key.get();

if(intValue % 5 == 0){
return 0;
}else{
return 1;
}
}
}

再在主函数里加入如下两行代码即可:

1
2
job.setPartitionerClass(FivePartitioner.class);
job.setNumReduceTasks(2);//设置为2

序列化和反序列化

Java的序列化是一个重量级序列化框架(Serializable),一个对象被序列化后,会附带很多额外的信息(各种校验信息,header,继承体系等),不便于在网络中高效传输。所以,hadoop自己开发了一套序列化机制(Writable),精简、高效。
自定义bean对象要想序列化传输步骤及注意事项:
  (1)必须实现Writable接口
  (2)反序列化时,需要反射调用空参构造函数,所以必须有空参构造
  (3)重写序列化方法
  (4)重写反序列化方法
  (5)注意反序列化的顺序和序列化的顺序完全一致
  (6)要想把结果显示在文件中,需要重写toString(),且用”\t”分开,方便后续用
  (7)如果需要将自定义的bean放在key中传输,则还需要实现comparable接口,因为mapreduce框中的shuffle过程一定会对key进行排序

InputSplit

FileInputFormat源码解析(input.getSplits(job))
(1)找到你数据存储的目录。
(2)开始遍历处理(规划切片)目录下的每一个文件。
(3)遍历第一个文件ss.txt。
  a)获取文件大小fs.sizeOf(ss.txt);。
  b)计算切片大小

1
computeSliteSize(Math.max(minSize,Math.min(maxSize,blocksize)))=blocksize=128M

  c)默认情况下,切片大小=blocksize
  d)开始切,形成第1个切片:ss.txt—0:128M 第2个切片ss.txt—128:256M 第3个切片ss.txt—256M:300M(每次切片时,都要判断切完剩下的部分是否大于块的1.1倍,不大于1.1倍就划分一块切片)。
  e)将切片信息写到一个切片规划文件中。
  f)整个切片的核心过程在getSplit()方法中完成。
  g)数据切片只是在逻辑上对输入数据进行分片,并不会再磁盘上将其切分成分片进行存储。InputSplit只记录了分片的元数据信息,比如起始位置、长度以及所在的节点列表等。
  h)注意:block是HDFS上物理上存储的存储的数据,切片是对数据逻辑上的划分。
(4)提交切片规划文件到yarn上,yarn上的MrAppMaster就可以根据切片规划文件计算开启maptask个数

一个job的map和reduce的数

map数量

1
splitSize=max{minSize,min{maxSize,blockSize}}

由处理的数据分成的block数量决定default_num = total_size / split_size,即切片个数。

reduce数量

job.setNumReduceTasks(x);x 为reduce的数量。不设置的话默认为 1。

MapReduce中的排序

部分排序、全排序、辅助排序、二次排序、自定义排序

发生的阶段:
map side:发生在spill后partition前。
reduce side:发生在copy后 reduce前。

缓存机制(Distributedcache)

在进行join操作的时候,如果一个表很大,另一个表很小,我们就可以将这个小表进行广播处理,即每个计算节点上都存一份,然后进行map端的连接操作

MapReduce无法提速的场景

数据量很小。
繁杂的小文件。
索引是更好的存取机制的时候。
事务处理。
只有一台机器的时候。

实现 TopN

可以自定义groupingcomparator,对结果进行最大值排序,然后再reduce输出时,控制只输出前n个数。就达到了topn输出的目的。

实现wordcount

定义一个mapper类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
//首先要定义四个泛型的类型
//keyin: LongWritable valuein: Text
//keyout: Text valueout:IntWritable

public class WordCountMapper extends Mapper<LongWritable, Text, Text, IntWritable>{
//map方法的生命周期: 框架每传一行数据就被调用一次
//key : 这一行的起始点在文件中的偏移量
//value: 这一行的内容
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
//拿到一行数据转换为string
String line = value.toString();
//将这一行切分出各个单词
String[] words = line.split(" ");
//遍历数组,输出<单词,1>
for(String word:words){
context.write(new Text(word), new IntWritable(1));
}
}
}

定义一个reducer类

1
2
3
4
5
6
7
8
9
10
11
12
//生命周期:框架每传递进来一个kv 组,reduce方法被调用一次  
@Override
protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
//定义一个计数器
int count = 0;
//遍历这一组kv的所有v,累加到count中
for(IntWritable value:values){
count += value.get();
}
context.write(key, new IntWritable(count));
}
}

定义一个主类,用来描述job并提交job

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
public class WordCountRunner {
//把业务逻辑相关的信息(哪个是mapper,哪个是reducer,要处理的数据在哪里,输出的结果放哪里……)描述成一个job对象
//把这个描述好的job提交给集群去运行
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
Job wcjob = Job.getInstance(conf);
//指定我这个job所在的jar包
// wcjob.setJar("/home/hadoop/wordcount.jar");
wcjob.setJarByClass(WordCountRunner.class);

wcjob.setMapperClass(WordCountMapper.class);
wcjob.setReducerClass(WordCountReducer.class);
//设置我们的业务逻辑Mapper类的输出key和value的数据类型
wcjob.setMapOutputKeyClass(Text.class);
wcjob.setMapOutputValueClass(IntWritable.class);
//设置我们的业务逻辑Reducer类的输出key和value的数据类型
wcjob.setOutputKeyClass(Text.class);
wcjob.setOutputValueClass(IntWritable.class);

//指定要处理的数据所在的位置
FileInputFormat.setInputPaths(wcjob, "hdfs://hdp-server01:9000/wordcount/data/big.txt");
//指定处理完成之后的结果所保存的位置
FileOutputFormat.setOutputPath(wcjob, new Path("hdfs://hdp-server01:9000/wordcount/output/"));

//向yarn集群提交这个job
boolean res = wcjob.waitForCompletion(true);
System.exit(res?0:1);
}

执行MapReduce常见的问题

  • client对集群中HDFS的操作没有权限

在集群配置文件hdfs-site.xml,然后重启

1
2
3
4
5
<property>
<name>dfs.permissions</name>
<value>false</value>
</property>

  • mapreduce的输出路径已存在,必须先删除掉那个路径

  • 提交集群运行,运行失败

1
job.setJar("/home/hadoop/wordcount.jar");
  • 日志打不出来,报警告信息
1
2
3
log4j:WARN No appenders could be found for logger (org.apache.hadoop.metrics2.lib.MutableMetricsFactory).  
log4j:WARN Please initialize the log4j system properly.
log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.

需要在项目的src下面新建file名为log4j.properties的文件

yarn

yarn三大组件

ResourceManager负责所有资源的监控、分配和管理;

ApplicationMaster负责每一个具体应用程序的调度和协调;

NodeManager负责每一个节点的维护。

MapReduce程序在yarn上的执行流程

Hadoop jar xxx.jar

MapReduce程序在yarn上的执行流程
  1. 客户端向集群提交一个任务,该任务首先到ResourceManager中的ApplicationManager;
  2. ApplicationManager收到任务之后,会在集群中找一个NodeManager,并在该NodeManager所在DataNode上启动一个AppMaster进程,该进程用于进行任务的划分和任务的监控;
  3. AppMaster启动起来之后,会向ResourceManager中的ApplicationManager注册其信息(目的是与之通信);
  4. AppMaster向ResourceManager下的ResourceScheduler申请计算任务所需的资源;
  5. AppMaster申请到资源之后,会与所有的NodeManager通信要求它们启动计算任务所需的任务(Map和Reduce);
  6. 各个NodeManager启动对应的容器用来执行Map和Reduce任务;
  7. 各个任务会向AppMaster汇报自己的执行进度和执行状况,以便让AppMaster随时掌握各个任务的运行状态,在某个任务出了问题之后重启执行该任务;
  8. 在任务执行完之后,AppMaster向ApplicationManager汇报,以便让ApplicationManager注销并关闭自己,使得资源得以回收;

调度器(scheduler)

FIFO Scheduler

FIFOScheduler

维持一个先入先出队列,按时间顺序执行任务。

Capacity Scheduler

CapacityScheduler

支持多个队列,每个队列可配置一定的资源量,每个队列采用FIFO调度策略.

(2.7.2版本默认)

为了防止同一个用户的作业独占队列中的资源,该调度器会对同一用户提交

的作业所占资源量进行限定:

  • 选择队列:计算每个队列中正在运行的任务数与其应该分得的计算资源之间的比

值,选择一个该比值最小的队列——最闲的。

  • 选择作业:按照作业优先级和提交时间顺序,同时考虑用户资源量限制和内存限

制对队列内任务排序。

Fair Scheduler

FairScheduler

支持多个队列,每个队列内部按照缺额大小分配资源启动任务,同一时间队列中有多个任务执行。队列的并行度大于等于队列的个数。

缺额:每个job理想情况下获得的计算资源与实际获得的计算资源存在的差距。

容错性

MRAppMaster容错性

  一旦运行失败,由YARN的ResourceManager负责重新启动,最多重启次数可由用户设置,默认是2次。一旦超过最高重启次数,则作业运行失败。

Map Task/Reduce容错性

  Task Task周期性向MRAppMaster汇报心跳;一旦Task挂掉,则MRAppMaster将为之重新申请资源,并运行之。最多重新运行次数可由用户设置,默认4次。

数据压缩算法

常用的压缩算法有bzip2、gzip、lzo、snappy,其中lzo、snappy需要操作系统安装native库才可以支持。

一般用Snappy,特点速度快,缺点无法切分(可以回答在链式 MR 中,Reduce 端输出使用 bzip2 压缩,以便后续的 map 任务对数据进行 split)

优化

MapReduce跑得慢的原因

  • 计算机性能
    CPU、内存、磁盘健康、网络
  • I/O 操作优化
    (1)数据倾斜
      (2)map和reduce数设置不合理
      (3)reduce等待过久
      (4)小文件过多
      (5)大量的不可分块的超大文件
      (6)spill次数过多
      (7)merge次数过多等

MapReduce优化方法

数据输入

  • 合并小文件:

    在执行mr任务前将小文件进行合并,大量的小文件会产生大量的map任务,增大map任务装载次数,而任务的装载比较耗时,从而导致mr运行较慢。

    (Hadoop Archive、Sequence file、CombineFileInputFormat)

  • 采用ConbinFileInputFormat来作为输入,解决输入端大量小文件场景。

map阶段

  • 减少spill次数:

    通过调整io.sort.mb及sort.spill.percent参数值,增大触发spill的内存上限,减少spill次数,从而减少磁盘 IO。

  • 减少merge次数:

    通过调整io.sort.factor参数,增大merge的文件数目,减少merge的次数,从而缩短mr处理时间。

  • 在 map 之后先进行combine处理,减少I/O。

reduce阶段

  • 合理设置map和reduce数:

    两个都不能设置太少,也不能设置太多。太少,会导致task等待,延长处理时间;太多,会导致 map、reduce任务间竞争资源,造成处理超时等错误。

  • 设置map、reduce共存:

    调整slowstart.completedmaps参数,使map运行到一定程度后,reduce也开始运行,减少reduce的等待时间。

  • 规避使用reduce:

    因为Reduce在用于连接数据集的时候将会产生大量的网络消耗。

  • 合理设置reduce端的buffer:

    默认情况下,数据达到一个阈值的时候,buffer中的数据就会写入磁盘,然后reduce会从磁盘中获得所有的数据。也就是说,buffer和reduce是没有直接关联的,中间多个一个写磁盘->读磁盘的过程,既然有这个弊端,那么就可以通过参数来配置,使得buffer中的一部分数据可以直接输送到reduce,从而减少IO开销:mapred.job.reduce.input.buffer.percent,默认为0.0。当值大于0的时候,会保留指定比例的内存读buffer中的数据直接拿给reduce使用。这样一来,设置buffer需要内存,读取数据需要内存,reduce计算也要内存,所以要根据作业的运行情况进行调整。

IO传输

  • 采用数据压缩的方式,减少网络IO的时间。安装Snappy和LZOP压缩编码器。
  • 使用SequenceFile二进制文件

数据倾斜问题

  • 提前在map 进行 combine,减少传输的数据量

在 Mapper 加上 combiner 相当于提前进行 reduce,即把一个 Mapper 中的相同 key 进行

了聚合,减少 shuffle 过程中传输的数据量,以及 Reducer 端的计算量。

如果导致数据倾斜的 key 大量分布在不同的 mapper 的时候,这种方法就不是很有效了。

  • 导致数据倾斜的key 大量分布在不同的 mapper

(1)局部聚合加全局聚合。

第一次在 map 阶段对那些导致了数据倾斜的 key 加上 1 到 n 的随机前缀,这样本来相

同的 key 也会被分到多个 Reducer 中进行局部聚合,数量就会大大降低。

第二次 mapreduce,去掉 key 的随机前缀,进行全局聚合。

思想:二次 mr,第一次将 key 随机散列到不同 reducer 进行处理达到负载均衡目的。第

二次再根据去掉 key 的随机前缀,按原 key 进行 reduce 处理。

这个方法进行两次 mapreduce,性能稍差。

2)增加 Reducer,提升并行度

1
JobConf.setNumReduceTasks(int)

3)实现自定义分区

根据数据分布情况,自定义散列函数,将 key 均匀分配到不同 Reducer