Hadoop3.3.0安装(HDFS+MapReduce+HBase安装及操作、CentOS8版本)
虚拟机安装步骤一:创建虚拟机[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-pwhdWKiX-1618662543294)(C:\Users\Administrator\AppData\Roaming\Typora\typora-user-images\image-20210320090517877.png)][外链图片转存失败,源站可能有防盗链机制,建议将图片保存下
虚拟机安装
- 步骤一:创建虚拟机
- 步骤二:启动虚拟机
此处的分区也可以使用自动分配,选择自动则可直接跳过分区步骤
输入密码登录即可。
配置前置条件
- 网卡ip相关配置
进入之后,按i
键进行插入
按ESC
键进行退出编辑模式
再输入:wq
写入并保存。
- 主机名配置
i键插入,修改为hadoop100即可。ESC键、之后:wq即可
- 主机名称映射修改
i键插入,修改之后即可。ESC键、之后:wq即可
reboot指令重启。
之后进行相关测试
- 查看ip
- ping www.baidu.com 或自己主机ip(此处配置的应该是192.168.122.2)
- 主机名称查看
远程连接Xshell
下载安装略过。。
新建一个连接
完成之后点击确定即可
成功页面
顺带一提,把Xftp也安装一下。
模板虚拟机
- 在Xshell中测试网络是否通畅。
- 安装epel-release
此为红帽系列的操作系统提供的额外软件包,提供了大量的rpm包。
- 关闭防火墙
systemctl stop firewalld.service
systemctl disable firewalld.service
- 配置新建账号也具有root功能
i
键进行插入
ESC
退出编辑模式
:wq
保存并退出
此处由于修改系统配置,需要进行强制修改,即加上!
因此上述需要输入:wq!
才能保存并退出。
以后执行命令无权限时,直接sudo xxx即可(不需要再次输入密码确认)
- 新建文件夹
cd /opt # 进入opt目录
ll # 查看目录下的文件
# 直接mkdir package会出错
sudo mkdir package
sudo mkdir program
ll # 查看此时目录下的文件
# 将package下和program下的文件夹授权给reflect
sudo chown reflect:reflect package/ program/
- 卸载自带的JDK
(CentOS8版本反正我安装了N多次,不见得有自带的JDK)
但是如果是CentOS7可能会有吧。直接进行删除即可。
注意:n后面的是数字1,不是l。最好直接复制下面这条命令即可。
rpm -qa | grep -i java | xargs -n1 rpm -e --nodeps
删除完成之后,再次使用上述命令查看为空,即证明卸载完成。
如果删除了JDK,建议reboot重启,以便于清理卸载的残余。
如果未进行删除操作,则不需要。
克隆虚拟机
先关闭虚拟机,才可以进行克隆。
在hadoop100上右键
同理:从Hadoop102克隆出Hadoop103和Hadoop104
- 进行配置的更改
开机Hadoop102
修改三个地方:IP、主机名、(主机名映射不需要改变)
修改完保存(i键插入、ESC退出编辑模式、:wq保存退出)
之后reboot即可。
JDK安装
在Hadoop102上进行安装。
想简单安装的话可参考我的另一篇文章Linux中的软件安装——JDK
此处采取的是另一种安装方式。
- 利用Xftp将jdk的压缩包上传到Linux中的/opt/package中
- 解压jdk的压缩包
进入package目录下
注意:-C是大写的C,小写的c是通过不了的。
tar -zxvf jdk-8u281-linux-x64.tar.gz -C /opt/program/
可以进行查看是否正确解压
- 配置环境变量
由于/etc/profile其中的配置文件中的shell脚本会自动扫描/etc/profile.d目录下的shell文件。因此,为了不变更/etc/profile文件,直接在/etc/profile.d目录下进行配置环境变量。
创建一个my_env.sh
sudo vim my_env.sh
#JAVA_HOME
export JAVA_HOME=/opt/program/jdk1.8.0_281
export PATH=$PATH:$JAVA_HOME/bin
配置完成之后,需要刷新配置文件。
source /etc/profile
- jdk安装测试三部曲:分别输入java、javac、java -version,看其是否有正确结果输出
三者都能正常输出才代表成功。
Hadoop安装
- 利用Xftp将Hadoop的压缩包上传到Linux中的/opt/package中
在上面的操作中,已经顺便上传了。
- 解压Hadoop的压缩包
注意:-C是大写的C,小写的c是通过不了的。
tar -zxvf hadoop-3.3.0.tar.gz -C /opt/program/
可以进行查看是否正确解压。
- 配置环境变量
sudo vim my_env.sh
#HADOOP_HOME
export HADOOP_HOME=/opt/program/hadoop-3.3.0
export PATH=$PATH:$HADOOP_HOME/bin
export PATH=$PATH:$HADOOP_HOME/sbin
刷新环境变量
source /etc/profile
- 测试
输入hadoop,出现如下情况,则代表成功。
分发拷贝
使用scp命令进行拷贝
语法:scp -r $pdir/$fname $user@host:$pdir/$fname
命令 递归 需拷贝的文件路径/名称 目的地
# 将jdk从hadoop102拷贝到hadoop103
scp -r jdk1.8.0_281/ root@hadoop103:/opt/program/
# 从hadoop103拉取hadoop102的hadoop文件
scp -r root@hadoop102:/opt/program/hadoop-3.3.0 ./
# 在hadoop03进行操作将hadoop102的jdk和hadoop复制到hadoop104
scp -r root@hadoop102:/opt/program/* root@hadoop104:/opt/program/
使用rsync命令进行拷贝
语法:rsync -av $pdir/$fname $user@host:$pdir/$fname
命令 归档拷贝+显示复制过程 需同步的文件 目的地
# 将hadoop102的hadoop同步到hadoop103
rsync -av hadoop-3.3.0/ root@hadoop103:/opt/program/hadoop-3.3.0/
使用xsync命令进行拷贝
直接在用户root或自己的用户目录下新建一个bin目录
mkdir bin
cd bin
vim xsync
#!/bin/bash
#1.判断参数个数
if [ $# -lt 1 ]
then
echo Not Enougn Arguement!
exit;
fi
#2.遍历集群所有机器
for host in hadoop102 hadoop103 hadoop104
do
echo ========== $host ==========
#3.遍历所有目录,依次发送
for file in $@
do
#4.判断文件是否存在
if [ -e $file ]
then
#5.获取父目录
pdir=$(cd -P $(dirname $file); pwd)
#6.获取当前文件的名称
fname=$(basename $file)
ssh $host "mkdir -p $pdir"
rsync -av $pdir/$fname $host:$pdir
else
echo $file does not exists!
fi
done
done
# 授予该xsync执行权限
chmod 777 xsync
# 使用此命令进行测试
xsync bin/
# 之后再把配置文件也分发
xsync /etc/profile.d/my_env.sh
# 如果是在root用户下可以直接执行成功
# 如果不是,则需要改为如下命令
sudo ./bin/xsync /etc/profile.d/my_env.sh
# 分发完成之后,记得刷新环境变量
source /etc/profile
# 之后可以进行jdk以及hadoop的测试
ssh无密码登录
# 在hadoop102上先执行此操作
ssh-keygen -t rsa
# 之后直接进行三次回车操作即可
# 可以在当前用户的.ssh(ls -al命令查看隐藏文件)文件夹下查看
# 将hadoop102的公钥复制到hadoop103,hadoop104
ssh-copy-id hadoop103
ssh-copy-id hadoop104
# 如果想要对自己也使用ssh,则对hadoop102也需要执行
ssh-copy-id hadoop102
# 之后对hadoop103和hadoop104进行相同操作
集群配置
# 进入其中查看相关四大配置文件
cd /opt/program/hadoop-3.3.0/etc/hadoop
hadoop102 | hadoop103 | hadoop104 | |
---|---|---|---|
HDFS | NameNode DataNode | DataNode | SecondaryNameNode DataNodeSECONDAR |
YARN | NodeManager | ResourceManager NodeManager | NodeManager |
分别在core-site.xml、hdfs-site.xml、yarn-site.xml、mapred-site.xml配置
core-site.xml
<configuration>
<!-- 指定NameNode的地址 -->
<property>
<name>fs.defaultFS</name>
<value>hdfs://hadoop102:8020</value>
</property>
<!-- 指定hadoop数据的存储目录 -->
<property>
<name>hadoop.tmp.dir</name>
<value>/opt/program/hadoop-3.3.0/data</value>
</property>
<!-- 配置HDFS网页登录使用的静态用户为root -->
<property>
<name>hadoop.http.staticuser.user</name>
<value>root</value>
</property>
</configuration>
hdfs-site.xml
<configuration>
<!-- nn web端访问地址 -->
<property>
<name>dfs.namenode.http-address</name>
<value>hadoop102:9870</value>
</property>
<!-- 2nn web端访问地址 -->
<property>
<name>dfs.namenode.secondary.http-address</name>
<value>hadoop104:9868</value>
</property>
</configuration>
yarn-site.xml
<configuration>
<!-- 指定MR走shuffle -->
<property>
<name>yarn.nodemanager.aux-services</name>
<value>mapreduce_shuffle</value>
</property>
<!-- 指定ResourceManager的地址 -->
<property>
<name>yarn.resourcemanager.hostname</name>
<value>hadoop103</value>
</property>
<!-- 3.2以上版本则不需要配置此处 -->
<!-- 环境变量的继承 -->
<property>
<name>yarn.nodemanager.env-whitelist</name>
<value>JAVA_HOME,HADOOP_HOME,HADOOP_HDFS_HOME,HADOOP_CONF_DIR,CLASSPATH_PREPEND_DISTCACHE,HADOOP_YARN_HOME,HADOOP_MAPRED_HOME</value>
</property>
</configuration>
mapred-site.xml
<configuration>
<!-- 指定MapReduce程序运行在Yarn上 -->
<property>
<name>mapreduce.framework.name</name>
<value>yarn</value>
</property>
</configuration>
将hadoop102上的配置分发至其它hadoop
xsync hadoop/
cd /opt/program/hadoop-3.3.0/etc/hadoop
# 找到其中的works文件
vim works
# 清空其中,并输入这些(注意不能有空格)
hadoop102
hadoop103
hadoop104
# 完成之后xsync分发此文件
xysnc works
启动集群
如果是第一次启动需要在hadoop102格式化NameNode结点
注意:格式化 NameNode,会产生新的集群id,导致 NameNode和 DataNode的集群id不一致,集群找不到已往数据。如果集群在运行过程中报错,需要重新格式化 NameNode的话,一定要先停止namenode和 datanode进程,并且要删除所有机器的data和logs目录,然后再进行格式化
# 102上格式化NameNode
hdfs namenode -format
# 初始化成功可使用ll查看多出了一个data和logs目录
ll
# 执行下面的启动命令时,可能会出错
# 若出错,则删除data和logs目录
# 然后在my_env.sh配置此些环境变量,分发之后分别刷新环境变量
source etc/profile
# 重新启动即可
export HDFS_NAMENODE_USER=root
export HDFS_DATANODE_USER=root
export HDFS_SECONDARYNAMENODE_USER=root
export YARN_RESOURCEMANAGER_USER=root
export YARN_NODEMANAGER_USER=root
# 102上启动HDFS
sbin/start-dfs.sh
# 在三台机器分别使用jps查看启动的进程
jps
# 在浏览器中输入hadoop102:9870进行查看
# (若未在windows的hosts文件中进行配置192.168.122.102 hadoop102
# 则使用192.168.122.102:9870进行查看)
hadoop102:9870
# 如访问一直拒绝,检查防火墙是否关闭
systemctl stop firewalld.service
systemctl disable firewalld.service
# 在103(配置了ResourceManager的节点)启动YARN
sbin/start-yarn.sh
# 在三台机器分别使用jps查看启动的进程
jps
# 在hadoop102上的hadoop-3.3.0目录下新建一个文件夹
hadoop fs -mkdir /wcinput
# 上传文件
hadoop fs -put wcinput/word.txt /wcinput
# 再次使用hadoop配合yarn演示单词统计功能
# 出错,找不到主类
错误: 找不到或无法加载主类 org.apache.hadoop.mapreduce.v2.app.MRAppMaster
# 在yarn-site.xml中加上这些
# 先执行hadoop classpath
# 再将输出结果替换value中的hadoop classpath
<!--Yarn出错找不到主类的解决办法-->>
<property>
<name>yarn.application.classpath</name>
<value>hadoop classpath</value>
</property>
配置历史服务器
在mapred-site.xml文件中添加
<!-- 历史服务器端地址 -->
<property>
<name>mapreduce.jobhistory.address</name>
<value>hadoop102:10020</value>
</property>
<!-- 历史服务器web端地址 -->
<property>
<name>mapreduce.jobhistory.webapp.address</name>
<value>hadoop102:19888</value>
</property>
修改完毕之后分发此文件
xsync mapred-site.xml
重启yarn(若处于运行状态先关闭yarn即可)
sbin/stop-yarn.sh
sbin/start-yarn.sh
在hadoop102启动历史服务器
bin/mapred --daemon start historyserver
使用jps查看历史服务器是否启动
使用http://hadoop102:19888/jobhistory查看JobHistory
开启日志聚集功能
在yarn-site.xml中添加
<!-- 开启日志聚集功能 -->
<property>
<name>yarn.log-aggregation-enable</name>
<value>true</value>
</property>
<!-- 设置日志聚集服务器地址 -->
<property>
<name>yarn.log.server.url</name>
<value>http://hadoop102:19888/jobhistory/logs</value>
</property>
<!-- 设置日志保留时间为7天 -->
<property>
<name>yarn.log-aggregation.retain-seconds</name>
<value>604800</value>
</property>
修改完毕之后分发此文件
xsync yarn-site.xml
关闭历史服务器
bin/mapred --daemon stop historyserver
重启yarn(若处于运行状态先关闭yarn即可)
sbin/stop-yarn.sh
sbin/start-yarn.sh
相关脚本
直接在用户root或自己的用户目录下的bin目录新建myhaoop.sh
#!/bin/bash
if [ $# -lt 1 ]
then
echo "No Args Input..."
exit;
fi
case $1 in
"start")
echo "========== 启动 Hadoop集群 =========="
echo "========== 启动 hdfs =========="
ssh hadoop102 "/opt/program/hadoop-3.3.0/sbin/start-dfs.sh"
echo "========== 启动 yarn =========="
ssh hadoop103 "/opt/program/hadoop-3.3.0/sbin/start-yarn.sh"
echo "========== 启动 historyserver =========="
ssh hadoop102 "/opt/program/hadoop-3.3.0/bin/mapred --daemon start historyserver"
;;
"stop")
echo "========== 关闭 Hadoop集群 =========="
echo "========== 关闭 historyserver =========="
ssh hadoop102 "/opt/program/hadoop-3.3.0/bin/mapred --daemon stop historyserver"
echo "========== 关闭 yarn =========="
ssh hadoop103 "/opt/program/hadoop-3.3.0/sbin/stop-yarn.sh"
echo "========== 关闭 hdfs =========="
ssh hadoop102 "/opt/program/hadoop-3.3.0/sbin/stop-dfs.sh"
;;
*)
echo "Input Args Error..."
;;
esac
# 授予权限
chmod 777 myhadoop.sh
使用示例
myhadoop.sh stop
myhadoop.sh start
直接在用户root或自己的用户目录下的bin目录新建jpsall
#!/bin/bash
for host in hadoop102 hadoop103 hadoop104
do
echo ========== $host ==========
ssh $host jps
done
# 授予权限
chmod 777 jpsall
HDFS
基本语法
hadoop fs xxx命令 或 hdfs dfs xxx命令
Shell相关操作
创建文件夹
hdfs dfs -mkdir /sanguo
上传
-moveFromLocal:从本地剪切粘贴到HDFS
hdfs dfs -moveFromLocal shuguo.txt /sanguo
-copyFromLocal:从本地文件系统拷贝文件到HDFS路径中
hdfs dfs -copyFromLocal weiguo.txt /sanguo
-put:等同于copyFromLocal,生产环境更常用
hdfs dfs -put wuguo.txt /sanguo
-appendToFile:追加一个文件到已存在的文件末尾
hdfs dfs -appendToFile liubei.txt /sanguo/shuguo.txt
追加前:
追加后:
下载
-copyToLocal:从HDFS拷贝到本地
hdfs dfs -copyToLocal /sanguo/shuguo.txt ./
下载前:
下载后:
-get:等同于-copyToLocal,生产环境更常用
hdfs dfs -get /sanguo/shuguo.txt ./shuguo2.txt
HDFS直接操作
-ls:显示目录信息
hdfs dfs -ls /sanguo
-cat:显示文件内容
hdfs dfs -cat /sanguo/shuguo.txt
-chgrp、-chmod、-chown:修改文件所属权限
hdfs dfs -chown root:root /sanguo/shuguo.txt
未修改权限前:
修改权限后:
-mkdir:创建文件夹
hdfs dfs -mkdir /jinguo
-cp:从HDFS的一个路径拷贝到HDFS的另一个路径
hdfs dfs -cp /sanguo/shuguo.txt /jinguo
-mv:在HDFS目录中移动文件
hdfs dfs -mv /sanguo/weiguo.txt /jinguo
hdfs dfs -mv /sanguo/wuguo.txt /jinguo
-tail:显示一个文件的末尾1kb的数据
hdfs dfs -tail /jinguo/shuguo.txt
-rm:删除文件或文件夹
hdfs dfs -rm /sanguo/shuguo.txt
-rm -r:递归删除目录及目录里面内容
hdfs dfs -rm -r /sanguo
-du:统计文件夹大小信息
hdfs dfs -du -s -h /jinguo
hdfs dfs -du -h /jinguo
-setrep:设置HDFS中文件的副本数量
hdfs dfs -setrep 10 /jinguo/shuguo.txt
Windows上环境依赖配置
GitHub找到对应版本的相关依赖
进行环境变量的配置
双击相关依赖中的winutils.exe
-
若一闪而过则代表Windows的相关类库已经安装
-
若出现缺少dll文件,则使用对应的软件安装即可
启动IDEA进行连接
新建项目
pom.xml中添加依赖
<dependencies>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>3.3.0</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.12</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.7.30</version>
</dependency>
</dependencies>
新建日志配置文件log4j.properties
log4j.rootLogger=INFO, stdout
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d %p [%c] - %m%n
log4j.appender.logfile=org.apache.log4j.FileAppender
log4j.appender.logfile.File=target/spring.log
log4j.appender.logfile.layout=org.apache.log4j.PatternLayout
log4j.appender.logfile.layout.ConversionPattern=%d %p [%c] - %m%n
API相关操作
创建HdfsClient.java
1、创建目录
package com.gezq.hdfs;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
/**
* @author: gezq0714
* @email: ge051799qi@163.com
* @date: 2021/5/1 - 0:10
* @description: 客户端代码常用步骤:
* 1、获取一个客户端对象
* 2、执行相关的操作命令
* 3、关闭资源
* HDFS、Zookeeper
*/
public class HdfsClient {
private FileSystem fileSystem;
// Test执行之前将会执行的方法
@Before
public void init() throws URISyntaxException, IOException, InterruptedException {
// 连接的集群nn地址
URI uri = new URI("hdfs://hadoop102:8020");
// 创建一个配置文件
Configuration configuration = new Configuration();
// 指定用户
String user = "root";
// 获取到了客户端对象
fileSystem = FileSystem.get(uri, configuration, user);
}
// Test执行之后将会执行的方法
@After
public void close() throws IOException {
// 关闭资源
fileSystem.close();
}
// 创建目录
@Test
public void testMkdir() throws IOException {
// 创建一个文件夹
fileSystem.mkdirs(new Path("/xiyouji/huaguoshan"));
}
}
2、上传文件
// 上传
@Test
public void testPut() throws IOException {
// 参数含义:是否删除源数据、是否允许覆盖、源数据路径、目的地路径
fileSystem.copyFromLocalFile(true, true, new Path("D:\\sunwukong.txt"), new Path("hdfs://hadoop102/xiyouji/huaguoshan"));
}
3、配置文件优先级
在resource下创建一个hafs-site.xml
<?xml version="1.0" encoding="UTF-8"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<configuration>
<property>
<name>dfs.replication</name>
<value>1</value>
</property>
</configuration>
此时上传文件,看其副本数量
在init方法中对进行配置参数的设置
configuration.set("dfs.replication", "2");
再次上传文件,看其副本数量
优先级顺序:代码中定义 > 配置文件中定义 > 默认hdfs-site.xml > 默认hdfs-default.xml
4、文件下载
// 下载
@Test
public void testGet() throws IOException {
// 参数含义:是否删除源文件、源文件路径HDFS、目标地址路径Windows中、是否开启本地校验
fileSystem.copyToLocalFile(true, new Path("hdfs://hadoop102/xiyouji/huaguoshan"), new Path("D:\\"), true);
}
5、文件删除
未删除前:非空目录、文件、空目录
// 删除
@Test
public void testRm() throws IOException {
// 参数含义:删除的路径、是否递归(若为false,则非空目录不能删除)
// 删除文件
fileSystem.delete(new Path("hdfs://hadoop102/test.txt"), true);
// 删除空目录
fileSystem.delete(new Path("hdfs://hadoop102/xiyouji"), false);
// 删除非空目录
fileSystem.delete(new Path("hdfs://hadoop102/jinguo"), true);
}
删除之后:
6、文件更名和移动
未更名前
// 文件更名和移动
@Test
public void testMv() throws IOException {
// 参数含义:源文件路径、目标文件路径
// 对文件名称的修改
fileSystem.rename(new Path("hdfs://hadoop102/test/test.txt"), new Path("hdfs://hadoop102/test/ss.txt"));
}
更名之后
// 文件的移动和更名
fileSystem.rename(new Path("hdfs://hadoop102/test/ss.txt"), new Path("hdfs://hadoop102/cls.txt"));
文件移动并更名之后:
// 目录更名
fileSystem.rename(new Path("hdfs://hadoop102/test"), new Path("hdfs://hadoop102/retest"));
目录更名之后
7、获取文件详细信息
// 获取文件详细信息
@Test
public void testFileDetails() throws IOException {
// 获取到的所有文件信息
RemoteIterator<LocatedFileStatus> listFiles = fileSystem.listFiles(new Path("hdfs://hadoop102/"), true);
// 遍历文件
while (listFiles.hasNext()) {
LocatedFileStatus fileStatus = listFiles.next();
// 输出文件或目录相关信息
System.out.println("*****" + fileStatus.getPath() + "*****");
System.out.println(fileStatus.getPermission());
System.out.println(fileStatus.getOwner());
System.out.println(fileStatus.getGroup());
System.out.println(fileStatus.getLen());
System.out.println(fileStatus.getModificationTime());
System.out.println(fileStatus.getReplication());
System.out.println(fileStatus.getBlockSize());
System.out.println(fileStatus.getPath().getName());
// 获取存储在对应块上的信息
BlockLocation[] blockLocations = fileStatus.getBlockLocations();
System.out.println(Arrays.toString(blockLocations));
}
}
8、判断是文件夹还是文件
// 判断是文件还是文件夹
@Test
public void testFile() throws IOException {
FileStatus[] fileStatuses = fileSystem.listStatus(new Path("hdfs://hadoop102/"));
for (FileStatus fileStatus : fileStatuses) {
if (fileStatus.isFile()) {
System.out.println("文件:" + fileStatus.getPath().getName());
} else {
System.out.println("目录:" + fileStatus.getPath().getName());
}
}
}
两个实验操作
读取的文件中内容
第一个读取操作
package com.gezq.hdfs;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.net.URI;
import java.net.URISyntaxException;
/**
* @author: gezq0714
* @email: ge051799qi@163.com
* @date: 2021/5/1 - 19:00
* @description: 实现HDFS的读取
*/
public class MyFSDataInputStream extends FSDataInputStream {
public MyFSDataInputStream(InputStream in) {
super(in);
}
// 定义全局使用的变量
private static FileSystem fileSystem;
private static BufferedReader bufferedReader;
public static void initHdfs() {
try {
// 连接的集群地址
URI uri = new URI("hdfs://hadoop102:8020");
// 创建一个配置文件
Configuration configuration = new Configuration();
// 指定用户
String user = "root";
// 获取客户端对象
fileSystem = FileSystem.get(uri, configuration, user);
} catch (URISyntaxException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
public static void closeHdfs() {
try {
// 关闭资源
bufferedReader.close();
fileSystem.close();
} catch (IOException e) {
e.printStackTrace();
}
}
public static void testRead() {
try {
Path path = new Path("hdfs://hadoop102/retest/test.txt");
// 打开Hadoop中的文件进行读取
FSDataInputStream fsDataInputStream = fileSystem.open(path);
// 使用字符缓冲流进行读取
bufferedReader = new BufferedReader(new InputStreamReader(fsDataInputStream));
// 逐行读取文件内容
String content;
while ((content = bufferedReader.readLine()) != null) {
System.out.println(content);
}
} catch (IOException e) {
e.printStackTrace();
}
}
public static void main(String[] args) {
// 初始化
initHdfs();
// 调用方法
testRead();
// 关闭资源
closeHdfs();
}
}
第二个读取操作
package com.gezq.hdfs;
import org.apache.hadoop.fs.FsUrlStreamHandlerFactory;
import org.apache.hadoop.io.IOUtils;
import java.io.IOException;
import java.io.InputStream;
import java.net.URL;
/**
* @author: gezq0714
* @email: ge051799qi@163.com
* @date: 2021/5/1 - 20:31
* @description: ...
*/
public class HdfsCache {
// 使用URL和FsUrlStreamHandlerFactory输出HDFS中指定文件的文本到终端中
static {
URL.setURLStreamHandlerFactory(new FsUrlStreamHandlerFactory());
}
// 打开HDFS中的相关文件进行操作
public static void testRead(String filename) {
try {
InputStream inputStream = new URL("hdfs", "hadoop102", 8020, filename).openStream();
IOUtils.copyBytes(inputStream, System.out, 4096, false);
IOUtils.closeStream(inputStream);
} catch (IOException e) {
e.printStackTrace();
}
}
public static void main(String[] args) {
String filename = "/retest/test.txt";
HdfsCache.testRead(filename);
}
}
MapReduce
单文件WordCount测试
1、自定义编写WordCount程序
package com.gezq.mapreduce.wordcount;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
/**
* @author: gezq0714
* @email: ge051799qi@163.com
* @date: 2021/5/2 - 9:00
* @description: ...
* 注意点:Mapper要是org.apache.hadoop.mapreduce中的
* Text要是org.apache.hadoop.io中的
*/
/*
KEYIN,map阶段输入key的类型,LongWritable
VALUEIN,map阶段输入value的类型,Text
KEYOUT,map阶段输出的key的类型,Text
VALUEOUT,map阶段输出的value的类型,IntWritable
*/
public class WordCountMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
private Text outK = new Text();
private IntWritable outV = new IntWritable(1);
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
// 1、获取一行
String line = value.toString();
// 2、分割
String[] words = line.split(" ");
// 3、循环写出
for (String word : words) {
// 封装outK
outK.set(word);
// 写出
context.write(outK, outV);
}
}
}
package com.gezq.mapreduce.wordcount;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
/**
* @author: gezq0714
* @email: ge051799qi@163.com
* @date: 2021/5/2 - 9:00
* @description: ...
* 注意点:Reducer要是org.apache.hadoop.mapreduce中的
* Text要是org.apache.hadoop.io中的
*/
/*
KEYIN,reduce阶段输入key的类型,Text
VALUEIN,reduce阶段输入value的类型,IntWritable
KEYOUT,reduce阶段输出的key的类型,Text
VALUEOUT,reduce阶段输出的value的类型,IntWritable
*/
public class WordCountReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
@Override
protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
int sum = 0;
// 累加操作
for (IntWritable value : values) {
sum += value.get();
}
IntWritable outV = new IntWritable();
outV.set(sum);
// 写出操作
context.write(key, outV);
}
}
package com.gezq.mapreduce.wordcount;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
/**
* @author: gezq0714
* @email: ge051799qi@163.com
* @date: 2021/5/2 - 9:00
* @description: ...
*/
public class WordCountDriver {
public static void main(String[] args) {
try {
// 1、获取obj
Configuration configuration = new Configuration();
Job job = Job.getInstance(configuration);
// 2、设置jar包路径
job.setJarByClass(WordCountDriver.class);
// 3、关联mapper和reducer
job.setMapperClass(WordCountMapper.class);
job.setReducerClass(WordCountReducer.class);
// 4、设置map输出的k v类型
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
// 5、设置最终输出的k v类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
// 6、设置输入路径和输出路径
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
// 7、提交obj
boolean result = job.waitForCompletion(true);
System.exit(result ? 0 : 1);
} catch (IOException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ClassNotFoundException e) {
e.printStackTrace();
}
}
}
2、使用Maven执行package打包
得到的jar包重命名,上传到hadoop102中
测试的/wcinput/word.txt文件
ss ss
cls cls
ban ban
bobo
yangge
3、执行自定义WordCount的jar包程序
hadoop jar MyWordCount.jar com.gezq.mapreduce.wordcount.WordCountDriver /wcinput /wcoutput
此时可以观察到Yarn中有任务正在执行
4、查看正在执行的结果
运行完成之后多出一个wcoutput目录,进行查看
多文件WordCount案例
1、创建案例前置条件
file1、file2、file3中的内容如下(与实验中要求内容相同)
2、执行WordCount的jar包程序
hadoop jar MyWordCount.jar com.gezq.mapreduce.wordcount.WordCountDriver /input /output
观察到Yarn有新任务的调度
3、查看正在执行的结果
运行完成之后多出一个output目录,进行查看
Zookeeper安装
准备工作
上传安装包
解压安装包
tar -zxvf apache-zookeeper-3.6.3-bin.tar.gz -C /opt/program/
前置工作
进入Zookeeper的安装目录,新建文件夹data和logs
mkdir data
mkdir logs
进入data目录中,新建一个myid文件,写入id为1
cd data
echo '1'>myid
# 可使用cat myid查看
配置文件
进入Zookeeper安装目录下的conf文件夹
将zoo_sample.cfg复制一份重命名为zoo.cfg
cd conf/
cp zoo_sample.cfg zoo.cfg
先使用pwd查看当前所处位置,复制当前位置
再编辑zoo.cfg
pwdvim zoo.cfg
dataDir=/opt/program/apache-zookeeper-3.6.3-bin/data
dataLogDir=/opt/program/apache-zookeeper-3.6.3-bin/logs
server.1=hadoop102:2888:3888
server.2=hadoop103:2888:3888
server.3=hadoop104:2888:3888
配置环境变量
vim /etc/profile.d/my_env.sh
#ZOOKEEPER_HOME
export ZOOKEEPER_HOME=/opt/program/apache-zookeeper-3.6.3-bin
export PATH=$PATH:$ZOOKEEPER_HOME/bin
分发操作
将配置文件分发给hadoop103和hadoop104
# 进入/etc/profile.d目录下,再执行
xsync my_env.sh
# 分发Zookeeper的压缩包
cd /opt/package
xsync apache-zookeeper-3.6.3-bin.tar.gz
# 分发Zookeeper解压后的文件
cd /opt/program/
xsync apache-zookeeper-3.6.3-bin/
需要修改hadoop103和hadoop104中的data目录中的myid
hadoop103修改为2,hadoop104修改为3
注意:上面修改环境变量时,未刷新。因此分发之后需要在三台机器上分别执行
source /etc/profile
启动集群
在三个节点分别执行如下命令(可在任何位置执行,因为配置了环境变量)
zkServer.sh start
使用如下命令查看ZooKeeper的状态
zkServer.sh status
将看到一个leader和两个follower,代表ZooKeeper启动成功
# 停止Zookeeper
zkServer.sh stop
# 重启Zookeeper
zkServer.sh restart
自定义zk脚本
在用户的家目录的bin目录下创建一个myzk.sh
#!/bin/bash
if [ $# -lt 1 ]
then
echo "No Args Input..."
exit;
fi
case $1 in
"start")
echo "========== 启动 Zookeeper集群 =========="
echo "========== 启动 hadoop102中的Zookeeper =========="
ssh hadoop102 "/opt/program/apache-zookeeper-3.6.3-bin/bin/zkServer.sh" start
echo "========== 启动 hadoop103中的Zookeeper =========="
ssh hadoop103 "/opt/program/apache-zookeeper-3.6.3-bin/bin/zkServer.sh" start
echo "========== 启动 hadoop104中的Zookeeper =========="
ssh hadoop104 "/opt/program/apache-zookeeper-3.6.3-bin/bin/zkServer.sh" start
;;
"status")
echo "========== 查看 Zookeeper集群 =========="
echo "========== 查看 hadoop102中的Zookeeper =========="
ssh hadoop102 "/opt/program/apache-zookeeper-3.6.3-bin/bin/zkServer.sh" status
echo "========== 查看 hadoop103中的Zookeeper =========="
ssh hadoop103 "/opt/program/apache-zookeeper-3.6.3-bin/bin/zkServer.sh" status
echo "========== 查看 hadoop104中的Zookeeper =========="
ssh hadoop104 "/opt/program/apache-zookeeper-3.6.3-bin/bin/zkServer.sh" status
;;
"restart")
echo "========== 重新启动 Zookeeper集群 =========="
echo "========== 重新启动 hadoop102中的Zookeeper =========="
ssh hadoop102 "/opt/program/apache-zookeeper-3.6.3-bin/bin/zkServer.sh" restart
echo "========== 重新启动 hadoop103中的Zookeeper =========="
ssh hadoop103 "/opt/program/apache-zookeeper-3.6.3-bin/bin/zkServer.sh" restart
echo "========== 重新启动 hadoop104中的Zookeeper =========="
ssh hadoop104 "/opt/program/apache-zookeeper-3.6.3-bin/bin/zkServer.sh" restart
;;
"stop")
echo "========== 停止 Zookeeper集群 =========="
echo "========== 停止 hadoop102中的Zookeeper =========="
ssh hadoop102 "/opt/program/apache-zookeeper-3.6.3-bin/bin/zkServer.sh" stop
echo "========== 停止 hadoop103中的Zookeeper =========="
ssh hadoop103 "/opt/program/apache-zookeeper-3.6.3-bin/bin/zkServer.sh" stop
echo "========== 停止 hadoop104中的Zookeeper =========="
ssh hadoop104 "/opt/program/apache-zookeeper-3.6.3-bin/bin/zkServer.sh" stop
;;
*)
echo "Input Args Error..."
;;
esac
授予权限
chmod 777 myzk.sh
此时即可通过此shell脚本完成集群的开启、查看、停止和重启
HBase安装
准备工作
上传安装包
解压安装包
tar -zxvf hbase-1.2.6-bin.tar.gz -C /opt/program/
配置文件
进入hbase的conf目录中,修改hbase-env.sh
cd /opt/program/hbase-1.2.6/conf
cp hbase-env.sh hbase-env.sh.bk
vim hbase-env.sh
# 配置java所在路径
export JAVA_HOME=/opt/program/jdk1.8.0_281
# 关闭其自带的zk,使用自己安装的Zookeeper
export HBASE_MANAGES_ZK=false
修改hbase-site.xml
cp hbase-site.xml hbase-site.xml.bk
vim hbase-site.xml
<!-- 指定HBase在hdfs上的集群逻辑名称 -->
<property>
<name>hbase.rootdir</name>
<value>hdfs://hadoop102:8020/hbase</value>
</property>
<!-- 指定HBase集群是分布式集群 -->
<property>
<name>hbase.cluster.distributed</name>
<value>true</value>
</property>
<!-- 指定Zookeeper集群 -->
<property>
<name>hbase.zookeeper.quorum</name>
<value>hadoop102,hadoop103,hadoop104</value>
</property>
<!-- Zookeeper元数据的存储目录,需要与zoo.cfg中配置一致 -->
<property>
<name>hbase.zookeeper.property.dataDir</name>
<value>/opt/program/apache-zookeeper-3.6.3-bin/data</value>
</property>
<!-- HBase的Master结点对应端口 -->
<property>
<name>hbase.master.port</name>
<value>16000</value>
</property>
<!-- HBase的Master Web页面访问端口,默认为16010 -->
<property>
<name>hbase.master.info.port</name>
<value>16010</value>
</property>
<!-- 指定HBase RegionServer Web页面访问端口,默认为16030 -->
<property>
<name>hbase.regionserver.info.port</name>
<value>16030</value>
</property>
<!-- 解决启动HMaster无法初始化WAL的问题 -->
<property>
<name>hbase.unsafe.stream.capability.enforce</name>
<value>false</value>
</property>
修改
vim regionservers
hadoop102
hadoop103
hadoop104
建立软连接
将Hadoop的相关配置文件建立软连接到HBase中
ln -s /opt/program/hadoop-3.3.0/etc/hadoop/core-site.xml /opt/program/hbase-1.2.6/conf/core-site.xml
ln -s /opt/program/hadoop-3.3.0/etc/hadoop/hdfs-site.xml /opt/program/hbase-1.2.6/conf/hdfs-site.xml
配置环境变量
vim /etc/profile.d/my_env.sh
#HBASE_HOME
export HBASE_HOME=/opt/program/hbase-1.2.6
export PATH=$PATH:$HBASE_HOME/bin
分发操作
将配置文件分发给hadoop103和hadoop104
xsync /etc/profile.d/my_env.sh
# 分发HBase的压缩包
xsync /opt/package/hbase-1.2.6-bin.tar.gz
# 分发HBase解压后的文件
xsync /opt/program/hbase-1.2.6/
注意:上面修改环境变量时,未刷新。因此分发之后需要在三台机器上分别执行
source /etc/profile
启动测试
确保Hadoop和Zookeeper均已启动,之后再启动HBase
# 自带集群启动
start-hbase.sh
# 自带集群关闭
stop-hbase.sh
访问hadoop102:16010,出现如下页面
HBase操作
Shell命令行
使用hbase shell进入命令行操作
hbase shell
注意:Backspace是向后删除,Delete是向前删除,与Windows中操作相反。
a)操作
# 创建student表
create 'student','score'
# 向其中插入数据
put 'student','zhangsan','score:English','69'
put 'student','zhangsan','score:Math','86'
put 'student','zhangsan','score:Computer','77'
put 'student','lisi','score:English','55'
put 'student','lisi','score:Math','100'
put 'student','lisi','score:Computer','88'
# scan指令浏览表信息
scan 'student'
b)操作
# 查询zhangsan的Computer成绩信息
get 'student','zhangsan','score:Computer'
c)操作
# 修改lisi的Math成绩为95
put 'student','lisi','score:Math','95'
# scan指令浏览表中数据的不同版本信息
scan 'student',{RAW => true,VERSION => 10}
API编程
添加数据
package com.gezq.hbase;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.*;
import org.apache.hadoop.hbase.util.Bytes;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import java.io.IOException;
/**
* @author: gezq0714
* @email: ge051799qi@163.com
* @date: 2021/5/3 - 16:41
* @description: ...
*/
public class HbaseClient {
private Admin admin;
private Connection connection;
@Before
public void init() {
try {
Configuration configuration = HBaseConfiguration.create();
// 配置其所在集群
configuration.set("hbase.zookeeper.quorum", "hadoop102,hadoop103,hadoop104");
// 获取管理员对象
connection = ConnectionFactory.createConnection(configuration);
admin = connection.getAdmin();
} catch (IOException e) {
e.printStackTrace();
}
}
@After
public void close() {
try {
if (admin != null) admin.close();
if (connection != null) connection.close();
} catch (IOException e) {
e.printStackTrace();
}
}
// 添加数据
@Test
public void testPut() {
// 要插入的数据
String tableName = "student";
String rowKey = "scofield";
String columnFamily = "score";
String column1 = "English";
String column2 = "Math";
String column3 = "Computer";
String value1 = "45";
String value2 = "89";
String value3 = "100";
try {
// 获取表对象
Table table = connection.getTable(TableName.valueOf(tableName));
// 创建Put对象
Put info = new Put(Bytes.toBytes(rowKey));
// 为Put对象赋值,多次赋值即可
info.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes(column1), Bytes.toBytes(value1));
info.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes(column2), Bytes.toBytes(value2));
info.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes(column3), Bytes.toBytes(value3));
// 插入数据
table.put(info);
// 关闭连接
table.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
执行上述方法,运行成功
再Hbase Shell中再次查看,与上面的对比,多出三行数据。
获取数据
// 获取数据
@Test
public void testGet() {
// 要查看数据的相关信息
String tableName = "student";
String rowKey = "scofield";
String columnFamily = "score";
String column = "English";
try {
// 获取表对象
Table table = connection.getTable(TableName.valueOf(tableName));
// 创建Get对象
Get info = new Get(Bytes.toBytes(rowKey));
// 指定获取的列族和列
info.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes(column));
// 获取数据
Result result = table.get(info);
// 解析数据
for (Cell cell : result.rawCells()) {
System.out.println(Bytes.toString(CellUtil.cloneRow(cell)) + "\t"
+ Bytes.toString(CellUtil.cloneFamily(cell)) + "\t"
+ Bytes.toString(CellUtil.cloneQualifier(cell)) + "\t"
+ Bytes.toString(CellUtil.cloneValue(cell)));
}
// 关闭连接
table.close();
} catch (IOException e) {
e.printStackTrace();
}
}
更多推荐
所有评论(0)