Hadoop学习总结

hadoop架构

hadoop1.0与2.0、3.0架构的变化。

image.png

image.png

Hadoop由三个模块组成:

  • 分布式存储HDFS
  • 分布式计算MapReduce
  • 资源调度引擎Yarn

主从中的各个角色:

  • HDFS模块主从架构:

    • namenode:主节点,主要负责集群的管理以及元数据信息管理
    • datanode:从节点,主要负责存储用户数据
    • secondaryNameNode:辅助namenode管理元数据信息,以及元数据信息的冷备份
  • Yarn模块:

    • ResourceManager:主节点,主要负责资源分配
    • NodeManager:从节点,主要负责执行任务

一个三结点的hadoop集群各个角色线程线程运行情况。

image.png

HDFS

HDFS是Hadoop的分布式文件系统,在HDFS中文件以block块为单位存储,在集群中block会保存多份副本,在新版本的hadoop支持机架感知,如果设置block会被均匀的分配存储到不同的机架上,最大限度避免物理故障对数据造成影响。

分块存储、副本

分块存储

  • 保存文件到HDFS时,会先默认按128M的单位对文件进行切分成一个个block块
  • 数据以block块的形式存在HDFS文件系统中
    • 在hadoop1当中,文件的block块默认大小是64M
    • hadoop2当中,文件的block块大小默认是128M,block块的大小可以通过hdfs-site.xml当中的配置文件进行指定
<property>
    <name>dfs.blocksize</name>
    <value>块大小 以字节为单位</value><!-- 只写数值就可以 -->
</property>
  • block块元数据(*.meta),每个block块对应一个元数据信息

image.png

副本存储

  • 为了保证block块的安全性,在hadoop2当中,采用文件默认保存三个副本,我们可以更改副本数以提高数据的安全性
  • 在hdfs-site.xml当中修改以下配置属性,即可更改文件的副本数
    <property>
          <name>dfs.replication</name>
          <value>3</value>
    </property>

HDFS架构

image.png

HDFS集群包括,NameNode和DataNode以及Secondary Namenode。

  • NameNode负责管理整个文件系统的元数据,以及每一个路径(文件)所对应的数据块信息。
  • DataNode 负责管理用户的文件数据块,每一个数据块都可以在多个datanode上存储多个副本。
  • Secondary NameNode用来监控HDFS状态的辅助后台程序,每隔一段时间获取HDFS元数据的快照。最主要作用是辅助namenode管理元数据信息

image.png

HDFS shell命令

使用命令的方式,两种:

hdfs dfs -help ls
hadoop fs -help ls 

HDFS的shell命令与Linux命令很相似,可以通过帮助查看后使用,通过程序操作是也提供了与shell命令相似的方法。

输入 hadoop fs 或 hdfs dfs查看帮助文档。

[hadoop@node01 /app/hadoop-3.2.2]$ hdfs dfs
Usage: hadoop fs [generic options]
	[-appendToFile <localsrc> ... <dst>]
	[-cat [-ignoreCrc] <src> ...]
	[-checksum <src> ...]
	[-chgrp [-R] GROUP PATH...]
	[-chmod [-R] <MODE[,MODE]... | OCTALMODE> PATH...]
	[-chown [-R] [OWNER][:[GROUP]] PATH...]
	[-copyFromLocal [-f] [-p] [-l] [-d] [-t <thread count>] <localsrc> ... <dst>]
	[-copyToLocal [-f] [-p] [-ignoreCrc] [-crc] <src> ... <localdst>]
	[-count [-q] [-h] [-v] [-t [<storage type>]] [-u] [-x] [-e] <path> ...]
	[-cp [-f] [-p | -p[topax]] [-d] <src> ... <dst>]
	[-createSnapshot <snapshotDir> [<snapshotName>]]
	[-deleteSnapshot <snapshotDir> <snapshotName>]
	[-df [-h] [<path> ...]]
	[-du [-s] [-h] [-v] [-x] <path> ...]
	[-expunge [-immediate]]
	[-find <path> ... <expression> ...]
	[-get [-f] [-p] [-ignoreCrc] [-crc] <src> ... <localdst>]
	[-getfacl [-R] <path>]
	[-getfattr [-R] {-n name | -d} [-e en] <path>]
	[-getmerge [-nl] [-skip-empty-file] <src> <localdst>]
	[-head <file>]
	[-help [cmd ...]]
	[-ls [-C] [-d] [-h] [-q] [-R] [-t] [-S] [-r] [-u] [-e] [<path> ...]]
	[-mkdir [-p] <path> ...]
	[-moveFromLocal [-f] [-p] [-l] [-d] <localsrc> ... <dst>]
	[-moveToLocal <src> <localdst>]
	[-mv <src> ... <dst>]
	[-put [-f] [-p] [-l] [-d] [-t <thread count>] <localsrc> ... <dst>]
	[-renameSnapshot <snapshotDir> <oldName> <newName>]
	[-rm [-f] [-r|-R] [-skipTrash] [-safely] <src> ...]
	[-rmdir [--ignore-fail-on-non-empty] <dir> ...]
	[-setfacl [-R] [{-b|-k} {-m|-x <acl_spec>} <path>]|[--set <acl_spec> <path>]]
	[-setfattr {-n name [-v value] | -x name} <path>]
	[-setrep [-R] [-w] <rep> <path> ...]
	[-stat [format] <path> ...]
	[-tail [-f] [-s <sleep interval>] <file>]
	[-test -[defswrz] <path>]
	[-text [-ignoreCrc] <src> ...]
	[-touch [-a] [-m] [-t TIMESTAMP ] [-c] <path> ...]
	[-touchz <path> ...]
	[-truncate [-w] <length> <path> ...]
	[-usage [cmd ...]]

Generic options supported are:
-conf <configuration file>        specify an application configuration file
-D <property=value>               define a value for a given property
-fs <file:///|hdfs://namenode:port> specify default filesystem URL to use, overrides 'fs.defaultFS' property from configurations.
-jt <local|resourcemanager:port>  specify a ResourceManager
-files <file1,...>                specify a comma-separated list of files to be copied to the map reduce cluster
-libjars <jar1,...>               specify a comma-separated list of jar files to be included in the classpath
-archives <archive1,...>          specify a comma-separated list of archives to be unarchived on the compute machines

The general command line syntax is:
command [genericOptions] [commandOptions]

HDFS优缺点

优点:

  1. 高容错性

    1. 数据自动保存多个副本。它通过增加副本的形式,提高容错性。
    2. 某一个副本丢失以后,它可以自动恢复,这是由 HDFS 内部机制自动实现
  2. 适合批处理

    1. 把数据位置暴露给计算框架,通过移动计算而不是移动数据,提高效率
  3. 适合大数据处理

    1. 数据规模:能够处理数据规模达到 GB、TB、甚至PB级别的数据。
    2. 文件规模:能够处理百万规模以上的文件数量,数量相当之大。
    3. 节点规模:能够处理10K节点的规模。
  4. 流式数据访问

    1. 一次写入,多次读取
    2. 不能随机修改,只能追加。
    3. 它能保证数据的一致性。
  5. 可构建在廉价机器上

    1. 它通过多副本机制,提高可靠性。
    2. 它提供了容错和恢复机制。比如某一个副本丢失,可以通过其它副本来恢复。

缺点:

  1. 不适合低延时数据访问;

    1. 比如毫秒级的来存储、读取数据,这是不行的,它做不到。
    2. 它适合高吞吐率的场景,就是在某一时间内写入大量的数据。
  2. 无法高效的对大量小文件进行存储

    1. 存储大量小文件的话,它会占用 NameNode大量的内存来存储文件、目录和块信息。这样是不可取的,因为NameNode的内存总是有限的。
    2. 小文件存储的寻道时间会超过读取时间,它违反了HDFS的设计目标。
  3. 并发写入、文件随机修改

    1. 一个文件只能有一个写,不允许多个线程同时写(租约机制)。
    2. 仅支持数据 append(追加),不支持文件的随机修改。

Java操作HDFS

操作HDFS的核心接口是FileSystem,通过这个类可以连接一个hdfs服务,然后进行操作。

    // 在hdfs中创建一个目录
    @Test
    public void hdfsTest() throws Exception {
        // configuration
        Configuration conf = new Configuration();

        //conf.set("fs.defaultFS","hdfs://node01:8020");

        // get filesystem
        FileSystem fs = FileSystem.get(conf);

        fs.mkdirs(new Path("/lqc/ideaTest"));

        fs.close();

    }

    //指定目录所属用户
    @Test
    public void mkDirOnHDFS2() throws IOException, URISyntaxException, InterruptedException {
        //配置项
        Configuration configuration = new Configuration();

        //获得文件系统
        FileSystem fileSystem = FileSystem.get(new URI("hdfs://node01:8020"), configuration, "liuqichun");

        //调用方法创建目录
        boolean mkdirs = fileSystem.mkdirs(new Path("/lqc/dir2"));

        //释放资源
        fileSystem.close();
    }

    
    //创建目录时,指定目录权限
    @Test
    public void mkDirOnHDFS3() throws IOException {
        Configuration configuration = new Configuration();
        configuration.set("fs.defaultFS", "hdfs://node01:8020");

        FileSystem fileSystem = FileSystem.get(configuration);
        FsPermission fsPermission = new FsPermission(FsAction.ALL, FsAction.READ, FsAction.READ);
        boolean isMkdirs = fileSystem.mkdirs(new Path("hdfs://node01:8020/lqc/dir3"), fsPermission);

        if (isMkdirs) {
            System.out.println("目录创建成功");
        }

        fileSystem.close();
    }

    /**
     * 说明:将文件hello.txt上传到/lqc/dir1
     * 
     * 如果路径/lqc/dir1不存在,那么结果是:
     * 在hdfs上先创建/kaikeba目录,然后将upload.txt上传到/lqc,并将文件upload.txt重命名为dir1
     * 
     * 如果路径/lqc/dir1存在,那么将hello.txt上传到此路径中去
     */
    @Test
    public void uploadFile2HDFS() throws IOException {
        Configuration configuration = new Configuration();
        configuration.set("fs.defaultFS", "hdfs://node01:8020");
        FileSystem fs = FileSystem.get(configuration);

        Path uploadPath = new Path("/lqc/dir1");

        boolean dir1IsExist = fs.exists(uploadPath);
        if(!dir1IsExist){
            boolean mkdirs = fs.mkdirs(uploadPath);
            if(mkdirs){
                System.out.println(uploadPath + " 已经创建");
            }
        }

        fs.copyFromLocalFile(new Path("D:\\IDEA_WorkSpace\\hadoopstudy\\hello.txt"), uploadPath);//hdfs路径
        fs.close();
    }

    // 文件下载, 有校验机制crc
    @Test
    public void downloadFileFromHDFS() throws IOException {
        Configuration configuration = new Configuration();
        configuration.set("fs.defaultFS", "hdfs://node01:8020");
        FileSystem fileSystem = FileSystem.get(configuration);


        fileSystem.copyToLocalFile(new Path("hdfs://node01:8020/lqc/dir1/hello.txt"),
                new Path("file:///D:\\IDEA_WorkSpace\\hadoopstudy\\downloads\\hello.txt"));

        //删除文件
        //fileSystem.delete()
        //重命名文件
        //fileSystem.rename()
        fileSystem.close();
    }

HDFS datanode工作机制

image.png

datanode工作机制

1、datanode是存放数据的节点,数据以block形式存放,每个block对应一个元信息文件。

  • block中存储的是数据
  • 元信息(.meta)中存储的是block的长度,数据校验,时间戳等信息

2、datanode启动后会向namenode注册,然后周期性(6小时)的向namenode汇报存储的信息

3、定期与namenode进行心跳检测

4、集群环境中可以动态添加和退出datanode

数据完整性

1、当客户端向hdfs写数据时,使用数据校验(CRC)确保网络传输过程中数据不出问题

2、datanode读取到数据就进行校验checksum,如果checksum不一样说明block已经损坏

3、datanode对其上创建的文件进行周期性的校验(验证checksum)

HDFS读写流程

上次文件流程

image.png

文件上传流程如下:

  • 创建文件:
    ①HDFS client向HDFS写入数据,先调用DistributedFileSystem.create()
    ②RPC调用namenode的create(),会在HDFS目录树中指定的路径,添加新文件;并将操作记录在edits.log中
    namenode.create()方法执行完后,返回一个FSDataOutputStream,它是DFSOutputStream的包装类

  • 建立数据流管道pipeline
    ③client调用DFSOutputStream.write()写数据(先写第一个块的数据,暂时叫blk1)
    ④DFSOutputStream通过RPC调用namenode的addBlock,向namenode申请一个空的数据块block
    ⑤addBlock返回LocatedBlock对象;此对象中包含了当前blk要存储在哪三个datanode的信息,比如dn1、dn2、dn3
    ⑥客户端,根据位置信息,建立数据流管道(图中蓝色线条)

  • 向数据流管道写当前块的数据
    ⑦写数据时,先将数据写入一个检验块chunk中,写满512字节后,对此chunk计算校验和checksum值(4字节)
    ⑧然后将chunk及对应校验和写入packet中,一个packet是64KB
    ⑨随着源源不断的带校验和的chunk写入packet,当packet写满后,将packet写入dataqueue数据队列中
    ⑩packet从队列中取出,沿pipeline发送到dn1,再从dn1发送到dn2,再从dn2发送到dn3
    ⑪同时,此packet会保存一份到一个确认队列ack queue中
    ⑫packet到达最后一个datanode即dn3后,做校验,将校验结果逆着pipeline方向回传到客户端,具体是校验结果从dn3传到dn2,dn2也会做校验,校验结果再
    传到dn1,dn1也做校验;结果再传回客户端
    ⑬客户端根据校验结果,如果“成功”,则将将保存在ack queue中的packet删除;如果失败,则将packet取出,重新放回到data queue末尾,等待再次沿pipeline发送
    ⑭如此,将block中的一个数据一个个packet发送出去;当此block发送完毕,
    即dn1、dn2、dn3都接受了blk1的完整的副本,那么三个dn分别RPC调用namenode的blockReceivedAndDeleted(),
    namenode会更新内存中block与datanode的对应关系(比如dn1上多了一个blk1副本)

  • 关闭dn1、dn2、dn3构建的pipeline;且文件还有下一个块时,再从④开始;直到文件全部数据写完
    ⑮最终,调用DFSOutputStream的close()
    ⑯客户端调用namenode的complete(),告知namenode文件传输完成

HDFS上次文件源码分析图:

HDFS 上传文件源码分析

读取文件流程

image.png

1、client端读取HDFS文件,client调用文件系统对象DistributedFileSystem的open方法

2、返回FSDataInputStream对象(对DFSInputStream的包装)

3、构造DFSInputStream对象时,调用namenode的getBlockLocations方法,获得file的开始若干block(如blk1, blk2, blk3, blk4)的存储datanode(以下简称dn)列表;针对每个block的dn列表,会根据网络拓扑做排序,离client近的排在前;

4、调用DFSInputStream的read方法,先读取blk1的数据,与client最近的datanode建立连接,读取数据

5、读取完后,关闭与dn建立的流

6、读取下一个block,如blk2的数据(重复步骤4、5、6)

7、这一批block读取完后,再读取下一批block的数据(重复3、4、5、6、7)

8、完成文件数据读取后,调用FSDataInputStream的close方法

namenode与secondaryName解析

  • NameNode主要负责集群当中的元数据信息管理,而且元数据信息需要经常随机访问,因为元数据信息必须高效的检索
    • 元数据信息保存在哪里能够==快速检索==呢?
    • 如何保证元数据的持久安全呢?
  • 为了保证元数据信息的快速检索,那么我们就必须将元数据存放在内存当中,因为在内存当中元数据信息能够最快速的检索,那么随着元数据信息的增多(每个block块大概占用150字节的元数据信息),内存的消耗也会越来越多。
  • 如果所有的元数据信息都存放内存,服务器断电,内存当中所有数据都消失,为了保证元数据的安全持久,元数据信息必须做可靠的持久化
  • 在hadoop当中为了持久化存储元数据信息,将所有的元数据信息保存在了FSImage文件当中,那么FSImage随着时间推移,必然越来越膨胀,FSImage的操作变得越来越难,为了解决元数据信息的增删改,hadoop当中还引入了元数据操作日志edits文件,edits文件记录了客户端操作元数据的信息,随着时间的推移,edits信息也会越来越大,为了解决edits文件膨胀的问题,hadoop当中引入了secondaryNamenode来专门做fsimage与edits文件的合并

image.png

namenode工作机制

(1)第一次启动namenode格式化后,创建fsimage和edits文件。如果不是第一次启动,直接加载编辑日志和镜像文件到内存。
(2)客户端对元数据进行增删改的请求
(3)namenode记录操作日志,更新滚动日志。
(4)namenode在内存中对数据进行增删改查

Secondary NameNode工作

(1)Secondary NameNode询问namenode是否需要checkpoint。直接带回namenode是否检查结果。
(2)Secondary NameNode请求执行checkpoint。
(3)namenode滚动正在写的edits日志
(4)将滚动前的编辑日志和镜像文件拷贝到Secondary NameNode
(5)Secondary NameNode加载编辑日志和镜像文件到内存,并合并。
(6)生成新的镜像文件fsimage.chkpoint
(7) 拷贝fsimage.chkpoint到namenode
(8)namenode将fsimage.chkpoint重新命名成fsimage

属性解释
dfs.namenode.checkpoint.period3600秒(即1小时)The number of seconds between two periodic checkpoints.
dfs.namenode.checkpoint.txns1000000The Secondary NameNode or CheckpointNode will create a checkpoint of the namespace every 'dfs.namenode.checkpoint.txns' transactions, regardless of whether 'dfs.namenode.checkpoint.period' has expired.
dfs.namenode.checkpoint.check.period60秒(1分钟)The SecondaryNameNode and CheckpointNode will poll the NameNode every 'dfs.namenode.checkpoint.check.period' seconds to query the number of uncheckpointed transactions.

FSImage与edits详解

  • 所有的元数据信息都保存在了FsImage与Eidts文件当中,这两个文件就记录了所有的数据的元数据信息,元数据信息的保存目录配置在了hdfs-site.xml当中
<!-- fsimage目录 -->
<property>
  <name>dfs.namenode.name.dir</name>
  <value>file:///kkb/install/hadoop-2.6.0-cdh5.14.2/hadoopDatas/namenodeDatas</value>
</property>
<!-- edit文件目录 -->
<property>
   <name>dfs.namenode.edits.dir</name>
   <value>file:///kkb/install/hadoop-2.6.0-cdh5.14.2/hadoopDatas/dfs/nn/edits</value>
</property>
  • 客户端对hdfs进行写文件时会首先被记录在edits文件中

    edits修改时元数据也会更新。

    每次hdfs更新时edits先更新后,客户端才会看到最新信息。

    fsimage:是namenode中关于元数据的镜像,一般称为检查点。

    一般开始时对namenode的操作都放在edits中,为什么不放在fsimage中呢?

    因为fsimage是namenode的完整的镜像,内容很大,如果每次都加载到内存的话生成树状拓扑结构,这是非常耗内存和CPU。

    fsimage内容包含了namenode管理下的所有datanode中文件及文件block及block所在的datanode的元数据信息。随着edits内容增大,就需要在一定时间点和fsimage合并。

FSimage文件当中的文件信息查看

cd  /kkb/install/hadoop-2.6.0-cdh5.14.2/hadoopDatas/namenodeDatas/current
hdfs oiv    #查看帮助信息
hdfs oiv -i fsimage_0000000000000000864 -p XML -o /home/hadoop/fsimage1.xml

edits当中的文件信息查看

cd /kkb/install/hadoop-2.6.0-cdh5.14.2/hadoopDatas/dfs/nn/edits/current
hdfs oev     #查看帮助信息
hdfs oev -i edits_0000000000000000865-0000000000000000866 -o /home/hadoop/myedit.xml -p XML

mapreduce

MapReduce编程模型

  • MapReduce是采用一种分而治之的思想设计出来的分布式计算框架

  • 那什么是分而治之呢?

    • 比如一复杂、计算量大、耗时长的的任务,暂且称为“大任务”;
    • 此时使用单台服务器无法计算或较短时间内计算出结果时,可将此大任务切分成一个个小的任务,小任务分别在不同的服务器上并行的执行;
    • 最终再汇总每个小任务的结果
  • MapReduce由两个阶段组成:

    • Map阶段(切分成一个个小的任务)
    • Reduce阶段(汇总小任务的结果)

image.png

image.png

mapreduce编程步骤

1. Map阶段2个步骤

  • 第一步:设置InputFormat类,将数据切分成key,value对;此kv对作为第二步的输入

  • 第二步:自定义map逻辑,处理我们第一步的传过来的kv对数据,然后转换成新的key,value对,并输出

2. shuffle阶段4个步骤

  • 第三步:对上一步输出的key,value对进行分区。(相同key的kv对属于同一分区)

  • 第四步:对每个分区的数据按照key进行排序

  • 第五步:对分区中的数据进行规约(combine操作),降低数据的网络拷贝(可选步骤)

  • 第六步:对排序后的kv对数据进行分组;分组的过程中,key相同的kv对为一组;将同一组的kv对的所有value放到一个集合当中(每组数据调用一次reduce方法)

3. reduce阶段2个步骤

  • 第七步:对多个map的任务进行合并,排序,写reduce函数自己的逻辑,对输入的key,value对进行处理,转换成新的key,value对进行输出

  • 第八步:设置将输出的key,value对数据保存到文件中

hadoop当中常用的数据类型

hadoop没有使用Java自带的数据类型,而是自己封装了一套数据类型,这些数据类型更利于持久化合网络传输。

Java类型Hadoop Writable类型
BooleanBooleanWritable
ByteByteWritable
IntIntWritable
FloatFloatWritable
LongLongWritable
DoubleDoubleWritable
StringText
MapMapWritable
ArrayArrayWritable
byte[]BytesWritable

mapreduce main程序

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

/**
 * 这个类作为mr程序的入口类,这里面写main方法
 */
public class WordCount extends Configured implements Tool {
    /**
     * 实现Tool接口之后,需要实现一个run方法,
     * 这个run方法用于组装我们的程序的逻辑,其实就是组装八个步骤
     *
     * @param args
     * @return
     * @throws Exception
     */
    @Override
    public int run(String[] args) throws Exception {
        /***
         * 第一步:读取文件,解析成key,value对,k1   v1
         * 第二步:自定义map逻辑,接受k1   v1  转换成为新的k2   v2输出
         * 第三步:分区。相同key的数据发送到同一个reduce里面去,key合并,value形成一个集合
         * 第四步:排序   对key2进行排序。字典顺序排序
         * 第五步:规约  combiner过程  调优步骤 可选
         * 第六步:分组
         * 第七步:自定义reduce逻辑接受k2   v2  转换成为新的k3   v3输出
         * 第八步:输出k3  v3 进行保存
         */

        //获取Job对象,组装我们的八个步骤,每一个步骤都是一个class类
        Configuration conf = super.getConf();

        Job job = Job.getInstance(conf, WordCount.class.getSimpleName());

        //判断输出路径,是否存在,如果存在,则删除
        FileSystem fileSystem = FileSystem.get(conf);
        if (fileSystem.exists(new Path(args[1]))) {
            fileSystem.delete(new Path(args[1]), true);
        }

        //实际工作当中,程序运行完成之后一般都是打包到集群上面去运行,打成一个jar包
        //如果要打包到集群上面去运行,必须添加以下设置
        job.setJarByClass(WordCount.class);

        //第一步:读取文件,解析成key,value对,k1:行偏移量  v1:一行文本内容
        job.setInputFormatClass(TextInputFormat.class);
        //指定我们去哪一个路径读取文件
//        TextInputFormat.addInputPath(job,new Path("file:///C:\\Users\\admin\\Desktop\\wordCount_input\\数据"));
        TextInputFormat.addInputPath(job, new Path(args[0]));

        //第二步:自定义map逻辑,接受k1   v1  转换成为新的k2   v2输出
        job.setMapperClass(MyMapper.class);
        //设置map阶段输出的key,value的类型,其实就是k2  v2的类型
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(IntWritable.class);

        //第三步到六步:分区,排序,规约,分组都省略

        //第七步:自定义reduce逻辑
        job.setReducerClass(MyReducer.class);
        //设置key3  value3的类型
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);

        //第八步:输出k3  v3 进行保存
        job.setOutputFormatClass(TextOutputFormat.class);
        //一定要注意,输出路径是需要不存在的,如果存在就报错
//        TextInputFormat.addInputPath(job,new Path("file:///C:\\Users\\admin\\Desktop\\wordCount_output"));
        TextOutputFormat.setOutputPath(job, new Path(args[1]));

        job.setNumReduceTasks(3);

        //提交job任务
        boolean b = job.waitForCompletion(true);
        return b ? 0 : 1;
    }

    /*
    作为程序的入口类
     */
    public static void main(String[] args) throws Exception {
        Configuration configuration = new Configuration();

        //提交run方法之后,得到一个程序的退出状态码
        int run = ToolRunner.run(configuration, new WordCount(), args);
        //根据我们 程序的退出状态码,退出整个进程
        System.exit(run);
    }
}

map task数量及切片机制

map个数

mapreduce的InputFormat

除了mapreduce提供的这些输输入类,还可以自定义输入类。

实现步骤:

  1. 继承FileInputFormat
  2. 实现方法,核心是返回RecordReader对象,因此还需要实现一个RecordReader类
类名主要作用
TextInputFormat读取文本文件
CombineFileInputFormat在MR当中用于合并小文件,将多个小文件合并之后只需要启动一个mapTask进行运行
SequenceFileInputFormat处理SequenceFile这种格式的数据
KeyValueTextInputFormat通过手动指定分隔符,将每一条数据解析成为key,value对类型
NLineInputFormat指定数据的行数作为一个切片
FixedLengthInputFormat文件的每个record是固定的长度;用于读取固定宽度的二进制记录

实现Mapper方法

import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;

import java.io.IOException;

public class MyMapper extends Mapper<NullWritable, BytesWritable, Text, BytesWritable> {
    /**
     * @param key
     * @param value   小文件的全部内容
     * @param context 上下文,用来写出OUTKEY,OUTVALUE
     */
    @Override
    protected void map(NullWritable key, BytesWritable value, Context context) throws IOException, InterruptedException {
        //文件名
        FileSplit inputSplit = (FileSplit) context.getInputSplit();
        String name = inputSplit.getPath().getName();
        context.write(new Text(name), value);
    }
}

实现reducer

实现reducer时,注意输入输出,reducer的输入是mapper的输出,reducer的输出是OutputFormat的输入。

在reducer中主要进行的操作是实现累加,累加这个过程可以加入一些处理逻辑。

实现reducer示例:

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;

/**
 * reduce求和计算,combine的时候也可以用此类
 */
public class AnalysisReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
    @Override
    protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
        int sum = 0;
        for (IntWritable v : values) {
            sum += v.get();
        }
        context.write(key, new IntWritable(sum));
    }
}

mapreduce 分区

分区需要继承Partitioner接口,实现 getPartition 方法。

  • 在mapreduce执行当中,有一个默认的步骤就是partition分区;
    • 分区主要的作用就是默认将key相同的kv对数据发送到同一个分区中;
    • 在mapreduce当中有一个抽象类叫做Partitioner,默认使用的实现类是HashPartitioner,我们可以通过HashPartitioner的源码,查看到分区的逻辑如下
  • MR编程的第三步就是分区;这一步中决定了map生成的每个kv对,被分配到哪个分区里

为job设置分区器:

job.setPartitionerClass(MyPartitioner.class);

mapreduce 的排序

mapreduce在map计算和分区处理后会对分区中的内容进行排序,默认对key进行排序(按字典序)。

  • 对于MapTask

    • 它会将处理的结果暂时放到环形缓冲区中,当环形缓冲区使用率达到一定阈值后,再对缓冲区中的数据进行一次快速排序
    • 并将这些有序数据溢写到磁盘上
    • 而当数据处理完毕后,它会对磁盘上所有文件进行归并排序。
  • 对于ReduceTask,它从每个执行完成的MapTask上远程拷贝相应的数据文件

    • 如果文件大小超过一定阈值,则溢写磁盘上,否则存储在内存中。
    • 如果磁盘上文件数目达到一定阈值,则进行一次归并排序以生成一个更大文件;
    • 如果内存中文件大小或者数目超过一定阈值,则进行一次合并后将数据溢写到磁盘上。
    • 当所有数据拷贝完毕后,ReduceTask统一对内存和磁盘上的所有数据进行一次归并排序。
自定义排序

自定义排序需要单独创建map和reduce输入输出时key使用的JavaBean对象,这个对象需要实现序列化和比较接口。

JavaBean示例:

import org.apache.hadoop.io.WritableComparable;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

//bean要能够可序列化且可比较,所以需要实现接口WritableComparable
public class MyBean implements WritableComparable<FlowSortBean> {
    int a;
    int b;

    @Override
    public int compareTo(FlowSortBean o) {
        // 实现比较逻辑,返回正数、0、负数
        return a-o.a
    }

    //序列化,注意:写入写出顺序一定要相同
    @Override
    public void write(DataOutput out) throws IOException {
        out.writeInt(a);
        out.writeInt(b);
    }

    //反序列化
    @Override
    public void readFields(DataInput in) throws IOException {
        this.a = in.readInt();
        this.b = in.readInt();
    }
}

mapreduce分组

实现分组需要实现 GroupingComparator 类。

关键类 GroupingComparator

  • 决定了mapreduce当中reduce端哪些数据作为一组,每一组调用一次reduce的逻辑
  • 默认是key相同的kv对,作为同一组;每组调用一次reduce方法
  • 通过实现GroupingComparator接口,实现自定义的分组逻辑

步骤:

1、继承WritableComparator
2、重写compare()方法

@Override
public int compare(WritableComparable a, WritableComparable b) {
        // 比较的业务逻辑
        return result;
}

3、创建一个构造器,将比较对象的类传给父类

protected OrderGroupingComparator() {
        super(OrderBean.class, true);
}

计数器与累加器

  • 计数器是收集作业统计信息的有效手段之一,用于质量控制或应用级统计。计数器还可辅助诊断系统故障。如果需要将日志信息传输到map 或reduce 任务, 更好的方法通常是看能否用一个计数器值来记录某一特定事件的发生。对于大型分布式作业而言,使用计数器更为方便。除了因为获取计数器值比输出日志更方便,还有根据计数器值统计特定事件的发生次数要比分析一堆日志文件容易得多。

  • hadoop内置计数器列表

MapReduce任务计数器org.apache.hadoop.mapreduce.TaskCounter
文件系统计数器org.apache.hadoop.mapreduce.FileSystemCounter
FileInputFormat计数器org.apache.hadoop.mapreduce.lib.input.FileInputFormatCounter
FileOutputFormat计数器org.apache.hadoop.mapreduce.lib.output.FileOutputFormatCounter
作业计数器org.apache.hadoop.mapreduce.JobCounter

计数器使用:

定义计数器(Counter),通过context上下文对象可以获取我们的计数器,进行记录。

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Counter;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

public class MyMapper extends Mapper<LongWritable, Text, FlowSortBean, NullWritable> {

    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        //自定义我们的计数器,这里实现了统计map输入数据的条数
        Counter counter = context.getCounter("MR_COUNT", "MapRecordCounter");
        counter.increment(1L);
	//...
    }
}

map task 工作机制

mapreduce完整流程

  • (1)Read阶段:MapTask通过用户编写的RecordReader,从输入InputSplit中解析出一个个key/value。

  • (2)Map阶段:该节点主要是将解析出的key/value交给用户编写map()函数处理,并产生一系列新的key/value。

  • (3)Collect收集阶段:在用户编写map()函数中,当数据处理完成后,一般会调用OutputCollector.collect()输出结果。在该函数内部,它会将生成的key/value分区(调用Partitioner),并写入一个环形内存缓冲区中。

  • (4)Spill阶段:即“溢写”,当环形缓冲区满80%后,MapReduce会将数据写到本地磁盘上,生成一个临时文件。需要注意的是,将数据写入本地磁盘之前,先要对数据进行一次本地排序,并在必要时对数据进行合并、压缩等操作。

  • 溢写阶段详情:

    • 步骤1:利用快速排序算法对缓存区内的数据进行排序,排序方式是,先按照分区编号Partition进行排序,然后按照key进行排序。这样,经过排序后,数据以分区为单位聚集在一起,且同一分区内所有数据按照key有序。

    • 步骤2:按照分区编号由小到大依次将每个分区中的数据写入任务工作目录下的临时文件output/spillN.out(N表示当前溢写次数)中。如果用户设置了Combiner,则写入文件之前,对每个分区中的数据进行一次聚集操作。

    • 步骤3:将分区数据的元信息写到内存索引数据结构SpillRecord中,其中每个分区的元信息包括,在临时文件中的偏移量、压缩前数据大小和压缩后数据大小。如果当前内存索引大小超过1MB,则将内存索引写到文件output/spillN.out.index中。

    • (5)合并阶段:当所有数据处理完成后,MapTask对所有临时文件进行一次合并,以确保最终只会生成一个数据文件。

    • 当所有数据处理完后,MapTask会将所有临时文件合并成一个大文件,并保存到文件output/file.out中,同时生成相应的索引文件output/file.out.index。

    • 在进行文件合并过程中,MapTask以分区为单位进行合并。对于某个分区,它将采用多轮递归合并的方式。每轮合并io.sort.factor(默认10)个文件,并将产生的文件重新加入待合并列表中,对文件排序后,重复以上过程,直到最终得到一个大文件。

    • 让每个MapTask最终只生成一个数据文件,可避免同时打开大量文件和同时读取大量小文件产生的随机读取带来的开销。

reduce工作机制

reduce简图.png

  • (1)Copy阶段:ReduceTask从各个MapTask上远程拷贝一片数据,并针对某一片数据,如果其大小超过一定阈值,则写到磁盘上,否则直接放到内存中。

  • (2)Merge阶段:在远程拷贝数据的同时,ReduceTask启动了两个后台线程对内存和磁盘上的文件进行合并,以防止内存使用过多或磁盘上文件过多。

  • (3)Sort阶段:当所有map task的分区数据全部拷贝完,按照MapReduce语义,用户编写reduce()函数输入数据是按key进行聚集的一组数据。为了将key相同的数据聚在一起,Hadoop采用了基于排序的策略。由于各个MapTask已经实现对自己的处理结果进行了局部排序,因此,ReduceTask只需对所有数据进行一次归并排序即可。

  • (4)Reduce阶段:reduce()函数将计算结果写到HDFS上。

设置ReduceTask并行度(个数)

ReduceTask的并行度同样影响整个Job的执行并发度和执行效率,但与MapTask的并发数由切片数决定不同,ReduceTask数量的决定是可以直接手动设置:

// 默认值是1,手动设置为4
job.setNumReduceTasks(4);

Hadoop数据处理实践

以下这个例子是对日志中的信息登录信息(手机号)进行统计,最后将不同的手机号分类写到文件。

项目依赖 pox.xml

    <properties>
        <hadoop.version>3.2.2</hadoop.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-client</artifactId>
            <version>${hadoop.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-common</artifactId>
            <version>${hadoop.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-hdfs</artifactId>
            <version>${hadoop.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-mapreduce-client-core</artifactId>
            <version>${hadoop.version}</version>
        </dependency>

        <!-- https://mvnrepository.com/artifact/junit/junit -->
        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>4.12</version>
            <!--<scope>test</scope>-->
        </dependency>

        <dependency>
            <groupId>org.testng</groupId>
            <artifactId>testng</artifactId>
            <version>RELEASE</version>
        </dependency>

        <dependency>
            <groupId>log4j</groupId>
            <artifactId>log4j</artifactId>
            <version>1.2.17</version>
        </dependency>

    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.0</version>
                <configuration>
                    <source>1.8</source>
                    <target>1.8</target>
                    <encoding>UTF-8</encoding>
                    <!--   <verbal>true</verbal>-->
                </configuration>
            </plugin>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-shade-plugin</artifactId>
                <version>2.4.3</version>
                <executions>
                    <execution>
                        <phase>package</phase>
                        <goals>
                            <goal>shade</goal>
                        </goals>
                        <configuration>
                            <minimizeJar>true</minimizeJar>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>

第一步:实现Mapper

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

/**
 * Description: 截取日志中的手机号
 *
 * @author: liuqichun
 */
public class AnalysisMapper extends Mapper<LongWritable, Text, Text, IntWritable> {

    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        if(value==null) return;
        String[] splits = value.toString().split(" ");
        if (splits.length < 8) return;

        String info = splits[7];
        if(info != null && !info.isEmpty() && info.contains("【用户登录】")){
            int lastIndex = info.lastIndexOf(":");
            String phone = info.substring(lastIndex + 1);
            if(!phone.isEmpty()){
                context.write(new Text(phone) ,new IntWritable(1));
            }
        }
    }
}

第二步:实现reducer

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;

/**
 * reduce求和计算,combine的时候也可以用此类
 * @author: liuqichun
 */
public class AnalysisReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
    @Override
    protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
        int sum = 0;
        // 求和
        for (IntWritable v : values) {
            sum += v.get();
        }
        // 写出
        context.write(key, new IntWritable(sum));
    }
}

第三步:实现分区

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Partitioner;

/**
 * 根据手机号前缀分区,把不同手机号开头结果的放到不同文件
 *
 * 10~13:A文件
 * 14~17:B文件
 * 18~19:C文件
 *
 * @author: liuqichun
 */
public class AnalysisPartitioner extends Partitioner<Text, IntWritable> {
    @Override
    public int getPartition(Text text, IntWritable intWritable, int numPartitions) {
        String prefix = text.toString().substring(0, 2);
        int val = Integer.parseInt(prefix);
        // 返回分区的索引即可实现分区
        if(val < 15){
            return 0;
        }else if(val<18)
            return 1;
        else {
            return 2;
        }
    }

}

第四步:实现main程序

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

/**
 * Description: 登录日志分析程序,日志进行分布式处理,统计每个手机号登录登录次数
 *
 * @author: liuqichun
 */
public class AnalysisMain extends Configured implements Tool {

    public static void main(String[] args) throws Exception {
        int run = ToolRunner.run(new Configuration(), new AnalysisMain(), args);
        System.exit(run);
    }

    @Override
    public int run(String[] args) throws Exception {

        FileSystem fileSystem = FileSystem.get(super.getConf());

        // 作业job
        Job job = Job.getInstance(super.getConf(), "AnalysisUserLogin");
        job.setJarByClass(AnalysisMain.class);

        // 输入数据
        job.setInputFormatClass(TextInputFormat.class);

        // 读取并过滤文件
        RemoteIterator<LocatedFileStatus> fileIterator = fileSystem.listFiles(new Path("hdfs://node01:8020/project-log/userlogin"), false);
        StringBuilder sb = new StringBuilder(2048);
        while (fileIterator.hasNext()){
            LocatedFileStatus file = fileIterator.next();
            String path = file.getPath().toString();
            if(path.contains("part-r-")){
                sb.append(path).append(",");
            }
        }

        TextInputFormat.addInputPaths(job,sb.substring(0,sb.length()-1));

        // map计算
        job.setMapperClass(AnalysisMapper.class);
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(IntWritable.class);

        // combine, map task中进行reduce
        job.setCombinerClass(AnalysisReducer.class);

        // 分区
        job.setPartitionerClass(AnalysisPartitioner.class);
        // 设置reduce作业数,每个task输出一个文件
        job.setNumReduceTasks(3);

        // 排序按照默认

        //reduce
        job.setReducerClass(AnalysisReducer.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);

        // 输出数据
        job.setOutputFormatClass(TextOutputFormat.class);
        Path outputPath = new Path("hdfs://node01:8020/project-log/userlogin_output");
        if (fileSystem.exists(outputPath)) {
            fileSystem.delete(outputPath,true);
        }

        TextOutputFormat.setOutputPath(job, outputPath);

        boolean b = job.waitForCompletion(true);

        return b ? 0 : 1;
    }
}

第五步:打包提交集群运行

1、在idea中将项目打包为jar

2、运行

# 格式: hadoop jar <jar包> <mapreduce main程序> [args...]
hadoop jar original-hadoop-study-1.0-SNAPSHOT.jar com.elltor.userlogincount.analysis.AnalysisMain

3、执行结果

原实日志数据为多个日志文件,部分内容如下:

[30mSMPE-ADMIN- 2021-04-09 15:35:54 [http-nio-8001-exec-4] INFO m.m.s.c.AuthorizationController - 【用户登录】用户手机号已注册。手机号:15935508617
[30mSMPE-ADMIN- 2021-04-09 15:35:54 [http-nio-8001-exec-4] INFO m.m.s.c.AuthorizationController - 【用户登录】用户手机号已注册。手机号:15936533317
[30mSMPE-ADMIN- 2021-04-13 14:47:52 [http-nio-8000-exec-5] INFO m.m.s.c.AuthorizationController - 【用户登录】用户手机号已注册。手机号:15090382096
[30mSMPE-ADMIN- 2021-04-14 15:04:28 [http-nio-8000-exec-1] INFO m.m.s.c.AuthorizationController - 【用户登录】用户手机号已注册。手机号:13555555555
[30mSMPE-ADMIN- 2021-04-14 15:04:28 [http-nio-8000-exec-1] INFO m.m.s.c.AuthorizationController - 【用户登录】用户手机号已注册。手机号:13555555555
[30mSMPE-ADMIN- 2021-04-17 10:58:20 [http-nio-8000-exec-6] INFO m.m.s.c.AuthorizationController - 【用户登录】用户手机号已注册。手机号:18888888888
[30mSMPE-ADMIN- 2021-04-17 10:58:20 [http-nio-8000-exec-6] INFO m.m.s.c.AuthorizationController - 【用户登录】用户手机号已注册。手机号:18888888888
[30mSMPE-ADMIN- 2021-04-17 17:16:47 [http-nio-8000-exec-10] INFO m.m.s.c.AuthorizationController - 【用户登录】用户手机号已注册。手机号:18888888888
[30mSMPE-ADMIN- 2021-04-20 16:59:19 [http-nio-8000-exec-8] INFO m.m.s.c.AuthorizationController - 【用户登录】用户手机号已注册。手机号:15090382096
[30mSMPE-ADMIN- 2021-04-20 16:59:19 [http-nio-8000-exec-8] INFO m.m.s.c.AuthorizationController - 【用户登录】用户手机号已注册。手机号:15090382096
[30mSMPE-ADMIN- 2021-04-22 17:21:18 [http-nio-8000-exec-3] INFO m.m.s.c.AuthorizationController - 【用户登录】用户手机号已注册。手机号:15500000000
[30mSMPE-ADMIN- 2021-04-22 17:21:18 [http-nio-8000-exec-3] INFO m.m.s.c.AuthorizationController - 【用户登录】用户手机号已注册。手机号:15500000000
[30mSMPE-ADMIN- 2021-04-24 10:27:11 [http-nio-8000-exec-6] INFO m.m.s.c.AuthorizationController - 【用户登录】用户手机号已注册。手机号:15143308338

处理后的结果(部分):

image.png

评论

Your browser is out-of-date!

Update your browser to view this website correctly. Update my browser now

×