# hadoop前言
# Hadoop试验集群的部署结构
pastedGraphic.png
# 系统和组建的依赖关系
 # 生产环境的部署结构
pastedGraphic_1.png
# Day1 搭建伪分布实验环境
# 准备软件
# vmare 9.0.2
#
 操作系统 CentOS 6.4
# jdk-6u45-linux-i586.bin
# hadoop-1.1.2.tar.gz
# 开始搭建环境一 (基础环境)
# 在虚拟机上装好 CentOS 6.4
# VM
的网络连接方式选择NAT方式
# 新建hadoop组跟用户(密码:hadoop)
[root@localhost home]# groupadd hadoop
[root@localhost home]# useradd -g hadoop hadoop
[root@localhost home]# passwd hadoop
Changing password for user hadoop.
New password:
BAD PASSWORD: it is based on a dictionary word
BAD PASSWORD: is too simple
Retype new password:
passwd: all authentication tokens updated successfully.
[root@localhost home]#
# 授时服务(时间同步)
[root@localhost home]# crontab –e
pastedGraphic_2.png
[root@localhost home]# crontab -l
0 1 * * * /usr/sbin/ntpdate cn.pool.ntp.org
# jdk-6u45-linux-i586.bin安装(没有x权限,修改权限后执行)
[root@localhost java]# pwd
/usr/local/java
[root@localhost java]# ll
total 130600
-rwxrw-rw-. 1 root root 61927560 Jun  7  2013 hadoop-1.1.2.tar.gz
-rw-r--r--. 1 root root 71799552 Oct 14 14:33 jdk-6u45-linux-i586.bin
[root@localhost java]# chmod u+x jdk-6u45-linux-i586.bin
[root@localhost java]# ll
total 130600
-rwxrw-rw-. 1 root root 61927560 Jun  7  2013 hadoop-1.1.2.tar.gz
-rwxr--r--. 1 root root 71799552 Oct 14 14:33 jdk-6u45-linux-i586.bin
[root@localhost java]# ./jdk-6u45-linux-i586.bin
# 配置环境变量(不在profile里面配置,新建一个java.sh文件,里面配置java的环境变量,profile文件会自动加载这个java.sh文件)

[root@localhost jdk1.6.0_45]# pwd
/usr/local/java/jdk1.6.0_45
[root@localhost jdk1.6.0_45]# vi /etc/profile.d/java.sh
pastedGraphic_3.png
[root@localhost jdk1.6.0_45]#
[root@localhost jdk1.6.0_45]# java
bash: java: command not found
[root@localhost jdk1.6.0_45]# source /etc/profile 使java.sh文件配置生效
[root@localhost jdk1.6.0_45]# java -version
java version "1.6.0_45"
Java(TM) SE Runtime Environment (build 1.6.0_45-b06)
Java HotSpot(TM) Client VM (build 20.45-b01, mixed mode, sharing)
[root@localhost jdk1.6.0_45]# javac -version
javac 1.6.0_45
[root@localhost jdk1.6.0_45]#
# 主机名修改
[root@localhost jdk1.6.0_45]# vi /etc/sysconfig/network
pastedGraphic_4.png
[root@localhost jdk1.6.0_45]# hostname
localhost.localdomain
[root@localhost jdk1.6.0_45]#
在这里需要logout一次,主机名才会生效
# IP配置
[root@localhost Desktop]# vi /etc/sysconfig/network-scripts/ifcfg-eth0
pastedGraphic_5.png
[root@localhost Desktop]# ifconfig
eth1      Link encap:Ethernet  HWaddr 00:50:56:38:E4:31 
          inet addr:192.168.209.100  Bcast:192.168.209.255  Mask:255.255.255.0
          inet6 addr: fe80::250:56ff:fe38:e431/64 Scope:Link
          UP BROADCAST RUNNING MULTICAST  MTU:1500  Metric:1
          RX packets:256 errors:0 dropped:0 overruns:0 frame:0
          TX packets:140 errors:0 dropped:0 overruns:0 carrier:0
          collisions:0 txqueuelen:1000
          RX bytes:66995 (65.4 KiB)  TX bytes:11119 (10.8 KiB)
          Interrupt:19 Base address:0x2024

lo        Link encap:Local Loopback 
          inet addr:127.0.0.1  Mask:255.0.0.0
          inet6 addr: ::1/128 Scope:Host
          UP LOOPBACK RUNNING  MTU:16436  Metric:1
          RX packets:18 errors:0 dropped:0 overruns:0 frame:0
          TX packets:18 errors:0 dropped:0 overruns:0 carrier:0
          collisions:0 txqueuelen:0
          RX bytes:1128 (1.1 KiB)  TX bytes:1128 (1.1 KiB)

[root@localhost Desktop]#
# hosts文件修改(能ping通就成功了)
[root@localhost Desktop]# vi /etc/hosts
pastedGraphic_6.png
[root@localhost Desktop]# ping master
PING master (192.168.209.100) 56(84) bytes of data.
64 bytes from master (192.168.209.100): icmp_seq=1 ttl=64 time=0.488 ms
64 bytes from master (192.168.209.100): icmp_seq=2 ttl=64 time=0.083 ms
^C
--- master ping statistics ---
2 packets transmitted, 2 received, 0% packet loss, time 1374ms
rtt min/avg/max/mdev = 0.083/0.285/0.488/0.203 ms
[root@localhost Desktop]#
# 防火墙关闭
[root@localhost Desktop]# service iptables status
Table: filter
Chain INPUT (policy ACCEPT)
num  target     prot opt source               destination        
1    ACCEPT     all  --  0.0.0.0/0            0.0.0.0/0           state RELATED,ESTABLISHED
2    ACCEPT     icmp --  0.0.0.0/0            0.0.0.0/0          
3    ACCEPT     all  --  0.0.0.0/0            0.0.0.0/0          
4    ACCEPT     tcp  --  0.0.0.0/0            0.0.0.0/0           state NEW tcp dpt:22
5    REJECT     all  --  0.0.0.0/0            0.0.0.0/0           reject-with icmp-host-prohibited

Chain FORWARD (policy ACCEPT)
num  target     prot opt source               destination        
1    REJECT     all  --  0.0.0.0/0            0.0.0.0/0           reject-with icmp-host-prohibited

Chain OUTPUT (policy ACCEPT)
num  target     prot opt source               destination        

[root@localhost Desktop]# service iptables stop
iptables: Flushing firewall rules:                         [  OK  ]
iptables: Setting chains to policy ACCEPT: filter          [  OK  ]
iptables: Unloading modules:                               [  OK  ]
 [root@localhost Desktop]# chkconfig iptables --list
iptables       0:off1:off2:on3:on4:on5:on6:off
[root@localhost Desktop]# chkconfig iptables off
[root@localhost Desktop]# chkconfig iptables --list
iptables       0:off1:off2:off3:off4:off5:off6:off
[root@localhost Desktop]#
[root@localhost Desktop]# service iptables status
iptables: Firewall is not running.
# SSH 无密钥登录(切换到hadoop用户下)
切换到hadoop用户下
[root@localhost ~]# su hadoop
生成公钥跟私钥(会有3次提示,一直回车即可)
[hadoop@localhost root]$ cd
[hadoop@localhost ~]$ pwd
/home/hadoop
[hadoop@localhost ~]$ ssh-keygen -t rsa
Generating public/private rsa key pair.
Enter file in which to save the key (/home/hadoop/.ssh/id_rsa):
Enter passphrase (empty for no passphrase):
Enter same passphrase again:
Your identification has been saved in /home/hadoop/.ssh/id_rsa.
Your public key has been saved in /home/hadoop/.ssh/id_rsa.pub.
The key fingerprint is:
33:09:0b:6d:30:f5:07:10:40:0d:be:99:cf:a9:a4:92 hadoop@localhost.localdomain
The key's randomart image is:
+--[ RSA 2048]----+
|   .*=+o.        |
|   . +.. .       |
|    o + . .      |
|     * o o       |
|    + . S        |
|     o . o       |
| .  . +          |
|E  o .           |
| .. .            |
+-----------------+
[hadoop@localhost ~]$
[hadoop@localhost ~]$ ls
[hadoop@localhost ~]$ ll -a
total 36
drwx------. 5 hadoop hadoop 4096 Feb 28 14:19 .
drwxr-xr-x. 3 root   root   4096 Feb 28 13:47 ..
-rw-------. 1 hadoop hadoop   79 Feb 28 14:23 .bash_history
-rw-r--r--. 1 hadoop hadoop   18 Feb 22  2013 .bash_logout
-rw-r--r--. 1 hadoop hadoop  176 Feb 22  2013 .bash_profile
-rw-r--r--. 1 hadoop hadoop  124 Feb 22  2013 .bashrc
drwxr-xr-x. 2 hadoop hadoop 4096 Nov 12  2010 .gnome2
drwxr-xr-x. 4 hadoop hadoop 4096 Feb 28 06:11 .mozilla
drwx------. 2 hadoop hadoop 4096 Feb 28 14:23 .ssh
[hadoop@localhost ~]$ cd .ssh/
[hadoop@localhost .ssh]$ ls
id_rsa  id_rsa.pub
[hadoop@localhost .ssh]$ ll
total 8
-rw-------. 1 hadoop hadoop 1671 Feb 28 14:23 id_rsa
-rw-r--r--. 1 hadoop hadoop  410 Feb 28 14:23 id_rsa.pub
[hadoop@localhost .ssh]$ cp id_rsa.pub authorized_keys
[hadoop@localhost .ssh]$ ll
total 12
-rw-r--r--. 1 hadoop hadoop  410 Feb 28 14:26 authorized_keys
-rw-------. 1 hadoop hadoop 1671 Feb 28 14:23 id_rsa
-rw-r--r--. 1 hadoop hadoop  410 Feb 28 14:23 id_rsa.pub
[hadoop@localhost .ssh]$
ssh
登录
[hadoop@localhost .ssh]$ ssh master
The authenticity of host 'master (192.168.209.100)' can't be established.
RSA key fingerprint is f0:92:0b:08:0d:9b:72:0d:ca:99:30:0a:40:7e:05:ae.
SSH
第一次登录有这个提示,回车就好,然后直接ssh master 不需要密码就成功了
Are you sure you want to continue connecting (yes/no)? yes
Warning: Permanently added 'master,192.168.209.100' (RSA) to the list of known hosts.
[hadoop@localhost ~]$ exit
logout
Connection to master closed.
[hadoop@localhost .ssh]$ ssh master
Last login: Fri Feb 28 14:27:32 2014 from master
[hadoop@localhost ~]$
退出ssh登录,进行接下来的环境配置
[hadoop@localhost ~]$ exit
Logout
# 检查基础环境
#测试命令
# java
# javac
# ifconfig
# ping master
# ssh master
# jps
# echo $JAVA_HOME
# echo $HADOOP_HOME
# hadoop
# hostname
# 开始搭建环境二 (hadoop环境)
# hadoop-1.1.2.tar.gz的安装(切换成root用户)
[root@localhost hadoop]# pwd
/usr/local/hadoop
[root@localhost hadoop]# ls
hadoop-1.1.2.tar.gz
[root@localhost hadoop]# tar -zxvf hadoop-1.1.2.tar.gz
# hadoop环境变量的配置(直接在java.sh文件中配置了)
[root@localhost hadoop-1.1.2]# vi /etc/profile.d/java.sh
pastedGraphic_7.png
[root@localhost hadoop-1.1.2]# source /etc/profile
[root@localhost hadoop-1.1.2]# hadoop fs -ls /
Found 24 items
dr-xr-xr-x   - root root      12288 2014-02-28 14:21 /lib
drwxr-xr-x   - root root       4096 2014-02-28 06:34 /var
drwxr-xr-x   - root root       3880 2014-02-28 14:06 /dev
drwxr-xr-x   - root root       4096 2014-02-27 23:12 /media
drwxrwxrwx   - root root       4096 2014-02-28 14:37 /tmp
drwxr-xr-x   - root root       4096 2014-02-28 13:33 /data
drwxr-xr-x   - root root       4096 2014-02-28 13:47 /home
dr-xr-xr-x   - root root          0 2014-02-28 13:13 /proc
drwxr-xr-x   - root root       4096 2014-02-28 23:05 /mnt
drwxr-xr-x   - root root          0 2014-02-28 13:13 /sys
drwxr-xr-x   - root root       4096 2011-09-23 19:47 /srv
drwxr-xr-x   - root root          0 2014-02-28 13:13 /selinux
drwxr-xr-x   - root root          0 2014-02-28 13:15 /net
drwx------   - root root       4096 2014-02-28 06:41 /.dbus
dr-xr-xr-x   - root root      12288 2014-02-28 14:22 /sbin
drwx------   - root root      16384 2014-02-28 06:03 /lost+found
dr-xr-xr-x   - root root       4096 2014-02-28 14:22 /bin
drwxr-xr-x   - root root          0 2014-02-28 13:15 /misc
drwxr-xr-x   - root root       4096 2014-02-28 06:35 /opt
dr-xr-x---   - root root       4096 2014-02-28 14:07 /root
dr-xr-xr-x   - root root       1024 2014-02-28 06:36 /boot
drwxr-xr-x   - root root      12288 2014-02-28 14:22 /etc
drwxr-xr-x   - root root       4096 2014-02-28 23:03 /usr
-rw-r--r--   1 root root          0 2014-02-28 13:14 /.autofsck

# 修改hadoop目录的权限
[root@master local]# pwd
/usr/local
[root@master local]# ll
total 44
drwxr-xr-x. 2 root   root   4096 Sep 23  2011 bin
drwxr-xr-x. 2 root   root   4096 Sep 23  2011 etc
drwxr-xr-x. 2 root   root   4096 Sep 23  2011 games
drwxr-xr-x. 3 hadoop hadoop 4096 Feb 28 14:34 hadoop
drwxr-xr-x. 2 root   root   4096 Sep 23  2011 include
drwxr-xr-x. 3 root   root   4096 Feb 28 14:51 java
drwxr-xr-x. 2 root   root   4096 Sep 23  2011 lib
drwxr-xr-x. 2 root   root   4096 Sep 23  2011 libexec
drwxr-xr-x. 2 root   root   4096 Sep 23  2011 sbin
drwxr-xr-x. 5 root   root   4096 Feb 28 06:11 share
drwxr-xr-x. 2 root   root   4096 Sep 23  2011 src
[root@master local]# chown -R hadoop:hadoop hadoop/
[root@master local]# ll
total 44
drwxr-xr-x. 2 root   root   4096 Sep 23  2011 bin
drwxr-xr-x. 2 root   root   4096 Sep 23  2011 etc
drwxr-xr-x. 2 root   root   4096 Sep 23  2011 games
drwxr-xr-x. 3 hadoop hadoop 4096 Feb 28 14:34 hadoop
drwxr-xr-x. 2 root   root   4096 Sep 23  2011 include
drwxr-xr-x. 3 root   root   4096 Feb 28 14:51 java
drwxr-xr-x. 2 root   root   4096 Sep 23  2011 lib
drwxr-xr-x. 2 root   root   4096 Sep 23  2011 libexec
drwxr-xr-x. 2 root   root   4096 Sep 23  2011 sbin
drwxr-xr-x. 5 root   root   4096 Feb 28 06:11 share
drwxr-xr-x. 2 root   root   4096 Sep 23  2011 src
# 切换到hadoop用户
# 修改hadoop-env.sh文件

[hadoop@master conf]$ vi hadoop-env.sh
pastedGraphic_8.png
# 修改core-site.xml文件
[hadoop@master conf]$ vi core-site.xml
<?xml version="1.0"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
                                                                               
<!-- Put site-specific property overrides in this file. -->
                                                                               
<configuration>
<property>
<name>fs.default.name</name>
<value>hdfs://master:9000</value>
</property>
<property>
<name>fs.checkpoint.dir</name>
<value>/data/hadoop/hdfs/namesecondary</value>
</property>
<property>
<name>fs.checkpoint.period</name>
<value>1800</value>
</property>
<property>
<name>fs.checkpoint.size</name>
<value>33554432</value>
</property>
<property>
<name>io.compression.codecs</name>
<value>org.apache.hadoop.io.compress.DefaultCodec,org.apache.hadoop.io.compress.GzipCodec</value>
</property>
<property>
<name>fs.trash.interval</name>
<value>1440</value>
</property>
</configuration>
# 修改hdfs-site.xml文件
<?xml version="1.0"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>

<!-- Put site-specific property overrides in this file. -->

<configuration>
<property>
<name>dfs.name.dir</name>
<value>/data/hadoop/hdfs/name</value>
<description>
</description>
</property>
<property>
<name>dfs.data.dir</name>
<value>/data/hadoop/hdfs/data</value>
<description>
</description>
</property>
<property>
<name>dfs.http.address</name>
<value>master:50070</value>
</property>
<property>
<name>dfs.secondary.http.address</name>
<value>master:50090</value>
</property>
<property>
<name>dfs.replication</name>
<value>1</value>
</property>
<property>
<name>dfs.datanode.du.reserved</name>
<value>1073741824</value>
</property>
<property>
<name>dfs.block.size</name>
<value>134217728</value>
</property>
<property>
<name>dfs.permissions</name>
<value>false</value>
</property>
</configuration>
# 修改 mapred-site.xml文件
<?xml version="1.0"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
                                                                               
<!-- Put site-specific property overrides in this file. -->

<configuration>
<property>
<name>mapred.job.tracker</name>
<value>master:9001</value>
</property>
<property>
<name>mapred.local.dir</name>
<value>/data/hadoop/mapred/mrlocal</value>
<final>true</final>
</property>
<property>
<name>mapred.system.dir</name>
<value>/data/hadoop/mapred/mrsystem</value>
<final>true</final>
</property>
<property>
<name>mapred.tasktracker.map.tasks.maximum</name>
<value>2</value>
<final>true</final>
</property>
<property>
<name>mapred.tasktracker.reduce.tasks.maximum</name>
<value>1</value>
<final>true</final>
</property>

<property>
<name>io.sort.mb</name>
<value>32</value>
<final>true</final>
</property>

<property>
<name>mapred.child.java.opts</name>
<value>-Xmx64M</value>
</property>

<property>
<name>mapred.compress.map.output</name>
<value>true</value>
</property>
</configuration>
# 开始搭建环境三(启动各个节点)
# 创建data目录,修改权限为hadoop
[root@master /]# mkdir data
[root@master /]# ll
total 98
drwxr-xr-x.   2 root root  4096 Feb 28 13:33 data
 [root@master /]# chown hadoop:hadoop data/
[root@master /]# ll
total 98
drwxr-xr-x.   2 hadoop hadoop  4096 Feb 28 13:33 data
# 启动namenode节点hadoop-daemon.sh start namenode
# 在这之前先格式化namenode节点,否则启动失败,会报错
[hadoop@master name]$ hadoop namenode -format
[hadoop@master name]$ hadoop-daemon.sh start namenode
starting namenode, logging to /usr/local/hadoop/hadoop-1.1.2/libexec/../logs/hadoop-hadoop-namenode-master.out
[hadoop@master name]$ jps
4234 NameNode
4304 Jps
[hadoop@master name]$
使用这个命令查看启动日志有没有异常,一般的异常信息再后面写了
[hadoop@master name]$ tail -100f  /usr/local/hadoop/hadoop-1.1.2/libexec/../logs/hadoop-hadoop-datanode-master.log
# 启动datanode节点hadoop-daemon.sh start datanode
[hadoop@master name]$ hadoop-daemon.sh start datanode
starting datanode, logging to /usr/local/hadoop/hadoop-1.1.2/libexec/../logs/hadoop-hadoop-datanode-master.out
[hadoop@master name]$ jps
4335 DataNode
4234 NameNode
4375 Jps
[hadoop@master name]$ tail -100f /usr/local/hadoop/hadoop-1.1.2/libexec/../logs/hadoop-hadoop-secondarynamenode-master.log

# 启动secnodarynamenode 节点secondarynamenode
[hadoop@master name]$ hadoop-daemon.sh start secondarynamenode
starting secondarynamenode, logging to /usr/local/hadoop/hadoop-1.1.2/libexec/../logs/hadoop-hadoop-secondarynamenode-master.out
[hadoop@master name]$ jps
4335 DataNode
4234 NameNode
4448 SecondaryNameNode
4486 Jps
[hadoop@master name]$ tail -100f /usr/local/hadoop/hadoop-1.1.2/libexec/../logs/hadoop-hadoop-secondarynamenode-master.log
# 启动jobtracker节点 hadoop-daemon.sh start jobtracker
[hadoop@master name]$ hadoop-daemon.sh start jobtracker
starting jobtracker, logging to /usr/local/hadoop/hadoop-1.1.2/libexec/../logs/hadoop-hadoop-jobtracker-master.out
[hadoop@master name]$ tail -100f /usr/local/hadoop/hadoop-1.1.2/libexec/../logs/hadoop-hadoop-jobtracker-master.log
[hadoop@master name]$ jps
4537 JobTracker
4335 DataNode
4612 Jps
4234 NameNode
4448 SecondaryNameNode
[hadoop@master name]$
# 启动tasktracker 节点hadoop-daemon.sh start tasktracker
[hadoop@master name]$ hadoop-daemon.sh start tasktracker
starting tasktracker, logging to /usr/local/hadoop/hadoop-1.1.2/libexec/../logs/hadoop-hadoop-tasktracker-master.out
[hadoop@master name]$ jps
4537 JobTracker
4689 Jps
4335 DataNode
4234 NameNode
4448 SecondaryNameNode
4652 TaskTracker
[hadoop@master name]$ tail -100f  /usr/local/hadoop/hadoop-1.1.2/libexec/../logs/hadoop-hadoop-tasktracker-master.log
# 启动hadoop自带的测试例子,通过的话环境就搭建成功了
# hadoop jar hadoop-examples-1.1.2.jar pi 10 100
[hadoop@master hadoop-1.1.2]$ hadoop jar hadoop-examples-1.1.2.jar pi 10 100
Number of Maps  = 10
Samples per Map = 100
Wrote input for Map #0
Wrote input for Map #1
Wrote input for Map #2
Wrote input for Map #3
Wrote input for Map #4
Wrote input for Map #5
Wrote input for Map #6
Wrote input for Map #7
Wrote input for Map #8
Wrote input for Map #9
Starting Job
14/02/28 16:04:00 INFO mapred.FileInputFormat: Total input paths to process : 10
14/02/28 16:04:01 INFO mapred.JobClient: Running job: job_201402281553_0001
14/02/28 16:04:02 INFO mapred.JobClient:  map 0% reduce 0%
14/02/28 16:04:22 INFO mapred.JobClient:  map 20% reduce 0%
14/02/28 16:04:38 INFO mapred.JobClient:  map 40% reduce 0%
14/02/28 16:04:50 INFO mapred.JobClient:  map 60% reduce 0%
14/02/28 16:04:51 INFO mapred.JobClient:  map 60% reduce 16%
14/02/28 16:05:01 INFO mapred.JobClient:  map 70% reduce 20%
14/02/28 16:05:02 INFO mapred.JobClient:  map 80% reduce 20%
14/02/28 16:05:07 INFO mapred.JobClient:  map 80% reduce 26%
14/02/28 16:05:11 INFO mapred.JobClient:  map 90% reduce 26%
14/02/28 16:05:12 INFO mapred.JobClient:  map 100% reduce 26%
14/02/28 16:05:16 INFO mapred.JobClient:  map 100% reduce 33%
14/02/28 16:05:18 INFO mapred.JobClient:  map 100% reduce 100%
14/02/28 16:05:22 INFO mapred.JobClient: Job complete: job_201402281553_0001
14/02/28 16:05:23 INFO mapred.JobClient: Counters: 30
14/02/28 16:05:23 INFO mapred.JobClient:   Job Counters
14/02/28 16:05:23 INFO mapred.JobClient:     Launched reduce tasks=1
14/02/28 16:05:23 INFO mapred.JobClient:     SLOTS_MILLIS_MAPS=130917
14/02/28 16:05:23 INFO mapred.JobClient:     Total time spent by all reduces waiting after reserving slots (ms)=0
14/02/28 16:05:23 INFO mapred.JobClient:     Total time spent by all maps waiting after reserving slots (ms)=0
14/02/28 16:05:23 INFO mapred.JobClient:     Launched map tasks=10
14/02/28 16:05:23 INFO mapred.JobClient:     Data-local map tasks=10
14/02/28 16:05:23 INFO mapred.JobClient:     SLOTS_MILLIS_REDUCES=56177
14/02/28 16:05:23 INFO mapred.JobClient:   File Input Format Counters
14/02/28 16:05:23 INFO mapred.JobClient:     Bytes Read=1180
14/02/28 16:05:23 INFO mapred.JobClient:   File Output Format Counters
14/02/28 16:05:23 INFO mapred.JobClient:     Bytes Written=97
14/02/28 16:05:23 INFO mapred.JobClient:   FileSystemCounters
14/02/28 16:05:23 INFO mapred.JobClient:     FILE_BYTES_READ=68
14/02/28 16:05:23 INFO mapred.JobClient:     HDFS_BYTES_READ=2380
14/02/28 16:05:23 INFO mapred.JobClient:     FILE_BYTES_WRITTEN=555675
14/02/28 16:05:23 INFO mapred.JobClient:     HDFS_BYTES_WRITTEN=215
14/02/28 16:05:23 INFO mapred.JobClient:   Map-Reduce Framework
14/02/28 16:05:23 INFO mapred.JobClient:     Map output materialized bytes=260
14/02/28 16:05:23 INFO mapred.JobClient:     Map input records=10
14/02/28 16:05:23 INFO mapred.JobClient:     Reduce shuffle bytes=260
14/02/28 16:05:23 INFO mapred.JobClient:     Spilled Records=40
14/02/28 16:05:23 INFO mapred.JobClient:     Map output bytes=180
14/02/28 16:05:23 INFO mapred.JobClient:     Total committed heap usage (bytes)=495345664
14/02/28 16:05:23 INFO mapred.JobClient:     CPU time spent (ms)=10760
14/02/28 16:05:23 INFO mapred.JobClient:     Map input bytes=240
14/02/28 16:05:23 INFO mapred.JobClient:     SPLIT_RAW_BYTES=1200
14/02/28 16:05:23 INFO mapred.JobClient:     Combine input records=0
14/02/28 16:05:23 INFO mapred.JobClient:     Reduce input records=20
14/02/28 16:05:23 INFO mapred.JobClient:     Reduce input groups=20
14/02/28 16:05:23 INFO mapred.JobClient:     Combine output records=0
14/02/28 16:05:23 INFO mapred.JobClient:     Physical memory (bytes) snapshot=760815616
14/02/28 16:05:23 INFO mapred.JobClient:     Reduce output records=0
14/02/28 16:05:23 INFO mapred.JobClient:     Virtual memory (bytes) snapshot=2558492672
14/02/28 16:05:23 INFO mapred.JobClient:     Map output records=20
Job Finished in 83.369 seconds
Estimated value of Pi is 3.14800000000000000000
[hadoop@master hadoop-1.1.2]$
# 搭建环境可能会出现的问题
# 看最后面妳那伊抹微笑搭建hadoop环境出现的问题(仅供参考)
# 到了这一步,你的hadoop伪分布式环境搭建成功了(妳那伊抹微笑在这里祝贺你)
# 总结
今天其实也没什么总结的,按照步骤一步步做就行了,细心,有耐心。
老师的话直接使用root用户搭建环境,这样不会涉及权限问题,饿这里使用的是hadoop用户,会涉及权限问题,有错误才有收获,看饿最后面出现的异常就知道了。
还有就是老师用的是start-all.sh方式启动的hadoop,这个方式对于初学者来说并不好,应该使用hadoop-daemon.sh start *一个个分别启动,然后再使用tail 命令查看日志,一步步的分析,这样才好,等熟练操作以后再使用start-all.sh命令一次启动



















































# Day2 介绍HDFS体系结构及shelljava操作方式
# 复习day01
# hadoop是适合大数据的分布式存储和计算的平台
# hadoop
核心组成由hdfsmapreduce组成
# hdfs
是主从式结构,主节点只有一个,是namenode:从节点有很多个
# 分布式文件系统与HDFS (HDFS体系结构与基本概念)
# Distributed File System
# 数据量越来越多,在一个操作系统管辖的范围存不下了,那么就分配到更多的操作系统管理的磁盘中,但是不方便管理和维护,因此迫切需要一种系统来管理多台机器上的文件,这就是分布式文件管理系统 。
#
 是一种允许文件通过网络在多台主机上分享的文件系统,可让多机器上的多用户分享文件和存储空间。
#
 通透性。让实际上是通过网络来访问文件的动作,由程序与用户看来,就像是访问本地的磁盘一般。
#
 容错。即使系统中有某些节点脱机,整体来说系统仍然可以持续运作而不会有数据损失。
#
 分布式文件管理系统很多,hdfs只是其中一种。适用于一次写入多次查询的情况,不支持并发写情况,小文件不合适。

# HDFSshell操作
# 调用文件系统(FS)Shell命令应使用 bin/hadoop fs 的形式。
# 所有的FS shell命令使用URI路径作为参数。
# URI格式是scheme://authority/pathHDFSschemehdfs,对本地文件系统,schemefile。其中schemeauthority参数都是可选的,如果未加指定,就会使用配置中指定的默认scheme
# 例如:/parent/child可以表示成hdfs://namenode:namenodePort/parent/child,或者更简单的/parent/child(假设配置文件是namenode:namenodePort
# 大多数FS Shell命令的行为和对应的Unix Shell命令类似。

# HDFS常用命令
# -help [cmd]//显示命令的帮助信息
# -ls(r) <path>//
显示当前目录下所有文件
# -du(s) <path>//
显示目录中所有文件大小
# -count[-q] <path>//
显示目录中文件数量
# -mv <src> <dst>//
移动多个文件到目标目录
# -cp <src> <dst>//
复制多个文件到目标目录
# -rm(r)//
删除文件()
# -put <localsrc> <dst>//
本地文件复制到hdfs
# -copyFromLocal//
put
# -moveFromLocal//
从本地文件移动到hdfs
# -get [-ignoreCrc] <src> <localdst>//
复制文件到本地,可以忽略crc校验
# -getmerge <src> <localdst>//
将源目录中的所有文件排序合并到一个文件中
# -cat <src>//
在终端显示文件内容
# -text <src>//
在终端显示文件内容
# -copyToLocal [-ignoreCrc] <src> <localdst>//
复制到本地
# -moveToLocal <src> <localdst>
# -mkdir <path>//
创建文件夹
# -touchz <path>//
创建一个空文件
# HDFS shell练习
# hadoop fs -ls /  查看HDFS根目录
# hadoop fs -mkdir /test
 在根目录创建一个目录test
# hadoop fs -mkdir /test1
 在根目录创建一个目录test1
# echo -e 'hadoop second lesson' >test.txt
# hadoop fs -put ./test.txt /test
 或 # hadoop fs -copyFromLocal ./test.txt /test
# cd ..
# hadoop fs -get /test/test.txt .  
 #hadoop fs -getToLocal /test/test.txt .
# hadoop fs -cp /test/test.txt /test1
# hadoop fs -rm /test1/test.txt
# hadoop fs -mv /test/test.txt /test1
# hadoop fs -rmr /test1  
# NameNode详解
# 是整个文件系统的管理节点。它维护着整个文件系统的文件目录树,文件/目录的元信息和每个文件对应的数据块列表。接收用户的操作请求。
(见源码)
# 文件包括:
# fsimage:元数据镜像文件。存储某一时段NameNode内存元数据信息。
# edits:操作日志文件。
# fstime:保存最近一次checkpoint的时间
# 以上这些文件是保存在linux的文件系统中。
/data/hadoop/hdfs/name
[hadoop@master name]$ ls
current  image  in_use.lock  previous.checkpoint
[hadoop@master name]$ cd current/
[hadoop@master current]$ ls
edits  fsimage  fstime  VERSION
如果想使用命令hadoop namenode –format命令格式化namenode 发现这个namenode下面的in_use.lock 就不能格式化
More in_use.lock 发现是空的 ----- 表示这个文件存在 就是在使用中
说一下current 进入这个文件夹
有四个文件 有一个VERSION
打开这个
namespace ---- 命令空间是相对独立的名称空间
namespaceID=-xxxx ---- 指的是文件系统的ID
datanode中的块一定要和namenode之间有匹配关系 ----- 如果两边的namespaceID相同的 --- 这样 通过namespaceID相同 就匹配datanode和
多次执行 hadoop -format 多次之后  出错 因为namespaceID被格式化之后改变了 和datanode中的namespaceID对应不上  ----- 所以第一次之后就出错了

# 提示:in_use.lock :如果你想格式化这个namenode的话,hadoop会找这个文件,如果存在的话会提示你Re-format filesystem in /data/hadoop/hdfs/name ? (Y or N) ,以此为凭据知道namenode是否已经格式化过。
# namenodesecondarynamenode的执行过程
pastedGraphic_9.png
# fsimage
很重要,所以被备份了
core-default.xml
<property>
  <name>hadoop.tmp.dir</name>
  <value>/tmp/hadoop-${user.name}</value>
  <description>A base for other temporary directories.</description>
</property>
hdfs-default.xml
<property>
  <name>dfs.name.dir</name>
  <value>${hadoop.tmp.dir}/dfs/name</value>
  <description>Determines where on the local filesystem the DFS name node
      should store the name table(fsimage).  If this is a comma-delimited list
      of directories then the name table is replicated in all of the
      directories, for redundancy. </description>
</property>
# HA的一个解决方案。但不支持热备。配置即可。
(见源码)
#
 执行过程:从NameNode上下载元数据信息(fsimage,edits),然后把二者合并,生成新的# fsimage,在本地保存,并将其推送到NameNode,同时重置NameNodeedits.
默认在安装在NameNode节点上,但这样...不安全!
# Datanode的数据并不是无限存储的,决定与namenodefsimage,因为fsimage是放在内存中的,内存不可能上百T吧!(内存中放不下了,fsimage设计的时候是被放在内存中的)namenode的元数据信息占内存。
namenode加内存
2、尽量传大文件。
3SequenceFile
4
、增加block块的大小(这个看环境,就是看一般上传的文件有多大来设计的)


# java接口及常用api
package com.yting.hadoop.hdfs;

import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.net.URI;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;

/**
 *
 使用FileSystem
 *
 * eclipse
创建方法快捷键Shift+Alt+M 
 * eclipse
创建局部变量名称快捷键Shift+Alt+L
 *
 */
public class HDFSJavaOperation {
public static final String HDFS_PATH = "hdfs://hadoop0:9000";
public static final String DIR_PATH = "/d1000";
public static final String FILE_PATH = "/d1000/f1000";

public static void main(String[] args) throws Exception {
final FileSystem fileSystem = FileSystem.get(new URI(HDFS_PATH), new Configuration());

//
创建文件夹
//makeDirectory(fileSystem);

//
上次文件
uploadData(fileSystem);

//
下载文件 
//downloadData(fileSystem);

//
删除文件()
//deleteFile(fileSystem);
}

private static void deleteFile(final FileSystem fileSystem)
throws IOException {
fileSystem.delete(new Path(FILE_PATH), true);
}

private static void downloadData(final FileSystem fileSystem)
throws IOException {
final FSDataInputStream in = fileSystem.open(new Path(FILE_PATH));
IOUtils.copyBytes(in, System.out, 1024, true);
}

private static void makeDirectory(final FileSystem fileSystem)
throws IOException {
fileSystem.mkdirs(new Path(DIR_PATH));
}

private static void uploadData(final FileSystem fileSystem)
throws IOException, FileNotFoundException {
final FSDataOutputStream out = fileSystem.create(new Path(FILE_PATH));

final FileInputStream in = new FileInputStream("c:/log.txt");
IOUtils.copyBytes(in, out, 1024, true);
}

# ---------------------------加深拓展----------------------
# RPC
调用
Client发起调用请求,请求调用Server端的对象的方法

# MyRpcServer
package com.yting.hadoop.rpc;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.ipc.RPC.Server;

public class MyRpcServer {
public static String BIND_ADDRESS = "localhost";   // 绑定地址
public static int PORT = 1129;                    // 绑定端口

/** Construct an RPC server. 构造一个RPCServer
     * @param instance the instance whose methods will be called 实例中的方法被客户端调用的实例
     * @param conf the configuration to use 使用的配置
     * @param bindAddress the address to bind on to listen for connection 绑定的地址用于监听链接的到来
     * @param port the port to listen for connections on 端口也是用于监听链接的到来
* @throws Exception 
     */
public static void main(String[] args) throws Exception {
MyInstance myInstance =
 new MyInstanceImpl();
final Server server = RPC.getServer(myInstance, BIND_ADDRESS, PORT, new Configuration());
server.start();
}
}
# MyRpcClient
package com.yting.hadoop.rpc;

import java.net.InetSocketAddress;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.ipc.RPC;

public class MyRpcClient {
public static void main(String[] args) throws Exception {
/** Construct a client-side proxy object that implements the named protocol,
  * talking to a server at the named address. */
/*
* Class<? extends VersionedProtocol> protocol,
    * long clientVersion,
    * InetSocketAddress addr,
    * Configuration conf
*/
MyInstance proxy = (MyInstance) RPC.
waitForProxy(MyInstance.class, MyInstance.versionID, new InetSocketAddress(MyRpcServer.BIND_ADDRESS, MyRpcServer.PORT), new Configuration());
String retVal = proxy.hello(
"world");
System.
out.println("客户端调用结果:" + retVal);
RPC.stopProxy(proxy);
}
}
# MyInstance接口
package com.yting.hadoop.rpc;

import org.apache.hadoop.ipc.VersionedProtocol;

public interface MyInstance extends VersionedProtocol {
public static final long versionID = 1234567L;

public abstract String hello(String name);

}
# MyInstanceImpl 实现
package com.yting.hadoop.rpc;

import java.io.IOException;

public class MyInstanceImpl  implements MyInstance{
/* (non-Javadoc)
* @see com.yting.hadoop.rpc.MyInstance#hello(java.lang.String)
*/
@Override
public String hello(String name) {
System.
out.println("我被调用了、、、");
return "hello" + name;
}

@Override
public long getProtocolVersion(String protocol, long clientVersion) throws IOException {
return MyInstance.versionID;
}
}
# 运行结果
客户端的运行结果
客户端调用结果:helloworld
服务端的运行结果
14/03/02 15:35:42 INFO ipc.Server: Starting SocketReader
14/03/02 15:35:42 INFO ipc.Server: IPC Server Responder: starting
14/03/02 15:35:42 INFO ipc.Server: IPC Server listener on 1129: starting
14/03/02 15:35:42 INFO ipc.Server: IPC Server handler 0 on 1129: starting
我被调用了、、、
# 结论
RPC 实际上就是RPC远程过程调用
被调用的对象位于服务端,并且这个对象必须有接口(
jdk反射要求),实现VersionedProtocolapi要求)
客户端调用的对象中的方法必须位于接口中
4、在本地运行jps看看
pastedGraphic_10.png
由此可以推断出hadoop中启动的5个进程,也就是RPC的服务端
# HDFS的分布式存储架构的源码分析
# HDFS的高可靠
Fsimage备份
Secondarynamenode
# edits文件可以没有么?(必须有)下面是一个例子
F1 start transfer
F1 block1 shuff
F1 end transfer
Edits
文件仅仅记录操作日志(确保事务的正确性)


























# Day3 介绍MapReduce体系结构及各种算法(1)
# MapReduce的介绍
# MapReduceHadoop的分布式计算框架,由两个阶段组成,分别是mapreduce阶段,对于程序员而言,使用过程非常简单,只要覆盖map阶段中的map方法和reduce节点的reduce方法即可
# mapreduce阶段的形参的键值对的形式
# mapreduce的执行流程
pastedGraphic_11.png
瓶颈:磁盘IO
# mapreduce执行原理
pastedGraphic_12.png
1.1
 读取输入文件内容,解析成keyvalue对。对输入文件的每一行,解析成keyvalue对。每一个键值对调用一次map函数。
1.2
 写自己的逻辑,对输入的keyvalue处理,转换成新的keyvalue输出。
1.3
 对输出的keyvalue进行分区。
1.4
 对不同分区的数据,按照key进行排序、分组。相同keyvalue放到一个集合中。
1.5 (
可选)分组后的数据进行归约。(Combine)
2.0 reduce
任务处理
2.1
 对多个map任务的输出,按照不同的分区,通过网络copy到不同的reduce节点。
2.2
 对多个map任务的输出进行合并、排序。写reduce函数自己的逻辑,对输入的keyvalue处理,转换成新的keyvalue输出。
2.3
 reduce的输出保存到文件中。
例子:实现WordCountApp
# 第一个统计单词的java程序(hadoop自带的例子源码)
package org.apache.hadoop.examples;

import java.io.IOException;
import java.util.StringTokenizer;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;

@SuppressWarnings("all")
public class WordCount {

public static class TokenizerMapper extends Mapper<Object, Text, Text, IntWritable> {

private final static IntWritable one = new IntWritable(1);
private Text word = new Text();

public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
StringTokenizer itr = new StringTokenizer(value.toString());
while (itr.hasMoreTokens()) {
word.set(itr.nextToken());
context.write(word, one);
}
}
}

public static class IntSumReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
private IntWritable result = new IntWritable();

public void reduce(Text key, Iterable<IntWritable> values,
Context context) throws IOException, InterruptedException {
int sum = 0;
for (IntWritable val : values) {
sum += val.get();
}
result.set(sum);
context.write(key, result);
}
}

public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
if (otherArgs.length != 2) {
System.err.println("Usage: wordcount <in> <out>");
System.exit(2);
}
Job job = new Job(conf, "word count");
job.setJarByClass(WordCount.class);
job.setMapperClass(TokenizerMapper.class);
job.setCombinerClass(IntSumReducer.class);
job.setReducerClass(IntSumReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
# 下面运行命令跟输出结果
[hadoop@master hadoop-1.1.2]$ hadoop jar hadoop-yting-wordcounter.jar org.apache.hadoop.examples.WordCount /user/hadoop/20140303/test.txt /user/hadoop/20140303/output001
14/03/03 10:43:51 INFO input.FileInputFormat: Total input paths to process : 1
14/03/03 10:43:52 INFO mapred.JobClient: Running job: job_201403020905_0001
14/03/03 10:43:53 INFO mapred.JobClient:  map 0% reduce 0%
14/03/03 10:44:12 INFO mapred.JobClient:  map 100% reduce 0%
14/03/03 10:44:25 INFO mapred.JobClient:  map 100% reduce 100%
14/03/03 10:44:29 INFO mapred.JobClient: Job complete: job_201403020905_0001
14/03/03 10:44:29 INFO mapred.JobClient: Counters: 29
14/03/03 10:44:29 INFO mapred.JobClient:   Job Counters
14/03/03 10:44:29 INFO mapred.JobClient:     Launched reduce tasks=1
14/03/03 10:44:29 INFO mapred.JobClient:     SLOTS_MILLIS_MAPS=19773
14/03/03 10:44:29 INFO mapred.JobClient:     Total time spent by all reduces waiting after reserving slots (ms)=0
14/03/03 10:44:29 INFO mapred.JobClient:     Total time spent by all maps waiting after reserving slots (ms)=0
14/03/03 10:44:29 INFO mapred.JobClient:     Launched map tasks=1
14/03/03 10:44:29 INFO mapred.JobClient:     Data-local map tasks=1
14/03/03 10:44:29 INFO mapred.JobClient:     SLOTS_MILLIS_REDUCES=13148
14/03/03 10:44:29 INFO mapred.JobClient:   File Output Format Counters
14/03/03 10:44:29 INFO mapred.JobClient:     Bytes Written=188
14/03/03 10:44:29 INFO mapred.JobClient:   FileSystemCounters
14/03/03 10:44:29 INFO mapred.JobClient:     FILE_BYTES_READ=171
14/03/03 10:44:29 INFO mapred.JobClient:     HDFS_BYTES_READ=310
14/03/03 10:44:29 INFO mapred.JobClient:     FILE_BYTES_WRITTEN=101391
14/03/03 10:44:29 INFO mapred.JobClient:     HDFS_BYTES_WRITTEN=188
14/03/03 10:44:29 INFO mapred.JobClient:   File Input Format Counters
14/03/03 10:44:29 INFO mapred.JobClient:     Bytes Read=197
14/03/03 10:44:29 INFO mapred.JobClient:   Map-Reduce Framework
14/03/03 10:44:29 INFO mapred.JobClient:     Map output materialized bytes=163
14/03/03 10:44:29 INFO mapred.JobClient:     Map input records=8
14/03/03 10:44:29 INFO mapred.JobClient:     Reduce shuffle bytes=163
14/03/03 10:44:29 INFO mapred.JobClient:     Spilled Records=56
14/03/03 10:44:29 INFO mapred.JobClient:     Map output bytes=376
14/03/03 10:44:29 INFO mapred.JobClient:     CPU time spent (ms)=4940
14/03/03 10:44:29 INFO mapred.JobClient:     Total committed heap usage (bytes)=63926272
14/03/03 10:44:29 INFO mapred.JobClient:     Combine input records=45
14/03/03 10:44:29 INFO mapred.JobClient:     SPLIT_RAW_BYTES=113
14/03/03 10:44:29 INFO mapred.JobClient:     Reduce input records=28
14/03/03 10:44:29 INFO mapred.JobClient:     Reduce input groups=28
14/03/03 10:44:29 INFO mapred.JobClient:     Combine output records=28
14/03/03 10:44:29 INFO mapred.JobClient:     Physical memory (bytes) snapshot=111722496
14/03/03 10:44:29 INFO mapred.JobClient:     Reduce output records=28
14/03/03 10:44:29 INFO mapred.JobClient:     Virtual memory (bytes) snapshot=468000768
14/03/03 10:44:29 INFO mapred.JobClient:     Map output records=45
[hadoop@master hadoop-1.1.2]$ hadoop fs -ls /user/hadoop/20140303/output001
Found 3 items
-rw-r--r--   1 hadoop supergroup          0 2014-03-03 10:44 /user/hadoop/20140303/output001/_SUCCESS
drwxr-xr-x   - hadoop supergroup          0 2014-03-03 10:43 /user/hadoop/20140303/output001/_logs
-rw-r--r--   1 hadoop supergroup        188 2014-03-03 10:44 /user/hadoop/20140303/output001/part-r-00000
[hadoop@master hadoop-1.1.2]$ hadoop fs -text /user/hadoop/20140303/output001/part-t-00000
text: File does not exist: /user/hadoop/20140303/output001/part-t-00000
[hadoop@master hadoop-1.1.2]$ hadoop fs -text /user/hadoop/20140303/output001/part-r-00000
a1
again1
and1
changce1
easy1
forever1
give1
hand1
heart2
hold1
i1
is1
it1
love1
me6
meimei1
miss1
see1
show1
smile1
so1
soul1
take3
the2
to4
until1
what1
you6
# 最小的MapReduce(默认设置)
Configuration configuration = new Configuration();
Job job = new Job(configuration, "HelloWorld");
job.setInputFormat(TextInputFormat.class);
job.setMapperClass(IdentityMapper.class);
job.setMapOutputKeyClass(LongWritable.class);
job.setMapOutputValueClass(Text.class);
job.setPartitionerClass(HashPartitioner.class);
job.setNumReduceTasks(1);
job.setReducerClass(IdentityReducer.class);
job.setOutputKeyClass(LongWritable.class);
job.setOutputValueClass(Text.class);
job.setOutputFormat(TextOutputFormat.class);
job.waitForCompletion(true);
pastedGraphic_13.png

# 序列化
# Writable
#
 数据流单向的
# LongWritable不能进行加减等操作(没必要,java的基本类型都已经弄了这些功能了)
# JobTrackerTaskTracker
# JobTracker
负责接收用户提交的作业,负责启动、跟踪任务执行。
JobSubmissionProtocolJobClientJobTracker通信的接口。
InterTrackerProtocolTaskTrackerJobTracker通信的接口。
# TaskTracker
负责执行任务
# JobClient
是用户作业与JobTracker交互的主要接口。
负责提交作业的,负责启动、跟踪任务执行、访问任务状态和日志等。
# 执行过程
pastedGraphic_14.png


























# Day4 介绍MapReduce体系结构及各种算法(2)
# combiner编程
    每一个map可能会产生大量的输出,combiner的作用就是在map端对输出先做一次合并,以减少传输到reducer的数据量。
   
    combiner
最基本是实现本地key的归并,combiner具有类似本地的reduce功能。

    如果不用combiner,那么,所有的结果都是reduce完成,效率会相对低下。使用combiner,先完成的map会在本地聚合,提升速度。

注意:Combiner的输出是Reducer的输入,Combiner绝不能改变最终的计算结果。所以从我的想法来看,Combiner只应该用于那种Reduce的输入key/value与输出key/value类型完全一致,且不影响最终结果的场景。比如累加,最大值等。
Combiner仅在Map端进行数据归约, Map之间的数据是无法归约的,因此必须使用Reducer
Combiner
的适合场景:求和,最大值,最小值等
Combiner的不适合场景:求平均数
# 举例
假如有1T的数据,对里面的数据求和,这一个T的数据被分成很多Block,再Map端进行读取之后全部送入Reducer端,这样的话Reducer处理的数据>=1T
但是如果再map端进行Combiner合并之后再传到Reducer之后,那么Reducer端处理的数据就很少了,这样就体现了分布式的优势。(相反不用Combiner就根部体现不了分布式的优势)

# Partitioner编程
Partitionerpartitioner的基类,如果需要定制partitioner也需要继承该类。

HashPartitioner
mapreduce的默认partitioner。计算方法是
which reducer=(key.hashCode() & Integer.MAX_VALUE) % numReduceTasks
,得到当前的目的reducer

(
例子以jar形式运行)

# 来看下默认的HashPartitioner
public class HashPartitioner<K2, V2> implements Partitioner<K2, V2> {

 
 public void configure(JobConf job) {}

 
 /** Use {@link Object#hashCode()} to partition. */
 
 public int getPartition(K2 key, V2 value,
                         
 int numReduceTasks) {
   
 return (key.hashCode() & Integer.MAX_VALUE) % numReduceTasks;
  }

}
注意:这里的getPartition默认的返回值是0,返回值是分区的编号
如果我们没有自定义分区的话,默认就只有一个分区
适合场景:城市的分区,IP地址的分区,电话号码的分区等等 
# 分组跟排序
分组要实现RaoComparator接口 
mapreduce阶段进行排序时,比较的是k2v2是不参与排序比较的。如果要想让v2也进行排序,需要把k2v2组装成新的类,作为k2,才能参与比较。

分组时也是按照
k2进行比较的。
# Shuffle
pastedGraphic_15.png
1
 每个map有一个环形内存缓冲区,用于存储任务的输出。默认大小100MBio.sort.mb属性),一旦达到阀值0.8io.sort.spill.percent,一个后台线程把内容写到(spill)磁盘的指定目录(mapred.local.dir)下的新建的一个溢出写文件。
2 写磁盘前,要partition,sort。如果有combinercombine排序后数据。
3 等最后记录写完,合并全部溢出写文件为一个分区且排序的文件。
2.1 Reducer通过Http方式得到输出文件的分区。
2.2 TaskTracker为分区文件运行Reduce任务。复制阶段把Map输出复制到Reducer的内存或磁盘。一个Map任务完成,Reduce就开始复制输出。
2.3 排序阶段合并map输出。然后走Reduce阶段。
只看这个图,不看
other mapsother reducers,有4map任务,3reducer
Reducer
的源码中有Shuffle的定义




























































# Day5 介绍Hadoop集群、zookeeper操作
# hadoop集群
1.hadoop集群搭建
1.1 hadoop集群式多台机器物理分布的,每一台物理机器都是承担不同的角色(NameNode\DataNode\SecondarynaemNode\JobTracker\TaskTracker)
    搭建三个节点的
hadoop集群:hadoop0(NameNodeJobTrackerSecondarynaemNode)hadoop1hadoop2(DataNodeTaskTracker)
1.2 集群的各个节点通常都是物理机器,但是也可以包含虚拟机。
1.3 VMWare中复制出两个节点,删除这两个节点中的.ssh配置信息和/usr/local/目录下的所有内容
1.4 设置这两个新节点的ip(分别是192.168.80.101192.168.80.102)hostname(hadoop1hadoop2)
1.5 把所有的iphostname的映射信息配置在每一个节点的/etc/hosts中。
1.6 集群的各个节点之间SSH免密码登陆
    (1)在两个新节点中产生ssh秘钥文件
(2)ssh-copy-id -i hadoop0
1.7
 hadoop0上原来的/usr/local/hadoop/logs/usr/local/hadoop/tmp目录删除
1.8 hadoop0上的jdk目录和hadoop目录复制到其他节点
    scp -r /usr/local/jdk  hadoop1:/usr/local
scp -r /usr/local/hadoop  hadoop1:/usr/local
1.9
 hadoop0节点上配置主从关系,修改$HADOOP_HOME/conf/slaves文件,删除原有内容,修改为hadoop1hadoop2.
1.10
 hadoop0上格式化文件系统  hadoop namenode -format
1.11
 hadoop0上执行命令  start-all.sh

2.
动态增加hadoop的从节点
2.1 假设hadoop0是将要被新增的从节点
2.2 修改主节点hadoop0slaves文件,增加hadoop0
2.3
 在从节点hadoop0上手工启动DataNodeTaskTracker进程
    hadoop-daemon.sh start datanode|tasktracker
2.4
 让主节点加载新的从节点信息
    hadoop dfsadmin -refreshNodes

3.
动态修改副本数
  hadoop fs -setrep 2  /core-site.xml
# zookeeper
pastedGraphic_16.png
大部分分布式应用需要一个主控、协调器或控制器来管理物理分布的子进程(如资源、任务分配等)
目前,大部分应用需要开发私有的协调程序,缺乏一个通用的机制
协调程序的反复编写浪费,且难以形成通用、伸缩性好的协调器
ZooKeeper:提供通用的分布式锁服务,用以协调分布式应用
Hadoop,使用Zookeeper的事件处理确保整个集群只有一个NameNode,存储配置信息等.
HBase,
使用Zookeeper的事件处理确保整个集群只有一个HMaster,察觉HRegionServer联机和宕机,存储访问控制列表等.
Zookeeper
是简单的
Zookeeper
是富有表现力的
Zookeeper
具有高可用性
Zookeeper
采用松耦合交互方式
Zookeeper
是一个资源库
# zookeeper的安装
下载ZooKeeperhttp://labs.renren.com/apache-mirror/zookeeper/zookeeper-3.4.3/zookeeper-3.4.3.tar.gz
解压:tar xzf zookeeper-3.4.3.tar.gz
conf目录下创建一个配置文件zoo.cfgtickTime=2000
dataDir=/Users/zdandljb/zookeeper/data
dataLogDir=/Users/zdandljb/zookeeper/dataLog         clientPort=2181
启动ZooKeeperServersh bin/zkServer.sh start, 如果想要关闭,输入:zkServer.sh stop
创建myid文件,server1机器的内容为:1server2机器的内容为:2server3机器的内容为:3
conf目录下创建一个配置文件zoo.cfgtickTime=2000
dataDir=/Users/zdandljb/zookeeper/data
dataLogDir=/Users/zdandljb/zookeeper/dataLog        
clientPort=2181        
initLimit=5
syncLimit=2  
server.1=server1:2888:3888                server.2=server2:2888:3888                server.3=server3:2888:3888

建了3个文件夹,server1 server2 server3然后每个文件夹里面解压一个zookeeper的下载包
进入data目录,创建一个myid的文件,里面写入一个数字,server1,就写一个1server2对应myid文件就写入2server3对应myid文件就写个3

#












# Day6 介绍HBase体系结构及基本操作
# 单机模式 Hbase的安装(不同版本的hadoop安装不同版本的hbase
# hbase-0.90.5.tar.gz解压到目录 /opt/modules/hbase下,然后解压
[hadoop@master hbase]$ tar -zxvf hbase-0.90.5.tar.gz                                                                                         
[hadoop@master hbase]$ pwd
/opt/modules/hbase
[hadoop@master hbase]$ ll
total 30964
drwxrwxr-x    8 hadoop   hadoop       4096 Feb 25 17:44 hbase-0.90.5
[hadoop@master hbase]$

# 修改hbase-env.sh 文件
[root@master conf]# pwd
/opt/modules/hbase/hbase-0.90.5/conf
[root@master conf]# vi hbase-env.sh
 JAVA_HOME 的注释去掉,并把路径修改正确
hbase-env.sh
由于是用的虚拟机,伪分布式,所以把对堆大小改成32M
pastedGraphic_17.png
# 配置hbase-site.xml 文件
先创建用于存放数据的目录 /data/hbase/
# 启动 hbase
[hadoop@master bin]$ ./start-hbase.sh
这里需要注意一下,刚刚启动的使用了root用户,应该用hadoop用户启动,所以会报后面的异常
# 查看日志,发现有异常信息
[hadoop@master bin]$ ./start-hbase.sh
starting master, logging to /opt/modules/hbase/hbase-0.90.5/bin/../logs/hbase-hadoop-master-master.out
[hadoop@master bin]$ jps
8653 TaskTracker
8194 NameNode
14013 Jps
8436 SecondaryNameNode
8322 DataNode
8527 JobTracker
13942 HMaster
[hadoop@master bin]$ tail -100f  /opt/modules/hbase/hbase-0.90.5/bin/../logs/hbase-hadoop-master-master.log
java.io.IOException: Unable to mkdir file:/data/hbase/.logs/master,43896,1393324144915
        at org.apache.hadoop.hbase.regionserver.wal.HLog.<init>(HLog.java:372)
        at org.apache.hadoop.hbase.regionserver.wal.HLog.<init>(HLog.java:325)
        at
2014-02-25 18:29:11,303 ERROR org.apache.zookeeper.server.NIOServerCnxn: Thread Thread[RegionServer:0;master,43896,1393324144915,5,main] died
java.lang.NullPointerException
        at org.apache.hadoop.hbase.regionserver.HRegionServer.join(HRegionServer.java:1445)
        at org.apache.hadoop.hbase.regionserver.HRegionServer.run(HRegionServer.java:682)
        at java.lang.Thread.run(Thread.java:662)
原因:在这之前使用了root用户启动,权限问题,不能创建目录
解决:将目录 /data/hbase 这个目录删除,重新创建一个目录,修改用户跟组为hadoop,然后再使用hadoop用户启动hbase




# 伪分布模式
pastedGraphic_18.png
# 单点模式弄好之后再来弄伪分布模式
# 编辑 hbase-env.sh 增加HBASE_CLASSPATH环境变量(hadoopconf目录)
pastedGraphic_19.png
# 编辑hbase-site.xml打开分布模式
pastedGraphic_20.png
# 覆盖 hadoop 核心jar包(这个有危险,万一弄错就完了,还是先把jar包备份下先)
pastedGraphic_21.png
# 没覆盖jar包之前看下面的日志会有错误
localhost: starting zookeeper, logging to /opt/modules/hbase/hbase-0.90.5/bin/../logs/hbase-hadoop-zookeeper-master.out
starting master, logging to /opt/modules/hbase/hbase-0.90.5/bin/../logs/hbase-hadoop-master-master.out
localhost: starting regionserver, logging to /opt/modules/hbase/hbase-0.90.5/bin/../logs/hbase-hadoop-regionserver-master.out
饿看到了错误的,但是后面就看不到了,竟然正常运行,真是见鬼了,后面错误应该还是会出来的,饿并没有覆盖jar
pastedGraphic_22.png
pastedGraphic_23.png
真是不科学啊饿靠!坐等后面出错

# hbaseshell命令
# 查看数据库状态
hbase(main):001:0> status
1 servers, 0 dead, 2.0000 average load
# 查看数据库版本
hbase(main):002:0> version
0.90.5, r1212209, Fri Dec  9 05:40:36 UTC 2011
# hbase中表的操作
# 创建表
hbase(main):003:0> create 'member','member_id','address','info'
0 row(s) in 1.5910 seconds
# 查看表信息
hbase(main):004:0> list
TABLE
member
1 row(s) in 0.0960 seconds
hbase(main):005:0> describe 'member'
DESCRIPTION                                          ENABLED
 {NAME => 'member', FAMILIES => [{NAME => 'address', true
  BLOOMFILTER => 'NONE', REPLICATION_SCOPE => '0', C
 OMPRESSION => 'NONE', VERSIONS => '3', TTL => '2147
 483647', BLOCKSIZE => '65536', IN_MEMORY => 'false'
 , BLOCKCACHE => 'true'}, {NAME => 'info', BLOOMFILT
 ER => 'NONE', REPLICATION_SCOPE => '0', COMPRESSION
  => 'NONE', VERSIONS => '3', TTL => '2147483647', B
 LOCKSIZE => '65536', IN_MEMORY => 'false', BLOCKCAC
 HE => 'true'}, {NAME => 'member_id', BLOOMFILTER =>
  'NONE', REPLICATION_SCOPE => '0', COMPRESSION => '
 NONE', VERSIONS => '3', TTL => '2147483647', BLOCKS
 IZE => '65536', IN_MEMORY => 'false', BLOCKCACHE =>
  'true'}]}
1 row(s) in 0.3810 seconds
 
hbase(main):006:0>
# 删除列族 alterdisableenable命令
hbase(main):005:0> alter 'member',{NAME=>'memberalter',METHOD=>'delete'}
 
ERROR: Table member is enabled. Disable it first before altering.
hbase(main):007:0> disable 'member'
0 row(s) in 2.1130 seconds
hbase(main):008:0> alter 'member',{NAME=>'memberalter',METHOD=>'delete'}
0 row(s) in 0.1280 seconds
hbase(main):010:0> enable 'member'
0 row(s) in 2.8070 seconds
# 删除表
hbase(main):013:0> disable 'member'
0 row(s) in 2.1990 seconds
 
hbase(main):014:0> drop 'member'
0 row(s) in 0.3720 seconds
 
hbase(main):015:0> list
TABLE
0 row(s) in 0.0210 seconds
# 查询一个表是否存在
hbase(main):016:0> exists 'member'
Table member does not exist
0 row(s) in 0.0420 seconds
# 判断表是否enabledisable(先创建一个表)
hbase(main):017:0>  create 'member','member_id','address','information'
0 row(s) in 1.7970 seconds
 
hbase(main):018:0> list
TABLE
member
1 row(s) in 0.0290 seconds
 
hbase(main):019:0> is_enabled 'member'
true
0 row(s) in 0.2320 seconds
 
hbase(main):020:0> is_disabled 'member'
false
0 row(s) in 0.1080 seconds
# user表中插入记录
hbase(main):006:0> create 'user','id','name','address','info'
0 row(s) in 1.2000 seconds
hbase(main):007:0> put 'user','zhangsan','info:age','24'
0 row(s) in 0.4150 seconds
 
hbase(main):008:0> put 'user','zhangsan','info:city','beijing'
0 row(s) in 0.0230 seconds

hbase(main):010:0> put 'user','zhangsan','address:city','shanghai'
0 row(s) in 0.0270 seconds
 
hbase(main):011:0> put 'user','zhangsan','address:province','hunan'
0 row(s) in 0.2020 seconds
 
hbase(main):012:0> put 'user','zhangsan','id:user_id','001'
0 row(s) in 0.0450 seconds
# 获取一个行键的所有数据
hbase(main):013:0> get 'user','zhangsan'
COLUMN                CELL
 address:city         timestamp=1393330663923, value=shanghai
 address:province     timestamp=1393330680719, value=hunan
 id:user_id           timestamp=1393330698149, value=001
 info:age             timestamp=1393330268426, value=24
 info:city            timestamp=1393330284815, value=beijing
5 row(s) in 0.0570 seconds
# 获取一个行键,一个列族的所有数据
hbase(main):014:0> get 'user','zhangsan','info'
COLUMN                CELL
 info:age             timestamp=1393330268426, value=24
 info:city            timestamp=1393330284815, value=beijing
2 row(s) in 0.1500 seconds
# 获取一个行键,一个列族中一个列的所有数据
hbase(main):017:0> get 'user','zhangsan','info:age'
COLUMN                CELL
 info:age             timestamp=1393330268426, value=24
1 row(s) in 0.1200 seconds
# 更新一条记录(也就是往记录中再插数据,timestamp的更新)
hbase(main):002:0> put 'user','zhangsan','info:age','999'
0 row(s) in 0.4670 seconds
hbase(main):006:0> get 'user','zhangsan','info:age'
COLUMN                CELL
 info:age             timestamp=1393331181973, value=999
1 row(s) in 0.0540 seconds
# 通过timestamp来获取数据
hbase(main):004:0> get 'user','zhangsan',{COLUMN=>'info:age',TIMESTAMP=>1393331181973}
COLUMN                      CELL
 info:age                   timestamp=1393331181973, value=999
1 row(s) in 1.7380 seconds
# 全表扫描
hbase(main):006:0> scan 'user'
ROW                         COLUMN+CELL
 lisi                       column=info:age, timestamp=1393333032066, value=12
 zhangsan                   column=address:city, timestamp=1393330663923, value=shanghai
 zhangsan                   column=address:province, timestamp=1393330680719, value=hunan
 zhangsan                   column=id:user_id, timestamp=1393330698149, value=001
 zhangsan                   column=info:age, timestamp=1393331181973, value=999
 zhangsan                   column=info:city, timestamp=1393330284815, value=beijing
2 row(s) in 0.3290 seconds
# 删除指定行键的字段
hbase(main):007:0> get 'user','zhangsan','info'
COLUMN                      CELL
 info:age                   timestamp=1393331181973, value=999
 info:city                  timestamp=1393330284815, value=beijing
2 row(s) in 0.1540 seconds
 
hbase(main):008:0> delete 'user','zhangsan','info:city'
0 row(s) in 0.0640 seconds
 
hbase(main):009:0> get 'user','zhangsan','info'
COLUMN                      CELL
 info:age                   timestamp=1393331181973, value=999
1 row(s) in 0.1900 seconds
# 删除整行
pastedGraphic_24.png
# 查询表中有多少行
hbase(main):003:0> count 'user'
2 row(s) in 0.0590 seconds
# 清空表
pastedGraphic_25.png


# 什么情况下使用hbase
pastedGraphic_26.png
# 关系型数据库的困难
pastedGraphic_27.png
# hbase的优点
pastedGraphic_28.png
# 模式设计
pastedGraphic_29.png
# 表设计与查询实现(一个网站推荐信息的例子)
pastedGraphic_30.png
# 辅助索引
pastedGraphic_31.png
# 复合行键的设计
pastedGraphic_32.png
# 复合行键的好处
pastedGraphic_33.png

# Day7 介绍Hivesqoop体系结构及基本操作和最后项目
1.Hive
1.1 hadoop领域中的数据仓库。Hive是一个SQL解析引擎,接收用户的SQL操作,转换为MapReducejob来运行。
1.2 SQL
中的表、字段需要和HDFS中的文件夹、数据文件进行映射。Hive有专门的映射机制,称作metastore。这些映射信息通常保存在RDBMS中,如derbymysql

2.
操作
2.1
 配置文件hive-site.xml中的hive.metastore.warehouse.dir的值表示默认数据库在hdfs中的位置。

3.
安装MySQL
3.1
 使用命令rpm -qa |grep mysql检查系统是否已经安装,发现一个类库mysql-libs-5.1.66-2.el6_3.i686
3.2
 使用命令rpm -e mysql-libs-5.1.66-2.el6_3.i686 --nodeps 删除刚才的依赖
3.3
 使用命令rpm -i MySQL-server-**************
3.4
 使用命令mysqld_safe & 启动mysql服务
3.5
 使用命令rpm -i MySQL-client-***********
3.6
 使用命令mysql_secure_installation 配置MySQL



4.
使用MySQL作为hivemetastore
4.1
 jdbc驱动放到hivelib目录下
4.2
 修改hivehive-site.xml文件
<property>
<name>javax.jdo.option.ConnectionURL</name>
<value>jdbc:mysql://hadoop0:3306/hive?createDatabaseIfNotExist=true</value>
</property>
<property>
<name>javax.jdo.option.ConnectionDriverName</name>
<value>com.mysql.jdbc.Driver</value>
</property>
<property>
<name>javax.jdo.option.ConnectionUserName</name>
<value>root</value>
</property>
<property>
<name>javax.jdo.option.ConnectionPassword</name>
<value>admin</value>
</property>

5.

5.1
 内部表
    CREATE TABLE t1(id int);
    LOAD DATA LOCAL INPATH '/root/Desktop/id'  INTO TABLE t1;

CREATE TABLE t2(id int , name String) ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t';
5.2
 分区表
    CREATE TABLE t3(id int) PARTITIONED BY (city string);
    LOAD DATA LOCAL INPATH '/root/Desktop/id'  INTO TABLE t3 PARTITION(city='beijing');
5.3
 桶表
    CREATE TABLE t5(id int) CLUSTERED BY (id) INTO 4 BUCKETS;
set hive.enforce.bucketing = true;
insert into table t5 select name from stu;
5.4
 外部表
    CREATE EXTERNAL TABLE t6(id int) LOCATION '/ids';
*******************************************
授权可以远程访问mysql
grant all on *.* to 'root'@'%'  identified by 'admin';
flush privileges;
*******************************************
---
使用hivesql实现word count功能
SELECT SPLIT(id, '\t') FROM t2;
SELECT EXPLODE(SPLIT(id, '\t')) AS t FROM t2;
SELECT temp.t, COUNT(t) FROM (SELECT EXPLODE(SPLIT(id, '\t')) AS t FROM t2) temp GROUP BY temp.t ORDER BY t;
# NameNode源码分析(RPC是基础)
# namenode注释翻译
/**********************************************************
 * NameNode serves as both directory namespace manager and
 * "inode table" for the Hadoop DFS.  There is a single NameNode
 * running in any DFS deployment.  (Well, except when there
 * is a second backup/failover NameNode.)
 *
 * The NameNode controls two critical tables:
 *   1)  filename->blocksequence (namespace)
 *   2)  block->machinelist ("inodes")
 *
 * The first table is stored on disk and is very precious.
 * The second table is rebuilt every time the NameNode comes
 * up.
 *
 * 'NameNode' refers to both this class as well as the 'NameNode server'.
 * The 'FSNamesystem' class actually performs most of the filesystem
 * management.  The majority of the 'NameNode' class itself is concerned
 * with exposing the IPC interface and the http server to the outside world,
 * plus some configuration management.
 *
 * NameNode implements the ClientProtocol interface, which allows
 * clients to ask for DFS services.  ClientProtocol is not
 * designed for direct use by authors of DFS client code.  End-users
 * should instead use the org.apache.nutch.hadoop.fs.FileSystem class.
 *
 * NameNode also implements the DatanodeProtocol interface, used by
 * DataNode programs that actually store DFS data blocks.  These
 * methods are invoked repeatedly and automatically by all the
 * DataNodes in a DFS deployment.
 *
 * NameNode also implements the NamenodeProtocol interface, used by
 * secondary namenodes or rebalancing processes to get partial namenode's
 * state, for example partial blocksMap etc.
 **********************************************************/
对于HDFS来说NameNode是一个目录命名空间管理器和”inode table”,它是一个单个的NameNode运行在任何的DFS的部署环境中(好吧,除非是有第二个备份/故障转移NameNode。)

NameNode控制着两个关键表:
1)  filename->blocksequence (namespace) 第一个表名为文件名,放的是block的顺序
2)  block->machinelist ("inodes") 第二个表是block,放的是block所存放的机器列表
第一个表存放在硬盘上并且非常珍贵
第二个表在每次
NameNode重启的时候会被重构

NameNode指的是这个类以及NameNode serverFSNamesystem这个类实际上执行的大多数文件系统管理。NameNode这个类主要的工作就是关注暴露的IPC接口以及向外部世界(用户)提供一些http服务,加上一些配置管理。

NameNode实现了ClientProtocol接口,它允许客户请求DFS服务。ClientProtocol不是专门为直接使用DFS客户机代码的作者设计的,终端用户(程序员)应该使用FileSystem这个类。
NameNode也实现了DatanodeProtocol接口,DataNode的程序使用去完成DFS数据块的存储。在一个DFS的环境中NameNode实现了DatanodeProtocol接口中的方法会被所有的DataNode自动重复的调用执行。

NameNode也实现了NamenodeProtocol接口,secondarynamenode使用或在平衡过程的进程中得到NameNode 的部分状态,例如部分blocksMap等等

# 知道了RPC原理才能更好的理 NameNode
# 首先看namenode类的结构,主要实现了ClientProtocol, DatanodeProtocol, NamenodeProtocol这三个接口
# 进入NameNode的源代码找到public class NameNode implements ClientProtocol, DatanodeProtocol, NamenodeProtocol, FSConstants, RefreshAuthorizationPolicyProtocol,
                                 RefreshUserMappingsProtocol {

# 接下来进入main方法(由于NameNode是一个RPC的服务端,所以我们进入RPCmain方法,为了证明NameNode是一个RPC的服务端)
public static void main(String argv[]) throws Exception {
    try {
      StringUtils.startupShutdownMessage(NameNode.class, argv, LOG);
      NameNode namenode = createNameNode(argv, null);
      if (namenode != null)
        namenode.join();
    } catch (Throwable e) {
      LOG.error(StringUtils.stringifyException(e));
      System.exit(-1);
    }
  }
# 进入createNameNode方法(只看重点,会有下面这么一行)
NameNode namenode = new NameNode(conf);
# 再点进去
public NameNode(Configuration conf) throws IOException {
   
 try {
      initialize(conf);
    }
 catch (IOException e) {
     
 this.stop();
     
 throw e;
    }
  }

#
 进入initialize(conf)方法(只看重点代码)
this.namesystem = new FSNamesystem(this, conf);

   
 if (UserGroupInformation.isSecurityEnabled()) {
     
 namesystem.activateSecretManager();
    }
// create rpc server
    InetSocketAddress dnSocketAddr = getServiceRpcServerAddress(conf);
   
 if (dnSocketAddr != null) {
     
 int serviceHandlerCount =
        conf.getInt(DFSConfigKeys.
DFS_NAMENODE_SERVICE_HANDLER_COUNT_KEY,
                    DFSConfigKeys.
DFS_NAMENODE_SERVICE_HANDLER_COUNT_DEFAULT);
     
 this.serviceRpcServer = RPC.getServer(this, dnSocketAddr.getHostName(),
          dnSocketAddr.getPort(), serviceHandlerCount,
         
 false, conf, namesystem.getDelegationTokenSecretManager());
     
 this.serviceRPCAddress = this.serviceRpcServer.getListenerAddress();
      setRpcServiceServerAddress(conf);
    }
   
 this.server = RPC.getServer(this, socAddr.getHostName(),
        socAddr.getPort(), handlerCount,
 false, conf, namesystem
        .getDelegationTokenSecretManager());

   
 // The rpc-server port can be ephemeral... ensure we have the correct info
   
 this.serverAddress = this.server.getListenerAddress();
    FileSystem.setDefaultUri(conf, getUri(
serverAddress));
   
 LOG.info("Namenode up at: " + this.serverAddress);

   

    startHttpServer(conf);
   
 this.server.start();  //start RPC server   
   
 if (serviceRpcServer != null) {
     
 serviceRpcServer.start();     
    }
    startTrashEmptier(conf);
namesystem后面再解释(namenode的初始化,namenode启动加载fsimage以及一些配置,后面详细解释)
// create rpc server
 意思就是创建 RPC服务端,也就是说NameNode是一个RPC的服务端
注意:这里不是启动了一个rpc的服务端,而是启动了两个rpc的服务端。
serviceRpcServerRPC服务器为了HDFS服务通信。备份节点(secondarynamenode),Datanodes和所有其他服务应该连接到这个服务器配置。客户应该只去调用NameNode 下的server的RPC服务端(这个是程序内部调用的)
server主要是用来给客户端调用的
# 然后再来看startHttpServer(conf);(开启一个Http的服务器)这个方法,跟进去
try {
     
 this.httpServer = ugi.doAs(new PrivilegedExceptionAction<HttpServer>() {
       
 @Override
       
 public HttpServer run() throws IOException, InterruptedException {
          String infoHost = infoSocAddr.getHostName();
         
 int infoPort = infoSocAddr.getPort();
         
 httpServer = new HttpServer("hdfs", infoHost, infoPort,
              infoPort == 0, conf,
              SecurityUtil.getAdmin

#
 在进入HttpServer 的构造方法,跟进new HttpServer("hdfs", infoHost, inf
public HttpServer(String name, String bindAddress, int port,
     
 boolean findPort, Configuration conf, AccessControlList adminsAcl)
     
 throws IOException {
   
 this(name, bindAddress, port, findPort, conf, adminsAcl, null);
  }

#
 再跟进this(name, bindAddress, port, findPort, conf, adminsAcl, null);
public HttpServer(String name, String bindAddress, int port,
     
 boolean findPort, Configuration conf, AccessControlList adminsAcl,
      Connector connector)
 throws IOException{
   
 webServer = new Server();
   
 this.findPort = findPort;
   
 this.conf = conf;
   
 this.adminsAcl = adminsAcl;

   
 if(connector == null) {
到这里就行了,把鼠标放到new Server()上面去,可以看到这么一行提示信息
pastedGraphic_34.pngorg.mortbay.jetty.Server.Server()
pastedGraphic_35.png
这里额外说明一下jettyjettytomcat一样,也是一个服务器,只是更小而已,被内置到NameNode中去了。
到了这里之后就可以看出来NameNode开启了一个jetty服务,也就是可以通过浏览器访问,也就是我们经常访问的http://hadoopip:50070(这里hadoopip表示你的hadoop机器的ip地址)出现的web界面
# NameNode启动过程详细剖析
# NameNode中几个关键的数据结构
# FSImage
Namenode会将HDFS的文件和目录元数据存储在一个叫fsimage的二进制文件中,每次保存fsimage之后到下次保存之间的所有hdfs操作,将会记录在editlog文件中,当editlog达到一定的大小(bytes,由fs.checkpoint.size参数定义)或从上次保存过后一定时间段过后(sec,由fs.checkpoint.period参数定义),namenode会重新将内存中对整个HDFS的目录树和文件元数据刷到fsimage文件中。Namenode就是通过这种方式来保证HDFS中元数据信息的安全性。
Fsimage
是一个二进制文件,当中记录了HDFS中所有文件和目录的元数据信息,在我的pastedGraphic_36.pnghadoopHDFS版中,该文件的中保存文件和目录的格式如下:
 
namenode重启加载fsimage时,就是按照如下格式协议从文件流中加载元数据信息。从fsimag的存储格式可以看出,fsimage保存有如下信息:
1.        
 首先是一个image head,其中包含:
a)         imgVersion(int)
:当前image的版本信息
b)        namespaceID(int)
:用来确保别的HDFS instance中的datanode不会误连上当前NN
c)         numFiles(long)
:整个文件系统中包含有多少文件和目录
d)        genStamp(long)
:生成该image时的时间戳信息。
2.        
 接下来便是对每个文件或目录的源数据信息,如果是目录,则包含以下信息:
a)         path(String)
:该目录的路径,如”/user/build/build-index”
b)        replications(short)
:副本数(目录虽然没有副本,但这里记录的目录副本数也为3
c)         mtime(long)
:该目录的修改时间的时间戳信息
d)        atime(long)
:该目录的访问时间的时间戳信息
e)         blocksize(long)
:目录的blocksize都为0
f)         numBlocks(int)
:实际有多少个文件块,目录的该值都为-1,表示该item为目录
g)        nsQuota(long)
namespace Quota值,若没加Quota限制则为-1
h)        dsQuota(long)
disk Quota值,若没加限制则也为-1
i)          username(String)
:该目录的所属用户名
j)          group(String)
:该目录的所属组
k)        permission(short)
:该目录的permission信息,如644等,有一个short来记录。
3.        
 若从fsimage中读到的item是一个文件,则还会额外包含如下信息:
a)         blockid(long)
:属于该文件的blockblockid
b)        numBytes(long)
:该block的大小
c)         genStamp(long)
:该block的时间戳
当该文件对应的numBlocks数不为1,而是大于1时,表示该文件对应有多个block信息,此时紧接在该fsimage之后的就会有多个blockidnumBytesgenStamp信息。
因此,在namenode启动时,就需要对fsimage按照如下格式进行顺序的加载,以将fsimage中记录的HDFS元数据信息加载到内存中。
# BlockMap
从以上fsimage中加载如namenode内存中的信息中可以很明显的看出,在fsimage中,并没有记录每一个block对应到哪几个datanodes的对应表信息,而只是存储了所有的关于namespace的相关信息。而真正每个block对应到datanodes列表的信息在hadoop中并没有进行持久化存储,而是在所有datanode启动时,每个datanode对本地磁盘进行扫描,将本datanode上保存的block信息汇报给namenodenamenode在接收到每个datanode的块信息汇报后,将接收到的块信息,以及其所在的datanode信息等保存在内存中。HDFS就是通过这种块信息汇报的方式来完成 block -> datanodes list的对应表构建。Datanodenamenode汇报块信息的过程叫做blockReport,而namenodeblock -> datanodes list的对应表信息保存在一个叫BlocksMap的数据结构中。
BlocksMap
的内部数据结构如下:   
 
pastedGraphic_37.png
如上图显示,BlocksMap实际上就是一个Block对象对BlockInfo对象的一个Map表,其中Block对象中只记录了blockidblock大小以及时间戳信息,这些信息在fsimage中都有记录。而BlockInfo是从Block对象继承而来,因此除了Block对象中保存的信息外,还包括代表该block所属的HDFS文件的INodeFile对象引用以及该block所属datanodes列表的信息(即上图中的DN1DN2DN3,该数据结构会在下文详述)。
因此在namenode启动并加载fsimage完成之后,实际上BlocksMap中的key,也就是Block对象都已经加载到BlocksMap中,每个key对应的value(BlockInfo)中,除了表示其所属的datanodes列表的数组为空外,其他信息也都已经成功加载。所以可以说:fsimage加载完毕后,BlocksMap中仅缺少每个块对应到其所属的datanodes list的对应关系信息。所缺这些信息,就是通过上文提到的从各datanode接收blockReport来构建。当所有的datanode汇报给namenodeblockReport处理完毕后,BlocksMap整个结构也就构建完成。
# BlockMapdatanode列表数据结构
BlockInfo中,将该block所属的datanodes列表保存在一个Object[]数组中,但该数组不仅仅保存了datanodes列表,。实际上该数组保存了如下信息:
 
pastedGraphic_38.png
上图表示一个block包含有三个副本,分别放置在DN1DN2DN3三个datanode上,每个datanode对应一个三元组,该三元组中的第二个元素,即上图中prev block所指的是该block在该datanode上的前一个BlockInfo引用。第三个元素,也就是上图中next Block所指的是该block在该datanode上的下一个BlockInfo引用。每个block有多少个副本,其对应的BlockInfo对象中就会有多少个这种三元组。
       Namenode
采用这种结构来保存block->datanode list的目的在于节约namenode内存。由于namenodeblock->datanodes的对应关系保存在了内存当中,随着HDFS中文件数的增加,block数也会相应的增加,namenode为了保存block->datanodes的信息已经耗费了相当多的内存,如果还像这种方式一样的保存datanode->block list的对应表,势必耗费更多的内存,而且在实际应用中,要查一个datanode上保存的block list的应用实际上非常的少,大部分情况下是要根据block来查datanode列表,所以namenode中通过上图的方式来保存block->datanode list的对应关系,当需要查询datanode->block list的对应关系时,只需要沿着该数据结构中next Block的指向关系,就能得出结果,而又无需保存datanode->block list在内存中。
# NameNode启动过程
# fsimage加载过程
Fsimage加载过程完成的操作主要是为了:
1.        
 fsimage中读取该HDFS中保存的每一个目录和每一个文件
2.        
 初始化每个目录和文件的元数据信息
3.        
 根据目录和文件的路径,构造出整个namespace在内存中的镜像
4.        
 如果是文件,则读取出该文件包含的所有blockid,并插入到BlocksMap中。
整个加载流程如下图所示:
 
pastedGraphic_39.png
如上图所示,namenode在加载fsimage过程其实非常简单,就是从fsimage中不停的顺序读取文件和目录的元数据信息,并在内存中构建整个namespace,同时将每个文件对应的blockid保存入BlocksMap中,此时BlocksMap中每个block对应的datanodes列表暂时为空。当fsimage加载完毕后,整个HDFS的目录结构在内存中就已经初始化完毕,所缺的就是每个文件对应的block对应的datanode列表信息。这些信息需要从datanodeRPC远程调用blockReport中获取,所以加载fsimage完毕后,namenode进程进入rpc等待状态,等待所有的datanodes发送blockReports
# blockReport阶段
每个datanode在启动时都会扫描其机器上对应保存hdfs block的目录下(dfs.data.dir)所保存的所有文件块,然后通过namenoderpc调用将这些block信息以一个long数组的方式发送给namenodenamenode在接收到一个datanodeblockReport rpc调用后,从rpc中解析出block数组,并将这些接收到的blocks插入到BlocksMap表中,由于此时BlocksMap缺少的仅仅是每个block对应的datanode信息,而namenoe能从report中获知当前report上来的是哪个datanode的块信息,所以,blockReport过程实际上就是namenode在接收到块信息汇报后,填充BlocksMap中每个block对应的datanodes列表的三元组信息的过程。其流程如下图所示:
 
pastedGraphic_40.png
当所有的datanode汇报完blocknamenode针对每个datanode的汇报进行过处理后,namenode的启动过程到此结束。此时BlocksMapblock->datanodes的对应关系已经初始化完毕。如果此时已经达到安全模式的推出阈值,则hdfs主动退出安全模式,开始提供服务。
# 启动过程数据采集和瓶颈分析
namenode的整个启动过程有了详细了解之后,就可以对其启动过程中各阶段各函数的调用耗时进行profiling的采集,数据的profiling仍然分为两个阶段,即fsimage加载阶段和blockReport阶段。
# fsimage加载阶段性能数据采集和瓶颈分析
以下是对建库集群真实的fsimage加载过程的的性能采集数据:
 
pastedGraphic_41.png
从上图可以看出,fsimage的加载过程那个中,主要耗时的操作分别分布在FSDirectory.addToParentFSImage.readString,以及PermissionStatus.read三个操作,这三个操作分别占用了加载过程的73%15%以及8%,加起来总共消耗了整个加载过程的96%。而其中FSImage.readStringPermissionStatus.read操作都是从fsimage的文件流中读取数据(分别是读取Stringshort)的操作,这种操作优化的空间不大,但是通过调整该文件流的Buffer大小来提高少许性能。而FSDirectory.addToParent的调用却占用了整个加载过程的73%,所以该调用中的优化空间比较大。
      
 以下是addToParent调用中的profiling数据:
 
pastedGraphic_42.png
从以上数据可以看出addToParent调用占用的73%的耗时中,有66%都耗在了INode.getPathComponents调用上,而这66%分别有36%消耗在INode.getPathNames调用,30%消耗在INode.getPathComponents调用。这两个耗时操作的具体分布如以下数据所示:
 
pastedGraphic_43.png
可以看出,消耗了36%的处理时间的INode.getPathNames操作,全部都是在通过String.split函数调用来对文件或目录路径进行切分。另外消耗了30%左右的处理时间在INode.getPathComponents中,该函数中最终耗时都耗在获取字符串的byte数组的java原生操作中。
# blockReport阶段性能数据采集和瓶颈分析
由于blockReport的调用是通过datanode调用namenoderpc调用,所以在namenode进入到等待blockreport阶段后,会分别开启rpc调用的监听线程和rpc调用的处理线程。其中rpc处理和rpc鉴定的调用耗时分布如下图所示:
 
pastedGraphic_44.png
而其中rpc的监听线程的优化是另外一个话题,在其他的issue中再详细讨论,且由于blockReport的操作实际上是触发的rpc处理线程,所以这里只关心rpc处理线程的性能数据。
      
 namenode处理blockReport过程中的调用耗时性能数据如下:
 
pastedGraphic_45.png
可以看出,在namenode启动阶段,处理从各个datanode汇报上来的blockReport耗费了整个rpc处理过程中的绝大部分时间(48/49)blockReport处理逻辑中的耗时分布如下图:
 
pastedGraphic_46.png
从上图数据中可以发现,blockReport阶段中耗时分布主要耗时在FSNamesystem.addStoredBlock调用以及DatanodeDescriptor.reportDiff过程中,分别耗时37/4810/48,其中FSNamesystem.addStoredBlock所进行的操作时对每一个汇报上来的block,将其于汇报上来的datanode的对应关系初始化到namenode内存中的BlocksMap表中。所以对于每一个block就会调用一次该方法。所以可以看到该方法在整个过程中调用了774819次,而另一个耗时的操作,即DatanodeDescriptor.reportDiff,该操作的过程在上文中有详细介绍,主要是为了将该datanode汇报上来的blocksnamenode内存中的BlocksMap中进行对比,以决定那个哪些是需要添加到BlocksMap中的block,哪些是需要添加到toRemove队列中的block,以及哪些是添加到toValidate队列中的block。由于这个操作需要针对每一个汇报上来的block去查询BlocksMap,以及namenode中的其他几个map,所以该过程也非常的耗时。而且从调用次数上可以看出,reportDiff调用在启动过程中仅调用了14(14datanode进行块汇报),却耗费了10/48的时间。所以reportDiff也是整个blockReport过程中非常耗时的瓶颈所在。
      
 同时可以看到,出了reportDiffaddStoredBlock的调用耗费了37%的时间,也就是耗费了整个blockReport时间的37/48,该方法的调用目的是为了将从datanode汇报上来的每一个block插入到BlocksMap中的操作。从该方法调用的运行数据如下图所示:
 
pastedGraphic_47.png
从上图可以看出,addStoredBlock中,主要耗时的两个阶段分别是FSNamesystem.countNodeDatanodeDescriptor.addBlock,后者是java中的插表操作,而FSNamesystem.countNode调用的目的是为了统计在BlocksMap中,每一个block对应的各副本中,有几个是live状态,几个是decommission状态,几个是Corrupt状态。而在namenode的启动初始化阶段,用来保存corrput状态和decommission状态的blockmap都还是空状态,并且程序逻辑中要得到的仅仅是出于live状态的block数,所以,这里的countNoes调用在namenode启动初始化阶段并无需统计每个block对应的副本中的corrrput数和decommission数,而仅仅需要统计live状态的block副本数即可,这样countNodes能够在namenode启动阶段变得更轻量,以节省启动时间。
# 瓶颈分析总结
profiling数据和瓶颈分歧情况来看,fsimage加载阶段的瓶颈除了在分切路径的过程中不够优以外,其他耗时的地方几乎都是在java原生接口的调用中,如从字节流读数据,以及从String对象中获取byte[]数组的操作。
      
 blockReport阶段的耗时其实很大的原因是跟当前的namenode设计以及内存结构有关,比较明显的不优之处就是在namenode启动阶段的countNodereportDiff的必要性,这两处在namenode初始化时的blockReport阶段有一些不必要的操作浪费了时间。可以针对namenode启动阶段将必要的操作抽取出来,定制成namenode启动阶段才调用的方式,以优化namenode启动性能。
# NameNode源码分析总结
一个hdfscluster包含了一个NameNode和若干个DataNodeNameNodemaster,主要负责管理hdfs文件系统,具体的包括namespace管理(目录结构)和block管理(具体filename->blocksequencenamespace),block->datanode list(“inodes”))。前者是通过FSImage写入到本地文件系统中,而后者是通过每次hdfs启动时,datanode进行blockreport后在内存中重构的数据结构。在hdfs的程序代码中,namenode类其实只是一个用来被动接收调用的服务的包装,它实现了ClientProtocol接口,用来接收来自DFSClientRPC请求;它实现了DatanodeProtocol接口,用来接收来自datanode的各种服务请求;同时还实现了NamenodeProtocol,用来提供跟SeconddaryNameNode之间的RPC的请求和通信。对以上数据结构进行维护的是hdfs中的FSNamesystem类。对于NameNode的各种请求,比如创建,修改,删除,移动,getLocations的操作,在NameNode内部都是通过FSNamesystem提供的接口对内部数据结构进行的访问。

NameNode 是一个目录命名空间的管理器,NameNodehdfs中只有一个。(当启动一个NameNode的时候,会产生一个锁文件,是锁住的,所以起不了第二个NameNode了)
NameNode维护这两张核心表:
1. Filename------blocksequence (“namespace”) 也就是block的顺序
2. block------machinelist(“inodes”) 每个block的存储的机器(dataNode)列表
NameNode其实就是一个RPC的服务端,并且启动了两个RPC服务端(这里又涉及到了RPC原理了,看不懂的话就看下RPC的原理),并且还开启了一个jetty服务器,对外界提供了WEB的访问方式。

# DataNode源码分析
# datanode注释翻译
/**********************************************************
 * DataNode is a class (and program) that stores a set of
 * blocks for a DFS deployment.  A single deployment can
 * have one or many DataNodes.  Each DataNode communicates
 * regularly with a single NameNode.  It also communicates
 * with client code and other DataNodes from time to time.
 *
 * DataNodes store a series of named blocks.  The DataNode
 * allows client code to read these blocks, or to write new
 * block data.  The DataNode may also, in response to instructions
 * from its NameNode, delete blocks or copy blocks to/from other
 * DataNodes.
 *
 * The DataNode maintains just one critical table:
 *   block-> stream of bytes (of BLOCK_SIZE or less)
 *
 * This info is stored on a local disk.  The DataNode
 * reports the table's contents to the NameNode upon startup
 * and every so often afterwards.
 *
 * DataNodes spend their lives in an endless loop of asking
 * the NameNode for something to do.  A NameNode cannot connect
 * to a DataNode directly; a NameNode simply returns values from
 * functions invoked by a DataNode.
 *
 * DataNodes maintain an open server socket so that client code 
 * or other DataNodes can read/write data.  The host/port for
 * this server is reported to the NameNode, which then sends that
 * information to clients or other DataNodes that might be interested.
 *
 **********************************************************/
# 首先看datanode结构,实现了Runnable接口(run方法)。
public class DataNode extends Configured 
    implements InterDatanodeProtocol, ClientDatanodeProtocol, FSConstants, Runnable, DataNodeMXBean {
#
 找到main方法,进入
public static void main(String args[]) {
    secureMain(args,
 null);
  }

# 进入scureMain
public static void secureMain(String [] args, SecureResources resources) {
   
 try {
      StringUtils.startupShutdownMessage(DataNode.
class, args, LOG);
      DataNode datanode = createDataNode(args,
 null, resources);
     
 if (datanode != null)
        datanode.
join();
创建datanode跟调用datanode.join()(Java Threadjoin() 方法主要是让调用改方法的thread完成run方法里面的东西后, 在执行join()方法后面的代码。)
#
 进入 createDataNode
/** Instantiate & Start a single datanode daemon and wait for it to finish.
   *  If this thread is specifically interrupted, it will stop waiting.
   *  LimitedPrivate for creating secure datanodes
   */
 
 public static DataNode createDataNode(String args[],
            Configuration conf, SecureResources resources)
 throws IOException {
    DataNode dn =
 instantiateDataNode(args, conf, resources);
    runDatanodeDaemon(dn);
   
 return dn;
  }
注释说:实例化和开始一个datanode守护进程(runDatanodeDaemon(dn),等待它完成。如果专门打断这个线程,它将停止等待。创建安全datanodes LimitedPrivate
# startDataNode
方法,一直跟进去(省略中间的代码)最后进入到这么一个主要的方法,该方法代码很多
void startDataNode(Configuration conf,
                     AbstractList<File> dataDirs, SecureResources resources
                     )
 throws IOException {
第一个重要的地方:
// connect to name node
   
 this.namenode = (DatanodeProtocol)
      RPC.waitForProxy(DatanodeProtocol.
class,
                       DatanodeProtocol.
versionID,
                       nameNodeAddr,
                       conf);
datanode中起了一个RPC的客户端,得到一个服务端的代理对象,这里被强转为DatanodeProtocol,实际上就是NameNode这个类,因为NameNode类实现了DatanodeProtocol接口,然后就可以调用NameNode里面的方法了
第二个重要的地方:
this.threadGroup = new ThreadGroup("dataXceiverServer");
   
 this.dataXceiverServer = new Daemon(threadGroup,
       
 new DataXceiverServer(ss, conf, this));
下面我们可以来开始分析DataNode上的动态行为。首先我们来分析DataXceiverServerDataXceiverDataNode上数据块的接受/发送并没有采用我们前面介绍的RPC机制,原因很简单,RPC是一个命令式的接口,而DataNode处理数据部分,往往是一种流式机制。DataXceiverServerDataXceiver就是这个机制的实现。其中,DataXceiver还依赖于两个辅助类:BlockSenderBlockReceiver。  DataXceiverServer很简单,它打开一个端口,然后每接收到一个连接,就创建一个DataXceiver,服务于该连接,DataXceiver是一个线程读一次操作请求进行操作之后就返回,并记录该连接的socket,对应的实现在DataXceiverServerrun方法里。当系统关闭时,DataXceiverServer将关闭监听的socket和所有DataXceiversocket,这样就导致了DataXceiver出错并结束线程。DataXceiverServer接受到的数据主要有操作码+操作数据+用户名。  (1BlockSender用来发送block数据,返回给用户的是:成功与否+校验类型+实际offset(因为校验块的原因和用户请求的offset不一致)。BlockSender有配置参数corruptChecksumOk(校验数据读入出错忽略,出错用零填充),chunkOffsetOK(是否要告知实际的offset,如上所述),verifyChecksum(是否要求在把校验数据和实际数据读入包缓存中时校验数据,也就是在发送之前),向客户端传包的时候第一、二个参数为true,第三为false,为的是尽快发送数据。而用来校验已有数据时使用第一二参数为false,第三参数为true,为了及时发现错误数据。readBlock完成实际读数据的操作,比较简单。sendChunks方法中,对于客户端传包的包只有校验和而实际数据通过管道传输,具体见函数。
第三个重要的地方:
//create a servlet to serve full-file content
    InetSocketAddress infoSocAddr = DataNode.getInfoAddr(conf);
    String infoHost = infoSocAddr.getHostName();
   
 int tmpInfoPort = infoSocAddr.getPort();
   
 this.infoServer = (secureResources == null)
       ?
 new HttpServer("datanode", infoHost, tmpInfoPort, tmpInfoPort == 0,
           conf, SecurityUtil.getAdminAcls(conf, DFSConfigKeys.
DFS_ADMIN))
       :
 new HttpServer("datanode", infoHost, tmpInfoPort, tmpInfoPort == 0,
           conf, SecurityUtil.getAdminAcls(conf, DFSConfigKeys.
DFS_ADMIN),
          
 secureResources.getListener());
这里new了一个infoServernew HttpServer里面是一个jettyserver,就是为了向用户提供web界面的,然后再后面再启动infoServer
第四个重要的地方:
//init ipc server
    InetSocketAddress ipcAddr = NetUtils.createSocketAddr(
        conf.get(
"dfs.datanode.ipc.address"));
   
 ipcServer = RPC.getServer(this, ipcAddr.getHostName(), ipcAddr.getPort(),
        conf.getInt(
"dfs.datanode.handler.count", 3), false, conf,
       
 blockTokenSecretManager);
这里datanode中起了一个RPC的服务端(暂时不知给谁调用的)

#
 接下来就会调用dtanode.join()方法,datanoderun方法
/**
   * No matter what kind of exception we get, keep retrying to offerService().
   * That's the loop that connects to the NameNode and provides basic DataNode
   * functionality.
   *
   * Only stop when "shouldRun" is turned off (which can only happen at shutdown).
   */
 
 public void run() {
LOG.info(dnRegistration + "In DataNode.run, data = " + data);

   
 // start dataXceiveServer
   
 dataXceiverServer.start();
   
 ipcServer.start();
       
   
 while (shouldRun) {
     
 try {
        startDistributedUpgradeIfNeeded();
        offerService();
      }
 catch (Exception ex) {
       
 LOG.error("Exception: " + StringUtils.stringifyException(ex));
       
 if (shouldRun) {
         
 try {
            Thread.sleep(5000);
          }
 catch (InterruptedException ie) {
          }
        }
      }
    }
       
   
 LOG.info(dnRegistration + ":Finishing DataNode in: "+data);
    shutdown();
  }
#
 我们主要看offerService();这个方法
/**
   * Main loop for the DataNode.  Runs until shutdown,
   * forever calling remote NameNode functions.
   */
 
 public void offerService() throws Exception {
里面有一个死循环,主要是调用了一个方法sendHeartbeat
DatanodeCommand[] cmds =
 namenode.sendHeartbeat(dnRegistration,
                                                      
 data.getCapacity(),
                                                      
 data.getDfsUsed(),
                                                      
 data.getRemaining(),
                                                      
 xmitsInProgress.get(),
                                                       getXceiverCount());
         
 myMetrics.addHeartBeat(now() - startTime);
         
 //LOG.info("Just sent heartbeat, with name " + localName);
         
 if (!processCommand(cmds))
           
 continue;
这里的namenode在上面已经解释过了,其实就是DataNode,这里会调用DataNodesendHeartbeat方法,将自己的状态作为参数(容量,空间使用了多少,剩余多少等等)传递给了namenode。然后namenode调用方法得到一些返回值给datanodedatanode处理这些命令,然后再处理这些指令processCommand(cmds),放送心跳基本分析完毕。
# 接下来看发送心跳以及处理的namenode的指令后datanode还干了些什么
DatanodeCommand cmd = namenode.blockReport(dnRegistration,
                    BlockListAsLongs.convertToArrayLongs(bReport));
看到这里了,namenodeDatanodeProtocol接口也就是NameNode类。这里是RPC客户端的远程调用,datanode会扫描其机器上对应保存hdfs block的目录下(dfs.data.dir)所保存的所有文件块,然后通过namenoderpc调用将这些block的信息以一个long数组的方式发送给namenodenamenode在接收到一个datanodeblockReport rpc调用后,从rpc中解析出block数组,并将这些接收到的blocks插入到BlocksMap表中,由于此时BlocksMap缺少的仅仅是每个block对应的datanode信息,而namenoe能从report中获知当前report上来的是哪个datanode的块信息,所以,blockReport过程实际上就是namenode在接收到块信息汇报后,填充BlocksMap中每个block对应的datanodes列表的三元组信息的过程。其流程如下图所示:
 pastedGraphic_40.png
当所有的datanode汇报完blocknamenode针对每个datanode的汇报进行过处理后,namenode的启动过程到此结束。此时BlocksMapblock->datanodes的对应关系已经初始化完毕。如果此时已经达到安全模式的推出阈值,则hdfs主动退出安全模式,开始提供服务。
// If we have sent the first block report, then wait a random
// time before we start the periodic block reports.
如果我们把第一块报告发送了之后,在我们开始定期块报告之前会等待一个随机时间。

发送完毕之后NameNode会给DataNode发送一些指令,然后DataNode会处理这些指令
processCommand(cmd);
基本上也就分析完了
# DataNode启动过程分析
DataNode一启动,会对DataNode进行初始化,最终进入到startDataNode方法中,该方法
主要有4个重要的地方。
1起了一个RPC客户端,namenode,实现的接口是DataProtocol,主要是DataNodeNameNode通信用的,DataNode将自己的一些状态(容量,使用,未使用等)告诉NameNode,然后NameNode返回DataNode一些指令,告诉DataNode要去做什么。
2dataXceiverServer的启动,是一个线程组(流服务)
3
起了一个jetty服务器,提供web方式的访问
4起了一个RPC的服务端,给NameNode调用(应该是这样)

接着会实例化datanode和开始一个datanode守护进程(runDatanodeDaemon(dn),然后会调用datanodejoin方法,进入到run方法,这个方法将刚刚startDataNode方法中的RPC服务端开启,然后调用了一个offerService方法,里面是一个死循环,最先开始是datanode RPC远程调用namenode.sendHeartbeat方法,这里的namenode在上面已经解释过了,其实就是DataNode,这里会调用DataNodesendHeartbeat方法,将自己的状态作为参数(容量,空间使用了多少,剩余多少等等)传递给了namenode。然后namenode调用方法得到一些返回值给datanodedatanode处理这些命令,然后再处理这些指令processCommand(cmds)
接下来datanode会调用namenode.blockReport的方法,datanode开始扫描自己目录下所保存的所有文件块,然后通过namenoderpc调用将这些block信息以一个long数组的方式发送给namenodenamenode在接收到一个datanodeblockReport rpc调用后,从rpc中解析出block数组,并将这些接收到的blocks插入到BlocksMap表中,由于NameNode启动时BlocksMap缺少的仅仅是每个block对应的datanode信息,而namenoe能从blocReport中获知当前blockReport上来的是哪个datanode的块信息,所以,blockReport过程实际上就是namenode在接收到块信息汇报后,填充BlocksMap中每个block对应的datanodes列表的三元组信息的过程。当所有的datanode汇报完blocknamenode针对每个datanode的汇报进行过处理后,namenode的启动过程到此结束。此时BlocksMapblock->datanodes的对应关系已经初始化完毕。如果此时已经达到安全模式的退出阈值,则hdfs主动退出安全模式,开始提供服务。
调用完namenode.blockReport方法之后 namenode会给一些指令给datanode,然后datanode再处理这些指令。
在这个过程中是datanode主动与namenode通信,然后namenode传给datanode一些函数的返回值,告诉datanode该做什么。
DataProtocol
中的注释:
* The only way a NameNode can communicate with a DataNode is by
 * returning values from thesefunctions.
# FileSystem源码分析(如何与NameNode通信ClientProtocol
# FileSystemcreate方法
public FSDataOutputStream create(Path f) throws IOException {
   
 return create(f, true);
  }
# 进入create(f, true);
public FSDataOutputStream create(Path f, boolean overwrite)
   
 throws IOException {
   
 return create(f, overwrite,
                  getConf().getInt(
"io.file.buffer.size", 4096),
                  getDefaultReplication(),
                  getDefaultBlockSize());
  }
# 进入create(一直点下去,这里省略一些重复的步骤),会看到create是一个接口
  public abstract FSDataOutputStream create(Path f,
      FsPermission permission,
     
 boolean overwrite,
     
 int bufferSize,
     
 short replication,
     
 long blockSize,
      Progressable progress)
 throws IOException;
看到这里FielSystemcreate方法是一个接口,子类肯定实现了,想要知道是什么子类,可以在FileSystem执行create的那里打个断点就知道了具体子类是什么了。具体子类就是DistributedFileSystem
# DistributedFileSystem中的create方法
public FSDataOutputStream create(Path f, FsPermission permission,
   
 boolean overwrite,
   
 int bufferSize, short replication, long blockSize,
    Progressable progress)
 throws IOException {

   
 statistics.incrementWriteOps(1);
   
 return new FSDataOutputStream
       (
dfs.create(getPathName(f), permission,
                   overwrite,
 true, replication, blockSize, progress, bufferSize),
       
 statistics);
  }
dfsorg.apache.hadoop.hdfs.DFSClient这个类
#
 进入new FSDataOutputStream 看看FSDataOutputStream的构造方法
在类的注释中可以看到这么一句话Utility that wraps a {@link OutputStream} in a {@link DataOutputStream},说明这个类是对OutputStream的一个封装,我们就不需再看这个类了,因为它只是对OutputStream的一个封装,只需要看OutputStream就行了,返回到上一个步骤
# 进入dfs.create方法看看
LOG.debug(src + ": masked=" + masked);
   
 final DFSOutputStream result = new DFSOutputStream(src, masked,
        overwrite, createParent, replication, blockSize, progress, buffersize,
       
 conf.getInt("io.bytes.per.checksum", 512));
    beginFileLease(src, result);
   
 return result;
# 进入new DFSOutputStream(src, masked…
/**
     * Create a new output stream to the given DataNode.
     * @see ClientProtocol#create(String, FsPermission, String, boolean, short, long)
     */
    DFSOutputStream(String src, FsPermission masked,
 boolean overwrite,
       
 boolean createParent, short replication, long blockSize, Progressable progress,
       
 int buffersize, int bytesPerChecksum) throws IOException {
     
 this(src, blockSize, progress, bytesPerChecksum, replication);

      computePacketChunkSize(
writePacketSize, bytesPerChecksum);

     
 try {
       
 // Make sure the regular create() is done through the old create().
       
 // This is done to ensure that newer clients (post-1.0) can talk to
       
 // older clusters (pre-1.0). Older clusters lack the new  create()
       
 // method accepting createParent as one of the arguments.
       
 if (createParent) {
         
 namenode.create(
            src, masked,
 clientName, overwrite, replication, blockSize);
        }
 else {
         
 namenode.create(
            src, masked,
 clientName, overwrite, false, replication, blockSize);
        }
      }
 catch(RemoteException re) {
       
 throw re.unwrapRemoteException(AccessControlException.class,
                                       FileAlreadyExistsException.
class,
                                       FileNotFoundException.
class,
                                       NSQuotaExceededException.
class,
                                       DSQuotaExceededException.
class);
      }
     
 streamer.start();
    }
该类的构造方法上说/**
     * Create a new output stream to the given DataNode.
     * @see ClientProtocol#create(String, FsPermission, String, boolean, short, long)
     */(创建一个指向DataNode的输出流)
这个DFSOutputStreamDFSClient.的内部类
# 进入namenode.create方法
/**
   * Create a new file entry in the namespace.
   */
 
 public void create(String src,
                     FsPermission masked,
                             String clientName,
                            
 boolean overwrite,
                            
 short replication,
                            
 long blockSize
                             )
 throws IOException;
看注释说在namespace创建一个entrynamespace就是NameNode中维护的两张核心表之一
The NameNode controls two critical tables:
 *   1)  filename->blocksequence (namespace))这就是namespace
#
 回到上一步,这里的那么namenode是什么呢?
Namenode就是  public final ClientProtocol namenode;是一个接口,说明了DFSClient是一个分布式文件系统的客户端,也就是RPC的客户端,就是说RPC客户端含有一个RPC服务端的代理对象。这个代理对象就是ClientProtocol,这个ClientProtocol就是NameNode的一个接口。结论就是这里看到的namenodeClientProtocol,实质上就是NameNode的代理对象。文件创建好了,现在来看看流
#
 点击查看streamer.start();可以看到streamer是一个线程类,streamer的是pastedGraphic_48.pngDataStreamer,现在看看pastedGraphic_48.pngDataStreamer的源码,看看它的run方法
// The DataStreamer class is responsible for sending data packets to the
   
 // datanodes in the pipeline. It retrieves a new blockid and block locations
   
 // from the namenode, and starts streaming packets to the pipeline of
   
 // Datanodes. Every packet has a sequence number associated with
   
 // it. When all the packets for a block are sent out and acks for each
   
 // if them are received, the DataStreamer closes the current block.
rivate class DataStreamer extends Daemon {
从注释上可以看出D啊他Streamer类负责发送数据包给datanode,它从namenode中渠道blockidblocklocations。每一个packet有一个它自身的序列号,当一个block的所有包发送并被确认之后,关闭当前block,实际上就是关闭指向当前快的输出流。
问题:
谁发送这些包?
------DataStreamer
发送给谁?-------------DataNode
谁发送确认?---------- DataNode(就是发送给DataStreamer一个确认的收到数据的信息)
谁接收确认?
----------- DataStreamer(接收DataNode发送的确认)
也就是把数据写到
DataNode节点的block文件中(linux文件系统中的)
pastedGraphic_49.png
# 总结
FileSystemcreate方法其实就是把一个数据发送到linux文件系统中,但是HDFSAPI封装的非常好,对于用户来说是透明的,也就是通透性。
从上面的流程图可以看出:
到了
DFSOutputStream中后分两条路线执行了,先是namenode.create方法的执行创建了一个从客户端到datanode的一条输出流,然后DataDtreamer类又得到了blockidblocklocation之后,就能把文件上传到指定的位置了,但是真正干事的是FSNamesystem这个类(Create a new file entry in the namespace.,就是往、维护的那两张表中加入信息)
# JobTracker源码分析
# JobTrackerhadoopmapreduce框架中最重要的一个类,这个类负责整个集群的作业控制和资源管理。
# JobTracker的启动是在用户启动hadoop集群时启动的,启动JobTracker是通过调用JobTrackermain()方法启动。接下来看看源码
* Start the JobTracker process.  This is used only for debugging.  As a rule,
   * JobTracker should be run as part of the DFS Namenode process.
   */
 
 public static void main(String argv[]
                          )
 throws IOException, InterruptedException {
    StringUtils.startupShutdownMessage(JobTracker.
class, argv, LOG);
   
   
 try {
     
 if(argv.length == 0) {
        JobTracker tracker = startTracker(
new JobConf());
        tracker.offerService();
# JobTracker tracker = startTracker(new JobConf());实例化了一个JobTracker,进入构造方法看看
public static JobTracker startTracker(JobConf conf
                                        )
 throws IOException,
                                                 InterruptedException {
   
 return startTracker(conf, generateNewIdentifier());
  }
# 进入startTracker(conf, generateNewIdentifier());最后进入下面的方法
public static JobTracker startTracker(JobConf conf, String identifier, boolean initialize)
 
 throws IOException, InterruptedException {
    DefaultMetricsSystem.initialize(
"JobTracker");
    JobTracker result =
 null;
   
 while (true) {
     
 try {
        result =
 new JobTracker(conf, identifier);
        result.
taskScheduler.setTaskTrackerManager(result);
       
 break;
      }
 catch (VersionMismatch e) {
#进入result = new JobTracker(conf, identifier);(这里主要看调度器跟RPC服务端)
# 创建一个调度器
// Create the scheduler
    Class<?
 extends TaskScheduler> schedulerClass
      = conf.getClass(
"mapred.jobtracker.taskScheduler",
          JobQueueTaskScheduler.
class, TaskScheduler.class);
   
 taskScheduler = (TaskScheduler) ReflectionUtils.newInstance(schedulerClass, conf);
# 起一个RPC的服务端
his.interTrackerServer =
      RPC.getServer(
this, addr.getHostName(), addr.getPort(), handlerCount,
         
 false, conf, secretManager);
# web服务器jetty(跟namenode一样,提供webui方式访问,更方便)
infoServer = new HttpServer("job", infoBindAddress, tmpInfoPort,
        tmpInfoPort == 0, conf,
 aclsManager.getAdminsAcl());
   
 infoServer.setAttribute("job.tracker", this);
这里的重点是创建了一个调度器跟起了一个RPC服务端和web服务器。
pastedGraphic_50.png
jobTrackertaskTracker的通信是使用周期的调用heartbeat方法
JobClientJobSubmissionProtocolJobTracker
JobTracker
的职责
资源分配,具体是资源调度器
监控作业运行
pastedGraphic_51.png
提交作业就是把jobID放进一个map集合,keyjobIDvalueJobInProgress
然后就有一个调度器,不断的去这个map集合中获取job,然后处理
MapTaskrun方法
ReduceTaskrun方法

jobClient提交作业,调用JobSubmisionProtocol的方法,将作业放入JobTrackermap之后就完成工作了。再然后由调度器去那个放了作业的map方法中获取job(获取作业),然后执行

jobTrackertaskTracker通信走的是InterTrackerProtocol
看一下heartbeat方法,接下来可以看taskTracker
……getHeartbeatInterval
得到心跳间隔 心跳传送给jobTracer,将自己的状态传给jobTracker(跟datanode一样),然后taskTracker得到一个返回值(jobTracker传给taskTracker的东西)然后再处理这个返回值(这个返回值也就是jobTracker告诉taskTracker要做的事情)

#TaskTracker源码分析
pastedGraphic_52.png
# taskTracker主要完成以下工作
1 负责向JobTracker定期的发送心跳消息。消息中有说明是否要申请新的任务,并接收Job下达的任务。
2
 如果jobTracker下达了task任务要执行,则执行该任务。
# 先看看类的注释跟类结构
* TaskTracker is a process that starts and tracks MR Tasks
 * in a networked environment.  It contacts the JobTracker
 * for Task assignments and reporting results.
public class TaskTracker implements MRConstants, TaskUmbilicalProtocol,
    Runnable, TaskTrackerMXBean {
TaskTracker是一个在网络环境处理开始和跟踪mapreduce任务(英语不好,只能这么翻译了),它联系着JobTracker为了任务分配和报告结果。
这个类实现了
Runnable接口,开启线程,你懂的,哼哼!←_←

#
 首先进入taskTrackermain方法,看看它都干了些什么
TaskTracker tt = new TaskTracker(conf);
#
 进入
int httpPort = infoSocAddr.getPort();
 
   this.server = new HttpServer("task", httpBindAddress, httpPort,
        httpPort == 0, conf, aclsManager.getAdminsAcl());
    workerThreads = conf.getInt("tasktracker.http.threads", 40);
   
 server.setThreads(1, workerThreads);
   
 // let the jsp pages get to the task tracker, config, and other relevant
   
 // objects
    FileSystem local = FileSystem.getLocal(conf);
   
 this.localDirAllocator = new LocalDirAllocator("mapred.local.dir");
    Class<?
 extends TaskController> taskControllerClass =
      conf.getClass(
"mapred.task.tracker.task-controller",
                     DefaultTaskController.
class, TaskController.class);

   
 fConf = new JobConf(conf);
   
 localStorage = new LocalStorage(fConf.getLocalDirs());
   
 localStorage.checkDirs();
   
 taskController =
      (TaskController)ReflectionUtils.newInstance(taskControllerClass,
 fConf);
   
 taskController.setup(localDirAllocator, localStorage);
   
 lastNumFailures = localStorage.numFailures();

   
 // create user log manager
    setUserLogManager(
new UserLogManager(conf, taskController));
    SecurityUtil.login(
originalConf, TT_KEYTAB_FILE, TT_USER_NAME);

    initialize();
这里开启了一个jetty服务器,web方式访问
# 进入 initialize方法
//set the num handlers to max*2 since canCommit may wait for the duration
   
 //of a heartbeat RPC
   
 this.taskReportServer = RPC.getServer(this, bindAddress,
        tmpPort, 2 * max,
 false, this.fConf, this.jobTokenSecretManager);
   
 this.taskReportServer.start();
开启一个RPC的服务端taskReportServer(谁来调用?)
# 抓取map任务完成的事件,mapLanacherreduceLanacher开始运行处理mapreduce任务
// start the thread that will fetch map task completion events
   
 this.mapEventsFetcher = new MapEventsFetcherThread();
   
 mapEventsFetcher.setDaemon(true);
   
 mapEventsFetcher.setName(
                            
 "Map-events fetcher for all reduce tasks " + "on " +
                            
 taskTrackerName);
   
 mapEventsFetcher.start();
mapLauncher = new TaskLauncher(TaskType.MAP, maxMapSlots);
   
 reduceLauncher = new TaskLauncher(TaskType.REDUCE, maxReduceSlots);
   
 mapLauncher.start();
   
 reduceLauncher.start();
# 然后进入 TackTrackermain方法里面,进入tt.run();看看该线程干了些什么(run方法)
 public void run() {
   
 try {
      getUserLogManager().start();
      startCleanupThreads();
     
 boolean denied = false;
     
 while (running && !shuttingDown) {
       
 boolean staleState = false;
       
 try {
         
 // This while-loop attempts reconnects if we get network errors
         
 while (running && !staleState && !shuttingDown && !denied) {
           
 try {
              State osState = offerService();
# Run方法里面主要是offerService();在干事,接下来看offerService();吧! (只看主要的代码)
  // If the TaskTracker is just starting up:
       
 // 1. Verify the versions matches with the JobTracker
       
 // 2. Get the system directory & filesystem
       
 if(justInited) {
          String jtBuildVersion =
 jobClient.getBuildVersion();
          String jtVersion =
 jobClient.getVIVersion();
这里的jobClientInterTrackerProtocol这样一个接口,JobTracker实现了这个接口,也就是说这里是RPC客户端远程调用了RPC服务端JobTracker的方法了,得到了这些版本,,第二行注释说了要核实核实匹配这些JobTracker的版本
# 重点来了,发送心跳
// Send the heartbeat and process the jobtracker's directives
        HeartbeatResponse heartbeatResponse = transmitHeartBeat(now);
# 进入transmitHeartBeatnow,看看在干嘛
//
   
 // Check if we should ask for a new Task
   
 //
   
 boolean askForNewTask;
   
 long localMinSpaceStart;
   
 synchronized (this) {
      askForNewTask =
        ((
status.countOccupiedMapSlots() < maxMapSlots ||
         
 status.countOccupiedReduceSlots() < maxReduceSlots) &&
        
 acceptNewTasks);
      localMinSpaceStart =
 minSpaceStart;
    }
   
 if (askForNewTask) {
先将自己的状态用对象封装起来,比如taskTrackerNamelocalHostnamemaxMapSlotsmaxReduceSlots等。
然后检查
taskTracker是否应该寻求一个新任务boolean askForNewTask;
如果mapreduced的个数没有超过最大值,可以接收新任务的基础上,再根据minSpaceStart
 的值来确定是否可以领取新的任务。minSpaceStartmapred.local.dir.minspacestart属性指定。默认为0,如果minSpaceStart的值小于磁盘空闲的空间值,则可以,否则不能。
并记录生成消息的时间。
# 得到了heartbeatResponse这个对象之后,就可以开始任务了,里面有jobid
HeartbeatResponse heartbeatResponse = jobClient.heartbeat(status,
                                                             
 justStarted,
                                                             
 justInited,
                                                              askForNewTask,
                                                             
 heartbeatResponseId);
# 在这里得到acitons,得到要处理的东西
   TaskTrackerAction[] actions = heartbeatResponse.getActions();
#
 循环actions
if (actions != null){
         
 for(TaskTrackerAction action: actions) {
           
 if (action instanceof LaunchTaskAction) {
              addToTaskQueue((LaunchTaskAction)action);
            }
 else if (action instanceof CommitTaskAction) {
              CommitTaskAction commitAction = (CommitTaskAction)action;
             
 if (!commitResponses.contains(commitAction.getTaskID())) {
               
 LOG.info("Received commit task action for " +
                          commitAction.getTaskID());
               
 commitResponses.add(commitAction.getTaskID());
              }
            }
 else {
              addActionToCleanup(action);
            }
          }
        }
        markUnresponsiveTasks();
        killOverflowingTasks();
# 进入addToTaskQueue((LaunchTaskAction)action);这里就是把mapreduce任务添加到mapLauncher或者reduceLauncher中去(这两个线程一开始初始化的时候就启动了
private void addToTaskQueue(LaunchTaskAction action) {
   
 if (action.getTask().isMapTask()) {
     
 mapLauncher.addToTaskQueue(action);
    }
 else {
     
 reduceLauncher.addToTaskQueue(action);
    }
  }
# 看看mapLauncher是什么类
class TaskLauncher extends Thread {
也就是说mapLauncherTaskLauncher这个类,这个类继承了Thread,也就是一个线程,下面看看它的run方法
# TaskLauncher run方法(重点代码)
synchronized (tip) {
           
 //to make sure that there is no kill task action for this
           
 if (!tip.canBeLaunched()) {
             
 //got killed externally while still in the launcher queue
             
 LOG.info("Not launching task " + task.getTaskID() + " as it got"
                +
 " killed externally. Task's state is " + tip.getRunState());
              addFreeSlots(task.getNumSlotsRequired());
             
 continue;
            }
            tip.
slotTaken = true;
          }
         
 //got a free slot. launch the task
          startNewTask(tip);
这里得到了一个空闲的slots,然后就运行任务了,这里的重点是把任务给运行了(map或者reduce任务)
到了这里基本上taskTracker的工作就完了,后面只是一些其它的事情了
# 除掉在汇报周期内没有回报进展的task,则认定为失败的taskkill掉。
# 检查本节点的磁盘空间是否处于危险阶段。空闲磁盘空间危险的临界值是通过mapred.local.dir.minspacekill
     设定的。默认为0,则不做处理,如果不为0,空闲的空间比该值小,则该节点不再接受新的task
    acceptNewTasks设为false,直到所有的task运行完,清理掉。同时会kill掉一个task,假如有多个task在运行,那kill掉那个呢,taskTracker是根据优先级来kill的,优先级最低的会kill掉。
# 检查当前节点是否空闲,如果是空闲而且acceptNewTasksfalse,则更新acceptNewTaskstrue,判断空闲的根据是tasks.isEmpty() && tasksToCleanup.isEmpty();

# 总结 TaskTracker
taskTracker 启动一个jetty服务器,启动了一个RPC的服务端,然后调用了run方法开启了一个线程,线程的主要方法是offerService这个方法,发送心跳,寻求一个新任务,得到heartbeatResponse这个对象,里面有任务相关的东西(比如任务id),然后通过TaskTrackerAction[] actions = heartbeatResponse.getActions();得到需要处理的任务,之后循环actions,将任务添加到任务队列中去addToTaskQueue((LaunchTaskAction)action)
这个队列会把action放入一个TaskLauncher类中,该类是一个线程类,运行run方法,就把这个action(任务)给完成了。最后面再做一些其他事情,比如检查当前节点是否为空闲什么的。
# 网站日志分析项目
# 项目介绍
# 项目环境以及所需工具
虚拟机:Vmare9.0.2
操作系统:CentOS 6.4hadoop用户登录,root用户权限过大
集群环境:完全分布,3台虚拟机,masterslave1slave2
主要软件:
Hadoop-1.1.2
Hbase-0.94.7
Hive-0.9.0
Zookeeper-3.4.5
Sqoop-1.4.3
辅助工具:
Putty
puttyMain
vmare启动了之后,用这个工具登录centos进行操作,该工具依赖putty
Scure shell client(主要用来传输文件)
# 项目描述
通过对黑马技术论坛的apache common日志进行分析,
计算论坛关键指标,供运营者决策。
# 数据情况
pastedGraphic_53.png
# 关键指标
# 浏览量PV 
 
定义:页面浏览量即为PV(Page View),是指所有用户浏览页面的总和,一个独立用户每打开一个页面就被记录1 次。

分析:网站总浏览量,可以考核用户对于网站的兴趣,就像收视率对于电视剧一样。但是对于网站运营者来说,更重要的是,每个栏目下的浏览量。



公式:记录计数
# 访客数UV(包括新访客数、新访客比例) 
 
定义:访客数(UV)即唯一访客数,一天之内网站的独立访客数( Cookie 为依据),一天内同一访客多次访问网站只计算1 个访客。

分析:在统计工具中,我们经常可以看到,独立访客和
IP数的数据是不一样的,独立访客都多于IP数。那是因为,同一个IP地址下,可能有很多台电脑一同使用,这种情况,相信都很常见。

还有一种情况就是同一台电脑上,用户清空了缓存,使用
360等工具,将cookie删除,这样一段时间后,用户再使用该电脑,进入网站,这样访问数UV也被重新加一。

当然,对于网站统计来说,关于访客数需要注意的另一个指标就是新访客数,新访客数据可以衡量,网站通过推广活动,所获得的用户数量。新访客对于总访客数的比值,可以看到网站吸引新鲜血液的能力,及如何保留旧有用户。

注册用户计算公式:对访问
member.php?mod=register的不同ip,计数
# IP数  
 
定义:一天之内,访问网站的不同独立IP 个数加和。其中同一IP无论访问了几个页面,独立IP 数均为1

分析:这是我们最熟悉的一个概念,无论同一个
IP上有多少电脑,或者其他用户,从某种程度上来说,独立IP的多少,是衡量网站推广活动好坏最直接的数据。

公式:对不同
ip,计数
# 跳出率 
 
定义:只浏览了一个页面便离开了网站的访问次数占总的访问次数的百分比,即只浏览了一个页面的访问次数 / 全部的访问次数汇总。

分析:跳出率是非常重要的访客黏性指标,它显示了访客对网站的兴趣程度:跳出率越低说明流量质量越好,访客对网站的内容越感兴趣,这些访客越可能是网站的有效用户、忠实用户。

该指标也可以衡量网络营销的效果,指出有多少访客被网络营销吸引到宣传产品页或网站上之后,又流失掉了,可以说就是煮熟的鸭子飞了。比如,网站在某媒体上打广告推广,分析从这个推广来源进入的访客指标,其跳出率可以反映出选择这个媒体是否合适,广告语的撰写是否优秀,以及网站入口页的设计是否用户体验良好。

公式:
(1)统计一天内只出现一条记录的ip,称为跳出数
          (2)跳出数/PV
# 版块热度排行榜 
 
定义:版块的访问情况排行。

分析:巩固热点版块成绩,加强冷清版块建设。同时对学科建设也有影响。

公式:按访问次数、停留时间统计排序
# 开发步骤
# 把日志数据导入到hdfs中(ssh工具上传)
# 将日志文件上传到hadoop用户下的yting_logs文件夹下。
hadoop用户下新建mkdir yting_logs目录用来存放需要分析的日志文件,然后将文件的用户修改为hadoop
[hadoop@master yting_logs]$ mkdir yting_logs
[hadoop@master yting_logs]$ cd yting_logs
[hadoop@master yting_logs]$ pwd
/home/hadoop/yting_logs
[hadoop@master yting_logs]$ ll
total 213052
-rw-r--r--. 1 root root  61084192 May 31  2013 access_2013_05_30.log
-rw-r--r--. 1 root root 157069653 Jun  1  2013 access_2013_05_31.log
[hadoop@master yting_logs]$ su root
Password:
[root@master yting_logs]# chown hadoop:hadoop *
[root@master yting_logs]# ll
total 213052
-rw-r--r--. 1 hadoop hadoop  61084192 May 31  2013 access_2013_05_30.log
-rw-r--r--. 1 hadoop hadoop 157069653 Jun  1  2013 access_2013_05_31.log
[root@master yting_logs]# exit
exit
[hadoop@master yting_logs]$ ll
total 213052
-rw-r--r--. 1 hadoop hadoop  61084192 May 31  2013 access_2013_05_30.log
-rw-r--r--. 1 hadoop hadoop 157069653 Jun  1  2013 access_2013_05_31.log
[hadoop@master yting_logs]$
pastedGraphic_54.png
# 将日志文件存入hdfs中去
# 启动集群
[hadoop@master yting_logs]$ jps
3392 Jps
[hadoop@master yting_logs]$ start-all.sh
starting namenode, logging to /usr/local/hadoop/hadoop-1.1.2/libexec/../logs/hadoop-hadoop-namenode-master.out
slave2: starting datanode, logging to /usr/local/hadoop/hadoop-1.1.2/libexec/../logs/hadoop-hadoop-datanode-slave2.out
master: starting datanode, logging to /usr/local/hadoop/hadoop-1.1.2/libexec/../logs/hadoop-hadoop-datanode-master.out
slave1: starting datanode, logging to /usr/local/hadoop/hadoop-1.1.2/libexec/../logs/hadoop-hadoop-datanode-slave1.out
master: starting secondarynamenode, logging to /usr/local/hadoop/hadoop-1.1.2/libexec/../logs/hadoop-hadoop-secondarynamenode-master.out
starting jobtracker, logging to /usr/local/hadoop/hadoop-1.1.2/libexec/../logs/hadoop-hadoop-jobtracker-master.out
slave2: starting tasktracker, logging to /usr/local/hadoop/hadoop-1.1.2/libexec/../logs/hadoop-hadoop-tasktracker-slave2.out
master: starting tasktracker, logging to /usr/local/hadoop/hadoop-1.1.2/libexec/../logs/hadoop-hadoop-tasktracker-master.out
slave1: starting tasktracker, logging to /usr/local/hadoop/hadoop-1.1.2/libexec/../logs/hadoop-hadoop-tasktracker-slave1.out
[hadoop@master yting_logs]$ jps
3688 SecondaryNameNode
3462 NameNode
3580 DataNode
3894 TaskTracker
3940 Jps
3769 JobTracker
[hadoop@master yting_logs]$
#
 hdfs中新建一个yting_logs然后上传日志
[hadoop@master yting_logs]$ hadoop fs -mkdir /yting_logs
[hadoop@master yting_logs]$ ls
access_2013_05_30.log  access_2013_05_31.log
[hadoop@master yting_logs]$ hadoop fs -put ./access_*.log /yting_logs
[hadoop@master yting_logs]$
# 数据清洗
数据清洗,把原始数据中与业务无关的信息、脏数据等处理掉,统一放到/hmbbs_cleaned/YYYY_mm_dd
# 使用mapred进行清洗

package com.itheima.logs;

import java.io.IOException;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.Locale;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

/**
 *
 * @author
 妳那伊抹微笑
 *
 *
 数据清洗,得到想要的数据,当下面一条数据进来之后
 * 27.19.74.143 - - [30/May/2013:17:38:20 +0800] "GET /data/attachment/common/c8/common_2_verify_icon.png HTTP/1.1" 200 582
 *
 经过清洗,输出后为
 * 27.19.74.143 1369906700000 data/attachment/common/c8/common_2_verify_icon.png
 */
public class DataClear {
private static LogParsertUtils logUtils = new LogParsertUtils();

enum Counter {
LINESKIP,         //
 出错的行
STATIC_RESOURCE   // 静态资源
}

public static class Mapper extends org.apache.hadoop.mapreduce.Mapper<LongWritable, Text, LongWritable, Text>{
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
try {
String[] parses = logUtils.parse(value.toString());
String ip = parses[0];        // ip
String logTime = parses[1];   //
 登录时间
String url = parses[2];       // url

//
 过滤所有静态请求
if(url.startsWith("GET /static") || url.startsWith("GET /uc_server")){
context.getCounter(Counter.STATIC_RESOURCE).increment(1); //
 静态资源的计数器加1
return ;   //
 不处理静态资源的请求了
}

//
 只要iplogTimeurl
context.write(key, new Text(ip+"\t"+logTime+"\t"+url));
} catch (Exception e) {
context.getCounter(Counter.LINESKIP).increment(1); //
 出错行计数器加 1(数据不完整)
}
}
}

public static class Reducer extends org.apache.hadoop.mapreduce.Reducer<LongWritable, Text, Text, NullWritable>{
@Override
protected void reduce(LongWritable key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
for (Text text : values) {
context.write(text, NullWritable.get());
}
}
}

static class LogParsertUtils {
public static final SimpleDateFormat FORMAT = new SimpleDateFormat(
"d/MMM/yyyy:HH:mm:ss", Locale.ENGLISH);

/**
*
 解析英文时间字符串
*
* @param string
* @return
* @throws ParseException
*/
public Date parseDateFormat(String string) {
Date parse = null;
try {
parse = FORMAT.parse(string);
} catch (ParseException e) {
e.printStackTrace();
}
return parse;
}

/**
*
 解析日志的行记录
*
* @param line
* @return
 数组含有5个元素,分别是ip、时间、url、状态、流量
*/
public String[] parse(String line) {
String ip = parseIP(line);
String time = parseTime(line);
String url = parseURL(line);
String status = parseStatus(line);
String traffic = parseTraffic(line);

return new String[] { ip, time, url, status, traffic };
}

private String parseTraffic(String line) {
final String trim = line.substring(line.lastIndexOf("\"") + 1)
.trim();
String traffic = trim.split(" ")[1];
return traffic;
}

private String parseStatus(String line) {
final String trim = line.substring(line.lastIndexOf("\"") + 1)
.trim();
String status = trim.split(" ")[0];
return status;
}

private String parseURL(String line) {
final int first = line.indexOf("\"");
final int last = line.lastIndexOf("\"");
String url = line.substring(first + 1, last);
if (url.startsWith("GET ")) {
//
 url中的"GET "跟后面的" HTTP/1.1"给删除掉,这些东西没用(根据业务需求来定)
url = url.substring("GET ".length() + 1, url.length()
- " HTTP/1.1".length());
}
if (url.startsWith("POST ")) {
//
 url中的"GET "跟后面的" HTTP/1.1"给删除掉,这些东西没用(根据业务需求来定)
url = url.substring("POST ".length() + 1, url.length()
- " HTTP/1.1".length());
}
return url;
}

private String parseTime(String line) {
final int first = line.indexOf("[");
final int last = line.indexOf("+0800]");
String time = line.substring(first + 1, last).trim();
return parseDateFormat(time).getTime() + ""; //
 将时间转换成long类型的数据
}

private String parseIP(String line) {
String ip = line.split("- -")[0].trim();
return ip;
}

}

public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
    String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
    if (otherArgs.length != 2) {
      System.err.println("Usage: data clear <in> <out>");
      System.exit(2);
    }
   
    Job job = new Job(conf, "data clear");   //
 任务命
    job.setJarByClass(DataClear.class);   // 指定Class
   
    job.setMapperClass(Mapper.class);   // Mapper
    job.setMapOutputKeyClass(LongWritable.class);   // Map
的输出key格式
    job.setMapOutputValueClass(Text.class);    // Map的输出value格式
   
    job.setReducerClass(Reducer.class);   // Reducer
    job.setOutputKeyClass(Text.class);   //
 输出key格式
    job.setOutputValueClass(NullWritable.class);   // 输出value格式
   
    job.setOutputFormatClass(TextOutputFormat.class);   //
 输出格式
   
    FileInputFormat.addInputPath(job, new Path(otherArgs[0]));     //
 文件的输入路径
    FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));   // 文件的输出路径
   
    System.exit(job.waitForCompletion(true) ? 0 : 1);
}

}
common.sh
# upload and clear daily log

logdate=$1

# upload daily logs fron linux to hdfs
hadoop fs -put /home/hadoop/yting_logs/access_${logdate}.log /yting_logs

# clear daily log
hadoop jar dataclearr.jar com.itheima.logs.DataClear hdfs://master:9000/yting_logs/access_${logdate}.log hdfs://master:9000/yting_logs_cleared/${logdate}
dailly.sh
# upload log file from linux to hdfs
# call ./common.sh

logdate='date + %y_%m_%d'

./common.sh $logdate
# 使用hive的外部分区表处理数据
# 创建一个外部分区表(yting)
[hadoop@master ~]$ hive -e "CREATE EXTERNAL TABLE YTING(id string, logtime string, url string) PARTITIONED BY (logdate string) ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t' LOCATION '/yting_logs_cleared';"
[hadoop@master ~]$ hive -e "show tables;"
OK
inner_table
sqoop_table1
t1
yting
# 添加分区(每天要修改分区字段)
hive -e "ALTER TABLE yting ADD PARTITION (logdate=\"2013_05_05\") LOCATION \"/yting_logs_cleared/2013_05_05\";"
# 使用hive进行数据的多维分析
# 开始统计(测试数据是5M,为了效率就弄这么少了)
# PV (浏览量 Page View
[hadoop@master ~]$ hive -e "select count(1) from yting;"
运行结果:43094
将查出来的结果放入到另一张表中去(pv_2013_05_05
hive> CREATE TABLE pv_2013_05_05 AS SELECT '2013_05_05', COUNT(1) FROM yting WHERE logdate='2013_05_05';
# reguser (一天里面的注册用户)
hive>  SELECT COUNT(1) FROM yting WHERE logdate='2013_05_05' AND instr(url, 'member.php?mod=register')>0;
将查出来的结果放入到另一张表去(reguser_2013_05_05
hive> create table reguser_2013_05_05 as SELECT '2013_05_05',COUNT(1) FROM yting WHERE logdate='2013_05_05' AND instr(url, 'member.php?mod=register')>0;
运行结果是:
4
# ip数(去重复distinct
hive> select count(distinct ip) from yting where logdate=’2013_05_05’;
运行结果:1738
不去重复的运行结果:43094
将查出来的结果放入到另一张表中去(ip_2013_05_05),这里退出了hive,直接在shell下使用的命令了
hive -e "create table ip_2013_05_05 as select '2013_05_05',count(distinct ip) from yting where logdate='2013_05_05'"
# jumper (跳出率)
hive> select count(1) from (select id,count(1) from yting where logdate=’ 2013_05_05’ group by id having count(1)=1);
运行结果:628
将查询出来的结果放入到另一张表中去(jumper_2013_05_05
create table jumper_2013_05_05 as select '2013_05_05',count(1) from (select id,count(1) from yting where logdate='2013_05_05' group by id having count(1)=1) t;
# hive分析结果导出到mysql中(sqoop导出之前mysql中一定要有表,不然会有异常)
# 导出pv(浏览量)
sqoop export --connect jdbc:mysql://master:3306/itheima_bbs --username root --password root --table daily_pv --export-dir '/user/hive/warehouse/pv_2013_05_05'  --fields-terminated-by '\001'
#
 导出reguser(注册用户)
sqoop export --connect jdbc:mysql://master:3306/itheima_bbs --username root --password root --table daily_reguser --export-dir '/user/hive/warehouse/reguser_2013_05_05'  --fields-terminated-by '\001'
#
 导出 ip
sqoop export --connect jdbc:mysql://master:3306/itheima_bbs --username root --password root --table daily_ip --export-dir '/user/hive/warehouse/ip_2013_05_05'  --fields-terminated-by '\001'
#
 导出 跳出率
sqoop export --connect jdbc:mysql://master:3306/itheima_bbs --username root --password root --table daily_ip --export-dir '/user/hive/warehouse/ip_2013_05_05'  --fields-terminated-by '\001'


# 提供视图工具供用户使用,指标查询mysql、明细查询hbase
# 网站日志分析中出的异常
# Faile with exception Unable to rename: hdfs://… to hdfs://…
原因:hivewarehouse目录被删除了
解决:新建一个表,这个目录就有了
# java.lang.RuntimeException: java.lang.ClassNotFoundException: org.apache.hadoop.hbase.mapreduce.TableOutputFormat
原因:slaves缺少jar
解决:masterjar包复制到slaves中去
# 找不到一个静态类,见鬼
14/03/11 14:12:49 WARN mapred.JobClient: No job jar file set.  User classes may not be found. See JobConf(Class) or JobConf#setJar(String).
java.lang.RuntimeException: java.lang.ClassNotFoundException: com.itheima.logs.HBaseBatchImport$BatchImportMapper
        at org.apache.hadoop.conf.Configuration.getClass(Configuration.java:849)
        at org.apache.hadoop.mapreduce.JobContext.getMapperClass(JobContext.java:199)
        at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:719)
        at org.apache.hadoop.mapred.MapTask.run(MapTask.java:370)
        at org.apache.hadoop.mapred.Child$4.run(Child.java:255)
        at java.security.AccessController.doPrivileged(Native Method)
        at javax.security.auth.Subject.doAs(Subject.java:396)
        at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1149)
        at org.apache.hadoop.mapred.Child.main(Child.java:249)
Caused by: java.lang.ClassNotFoundException: com.itheima.logs.HBaseBatchImport$BatchImportMapper
        at java.net.URLClassLoader$1.run(URLClassLoader.java:202)
        at java.security.AccessController.doPrivileged(Native Method)
        at java.net.URLClassLoader.findClass(URLClassLoader.java:190)
        at java.lang.ClassLoader.loadClass(ClassLoader.java:306)
        at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:301)
        at java.lang.ClassLoader.loadClass(ClassLoader.java:247)
        at java.lang.Class.forName0(Native Method)
        at java.lang.Class.forName(Class.java:249)
        at org.apache.hadoop.conf.Configuration.getClassByName(Configuration.java:802)
        at org.apache.hadoop.conf.Configuration.getClass(Configuration.java:847)
        ... 8 more

原因:java文件中缺少了job.setJarByClass(HBaseBatchImport.class);这样一行代码
解决:上面的异常信息最前面的饿没有截取
14/03/11 14:12:49 WARN mapred.JobClient: No job jar file set.  User classes may not be found. See JobConf(Class) or JobConf#setJar(String).
是这么一行,java代码里面没有job.setJarByClass(HBaseBatchImport.class);这样一行代码
这样一行代码
# java.lang.ArrayIndexOutOfBoundsException:
java.lang.ArrayIndexOutOfBoundsException: 3
        at com.itheima.logs.HBaseBatchImport$BatchImportReducer.reduce(HBaseBatchImport.java:50)
        at com.itheima.logs.HBaseBatchImport$BatchImportReducer.reduce(HBaseBatchImport.java:1)
        at org.apache.hadoop.mapreduce.Reducer.run(Reducer.java:176)
        at org.apache.hadoop.mapred.ReduceTask.runNewReducer(ReduceTask.java:650)
        at org.apache.hadoop.mapred.ReduceTask.run(ReduceTask.java:418)
        at org.apache.hadoop.mapred.Child$4.run(Child.java:255)
        at java.security.AccessController.doPrivileged(Native Method)
        at javax.security.auth.Subject.doAs(Subject.java:396)
        at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1149)
        at org.apache.hadoop.mapred.Child.main(Child.java:249)
原因:数组下标越界
解决:mapreduce程序中trycatch就行了


以后出异常的时候记得把最前面的异常信息看看



# 妳那伊抹微笑搭建hadoop环境出现的问题(仅供参考)
# 启动NameNode出现的错误hadoop-daemon.sh start dataNode
# [hadoop@master modules]$ hadoop-daemon.sh start namenode
bash: /root/.bashrc: Permission denied
Warning: $HADOOP_HOME is deprecated.
 
starting namenode, logging to /opt/modules/hadoop/hadoop-1.0.3/bin/../logs/hadoop-hadoop-namenode-master.out
bash: /root/.bashrc: Permission denied

下面是百度的结果:
(前些天根据“EasyHadoop集群部署和入门文档配置Hadoop集群时出现了错误,本人使用的reshat4,hadoop-1.0.3)
提示是:
bash: /root/.bashrc:
 权限不够
Warning: $HADOOP_HOME is deprecated.
bash: /root/.bashrc:
权限不够                            这里将将对应目录/root文件权限更改为777即可
bash: /root/.bashrc:
 权限不够
starting  namenode,logging to
/opt/modules/hadoop/hadoop-1.0.3/bin/../logs/hadoop-hadoop-namenode-master.out
这个问题我头疼了一个多星期,一直悬而未决,一直给予haddop-1.0.3目录777权限却还是有问题。偶尔看到某个网站试看视频中有个处理权限的镜头,今天照着命令敲了下。成功执行。因此要注意使用对应用户创建解压文件。并随时使用ll –a 查看文件权限。
[root@master hadoop-1.0.3]# chmod 777 logs/*
[root@master hadoop-1.0.3]# chmod 777 logs
[root@master hadoop-1.0.3]# chown root logs/*
[root@master hadoop-1.0.3]# chown root logs
[root@master hadoop-1.0.3]# bin/start-all.sh
弄了这之后发现依然namenode打不开,我就格式化了namenodehadoopnamenode –format,发现里面原来有抛出例外:
ERRORnamenode.NameNode: java.io.IOException: Cannot create directory
/var/hadoop/hadoop-hadoop/dfs/name/current at org.apache.hadoop.hdfs.server.common.Storage$
StorageDirectory.clearDirectory(Storage.java:297)
atorg.apache.hadoop.hdfs.server.namenode.FSImage.format(FSImage.java:1320)
at org.apache.hadoop.hdfs.server.namenode.FSImage.format(FSImage.java:1339)
at.org.apache.hadoop.hdfs.server.namenode.NameNode.format(NameNode.java:1164)
at.org.apache.hadoop.hdfs.server.namenode.NameNode.createNameNode(NameNode.java:1271)
atorg.apache.hadoop.hdfs.server.namenode.NameNode.main(NameNode.java:1288)
根据列外是不能创建一个目录,有了上面的经验我就知道了是没有建目录的权限。所以我直接就在/var/下建了一个目录叫做hadoop/,并授予了权限。再格式化namenode,成功启动。但是转到子节点上输入jps查看,发现并没有完全启动。知道node也有同样问题。
而为了node不出现同样问题,我们最好是在虚拟机中配置好后将master复制过去。然后改ip,改主机名。

# 然后会出现一个sax解析xml文件”<!--” 不能解析的错误。
解决:进入配置文件把Core-site.xml hdfs-site.xml 中的<!-- -->注释都删了,应该是中文注释的问题
# 然后又会报错
2014-02-10 21:14:24,393 ERROR org.apache.hadoop.hdfs.server.namenode.FSNamesystem: FSNamesystem initialization failed.
java.io.IOException: NameNode is not formatted.
        at org.apache.hadoop.hdfs.server.namenode.FSImage.recoverTransitionRead(FSImage.java:330)
解决:[hadoop@master conf]$ hadoop namenode -format
Warning: $HADOOP_HOME is deprecated.
#再报错,格式化终止Format aborted in /data/hadoop/hdfs/name,下面是执行的命令
14/02/10 21:30:03 INFO namenode.NameNode: STARTUP_MSG:
/************************************************************
STARTUP_MSG: Starting NameNode
STARTUP_MSG:   host = master/192.168.1.100
STARTUP_MSG:   args = [-format]
STARTUP_MSG:   version = 1.0.3
STARTUP_MSG:   build = https://svn.apache.org/repos/asf/hadoop/common/branches/branch-1.0 -r 1335192; compiled by 'hortonfo' on Tue May  8 20:31:25 UTC 2012
************************************************************/
Re-format filesystem in /data/hadoop/hdfs/name ? (Y or N) y
Format aborted in /data/hadoop/hdfs/name
14/02/10 21:30:05 INFO namenode.NameNode: SHUTDOWN_MSG:
/************************************************************
SHUTDOWN_MSG: Shutting down NameNode at master/192.168.1.100
************************************************************/
[hadoop@master conf]$
解决:在对namenode格式化之前,要确保dfs.name.dir(也就是/data/hadoop/hdfs/name )参数指定的目录不存在。
Hadoop
这样做的目的是防止错误地将已存在的集群格式化了
# 继续报错 
/************************************************************
SHUTDOWN_MSG: Shutting down NameNode at master/192.168.1.100
************************************************************/
2014-02-10 21:55:58,808 INFO org.apache.hadoop.hdfs.server.namenode.NameNode: STARTUP_MSG:
/************************************************************
STARTUP_MSG: Starting NameNode
STARTUP_MSG:   host = master/192.168.1.100
STARTUP_MSG:   args = []
STARTUP_MSG:   version = 1.0.3
STARTUP_MSG:   build = https://svn.apache.org/repos/asf/hadoop/common/branches/branch-1.0 -r 1335192; compiled by 'hortonfo' on Tue May  8 20:31:25 UTC 2012
************************************************************/
2014-02-10 21:55:59,478 INFO org.apache.hadoop.metrics2.impl.MetricsConfig: loaded properties from hadoop-metrics2.properties
2014-02-10 21:55:59,534 INFO org.apache.hadoop.metrics2.impl.MetricsSourceAdapter: MBean for source MetricsSystem,sub=Stats registered.
2014-02-10 21:55:59,538 INFO org.apache.hadoop.metrics2.impl.MetricsSystemImpl: Scheduled snapshot period at 10 second(s).
2014-02-10 21:55:59,538 INFO org.apache.hadoop.metrics2.impl.MetricsSystemImpl: NameNode metrics system started
2014-02-10 21:55:59,926 INFO org.apache.hadoop.metrics2.impl.MetricsSourceAdapter: MBean for source ugi registered.
2014-02-10 21:55:59,951 INFO org.apache.hadoop.metrics2.impl.MetricsSourceAdapter: MBean for source jvm registered.
2014-02-10 21:55:59,954 INFO org.apache.hadoop.metrics2.impl.MetricsSourceAdapter: MBean for source NameNode registered.
2014-02-10 21:56:00,202 INFO org.apache.hadoop.hdfs.util.GSet: VM type       = 32-bit
2014-02-10 21:56:00,202 INFO org.apache.hadoop.hdfs.util.GSet: 2% max memory = 0.61875 MB
2014-02-10 21:56:00,203 INFO org.apache.hadoop.hdfs.util.GSet: capacity      = 2^17 = 131072 entries
2014-02-10 21:56:00,203 INFO org.apache.hadoop.hdfs.util.GSet: recommended=131072, actual=131072
2014-02-10 21:56:00,305 INFO org.apache.hadoop.hdfs.server.namenode.FSNamesystem: fsOwner=hadoop
2014-02-10 21:56:00,305 INFO org.apache.hadoop.hdfs.server.namenode.FSNamesystem: supergroup=supergroup
2014-02-10 21:56:00,305 INFO org.apache.hadoop.hdfs.server.namenode.FSNamesystem: isPermissionEnabled=false
2014-02-10 21:56:00,322 INFO org.apache.hadoop.hdfs.server.namenode.FSNamesystem: dfs.block.invalidate.limit=100
2014-02-10 21:56:00,323 INFO org.apache.hadoop.hdfs.server.namenode.FSNamesystem: isAccessTokenEnabled=false accessKeyUpdateInterval=0 min(s), accessTokenLifetime=0 min(s)
2014-02-10 21:56:01,295 INFO org.apache.hadoop.hdfs.server.namenode.FSNamesystem: Registered FSNamesystemStateMBean and NameNodeMXBean
2014-02-10 21:56:01,375 INFO org.apache.hadoop.hdfs.server.namenode.NameNode: Caching file names occuring more than 10 times
2014-02-10 21:56:01,406 INFO org.apache.hadoop.hdfs.server.common.Storage: Number of files = 1
2014-02-10 21:56:01,447 INFO org.apache.hadoop.hdfs.server.common.Storage: Number of files under construction = 0
2014-02-10 21:56:01,448 INFO org.apache.hadoop.hdfs.server.common.Storage: Image file of size 112 loaded in 0 seconds.
2014-02-10 21:56:01,450 INFO org.apache.hadoop.hdfs.server.common.Storage: Edits file /data/hadoop/hdfs/name/current/edits of size 4 edits # 0 loaded in 0 seconds.
2014-02-10 21:56:01,456 INFO org.apache.hadoop.hdfs.server.common.Storage: Image file of size 112 saved in 0 seconds.
2014-02-10 21:56:02,008 INFO org.apache.hadoop.hdfs.server.common.Storage: Image file of size 112 saved in 0 seconds.
2014-02-10 21:56:02,124 INFO org.apache.hadoop.hdfs.server.namenode.NameCache: initialized with 0 entries 0 lookups
2014-02-10 21:56:02,125 INFO org.apache.hadoop.hdfs.server.namenode.FSNamesystem: Finished loading FSImage in 1843 msecs
2014-02-10 21:56:02,192 INFO org.apache.hadoop.hdfs.server.namenode.FSNamesystem: Total number of blocks = 0
2014-02-10 21:56:02,192 INFO org.apache.hadoop.hdfs.server.namenode.FSNamesystem: Number of invalid blocks = 0
2014-02-10 21:56:02,192 INFO org.apache.hadoop.hdfs.server.namenode.FSNamesystem: Number of under-replicated blocks = 0
2014-02-10 21:56:02,192 INFO org.apache.hadoop.hdfs.server.namenode.FSNamesystem: Number of  over-replicated blocks = 0
2014-02-10 21:56:02,192 INFO org.apache.hadoop.hdfs.StateChange: STATE* Safe mode termination scan for invalid, over- and under-replicated blocks completed in 53 msec
2014-02-10 21:56:02,192 INFO org.apache.hadoop.hdfs.StateChange: STATE* Leaving safe mode after 1 secs.
2014-02-10 21:56:02,194 INFO org.apache.hadoop.hdfs.StateChange: STATE* Network topology has 0 racks and 0 datanodes
2014-02-10 21:56:02,194 INFO org.apache.hadoop.hdfs.StateChange: STATE* UnderReplicatedBlocks has 0 blocks
2014-02-10 21:56:02,217 INFO org.apache.hadoop.util.HostsFileReader: Refreshing hosts (include/exclude) list
2014-02-10 21:56:02,243 INFO org.apache.hadoop.metrics2.impl.MetricsSourceAdapter: MBean for source FSNamesystemMetrics registered.
2014-02-10 21:56:02,342 INFO org.apache.hadoop.hdfs.server.namenode.FSNamesystem: ReplicateQueue QueueProcessingStatistics: First cycle completed 0 blocks in 122 msec
2014-02-10 21:56:02,342 INFO org.apache.hadoop.hdfs.server.namenode.FSNamesystem: ReplicateQueue QueueProcessingStatistics: Queue flush completed 0 blocks in 122 msec processing time, 122 msec clock time, 1 cycles
2014-02-10 21:56:02,342 INFO org.apache.hadoop.hdfs.server.namenode.FSNamesystem: InvalidateQueue QueueProcessingStatistics: First cycle completed 0 blocks in 0 msec
2014-02-10 21:56:02,343 INFO org.apache.hadoop.hdfs.server.namenode.FSNamesystem: InvalidateQueue QueueProcessingStatistics: Queue flush completed 0 blocks in 0 msec processing time, 0 msec clock time, 1 cycles
2014-02-10 21:56:02,565 INFO org.apache.hadoop.metrics2.impl.MetricsSourceAdapter: MBean for source RpcDetailedActivityForPort9000 registered.
2014-02-10 21:56:02,567 INFO org.apache.hadoop.metrics2.impl.MetricsSourceAdapter: MBean for source RpcActivityForPort9000 registered.
2014-02-10 21:56:02,575 INFO org.apache.hadoop.hdfs.server.namenode.NameNode: Namenode up at: master/192.168.1.100:9000
2014-02-10 21:56:02,595 INFO org.apache.hadoop.ipc.Server: Starting SocketReader
2014-02-10 21:56:03,125 INFO org.mortbay.log: Logging to org.slf4j.impl.Log4jLoggerAdapter(org.mortbay.log) via org.mortbay.log.Slf4jLog
2014-02-10 21:56:03,534 INFO org.apache.hadoop.http.HttpServer: Added global filtersafety (class=org.apache.hadoop.http.HttpServer$QuotingInputFilter)
2014-02-10 21:56:03,638 INFO org.apache.hadoop.http.HttpServer: dfs.webhdfs.enabled = false
2014-02-10 21:56:03,665 INFO org.apache.hadoop.http.HttpServer: Port returned by webServer.getConnectors()[0].getLocalPort() before open() is -1. Opening the listener on 50070
2014-02-10 21:56:03,689 INFO org.apache.hadoop.http.HttpServer: listener.getLocalPort() returned 50070 webServer.getConnectors()[0].getLocalPort() returned 50070
2014-02-10 21:56:03,689 INFO org.apache.hadoop.http.HttpServer: Jetty bound to port 50070
2014-02-10 21:56:03,689 INFO org.mortbay.log: jetty-6.1.26
2014-02-10 21:56:05,407 INFO org.mortbay.log: Started SelectChannelConnector@master:50070
2014-02-10 21:56:05,407 INFO org.apache.hadoop.hdfs.server.namenode.NameNode: Web-server up at: master:50070
2014-02-10 21:56:05,409 INFO org.apache.hadoop.ipc.Server: IPC Server Responder: starting
2014-02-10 21:56:05,427 INFO org.apache.hadoop.ipc.Server: IPC Server listener on 9000: starting
2014-02-10 21:56:05,444 INFO org.apache.hadoop.ipc.Server: IPC Server handler 0 on 9000: starting
2014-02-10 21:56:05,473 INFO org.apache.hadoop.ipc.Server: IPC Server handler 2 on 9000: starting
2014-02-10 21:56:05,474 INFO org.apache.hadoop.ipc.Server: IPC Server handler 1 on 9000: starting
2014-02-10 21:56:05,478 INFO org.apache.hadoop.ipc.Server: IPC Server handler 3 on 9000: starting
2014-02-10 21:56:05,478 INFO org.apache.hadoop.ipc.Server: IPC Server handler 4 on 9000: starting
2014-02-10 21:56:05,509 INFO org.apache.hadoop.ipc.Server: IPC Server handler 5 on 9000: starting
2014-02-10 21:56:05,509 INFO org.apache.hadoop.ipc.Server: IPC Server handler 6 on 9000: starting
2014-02-10 21:56:05,510 INFO org.apache.hadoop.ipc.Server: IPC Server handler 7 on 9000: starting
2014-02-10 21:56:05,510 INFO org.apache.hadoop.ipc.Server: IPC Server handler 8 on 9000: starting
2014-02-10 21:56:05,529 INFO org.apache.hadoop.ipc.Server: IPC Server handler 9 on 9000: starting
解决:好吧!这不是错误,是启动成功了 - - !使用一下命令,可以发现成功了
pastedGraphic_55.png

# ssh有警告
Warning: No xauth data; using fake authentication data for X11 forwarding.
原因:  this means that your authorized keys file is configured correctly, but your ssh configuration has X11 forwarding enabled. 
解决:# vi /etc/ssh/ssh_config
   ForwardX11 no #
这里设置为‘no’即可。
然后执行:# service sshd restart 即可。



















2.
启动dataNode出现的错误 hadoop-daemon.sh start dataNode
2.1 权限不匹配Incorrect permission for /data/hadoop/hdfs/data, expected: rwxr-xr-x, while actual: rwxrwxr-x
/************************************************************
STARTUP_MSG: Starting DataNode
STARTUP_MSG:   host = master/192.168.1.100
STARTUP_MSG:   args = []
STARTUP_MSG:   version = 1.0.3
STARTUP_MSG:   build = https://svn.apache.org/repos/asf/hadoop/common/branches/branch-1.0 -r 1335192; compiled by 'hortonfo' on Tue May  8 20:31:25 UTC 2012
************************************************************/
2014-02-10 22:10:55,197 INFO org.apache.hadoop.metrics2.impl.MetricsConfig: loaded properties from hadoop-metrics2.properties
2014-02-10 22:10:55,287 INFO org.apache.hadoop.metrics2.impl.MetricsSourceAdapter: MBean for source MetricsSystem,sub=Stats registered.
2014-02-10 22:10:55,308 INFO org.apache.hadoop.metrics2.impl.MetricsSystemImpl: Scheduled snapshot period at 10 second(s).
2014-02-10 22:10:55,308 INFO org.apache.hadoop.metrics2.impl.MetricsSystemImpl: DataNode metrics system started
2014-02-10 22:10:55,902 INFO org.apache.hadoop.metrics2.impl.MetricsSourceAdapter: MBean for source ugi registered.
2014-02-10 22:10:56,238 WARN org.apache.hadoop.hdfs.server.datanode.DataNode: Invalid directory in dfs.data.dir:
 Incorrect permission for /data/hadoop/hdfs/data, expected: rwxr-xr-x, while actual: rwxrwxr-x
2014-02-10 22:10:56,239 ERROR org.apache.hadoop.hdfs.server.datanode.DataNode: All directories in dfs.data.dir are invalid.
2014-02-10 22:10:56,239 INFO org.apache.hadoop.hdfs.server.datanode.DataNode: Exiting Datanode
2014-02-10 22:10:56,240 INFO org.apache.hadoop.hdfs.server.datanode.DataNode: SHUTDOWN_MSG:
/************************************************************
SHUTDOWN_MSG: Shutting down DataNode at master/192.168.1.100
************************************************************/
解决:修改权限:
[hadoop@master hdfs]$ chmod 755/data/hadoop/hdfs/data
查看,启动成功
pastedGraphic_56.png
# Jps出现多余的错误
[root@master root]#            jps
3672 DataNode
3569 NameNode
14811 -- process information unavailable  
 @yting_xmei1129 这里不对
3845 Jps 
解决:
[root@master root]# cd /tmp/hsperfdata_hadoop
[root@master hsperfdata_hadoop]# pwd
/tmp/hsperfdata_hadoop
[root@master hsperfdata_hadoop]# ll
total 32
-rw-------    1 hadoop   hadoop      32768 Feb 10 21:55 14811
[root@master hsperfdata_hadoop]# jps
3672 DataNode
3569 NameNode
14811 -- process information unavailable
3845 Jps
[root@master hsperfdata_hadoop]# rm -f 14811
[root@master hsperfdata_hadoop]# jps
3855 Jps
3672 DataNode
3569 NameNode
[root@master hsperfdata_hadoop]# 
需要注意的是 这个用户其他正常的进程号也在里面 删除了就等于把进程杀掉了,慎用。
jps 就没有了


# jobtracker 启动报错
2014-02-12 23:46:21,648 ERROR org.apache.hadoop.hdfs.server.namenode.NameNode: java.io.FileNotFoundException: /data/hadoop/hdfs/name/current/VERSION (Permission denied)
原因:应该是root用户启动namenode时,更改了current文件的权限,root用户修改文件权限:
解决:
[root@master root]# ll /data/hadoop/hdfs/name/current
total 16
-rw-r--r--    1 root     root            4 Feb 12 23:01 edits
-rw-r--r--    1 root     root          927 Feb 12 23:01 fsimage
-rw-r--r--    1 root     root            8 Feb 12 23:01 fstime
-rw-r--r--    1 root     root           99 Feb 12 23:01 VERSION
[root@master root]# chown hadoop:hadoop -R /data/hadoop/hdfs/name/current/
[root@master root]# ll /data/hadoop/hdfs/name/current
total 16
-rw-r--r--    1 hadoop   hadoop          4 Feb 12 23:01 edits
-rw-r--r--    1 hadoop   hadoop        927 Feb 12 23:01 fsimage
-rw-r--r--    1 hadoop   hadoop          8 Feb 12 23:01 fstime
-rw-r--r--    1 hadoop   hadoop         99 Feb 12 23:01 VERSIO
4.2紧接着启动namenode时又报错 hadoop-daemon.sh start namenode
2014-02-12 23:57:46,480 ERROR org.apache.hadoop.hdfs.server.common.Storage: Unable to move last checkpoint for /data/hadoop/hdfs/name
java.io.IOException: Failed to delete /data/hadoop/hdfs/name/previous.checkpoint
2014-02-12 23:57:46,483 FATAL org.apache.hadoop.hdfs.server.namenode.FSNamesystem: Edit streams not yet initialized
java.lang.Exception: Edit streams not yet initialized
原因:/data/hadoop/hdfs/name 这个name目录的权限为root的,hadoop没有权限,所以要修改它的权限,然后启动namenode就行了
解决:
[root@master root]# ll /data/hadoop/hdfs/name
total 16
drwxrwxr-x    2 hadoop   hadoop       4096 Feb 12 23:57 current
drwxrwxr-x    2 hadoop   hadoop       4096 Feb 10 21:51 image
drwxr-xr-x    2 hadoop   hadoop       4096 Feb 12 23:01 lastcheckpoint.tmp
drwxr-xr-x    2 root     root         4096 Feb 11 22:35 previous.checkpoint
[root@master root]# chown -R hadoop:hadoop /data/hadoop/hdfs/
[root@master root]# ll /data/hadoop/hdfs/name
total 16
drwxrwxr-x    2 hadoop   hadoop       4096 Feb 12 23:57 current
drwxrwxr-x    2 hadoop   hadoop       4096 Feb 10 21:51 image
drwxr-xr-x    2 hadoop   hadoop       4096 Feb 12 23:01 lastcheckpoint.tmp
drwxr-xr-x    2 hadoop   hadoop       4096 Feb 11 22:35 previous.checkpoint


# jdk1.6.0_45安装成功后,java命令可以用,javac命令可以用,javac编译Hello.class,java Hello 却不可以运行,看下面异常
Exception in thread "main" java.lang.NoClassDefFoundError: Hello
Caused by: java.lang.ClassNotFoundException: Hello
        at java.net.URLClassLoader$1.run(URLClassLoader.java:202)
        at java.security.AccessController.doPrivileged(Native Method)
        at java.net.URLClassLoader.findClass(URLClassLoader.java:190)
        at java.lang.ClassLoader.loadClass(ClassLoader.java:306)
        at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:301)
        at java.lang.ClassLoader.loadClass(ClassLoader.java:247)
Could not find the main class: Hello.  Program will exit.

原因:/etc/profile 文件中的CLASSPATH配置错误,忘记配置.;
解决:配置好CLASSPATH,正确配置为CLASSPATH=.:=/usr/java/jdk1.6.0_45/jre/lib/rt.jar
# java configuration start
JAVA_HOME=/usr/java/jdk1.6.0_45
PATH=$PATH:/usr/java/jdk1.6.0_45/bin
CLASSPATH=.:=/usr/java/jdk1.6.0_45/jre/lib/rt.jar
export JAVA_HOME PATH CLASSPATH
"/etc/profile" 61L, 1140C written                                                                                                                                           
[root@master root]# source /etc/profile

# 启动secondarynamenode失败
ERROR org.apache.hadoop.security.UserGroupInformation: PriviledgedActionException as:hadoop cause:java.net.BindException: Cannot assign requested address
原因:我觉得原因应该是饿的实验室为分布是安装,没有虚拟第二台机器,所以出这样的异常
解决:secondarynamenode配置有问题,饿原先配的是node1,但是并没有这个节点,应该改成饿自己的master(伪分布式安装)


踩踩空间、、、

Logo

华为开发者空间,是为全球开发者打造的专属开发空间,汇聚了华为优质开发资源及工具,致力于让每一位开发者拥有一台云主机,基于华为根生态开发、创新。

更多推荐