Hadoop 学习总结

Hadoop [字体···] [宽度···]


hadoop 架构

hadoop1.0 与 2.0、3.0 架构的变化。

image.png

image.png

Hadoop 由三个模块组成:

  • 分布式存储 HDFS
  • 分布式计算 MapReduce
  • 资源调度引擎 Yarn

主从中的各个角色:

  • HDFS 模块主从架构:

    • namenode:主节点,主要负责集群的管理以及元数据信息管理
    • datanode:从节点,主要负责存储用户数据
    • secondaryNameNode:辅助 namenode 管理元数据信息,以及元数据信息的冷备份
  • Yarn 模块:

    • ResourceManager:主节点,主要负责资源分配
    • NodeManager:从节点,主要负责执行任务

一个三结点的 hadoop 集群各个角色线程线程运行情况。

image.png

HDFS

HDFS 是 Hadoop 的分布式文件系统,在 HDFS 中文件以 block 块为单位存储,在集群中 block 会保存多份副本,在新版本的 hadoop 支持机架感知,如果设置 block 会被均匀的分配存储到不同的机架上,最大限度避免物理故障对数据造成影响。

分块存储、副本

分块存储

  • 保存文件到 HDFS 时,会先默认按 128M 的单位对文件进行切分成一个个block 块
  • 数据以 block 块的形式存在 HDFS 文件系统中
    • 在 hadoop1 当中,文件的 block 块默认大小是 64M
    • hadoop2 当中,文件的 block 块大小默认是 128M,block 块的大小可以通过 hdfs-site.xml 当中的配置文件进行指定
<property>
    <name>dfs.blocksize</name>
    <value>块大小 以字节为单位</value><!-- 只写数值就可以 -->
</property>
  • block 块元数据(*.meta),每个 block 块对应一个元数据信息

image.png

副本存储

  • 为了保证 block 块的安全性,在 hadoop2 当中,采用文件默认保存三个副本,我们可以更改副本数以提高数据的安全性
  • 在 hdfs-site.xml 当中修改以下配置属性,即可更改文件的副本数
    <property>
          <name>dfs.replication</name>
          <value>3</value>
    </property>

HDFS 架构

image.png

HDFS 集群包括,NameNode 和 DataNode 以及 Secondary Namenode。

  • NameNode 负责管理整个文件系统的元数据,以及每一个路径(文件)所对应的数据块信息。
  • DataNode 负责管理用户的文件数据块,每一个数据块都可以在多个 datanode 上存储多个副本。
  • Secondary NameNode 用来监控 HDFS 状态的辅助后台程序,每隔一段时间获取 HDFS 元数据的快照。最主要作用是辅助 namenode 管理元数据信息

image.png

HDFS shell 命令

使用命令的方式,两种:

hdfs dfs -help ls
hadoop fs -help ls

HDFS 的 shell 命令与 Linux 命令很相似,可以通过帮助查看后使用,通过程序操作是也提供了与 shell 命令相似的方法。

输入 hadoop fs 或 hdfs dfs 查看帮助文档。

[hadoop@node01 /app/hadoop-3.2.2]$ hdfs dfs
Usage: hadoop fs [generic options]
	[-appendToFile <localsrc> ... <dst>]
	[-cat [-ignoreCrc] <src> ...]
	[-checksum <src> ...]
	[-chgrp [-R] GROUP PATH...]
	[-chmod [-R] <MODE[,MODE]... | OCTALMODE> PATH...]
	[-chown [-R] [OWNER][:[GROUP]] PATH...]
	[-copyFromLocal [-f] [-p] [-l] [-d] [-t <thread count>] <localsrc> ... <dst>]
	[-copyToLocal [-f] [-p] [-ignoreCrc] [-crc] <src> ... <localdst>]
	[-count [-q] [-h] [-v] [-t [<storage type>]] [-u] [-x] [-e] <path> ...]
	[-cp [-f] [-p | -p[topax]] [-d] <src> ... <dst>]
	[-createSnapshot <snapshotDir> [<snapshotName>]]
	[-deleteSnapshot <snapshotDir> <snapshotName>]
	[-df [-h] [<path> ...]]
	[-du [-s] [-h] [-v] [-x] <path> ...]
	[-expunge [-immediate]]
	[-find <path> ... <expression> ...]
	[-get [-f] [-p] [-ignoreCrc] [-crc] <src> ... <localdst>]
	[-getfacl [-R] <path>]
	[-getfattr [-R] {-n name | -d} [-e en] <path>]
	[-getmerge [-nl] [-skip-empty-file] <src> <localdst>]
	[-head <file>]
	[-help [cmd ...]]
	[-ls [-C] [-d] [-h] [-q] [-R] [-t] [-S] [-r] [-u] [-e] [<path> ...]]
	[-mkdir [-p] <path> ...]
	[-moveFromLocal [-f] [-p] [-l] [-d] <localsrc> ... <dst>]
	[-moveToLocal <src> <localdst>]
	[-mv <src> ... <dst>]
	[-put [-f] [-p] [-l] [-d] [-t <thread count>] <localsrc> ... <dst>]
	[-renameSnapshot <snapshotDir> <oldName> <newName>]
	[-rm [-f] [-r|-R] [-skipTrash] [-safely] <src> ...]
	[-rmdir [--ignore-fail-on-non-empty] <dir> ...]
	[-setfacl [-R] [{-b|-k} {-m|-x <acl_spec>} <path>]|[--set <acl_spec> <path>]]
	[-setfattr {-n name [-v value] | -x name} <path>]
	[-setrep [-R] [-w] <rep> <path> ...]
	[-stat [format] <path> ...]
	[-tail [-f] [-s <sleep interval>] <file>]
	[-test -[defswrz] <path>]
	[-text [-ignoreCrc] <src> ...]
	[-touch [-a] [-m] [-t TIMESTAMP ] [-c] <path> ...]
	[-touchz <path> ...]
	[-truncate [-w] <length> <path> ...]
	[-usage [cmd ...]]

Generic options supported are:
-conf <configuration file>        specify an application configuration file
-D <property=value>               define a value for a given property
-fs <file:///|hdfs://namenode:port> specify default filesystem URL to use, overrides 'fs.defaultFS' property from configurations.
-jt <local|resourcemanager:port>  specify a ResourceManager
-files <file1,...>                specify a comma-separated list of files to be copied to the map reduce cluster
-libjars <jar1,...>               specify a comma-separated list of jar files to be included in the classpath
-archives <archive1,...>          specify a comma-separated list of archives to be unarchived on the compute machines

The general command line syntax is:
command [genericOptions] [commandOptions]

HDFS 优缺点

优点:

  1. 高容错性

    1. 数据自动保存多个副本。它通过增加副本的形式,提高容错性。
    2. 某一个副本丢失以后,它可以自动恢复,这是由 HDFS 内部机制自动实现
  2. 适合批处理

    1. 把数据位置暴露给计算框架,通过移动计算而不是移动数据,提高效率
  3. 适合大数据处理

    1. 数据规模:能够处理数据规模达到 GB、TB、甚至 PB 级别的数据。
    2. 文件规模:能够处理百万规模以上的文件数量,数量相当之大。
    3. 节点规模:能够处理 10K 节点的规模。
  4. 流式数据访问

    1. 一次写入,多次读取
    2. 不能随机修改,只能追加。
    3. 它能保证数据的一致性。
  5. 可构建在廉价机器上

    1. 它通过多副本机制,提高可靠性。
    2. 它提供了容错和恢复机制。比如某一个副本丢失,可以通过其它副本来恢复。

缺点:

  1. 不适合低延时数据访问;

    1. 比如毫秒级的来存储、读取数据,这是不行的,它做不到。
    2. 它适合高吞吐率的场景,就是在某一时间内写入大量的数据。
  2. 无法高效的对大量小文件进行存储

    1. 存储大量小文件的话,它会占用 NameNode 大量的内存来存储文件、目录和块信息。这样是不可取的,因为 NameNode 的内存总是有限的。
    2. 小文件存储的寻道时间会超过读取时间,它违反了 HDFS 的设计目标。
  3. 并发写入、文件随机修改

    1. 一个文件只能有一个写,不允许多个线程同时写(租约机制)。
    2. 仅支持数据 append(追加),不支持文件的随机修改。

Java 操作 HDFS

操作 HDFS 的核心接口是 FileSystem,通过这个类可以连接一个 hdfs 服务,然后进行操作。

    // 在hdfs中创建一个目录
    @Test
    public void hdfsTest() throws Exception {
        // configuration
        Configuration conf = new Configuration();

        //conf.set("fs.defaultFS","hdfs://node01:8020");

        // get filesystem
        FileSystem fs = FileSystem.get(conf);

        fs.mkdirs(new Path("/lqc/ideaTest"));

        fs.close();

    }

    //指定目录所属用户
    @Test
    public void mkDirOnHDFS2() throws IOException, URISyntaxException, InterruptedException {
        //配置项
        Configuration configuration = new Configuration();

        //获得文件系统
        FileSystem fileSystem = FileSystem.get(new URI("hdfs://node01:8020"), configuration, "liuqichun");

        //调用方法创建目录
        boolean mkdirs = fileSystem.mkdirs(new Path("/lqc/dir2"));

        //释放资源
        fileSystem.close();
    }


    //创建目录时,指定目录权限
    @Test
    public void mkDirOnHDFS3() throws IOException {
        Configuration configuration = new Configuration();
        configuration.set("fs.defaultFS", "hdfs://node01:8020");

        FileSystem fileSystem = FileSystem.get(configuration);
        FsPermission fsPermission = new FsPermission(FsAction.ALL, FsAction.READ, FsAction.READ);
        boolean isMkdirs = fileSystem.mkdirs(new Path("hdfs://node01:8020/lqc/dir3"), fsPermission);

        if (isMkdirs) {
            System.out.println("目录创建成功");
        }

        fileSystem.close();
    }

    /**
     * 说明:将文件hello.txt上传到/lqc/dir1
     *
     * 如果路径/lqc/dir1不存在,那么结果是:
     * 在hdfs上先创建/kaikeba目录,然后将upload.txt上传到/lqc,并将文件upload.txt重命名为dir1
     *
     * 如果路径/lqc/dir1存在,那么将hello.txt上传到此路径中去
     */
    @Test
    public void uploadFile2HDFS() throws IOException {
        Configuration configuration = new Configuration();
        configuration.set("fs.defaultFS", "hdfs://node01:8020");
        FileSystem fs = FileSystem.get(configuration);

        Path uploadPath = new Path("/lqc/dir1");

        boolean dir1IsExist = fs.exists(uploadPath);
        if(!dir1IsExist){
            boolean mkdirs = fs.mkdirs(uploadPath);
            if(mkdirs){
                System.out.println(uploadPath + " 已经创建");
            }
        }

        fs.copyFromLocalFile(new Path("D:\\IDEA_WorkSpace\\hadoopstudy\\hello.txt"), uploadPath);//hdfs路径
        fs.close();
    }

    // 文件下载, 有校验机制crc
    @Test
    public void downloadFileFromHDFS() throws IOException {
        Configuration configuration = new Configuration();
        configuration.set("fs.defaultFS", "hdfs://node01:8020");
        FileSystem fileSystem = FileSystem.get(configuration);


        fileSystem.copyToLocalFile(new Path("hdfs://node01:8020/lqc/dir1/hello.txt"),
                new Path("file:///D:\\IDEA_WorkSpace\\hadoopstudy\\downloads\\hello.txt"));

        //删除文件
        //fileSystem.delete()
        //重命名文件
        //fileSystem.rename()
        fileSystem.close();
    }

HDFS datanode 工作机制

image.png

datanode 工作机制

1、datanode 是存放数据的节点,数据以 block 形式存放,每个 block 对应一个元信息文件。

  • block 中存储的是数据
  • 元信息(.meta)中存储的是 block 的长度,数据校验,时间戳等信息

2、datanode 启动后会向 namenode 注册,然后周期性(6 小时)的向 namenode 汇报存储的信息

3、定期与 namenode 进行心跳检测

4、集群环境中可以动态添加和退出 datanode

数据完整性

1、当客户端向 hdfs 写数据时,使用数据校验(CRC)确保网络传输过程中数据不出问题

2、datanode 读取到数据就进行校验 checksum,如果 checksum 不一样说明 block 已经损坏

3、datanode 对其上创建的文件进行周期性的校验(验证 checksum)

HDFS 读写流程

上次文件流程

image.png

文件上传流程如下:

  • 创建文件: ①HDFS client 向 HDFS 写入数据,先调用 DistributedFileSystem.create() ②RPC 调用 namenode 的 create(),会在 HDFS 目录树中指定的路径,添加新文件;并将操作记录在 edits.log 中 namenode.create()方法执行完后,返回一个 FSDataOutputStream,它是 DFSOutputStream 的包装类

  • 建立数据流管道 pipeline ③client 调用 DFSOutputStream.write()写数据(先写第一个块的数据,暂时叫 blk1) ④DFSOutputStream 通过 RPC 调用 namenode 的 addBlock,向 namenode 申请一个空的数据块 block ⑤addBlock 返回 LocatedBlock 对象;此对象中包含了当前 blk 要存储在哪三个 datanode 的信息,比如 dn1、dn2、dn3 ⑥ 客户端,根据位置信息,建立数据流管道(图中蓝色线条)

  • 向数据流管道写当前块的数据 ⑦ 写数据时,先将数据写入一个检验块 chunk 中,写满 512 字节后,对此 chunk 计算校验和 checksum 值(4 字节) ⑧ 然后将 chunk 及对应校验和写入 packet 中,一个 packet 是 64KB ⑨ 随着源源不断的带校验和的 chunk 写入 packet,当 packet 写满后,将 packet 写入 dataqueue 数据队列中 ⑩packet 从队列中取出,沿 pipeline 发送到 dn1,再从 dn1 发送到 dn2,再从 dn2 发送到 dn3 ⑪ 同时,此 packet 会保存一份到一个确认队列 ack queue 中 ⑫packet 到达最后一个 datanode 即 dn3 后,做校验,将校验结果逆着 pipeline 方向回传到客户端,具体是校验结果从 dn3 传到 dn2,dn2 也会做校验,校验结果再 传到 dn1,dn1 也做校验;结果再传回客户端 ⑬ 客户端根据校验结果,如果“成功”,则将将保存在 ack queue 中的 packet 删除;如果失败,则将 packet 取出,重新放回到 data queue 末尾,等待再次沿 pipeline 发送 ⑭ 如此,将 block 中的一个数据一个个 packet 发送出去;当此 block 发送完毕, 即 dn1、dn2、dn3 都接受了 blk1 的完整的副本,那么三个 dn 分别 RPC 调用 namenode 的 blockReceivedAndDeleted(), namenode 会更新内存中 block 与 datanode 的对应关系(比如 dn1 上多了一个 blk1 副本)

  • 关闭 dn1、dn2、dn3 构建的 pipeline;且文件还有下一个块时,再从 ④ 开始;直到文件全部数据写完 ⑮ 最终,调用 DFSOutputStream 的 close() ⑯ 客户端调用 namenode 的 complete(),告知 namenode 文件传输完成

HDFS 上次文件源码分析图:

HDFS 上传文件源码分析

读取文件流程

image.png

1、client 端读取 HDFS 文件,client 调用文件系统对象 DistributedFileSystem 的 open 方法

2、返回 FSDataInputStream 对象(对 DFSInputStream 的包装)

3、构造 DFSInputStream 对象时,调用 namenode 的 getBlockLocations 方法,获得 file 的开始若干 block(如 blk1, blk2, blk3, blk4)的存储 datanode(以下简称 dn)列表;针对每个 block 的 dn 列表,会根据网络拓扑做排序,离 client 近的排在前;

4、调用 DFSInputStream 的 read 方法,先读取 blk1 的数据,与 client 最近的 datanode 建立连接,读取数据

5、读取完后,关闭与 dn 建立的流

6、读取下一个 block,如 blk2 的数据(重复步骤 4、5、6)

7、这一批 block 读取完后,再读取下一批 block 的数据(重复 3、4、5、6、7)

8、完成文件数据读取后,调用 FSDataInputStream 的 close 方法

namenode 与 secondaryName 解析

  • NameNode 主要负责集群当中的元数据信息管理,而且元数据信息需要经常随机访问,因为元数据信息必须高效的检索
    • 元数据信息保存在哪里能够==快速检索==呢?
    • 如何保证元数据的持久安全呢?
  • 为了保证元数据信息的快速检索,那么我们就必须将元数据存放在内存当中,因为在内存当中元数据信息能够最快速的检索,那么随着元数据信息的增多(每个 block 块大概占用 150 字节的元数据信息),内存的消耗也会越来越多。
  • 如果所有的元数据信息都存放内存,服务器断电,内存当中所有数据都消失,为了保证元数据的安全持久,元数据信息必须做可靠的持久化
  • 在 hadoop 当中为了持久化存储元数据信息,将所有的元数据信息保存在了 FSImage 文件当中,那么 FSImage 随着时间推移,必然越来越膨胀,FSImage 的操作变得越来越难,为了解决元数据信息的增删改,hadoop 当中还引入了元数据操作日志 edits 文件,edits 文件记录了客户端操作元数据的信息,随着时间的推移,edits 信息也会越来越大,为了解决 edits 文件膨胀的问题,hadoop 当中引入了 secondaryNamenode 来专门做 fsimage 与 edits 文件的合并

image.png

namenode 工作机制

(1)第一次启动 namenode 格式化后,创建 fsimage 和 edits 文件。如果不是第一次启动,直接加载编辑日志和镜像文件到内存。 (2)客户端对元数据进行增删改的请求 (3)namenode 记录操作日志,更新滚动日志。 (4)namenode 在内存中对数据进行增删改查

Secondary NameNode 工作

(1)Secondary NameNode 询问 namenode 是否需要 checkpoint。直接带回 namenode 是否检查结果。 (2)Secondary NameNode 请求执行 checkpoint。 (3)namenode 滚动正在写的 edits 日志 (4)将滚动前的编辑日志和镜像文件拷贝到 Secondary NameNode (5)Secondary NameNode 加载编辑日志和镜像文件到内存,并合并。 (6)生成新的镜像文件 fsimage.chkpoint (7) 拷贝 fsimage.chkpoint 到 namenode (8)namenode 将 fsimage.chkpoint 重新命名成 fsimage

属性解释
dfs.namenode.checkpoint.period3600 秒(即 1 小时)The number of seconds between two periodic checkpoints.
dfs.namenode.checkpoint.txns1000000The Secondary NameNode or CheckpointNode will create a checkpoint of the namespace every ‘dfs.namenode.checkpoint.txns’ transactions, regardless of whether ‘dfs.namenode.checkpoint.period’ has expired.
dfs.namenode.checkpoint.check.period60 秒(1 分钟)The SecondaryNameNode and CheckpointNode will poll the NameNode every ‘dfs.namenode.checkpoint.check.period’ seconds to query the number of uncheckpointed transactions.

FSImage 与 edits 详解

  • 所有的元数据信息都保存在了 FsImage 与 Eidts 文件当中,这两个文件就记录了所有的数据的元数据信息,元数据信息的保存目录配置在了 hdfs-site.xml 当中
<!-- fsimage目录 -->
<property>
  <name>dfs.namenode.name.dir</name>
  <value>file:///kkb/install/hadoop-2.6.0-cdh5.14.2/hadoopDatas/namenodeDatas</value>
</property>
<!-- edit文件目录 -->
<property>
   <name>dfs.namenode.edits.dir</name>
   <value>file:///kkb/install/hadoop-2.6.0-cdh5.14.2/hadoopDatas/dfs/nn/edits</value>
</property>
  • 客户端对 hdfs 进行写文件时会首先被记录在 edits 文件中

    edits 修改时元数据也会更新。

    每次 hdfs 更新时 edits 先更新后,客户端才会看到最新信息。

    fsimage:是 namenode 中关于元数据的镜像,一般称为检查点。

    一般开始时对 namenode 的操作都放在 edits 中,为什么不放在 fsimage 中呢?

    因为 fsimage 是 namenode 的完整的镜像,内容很大,如果每次都加载到内存的话生成树状拓扑结构,这是非常耗内存和 CPU。

    fsimage 内容包含了 namenode 管理下的所有 datanode 中文件及文件 block 及 block 所在的 datanode 的元数据信息。随着 edits 内容增大,就需要在一定时间点和 fsimage 合并。

FSimage 文件当中的文件信息查看

cd  /kkb/install/hadoop-2.6.0-cdh5.14.2/hadoopDatas/namenodeDatas/current
hdfs oiv    #查看帮助信息
hdfs oiv -i fsimage_0000000000000000864 -p XML -o /home/hadoop/fsimage1.xml

edits 当中的文件信息查看

cd /kkb/install/hadoop-2.6.0-cdh5.14.2/hadoopDatas/dfs/nn/edits/current
hdfs oev     #查看帮助信息
hdfs oev -i edits_0000000000000000865-0000000000000000866 -o /home/hadoop/myedit.xml -p XML

mapreduce

MapReduce 编程模型

  • MapReduce 是采用一种分而治之的思想设计出来的分布式计算框架

  • 那什么是分而治之呢?

    • 比如一复杂、计算量大、耗时长的的任务,暂且称为“大任务”;
    • 此时使用单台服务器无法计算或较短时间内计算出结果时,可将此大任务切分成一个个小的任务,小任务分别在不同的服务器上并行的执行;
    • 最终再汇总每个小任务的结果
  • MapReduce 由两个阶段组成:

    • Map 阶段(切分成一个个小的任务)
    • Reduce 阶段(汇总小任务的结果)

image.png

image.png

mapreduce 编程步骤

1. Map 阶段 2 个步骤

  • 第一步:设置 InputFormat 类,将数据切分成 key,value 对;此 kv 对作为第二步的输入

  • 第二步:自定义 map 逻辑,处理我们第一步的传过来的 kv 对数据,然后转换成新的 key,value 对,并输出

2. shuffle 阶段 4 个步骤

  • 第三步:对上一步输出的 key,value 对进行分区。(相同 key 的 kv 对属于同一分区)

  • 第四步:对每个分区的数据按照 key 进行排序

  • 第五步:对分区中的数据进行规约(combine 操作),降低数据的网络拷贝(可选步骤)

  • 第六步:对排序后的 kv 对数据进行分组;分组的过程中,key 相同的 kv 对为一组;将同一组的 kv 对的所有 value 放到一个集合当中(每组数据调用一次 reduce 方法)

3. reduce 阶段 2 个步骤

  • 第七步:对多个 map 的任务进行合并,排序,写 reduce 函数自己的逻辑,对输入的 key,value 对进行处理,转换成新的 key,value 对进行输出

  • 第八步:设置将输出的 key,value 对数据保存到文件中

hadoop 当中常用的数据类型

hadoop 没有使用 Java 自带的数据类型,而是自己封装了一套数据类型,这些数据类型更利于持久化合网络传输。

Java 类型Hadoop Writable 类型
BooleanBooleanWritable
ByteByteWritable
IntIntWritable
FloatFloatWritable
LongLongWritable
DoubleDoubleWritable
StringText
MapMapWritable
ArrayArrayWritable
byte[]BytesWritable

mapreduce main 程序

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

/**
 * 这个类作为mr程序的入口类,这里面写main方法
 */
public class WordCount extends Configured implements Tool {
    /**
     * 实现Tool接口之后,需要实现一个run方法,
     * 这个run方法用于组装我们的程序的逻辑,其实就是组装八个步骤
     *
     * @param args
     * @return
     * @throws Exception
     */
    @Override
    public int run(String[] args) throws Exception {
        /***
         * 第一步:读取文件,解析成key,value对,k1   v1
         * 第二步:自定义map逻辑,接受k1   v1  转换成为新的k2   v2输出
         * 第三步:分区。相同key的数据发送到同一个reduce里面去,key合并,value形成一个集合
         * 第四步:排序   对key2进行排序。字典顺序排序
         * 第五步:规约  combiner过程  调优步骤 可选
         * 第六步:分组
         * 第七步:自定义reduce逻辑接受k2   v2  转换成为新的k3   v3输出
         * 第八步:输出k3  v3 进行保存
         */

        //获取Job对象,组装我们的八个步骤,每一个步骤都是一个class类
        Configuration conf = super.getConf();

        Job job = Job.getInstance(conf, WordCount.class.getSimpleName());

        //判断输出路径,是否存在,如果存在,则删除
        FileSystem fileSystem = FileSystem.get(conf);
        if (fileSystem.exists(new Path(args[1]))) {
            fileSystem.delete(new Path(args[1]), true);
        }

        //实际工作当中,程序运行完成之后一般都是打包到集群上面去运行,打成一个jar包
        //如果要打包到集群上面去运行,必须添加以下设置
        job.setJarByClass(WordCount.class);

        //第一步:读取文件,解析成key,value对,k1:行偏移量  v1:一行文本内容
        job.setInputFormatClass(TextInputFormat.class);
        //指定我们去哪一个路径读取文件
//        TextInputFormat.addInputPath(job,new Path("file:///C:\\Users\\admin\\Desktop\\wordCount_input\\数据"));
        TextInputFormat.addInputPath(job, new Path(args[0]));

        //第二步:自定义map逻辑,接受k1   v1  转换成为新的k2   v2输出
        job.setMapperClass(MyMapper.class);
        //设置map阶段输出的key,value的类型,其实就是k2  v2的类型
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(IntWritable.class);

        //第三步到六步:分区,排序,规约,分组都省略

        //第七步:自定义reduce逻辑
        job.setReducerClass(MyReducer.class);
        //设置key3  value3的类型
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);

        //第八步:输出k3  v3 进行保存
        job.setOutputFormatClass(TextOutputFormat.class);
        //一定要注意,输出路径是需要不存在的,如果存在就报错
//        TextInputFormat.addInputPath(job,new Path("file:///C:\\Users\\admin\\Desktop\\wordCount_output"));
        TextOutputFormat.setOutputPath(job, new Path(args[1]));

        job.setNumReduceTasks(3);

        //提交job任务
        boolean b = job.waitForCompletion(true);
        return b ? 0 : 1;
    }

    /*
    作为程序的入口类
     */
    public static void main(String[] args) throws Exception {
        Configuration configuration = new Configuration();

        //提交run方法之后,得到一个程序的退出状态码
        int run = ToolRunner.run(configuration, new WordCount(), args);
        //根据我们 程序的退出状态码,退出整个进程
        System.exit(run);
    }
}

map task 数量及切片机制

map个数

mapreduce 的 InputFormat

除了 mapreduce 提供的这些输输入类,还可以自定义输入类。

实现步骤:

  1. 继承 FileInputFormat
  2. 实现方法,核心是返回 RecordReader 对象,因此还需要实现一个 RecordReader 类
类名主要作用
TextInputFormat读取文本文件
CombineFileInputFormat在 MR 当中用于合并小文件,将多个小文件合并之后只需要启动一个 mapTask 进行运行
SequenceFileInputFormat处理 SequenceFile 这种格式的数据
KeyValueTextInputFormat通过手动指定分隔符,将每一条数据解析成为 key,value 对类型
NLineInputFormat指定数据的行数作为一个切片
FixedLengthInputFormat文件的每个 record 是固定的长度;用于读取固定宽度的二进制记录

实现 Mapper 方法

import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;

import java.io.IOException;

public class MyMapper extends Mapper<NullWritable, BytesWritable, Text, BytesWritable> {
    /**
     * @param key
     * @param value   小文件的全部内容
     * @param context 上下文,用来写出OUTKEY,OUTVALUE
     */
    @Override
    protected void map(NullWritable key, BytesWritable value, Context context) throws IOException, InterruptedException {
        //文件名
        FileSplit inputSplit = (FileSplit) context.getInputSplit();
        String name = inputSplit.getPath().getName();
        context.write(new Text(name), value);
    }
}

实现 reducer

实现 reducer 时,注意输入输出,reducer 的输入是 mapper 的输出,reducer 的输出是 OutputFormat 的输入。

在 reducer 中主要进行的操作是实现累加,累加这个过程可以加入一些处理逻辑。

实现 reducer 示例:

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;

/**
 * reduce求和计算,combine的时候也可以用此类
 */
public class AnalysisReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
    @Override
    protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
        int sum = 0;
        for (IntWritable v : values) {
            sum += v.get();
        }
        context.write(key, new IntWritable(sum));
    }
}

mapreduce 分区

分区需要继承 Partitioner 接口,实现 getPartition 方法。

  • 在 mapreduce 执行当中,有一个默认的步骤就是 partition 分区;
    • 分区主要的作用就是默认将 key 相同的 kv 对数据发送到同一个分区中;
    • 在 mapreduce 当中有一个抽象类叫做 Partitioner,默认使用的实现类是 HashPartitioner,我们可以通过 HashPartitioner 的源码,查看到分区的逻辑如下
  • MR 编程的第三步就是分区;这一步中决定了 map 生成的每个 kv 对,被分配到哪个分区里

为 job 设置分区器:

job.setPartitionerClass(MyPartitioner.class);

mapreduce 的排序

mapreduce 在 map 计算和分区处理后会对分区中的内容进行排序,默认对 key 进行排序(按字典序)。

  • 对于 MapTask

    • 它会将处理的结果暂时放到环形缓冲区中,当环形缓冲区使用率达到一定阈值后,再对缓冲区中的数据进行一次快速排序
    • 并将这些有序数据溢写到磁盘上
    • 而当数据处理完毕后,它会对磁盘上所有文件进行归并排序。
  • 对于 ReduceTask,它从每个执行完成的 MapTask 上远程拷贝相应的数据文件

    • 如果文件大小超过一定阈值,则溢写磁盘上,否则存储在内存中。
    • 如果磁盘上文件数目达到一定阈值,则进行一次归并排序以生成一个更大文件;
    • 如果内存中文件大小或者数目超过一定阈值,则进行一次合并后将数据溢写到磁盘上。
    • 当所有数据拷贝完毕后,ReduceTask 统一对内存和磁盘上的所有数据进行一次归并排序。
自定义排序

自定义排序需要单独创建 map 和 reduce 输入输出时 key 使用的 JavaBean 对象,这个对象需要实现序列化和比较接口。

JavaBean 示例:

import org.apache.hadoop.io.WritableComparable;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

//bean要能够可序列化且可比较,所以需要实现接口WritableComparable
public class MyBean implements WritableComparable<FlowSortBean> {
    int a;
    int b;

    @Override
    public int compareTo(FlowSortBean o) {
        // 实现比较逻辑,返回正数、0、负数
        return a-o.a
    }

    //序列化,注意:写入写出顺序一定要相同
    @Override
    public void write(DataOutput out) throws IOException {
        out.writeInt(a);
        out.writeInt(b);
    }

    //反序列化
    @Override
    public void readFields(DataInput in) throws IOException {
        this.a = in.readInt();
        this.b = in.readInt();
    }
}

mapreduce 分组

实现分组需要实现 GroupingComparator 类。

关键类 GroupingComparator

  • 决定了 mapreduce 当中 reduce 端哪些数据作为一组,每一组调用一次 reduce 的逻辑
  • 默认是 key 相同的 kv 对,作为同一组;每组调用一次 reduce 方法
  • 通过实现 GroupingComparator 接口,实现自定义的分组逻辑

步骤:

1、继承 WritableComparator 2、重写 compare()方法

@Override
public int compare(WritableComparable a, WritableComparable b) {
        // 比较的业务逻辑
        return result;
}

3、创建一个构造器,将比较对象的类传给父类

protected OrderGroupingComparator() {
        super(OrderBean.class, true);
}

计数器与累加器

  • 计数器是收集作业统计信息的有效手段之一,用于质量控制或应用级统计。计数器还可辅助诊断系统故障。如果需要将日志信息传输到 map 或 reduce 任务, 更好的方法通常是看能否用一个计数器值来记录某一特定事件的发生。对于大型分布式作业而言,使用计数器更为方便。除了因为获取计数器值比输出日志更方便,还有根据计数器值统计特定事件的发生次数要比分析一堆日志文件容易得多。

  • hadoop 内置计数器列表

MapReduce 任务计数器org.apache.hadoop.mapreduce.TaskCounter
文件系统计数器org.apache.hadoop.mapreduce.FileSystemCounter
FileInputFormat 计数器org.apache.hadoop.mapreduce.lib.input.FileInputFormatCounter
FileOutputFormat 计数器org.apache.hadoop.mapreduce.lib.output.FileOutputFormatCounter
作业计数器org.apache.hadoop.mapreduce.JobCounter

计数器使用:

定义计数器(Counter),通过 context 上下文对象可以获取我们的计数器,进行记录。

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Counter;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

public class MyMapper extends Mapper<LongWritable, Text, FlowSortBean, NullWritable> {

    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        //自定义我们的计数器,这里实现了统计map输入数据的条数
        Counter counter = context.getCounter("MR_COUNT", "MapRecordCounter");
        counter.increment(1L);
	//...
    }
}

map task 工作机制

mapreduce完整流程

  • (1)Read 阶段:MapTask 通过用户编写的 RecordReader,从输入 InputSplit 中解析出一个个 key/value。

  • (2)Map 阶段:该节点主要是将解析出的 key/value 交给用户编写 map()函数处理,并产生一系列新的 key/value。

  • (3)Collect 收集阶段:在用户编写 map()函数中,当数据处理完成后,一般会调用 OutputCollector.collect()输出结果。在该函数内部,它会将生成的 key/value 分区(调用 Partitioner),并写入一个环形内存缓冲区中。

  • (4)Spill 阶段:即“溢写”,当环形缓冲区满 80%后,MapReduce 会将数据写到本地磁盘上,生成一个临时文件。需要注意的是,将数据写入本地磁盘之前,先要对数据进行一次本地排序,并在必要时对数据进行合并、压缩等操作。

  • 溢写阶段详情:

    • 步骤 1:利用快速排序算法对缓存区内的数据进行排序,排序方式是,先按照分区编号 Partition 进行排序,然后按照 key 进行排序。这样,经过排序后,数据以分区为单位聚集在一起,且同一分区内所有数据按照 key 有序。

    • 步骤 2:按照分区编号由小到大依次将每个分区中的数据写入任务工作目录下的临时文件 output/spillN.out(N 表示当前溢写次数)中。如果用户设置了 Combiner,则写入文件之前,对每个分区中的数据进行一次聚集操作。

    • 步骤 3:将分区数据的元信息写到内存索引数据结构 SpillRecord 中,其中每个分区的元信息包括,在临时文件中的偏移量、压缩前数据大小和压缩后数据大小。如果当前内存索引大小超过 1MB,则将内存索引写到文件 output/spillN.out.index 中。

    • (5)合并阶段:当所有数据处理完成后,MapTask 对所有临时文件进行一次合并,以确保最终只会生成一个数据文件。

    • 当所有数据处理完后,MapTask 会将所有临时文件合并成一个大文件,并保存到文件 output/file.out 中,同时生成相应的索引文件 output/file.out.index。

    • 在进行文件合并过程中,MapTask 以分区为单位进行合并。对于某个分区,它将采用多轮递归合并的方式。每轮合并 io.sort.factor(默认 10)个文件,并将产生的文件重新加入待合并列表中,对文件排序后,重复以上过程,直到最终得到一个大文件。

    • 让每个 MapTask 最终只生成一个数据文件,可避免同时打开大量文件和同时读取大量小文件产生的随机读取带来的开销。

reduce 工作机制

reduce简图.png

  • (1)Copy 阶段:ReduceTask 从各个 MapTask 上远程拷贝一片数据,并针对某一片数据,如果其大小超过一定阈值,则写到磁盘上,否则直接放到内存中。

  • (2)Merge 阶段:在远程拷贝数据的同时,ReduceTask 启动了两个后台线程对内存和磁盘上的文件进行合并,以防止内存使用过多或磁盘上文件过多。

  • (3)Sort 阶段:当所有 map task 的分区数据全部拷贝完,按照 MapReduce 语义,用户编写 reduce()函数输入数据是按 key 进行聚集的一组数据。为了将 key 相同的数据聚在一起,Hadoop 采用了基于排序的策略。由于各个 MapTask 已经实现对自己的处理结果进行了局部排序,因此,ReduceTask 只需对所有数据进行一次归并排序即可。

  • (4)Reduce 阶段:reduce()函数将计算结果写到 HDFS 上。

设置 ReduceTask 并行度(个数)

ReduceTask 的并行度同样影响整个 Job 的执行并发度和执行效率,但与 MapTask 的并发数由切片数决定不同,ReduceTask 数量的决定是可以直接手动设置:

// 默认值是1,手动设置为4
job.setNumReduceTasks(4);

Hadoop 数据处理实践

以下这个例子是对日志中的信息登录信息(手机号)进行统计,最后将不同的手机号分类写到文件。

项目依赖 pox.xml

    <properties>
        <hadoop.version>3.2.2</hadoop.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-client</artifactId>
            <version>${hadoop.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-common</artifactId>
            <version>${hadoop.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-hdfs</artifactId>
            <version>${hadoop.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-mapreduce-client-core</artifactId>
            <version>${hadoop.version}</version>
        </dependency>

        <!-- https://mvnrepository.com/artifact/junit/junit -->
        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>4.12</version>
            <!--<scope>test</scope>-->
        </dependency>

        <dependency>
            <groupId>org.testng</groupId>
            <artifactId>testng</artifactId>
            <version>RELEASE</version>
        </dependency>

        <dependency>
            <groupId>log4j</groupId>
            <artifactId>log4j</artifactId>
            <version>1.2.17</version>
        </dependency>

    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.0</version>
                <configuration>
                    <source>1.8</source>
                    <target>1.8</target>
                    <encoding>UTF-8</encoding>
                    <!--   <verbal>true</verbal>-->
                </configuration>
            </plugin>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-shade-plugin</artifactId>
                <version>2.4.3</version>
                <executions>
                    <execution>
                        <phase>package</phase>
                        <goals>
                            <goal>shade</goal>
                        </goals>
                        <configuration>
                            <minimizeJar>true</minimizeJar>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>

第一步:实现 Mapper

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

/**
 * Description: 截取日志中的手机号
 *
 * @author: liuqichun
 */
public class AnalysisMapper extends Mapper<LongWritable, Text, Text, IntWritable> {

    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        if(value==null) return;
        String[] splits = value.toString().split(" ");
        if (splits.length < 8) return;

        String info = splits[7];
        if(info != null && !info.isEmpty() && info.contains("【用户登录】")){
            int lastIndex = info.lastIndexOf(":");
            String phone = info.substring(lastIndex + 1);
            if(!phone.isEmpty()){
                context.write(new Text(phone) ,new IntWritable(1));
            }
        }
    }
}

第二步:实现 reducer

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;

/**
 * reduce求和计算,combine的时候也可以用此类
 * @author: liuqichun
 */
public class AnalysisReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
    @Override
    protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
        int sum = 0;
        // 求和
        for (IntWritable v : values) {
            sum += v.get();
        }
        // 写出
        context.write(key, new IntWritable(sum));
    }
}

第三步:实现分区

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Partitioner;

/**
 * 根据手机号前缀分区,把不同手机号开头结果的放到不同文件
 *
 * 10~13:A文件
 * 14~17:B文件
 * 18~19:C文件
 *
 * @author: liuqichun
 */
public class AnalysisPartitioner extends Partitioner<Text, IntWritable> {
    @Override
    public int getPartition(Text text, IntWritable intWritable, int numPartitions) {
        String prefix = text.toString().substring(0, 2);
        int val = Integer.parseInt(prefix);
        // 返回分区的索引即可实现分区
        if(val < 15){
            return 0;
        }else if(val<18)
            return 1;
        else {
            return 2;
        }
    }

}

第四步:实现 main 程序

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

/**
 * Description: 登录日志分析程序,日志进行分布式处理,统计每个手机号登录登录次数
 *
 * @author: liuqichun
 */
public class AnalysisMain extends Configured implements Tool {

    public static void main(String[] args) throws Exception {
        int run = ToolRunner.run(new Configuration(), new AnalysisMain(), args);
        System.exit(run);
    }

    @Override
    public int run(String[] args) throws Exception {

        FileSystem fileSystem = FileSystem.get(super.getConf());

        // 作业job
        Job job = Job.getInstance(super.getConf(), "AnalysisUserLogin");
        job.setJarByClass(AnalysisMain.class);

        // 输入数据
        job.setInputFormatClass(TextInputFormat.class);

        // 读取并过滤文件
        RemoteIterator<LocatedFileStatus> fileIterator = fileSystem.listFiles(new Path("hdfs://node01:8020/project-log/userlogin"), false);
        StringBuilder sb = new StringBuilder(2048);
        while (fileIterator.hasNext()){
            LocatedFileStatus file = fileIterator.next();
            String path = file.getPath().toString();
            if(path.contains("part-r-")){
                sb.append(path).append(",");
            }
        }

        TextInputFormat.addInputPaths(job,sb.substring(0,sb.length()-1));

        // map计算
        job.setMapperClass(AnalysisMapper.class);
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(IntWritable.class);

        // combine, map task中进行reduce
        job.setCombinerClass(AnalysisReducer.class);

        // 分区
        job.setPartitionerClass(AnalysisPartitioner.class);
        // 设置reduce作业数,每个task输出一个文件
        job.setNumReduceTasks(3);

        // 排序按照默认

        //reduce
        job.setReducerClass(AnalysisReducer.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);

        // 输出数据
        job.setOutputFormatClass(TextOutputFormat.class);
        Path outputPath = new Path("hdfs://node01:8020/project-log/userlogin_output");
        if (fileSystem.exists(outputPath)) {
            fileSystem.delete(outputPath,true);
        }

        TextOutputFormat.setOutputPath(job, outputPath);

        boolean b = job.waitForCompletion(true);

        return b ? 0 : 1;
    }
}

第五步:打包提交集群运行

1、在 idea 中将项目打包为 jar

2、运行

# 格式: hadoop jar <jar包> <mapreduce main程序> [args...]
hadoop jar original-hadoop-study-1.0-SNAPSHOT.jar com.elltor.userlogincount.analysis.AnalysisMain

3、执行结果

原实日志数据为多个日志文件,部分内容如下:

[30mSMPE-ADMIN- 2021-04-09 15:35:54 [http-nio-8001-exec-4] INFO m.m.s.c.AuthorizationController - 【用户登录】用户手机号已注册。手机号:15935508617
[30mSMPE-ADMIN- 2021-04-09 15:35:54 [http-nio-8001-exec-4] INFO m.m.s.c.AuthorizationController - 【用户登录】用户手机号已注册。手机号:15936533317
[30mSMPE-ADMIN- 2021-04-13 14:47:52 [http-nio-8000-exec-5] INFO m.m.s.c.AuthorizationController - 【用户登录】用户手机号已注册。手机号:15090382096
[30mSMPE-ADMIN- 2021-04-14 15:04:28 [http-nio-8000-exec-1] INFO m.m.s.c.AuthorizationController - 【用户登录】用户手机号已注册。手机号:13555555555
[30mSMPE-ADMIN- 2021-04-14 15:04:28 [http-nio-8000-exec-1] INFO m.m.s.c.AuthorizationController - 【用户登录】用户手机号已注册。手机号:13555555555
[30mSMPE-ADMIN- 2021-04-17 10:58:20 [http-nio-8000-exec-6] INFO m.m.s.c.AuthorizationController - 【用户登录】用户手机号已注册。手机号:18888888888
[30mSMPE-ADMIN- 2021-04-17 10:58:20 [http-nio-8000-exec-6] INFO m.m.s.c.AuthorizationController - 【用户登录】用户手机号已注册。手机号:18888888888
[30mSMPE-ADMIN- 2021-04-17 17:16:47 [http-nio-8000-exec-10] INFO m.m.s.c.AuthorizationController - 【用户登录】用户手机号已注册。手机号:18888888888
[30mSMPE-ADMIN- 2021-04-20 16:59:19 [http-nio-8000-exec-8] INFO m.m.s.c.AuthorizationController - 【用户登录】用户手机号已注册。手机号:15090382096
[30mSMPE-ADMIN- 2021-04-20 16:59:19 [http-nio-8000-exec-8] INFO m.m.s.c.AuthorizationController - 【用户登录】用户手机号已注册。手机号:15090382096
[30mSMPE-ADMIN- 2021-04-22 17:21:18 [http-nio-8000-exec-3] INFO m.m.s.c.AuthorizationController - 【用户登录】用户手机号已注册。手机号:15500000000
[30mSMPE-ADMIN- 2021-04-22 17:21:18 [http-nio-8000-exec-3] INFO m.m.s.c.AuthorizationController - 【用户登录】用户手机号已注册。手机号:15500000000
[30mSMPE-ADMIN- 2021-04-24 10:27:11 [http-nio-8000-exec-6] INFO m.m.s.c.AuthorizationController - 【用户登录】用户手机号已注册。手机号:15143308338

处理后的结果(部分):

image.png

Top↑