Path of learning Hadoop

HDFS 架构详解

1. HDFS 核心组件与架构

HDFS(Hadoop Distributed File System) 采用主从(Master/Slave)架构,核心组件包括:

  • NameNode:主节点(Master),负责元数据管理和全局协调。
  • DataNode:从节点(Slave),负责实际数据存储。
  • Secondary NameNode:辅助节点,帮助 NameNode 管理元数据。
  • FSImage & Edits:元数据持久化文件。

HDFS Architecture

2. 核心角色详解

2.1 NameNode

  • 核心职责
    • 元数据存储:管理文件系统的命名空间(目录树结构)、文件到数据块的映射、数据块到 DataNode 的映射。
    • 客户端请求处理:响应客户端读写请求,协调 DataNode 执行操作。
    • DataNode 监控:通过心跳机制(默认3秒)检测 DataNode 存活状态。
    • 副本策略管理:确保数据块副本数符合配置(默认3副本)。
  • 关键特点
    • 单点故障(SPOF):在非高可用(HA)模式下,NameNode 宕机会导致集群不可用。
    • 内存密集型:所有元数据存储在内存中,限制文件系统规模。

2.2 DataNode

  • 核心职责
    • 数据块存储:实际存储文件数据块(默认大小128MB或256MB)。
    • 数据块读写:执行客户端或 NameNode 发起的读写操作。
    • 定期汇报
      • 心跳信号(默认3秒):向 NameNode 报告存活状态。
      • 块报告(默认6小时):汇报本地存储的数据块列表。
  • 关键特点
    • 无状态设计:DataNode 不感知文件整体结构,仅管理数据块。
    • 自动负载均衡:NameNode 根据存储情况分配新块写入位置。

2.3 FSImage 与 Edits

  • FSImage
    • 定义:元数据的完整快照文件,保存文件系统命名空间和块映射的检查点。
    • 生成时机:在 NameNode 启动或 Secondary NameNode 触发合并时生成。
  • Edits(Edit Logs)
    • 定义:记录所有对元数据的修改操作(如创建/删除文件、修改副本数)。
    • 写入机制:客户端操作先写入 Edits,再更新内存元数据。
  • 协作关系
    • 启动时 NameNode 加载 FSImage 并重放 Edits 以恢复最新状态。
    • Secondary NameNode 定期合并 FSImageEdits,生成新快照,防止 Edits 文件过大。

2.4 Secondary NameNode

  • 核心职责
    • Checkpoint 合并:定期从 NameNode 获取 FSImageEdits,合并后传回 NameNode。
    • 元数据备份:非实时备份,不能替代 NameNode,主要用于恢复辅助。
  • 误解澄清
    • 不是热备节点:无法直接接管 NameNode 工作(HA 模式需配置 Standby NameNode)。

3. 组件交互关系

3.1 NameNode 与 DataNode

  1. 注册与心跳
    • DataNode 启动时向 NameNode 注册。
    • 周期性发送心跳,NameNode 返回指令(如删除块、复制块)。
  2. 块报告
    • DataNode 定期发送块列表,NameNode 验证元数据一致性。

3.2 FSImage 与 Edits 协作流程

  1. 客户端发起元数据操作(如创建文件)。
  2. 操作记录追加到 Edits 文件。
  3. NameNode 更新内存中的元数据。
  4. Checkpoint 触发条件
    • 时间间隔(默认1小时)或 Edits 文件大小达到阈值(默认64MB)。
  5. Secondary NameNode 下载 FSImage 和 Edits,合并后上传新 FSImage。

4. 高可用与容错机制

4.1 NameNode HA(Hadoop 2.x+)

  • Active/Standby NameNode
    • 通过 ZooKeeper 实现故障自动转移。
    • 使用 JournalNodes 集群共享 Edits 日志,确保状态同步。

4.2 DataNode 容错

  • 副本机制:数据块默认3副本,分布在不同机架。
  • 损坏检测
    • 客户端读取时校验 Checksum。
    • DataNode 定期扫描本地块校验和。

5. 数据读写流程示例

5.1 写入流程

  1. 客户端向 NameNode 请求上传文件。
  2. NameNode 返回可写入的 DataNode 列表(如A、B、C)。
  3. 客户端直接向 DataNode A 写入数据块,A 转发给 B,B 转发给 C。
  4. 确认写入成功后,NameNode 更新元数据。

5.2 读取流程

  1. 客户端向 NameNode 请求文件位置。
  2. NameNode 返回包含该文件块的 DataNode 列表(按网络拓扑排序)。
  3. 客户端直接从最近的 DataNode 读取数据。

YARN架构


内容来自:【Hadoop核心组件系列-YARN工作流程详解】 https://www.bilibili.com/video/BV1Pa4y1t7nf/?share_source=copy_web&vd_source=9212f1767788a4852abd635bbf66ad46

MapReduce

1. 概念

MapReduce 是一种由 Google 提出的分布式计算编程模型,用于处理大规模数据集(TB/PB 级)的并行计算。其核心思想是将计算过程拆分为 MapReduce 两个阶段,通过分而治之的方式实现高效的数据处理。

2. 核心阶段

2.1 Map 阶段

  • 输入:键值对 <k1, v1>(如文件偏移量和文本行)。
  • 处理:用户自定义的 map() 函数将输入转换为中间键值对 <k2, v2>
  • 输出:多个中间键值对列表。

2.2 Shuffle & Sort

  • 功能:将相同 k2 的中间结果聚合到一起,生成 <k2, list(v2)>
  • 优化:数据按 k2 排序,减少网络传输开销。

2.3 Reduce 阶段

  • 输入:经过 Shuffle 后的 <k2, list(v2)>
  • 处理:用户自定义的 reduce() 函数合并相同键的值。
  • 输出:最终结果 <k3, v3>(如统计结果)。

3. 工作流程

  1. 输入分片
    将输入数据划分为多个逻辑分片(Split),每个分片由一个 Map 任务处理。
  2. Map 任务并行执行
    多个节点并行运行 Map 任务,生成中间键值对。
  3. Combiner(可选)
    在 Map 端本地合并中间结果,减少网络传输量(如本地预聚合)。
  4. Partitioner
    决定中间结果分配给哪个 Reduce 任务(默认使用哈希取模)。
  5. Reduce 任务聚合
    Reduce 任务接收并处理分配给自己的数据,输出最终结果。

4. 特点

  • 分布式处理:任务分布在集群节点上并行执行。
  • 容错机制:自动重启失败的任务。
  • 横向扩展(Scalability):可通过增加节点提升处理能力。
  • 数据本地化:优先在存储数据的节点上执行任务,减少网络传输。

5. 优缺点

5.1 优点

  • 简化分布式编程:开发者只需关注业务逻辑。
  • 高容错性:自动处理节点故障。
  • 扩展性强:线性扩展至数千节点。

5.2 缺点

  • 高延迟:不适合实时或交互式查询。
  • 中间数据存磁盘:Shuffle 阶段可能成为性能瓶颈。
  • 不适合迭代计算:如机器学习中的多次迭代。

6. 应用场景

  • 词频统计(WordCount)
  • 网页爬虫日志分析
  • 倒排索引构建(搜索引擎)
  • 数据聚合(如统计用户行为)

7. 编程MapReduce试运行(wordcount)

Mapper类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
package org.example.wordcount;  

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

public class wordcountMapper extends Mapper<LongWritable, Text, Text, LongWritable> {
private final Text outkey = new Text();
private final static LongWritable outvalue = new LongWritable(1);
@Override
protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, LongWritable>.Context context) throws IOException, InterruptedException {
String[] words = value.toString().split("\\s+");
for (String word : words) {
outkey.set(word);
context.write(outkey, outvalue);
}
}
}

Reducer类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
package org.example.wordcount;  

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;

public class wordcountReducer extends Reducer<Text, LongWritable, Text, LongWritable> {
LongWritable outvalue = new LongWritable();
@Override
protected void reduce(Text key, Iterable<LongWritable> values, Reducer<Text, LongWritable, Text, LongWritable>.Context context) throws IOException, InterruptedException {
long count = 0;
for (LongWritable value : values) {
count += value.get();
}
outvalue.set(count);
context.write(key, outvalue);
}
}

Driver驱动类(提交 Job)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
package org.example.wordcount;  

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;

public class wordcountDriver {
public static void main(String[] args) throws Exception {
//创建配置对象
Configuration config = new Configuration();
//构建 Job 示例
Job jb = Job.getInstance(config, wordcountDriver.class.getSimpleName());
//设置 mapreduce程序运行的主类
jb.setJarByClass(wordcountDriver.class);

//设置本次需要的 mapper 类和 reducer 类
jb.setMapperClass(wordcountMapper.class);
jb.setReducerClass(wordcountReducer.class);

//设置 mapper 的【输出】的 key,value 值
jb.setMapOutputKeyClass(Text.class);
jb.setMapOutputValueClass(LongWritable.class);

//设置 reducer 的【输出】key,value 值(最终输出)
jb.setOutputKeyClass(Text.class);
jb.setOutputValueClass(LongWritable.class);

//配置本次作业的输入TextInputFormat和输出路径TextOutputFormat
FileInputFormat.setInputPaths(jb, new Path(args[0]));
FileOutputFormat.setOutputPath(jb, new Path(args[1]));

//提交作业
boolean result = jb.waitForCompletion(true);
System.exit(result ? 0: 1);

}
}

利用 hadoop 提供的工具ToolRunner 来提交 Job

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
package org.example.wordcount;  

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

public class wordcountDriver_v2 extends Configured implements Tool {
public static void main(String[] args) throws Exception {
//创建配置对象
Configuration config = new Configuration();
//配置工具类toolrunner
int status = ToolRunner.run(config, new wordcountDriver_v2(), args);
//退出客户端
System.exit(status);
}

@Override
public int run(String[] args) throws Exception {
//构建 Job 示例
Job jb = Job.getInstance(getConf(), wordcountDriver_v2.class.getSimpleName());
//设置 mapreduce程序运行的主类
jb.setJarByClass(wordcountDriver_v2.class);

//设置本次需要的 mapper 类和 reducer 类
jb.setMapperClass(wordcountMapper.class);
jb.setReducerClass(wordcountReducer.class);

//设置 mapper 的【输出】的 key,value 值
jb.setMapOutputKeyClass(Text.class);
jb.setMapOutputValueClass(LongWritable.class);

//设置 reducer 的【输出】key,value 值(最终输出)
jb.setOutputKeyClass(Text.class);
jb.setOutputValueClass(LongWritable.class);

//配置本次作业的输入TextInputFormat和输出路径TextOutputFormat
FileInputFormat.setInputPaths(jb, new Path(args[0]));
FileOutputFormat.setOutputPath(jb, new Path(args[1]));

return jb.waitForCompletion(true) ? 0: 1;
}
}

程序运行

使用 Maven 将MapReduce所属项目打包

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>

<groupId>org.example</groupId>
<artifactId>hadoop</artifactId>
<version>1.0-SNAPSHOT</version>

<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>3.4.1</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
<version>3.4.1</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>3.4.1</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-client-core</artifactId>
<version>3.4.1</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-jar-plugin</artifactId>
<version>2.4</version>
<configuration>
<archive>
<manifest>
<addClasspath>true</addClasspath>
<classpathPrefix>lib/</classpathPrefix>
<mainClass>org.example.wordcount.wordcountDriver_v2</mainClass>
</manifest>
</archive>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.0</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
<encoding>utf-8</encoding>
</configuration>
</plugin>
</plugins>
</build>
</project>

集群配置步骤(伪集群)

1. 所有机器安装hadoop并配置 java 环境变量

新建一个my_env文件,所有PATH环境变量都配置到/etc/profile.d/my_env。

2. 所有机器设配置ssh免密登陆(权限修改与配置)

在sudoers给予用户sudo权限且无需输入密码:username ALL=(ALL:ALL) NOPASSWD:ALL。

ssh-keygen -t rsa生成密钥,ssh-copy-id将生成公钥复制到需要免密登录的机器上。

此外在涉及root管理的文件的修改,需要允许远程使用密码root登录(比如使用分发脚本发送my_env),sshd_config里的配置需改成PermitRootLogin yes和PubkeyAuthentication yes。

3. 集群分发脚本xsync

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
#!/bin/bash

#1. 判断参数个数

if [ $# -lt 1 ]

then

  echo Not Enough Arguement!

  exit;

fi

#2. 遍历集群所有机器

for host in hadoop102 hadoop103 hadoop104

do

  echo ====================  $host  ====================

  #3. 遍历所有目录,逐一发送

  for file in $@

  do

    #4 判断文件是否存在

    if [ -e $file ]

    then

      #5. 获取父目录

      pdir=$(cd -P $(dirname $file); pwd)

      #6. 获取当前文件的名称

      fname=$(basename $file)

      ssh $host "mkdir -p $pdir"

      rsync -av $pdir/$fname $host:$pdir

    else

      echo $file does not exists!

    fi

  done

done

给予脚本执行权限 777,同时将执行路径添加至环境变量 PATH

4. 所有配置文件的修改在Namenode完成(基本属性

core-site.xml

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
<configuration>

<!-- 指定 Namenode 的地址,可以理解为内部通信地址,通讯协议为hdfs-->

<property>

<name>fs.defaultFS</name>

<value>hdfs://localhost:9000</value>

</property>



<!-- 指定 hadoop 数据(hdfs,yarn,mapreduce)的临时存储目录,其他组件如果没有配置目录,临时目录才会生效,不建议放在 tmp,因为 linux 的 tmp 目录每隔一段时间会自动清空!-->

<property>

<name>hadoop.tmp.dir</name>

<value>$your_path</value>

</property>

</configuration>

hdfs-site.xml

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
<configuration>

<property>

<!--namenode 节点数据(即元数据)的存放位置,可以指定多个目录实现容错,多个目录用逗号分隔-->

<name>dfs.namenode.name.dir</name>

<value>/home/hadoop/namenode/data</value>

</property>

<property>

<!--datanode 节点数据(即数据块)的存放位置-->

<name>dfs.datanode.data.dir</name>

<value>/home/hadoop/datanode/data</value>

</property>



<!--namenode web 端访问地址,用户外部访问-->

<property>

<name>dfs.namenode.http-address</name>

<value>localhost:9870</value>

</property>



<!--secondarynamenode web 端访问地址,用户外部访问-->

<property>

<name>dfs.namenode.secondary.http-address</name>

<value>localhost:9860</value>

</property>



<!-- 副本保存数,一般默认为 3 -->

<property>

<name>dfs.replication</name>

<value>1</value>

</property>


<!-- 访问全部文件的权限,测试环境使用 -->

<property>

<name>dfs.permissions</name>

<value>false</value>

</property>

</configuration>

yarn-site.xml

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
<configuration>

<!-- 指定 mapreduce 使用 shuffle-->

<property>

<name>yarn.nodemanager.aux-services</name>

<value>mapreduce_shuffle</value>

</property>



<!-- 指定 ResourceManager的内部通行地址-->

<property>

<name>yarn.resourcemanager.hostname</name>

<value></value>

</property>



<!-- 环境变量的继承 -->

<property>

<name>yarn.nodemanager.env-whitelist</name>

<value>

JAVA_HOME,HADOOP_COMMON_HOME,HADOOP_HDFS_HOME,HADOOP_CONF_DIR,CLASSPATH_PREPEND_DISTCACHE,HADOOP_YARN_HOME,HADOOP_MAPRED_HOME

</value>

</property>

</configuration>

mapred-site.xml

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
<configuration>

<!--指定 mapreduce 运行在 yarn 上-->

<property>

<name>mapreduce.framework.name</name>

<value>yarn</value>

</property>

<!-- 环境变量的继承 -->

<property>

<name>yarn.nodemanager.env-whitelist</name>

<value>

JAVA_HOME,HADOOP_COMMON_HOME,HADOOP_HDFS_HOME,HADOOP_CONF_DIR,CLASSPATH_PREPEND_DISTCACHE,HADOOP_YARN_HOME,HADOOP_MAPRED_HOME

</value>

</property>

</configuration>

补充:配置文件与组件的对应关系

配置文件 影响的组件 核心参数示例
core-site.xml 全局配置(间接影响所有组件) fs.defaultFS(NameNode 地址)
hdfs-site.xml NameNode、DataNode dfs.namenode.name.dirdfs.datanode.data.dir
yarn-site.xml ResourceManager、NodeManager yarn.resourcemanager.hostnameyarn.nodemanager.resource.memory-mb
mapred-site.xml MapReduce(YARN 任务) mapreduce.framework.name

5. 配置 workers 文件(指定从属 nodes

配置所有从属节点的主机名或 IP 地址,每行一个。所有从属节点上的 DataNode 服务和 NodeManager 服务都会被启动。另外如果主机性能强劲,也可以将其加入。

6. 将上述修改的配置文件全部发送给其余nodes

可以使用魔法命令 xsync 来实现一键发送,前提是要完成 ssh 免密登录

7. 在主节点启动hadoop

8. 注意事项

设置用户


需要指定用户,在这里为了方便调试,全部设定成 root,在 hadoop-env.sh末尾添加如下代码:

1
2
3
4
5
export HDFS_NAMENODE_USER=root  
export HDFS_DATANODE_USER=root
export HDFS_SECONDARYNAMENODE_USER=root
export YARN_RESOURCEMANAGER_USER=root
export YARN_NODEMANAGER_USER=root

JAVA 与hadoop的兼容问题

一般选择 JAVA8,过高过低都不行,否则有可能出现读取不了 hdfs 文件的情况

Web 端上传文件错误(未解决)

错误信息:Couldn’t upload the file xxx. F12发现点击 upload 发送的请求居然是 localhost 的…
只需要在 hdfs-site.xml 中添加配置(个 der):

1
2
3
4
5
6
7
8
9
10
11
<!-- DataNode 的 HTTP 服务地址 -->  
<property>
<name>dfs.datanode.http.address</name>
<value>xxx.xxx.xxx.xxx:9864</value>
</property>

<!--允许使用网页 web 上传文件-->
<property>
<name>dfs.webhdfs.enabled</name>
<value>true</value>
</property>

BUG合集

JAVA路径配置

这是最最最最最折磨人的(在 mac上)。有遇到路径下包含空格的问题,为了解决这个问题决定重装一个,但是hadoop 还是无法识别到。按照网上说的把相关核心文件的 JAVA_HOME 都配置好,但还是报错…

此时我的心情和 DeepSeek的be like:

后面又想,是需要软连接么?嗯,似乎方向对了但没完全对,我也只是简单地将路径前面的部分简单连接了。Again,失败…折腾到凌晨两点最后含泪入眠。

今早起床想试着再尝试一次,于是乎又开始疯狂 Google,此时我终于发现了修正这个问题的解决方案!

截图的原文链接:https://github.com/MarkDana/Compile-Hadoop2.2.0-on-MacOS/blob/master/readme.md
太感谢这篇文章了,这哥们是7年前写的哇!!!他遇到的bug比我还多(salute)

后续

经典白忙活,软连接会返回 read-only system,这意味着重新挂载,那就运行```

1
mount -uw /

结果:mount_apfs: volume could not be mounted: Permission denied。、又查阅了资料,说 mac 在升级到 Big Sur 版本后就关闭了 SIP 重新分区挂载根分区了…

最终

受够了,真的受够了。我还是老老实实装个虚拟机吧,是 mac 的问题不是我的问题😭,折腾两天,当了两天小丑,只能说选择比努力更重要😮‍💨