Hadoop--炼数成金
//note01 Hadoop介绍与安装
。。。
倒排索引
(1;1) 单词出现在标识号为1的网页的编辑量是第1的位置
分词难度:
字典;
Page Rank
用于给每个网页价值评分
Map-reduce思想:
计算PR
Lucene
hadoop的起源,提供了全文检索引擎的架构
nutch
HBase
列式存储(面向数据分析)(提高响应速度及I/O)
Namenode
HDFS的守护程序
记录文件是如何分割成数据块的,以及这些数据块被存储到哪些节点上
对内存和I/O进行集中管理
是个单点,发生故障将使集群崩溃
Secondary Namenode
监控HDFS状态的辅助后台程序
每个集群都有一个
与NameNode进行通讯,定期保存HDFS元数据快照
当NameNode故障可以作为备用NameNode使用
DataNode
每台从朋务器都运行一个
负责把HDFS数据块读写到本地文件系统
JobTracker
用于处理作业(用户提交代码)的后台程序
决定有哪些文件参不处理,然后切割task幵分配节点
监控task,重启失败的task(于不同的节点)
每个集群只有唯一一个JobTracker,位于Master节点
TaskTracker
位于slave节点上,与datanode结合(代码与数据一起的原则)
管理各自节点上的task(由jobtracker分配)
每个节点只有一个tasktracker,但一个tasktracker可以启动多个JVM, 用于并行执行map或reduce仸务
与jobtracker交互
Master与Slave
Master:Namenode、Secondary Namenode、Jobtracker。浏览器(用于观看管理界面),其它Hadoop工具
Slave:Tasktracker、Datanode
Master不是唯一的
安装:
ssh-keygen -t rsa
scp ./id_rsa.pub huang@192.168.04:/home/huang/.ssh
(名称节点的服务器)
hadoop/conf/hadoop-env.sh //只改JAVA_HOME就行
hadoop/conf/core-site.xml
<configuration>
<property>
<name>fs.default.name</name>
<value>hdfs://backup01:9000</value> //指定名称节点位置
</property>
<property>
<name>hadoop.tmp.dir</name> //临时路径,不指定会默认用root下的./tmp目录,一定要设置这个参数
<value>/home/huang/hadoop/tmp</value>
</property>
</configuration>
hadoop/conf/hdfs-site.xml
<configuration>
<property>
<name>dfs.replication</name> //服务器因子
<value>1</value> //2代表复制2份,1不复制
</property>
</configuration>
hadoop/conf/mapred-site.xml
<property>
<name>mapred.job.tracker</name>
<value>backup01:9001</value>
</property>
hadoop/conf/masters.xml
//填master主机名称
hadoop/conf/slaves.xml
//填slaves主机名称
vi /etc/hosts
//检查防火墙
chkconfig iptables off
(集群里面的配置几乎都一样)
scp -r ./hadoop-1.1.2 huang@192.168.0.4:/home/huang/
格式化名称节点:
bin/hadoop namenode -format
bin/start-all.sh
//处理自己连自己免密码问题
.ssh id_rsa.pub -> copy -> authorized_keys 在末尾粘贴
bin/start-all.sh //启动所有的节点
//检查系统是否正常启动
/usr/jdk1.7.0_25/bin/jps //jps:java相关进程统计
伪分布式:
到本地自己给自己免密码:
只需要将.ssh/id_rsa.pub 复制成 authorized_keys
CentOS,安装与编译有关的包:
yum install svn //可以部署到其他服务器
yum install autoconfautomake libtool cmake
yum install ncurses-devel
yum install openssl-devel
yum install gcc*
安装maven
//优点:参数可以结构化写在一个xml里面
需要安装protobuf这个插件
/usr/local/bin/protoc
svn
mvn //之后有一个本地库问题
//测试hello world
//建立一个子目录
mkdir input
cd input/
echo "hello world" > test1.txt
echo "hello hadoop" > test2.txt
bin/hadoop fs -ls
bin/hadoop fs -put ../input ./in //复制input到/.in
bin/hadoop fs -ls
bin/hadoop fs -ls ./in/* //hadoop是没有当前路径一说的
bin/hadoop fs -cat ./in/test1.txt
jar包统计:
bin/hadoop jar hadoop-examples-1.1.2.jar wordcount in out
//out 输出文件名称 in 源文件名称
bin/hadoop fs -ls ./out
/out/part-r-00000 //放的是结果
port:50070
port:50030
219.232.252.17:50070
CDH安装
//note02 HDFS
提供分布式存储机制,提供可线性增长的海量存储能力
自动数据冗余,无须使用Raid,无须另行备份
为进一步分析计算提供数据基础
MR在HDFS基础上进行快速分析
PC组成集群即可。在任何节点,只要发布操作命令,就可以对整个HDFS系统进行统一操作
//本地化数据计算,节省传输花费的时间,也是HDFS设计的原则所在
包含:
NameNode
DataNode
事务日志
映像文件
SecondaryNameNode
cat tmp/dfs/name/current/VERSION
namespaceID= //记录命名空间的标识号,就是整个集群的标识
cTime=0 //这个HDFS创建的时间
storageType=NAME_NODE //存储的类型
layoutVersion=-32 //-32 构造版本
还有影像文件,编辑日志
//每隔一段时间会有一个检查点将内存的数据写到fs里面实地保存
//edits会记录用户的各个操作,当系统如果有异常崩溃的话,系统恢复时它会先加载fsimage,调用edits重做一遍;如果写过一次fsimage,在这之前的操作就没用了;
cat dfs/namesecondary/current/VERSION //备份
//blk开头的文件是数据块
一个文件的写是写到不同的datanode里面的
冗余副本策略
//在复制冗余副本的时候用户是不能操作的
机架策略
//机架一般放20多个服务器,每个机架之间用交换机相连,交换机通过一个上级交换机来连接;同一机架下的节点只经过一个交换机,所以传输速度快
core-site.xml //设置机架
心跳机制
//每隔一段时间给Namenode发送一次
安全模式
//安全模式下用户不能写数据,节点多的话可能会长达10多分钟
bin/hadoop dfsadmin -safemode enter //强制进入安全模式
校验和
blk_xxxx.meta //crc校验然后写到.meta文件里,缺省值512字节产生4个字节校验和
//校验和本身很消耗性能,使用的是jvm的软件进行软计算算的。可以直接改成cpu硬计算
//性能优化一般都会到jvm里面去操作
回收站
//如果打开的话需要先配置core-site.xml
//测试
bin/hadoop fs -rmr ./in/test1.txt
//会出现./Trash这个文件,相当于移到了回收站的目录
恢复和清空
mv 将.Trash的文件移回来就OK了
fs -expunge //清空
元数据保护
//配置多个副本会影响namenode处理速度,但是会增加安全性。
快照
HDFS文件操作
hadoop没有当前目录的概念,也没有cd命令,需要绝对地址
//查看HDFS下某个文件的内容
bin/hadoop fs -cat ./in/test1.txt
//查看HDFS基本统计信息
bin/hadoop dfsadmin -report
//HDFS是不能修改,下载到linux文件改完再传回去
怎么添加节点?
在新节点安装好hadoop
把namenode的有关配置文件复制到该节点
修改masters和slaves文件,增加该节点
设置ssh免密码迕出该节点
单独启劢该节点上的datanode和tasktracker(hadoop-daemon.sh start datanode/tasktracker)
运行start-balancer.sh迕行数据负载均衡
//start-dfs.sh,不需要重启集群
java操作HDFS:
URLCat.java
public class URLCat{
static{
URL.setURLStreamHandlerFactory(new FsUrlStreamHandlerFactory());
}
public static void main(String[] args) throws Exception{
inputStream in = null;
try{
in = new URL(args[0]).openStream();
IOUtils.copyBytes(in,System.out,4096,false);
}finally{
IOUtils.closeStream(in);
}
}
}
设置Hadoop类目录
Hadoop-env.sh
export HADOOP_CLASSPATH=xxxxx/myclass
设置搜索目录
ls -a //查看隐藏文件
.bash_profile //脚本
javac URLCat.java //会报错,因为没有import Hadoop的包
cd hadoop/lib
//导入包之后还会报错,再指定classpath的jar路径就OK了
javac -classpath ../hadoop-core-1.1.2.jar URLCat.java
bin/hadoop URLCat hdfs://backup01/usr/huang/in/test1.txt //运行jar
//这里没有指定端口
bin/hadoop URLCat hdfs://backup01:9000/usr/huang/in/test1.txt
下载ant
下载参考书的代码并上传解开
7287OS_Code/
cd chapter2
cd HDFS_JAVA_API/
cd src
设置HADOOP_HOME环境变量
build.xml
loaction="build" //输出到build文件夹里
/home/huang/apache-ant-1.9.2/bin/ant //在HDFS_Java_API目录下执行
~/hadoop-1.1.2/bin/hadoop jar HDFSJavaAPI.jar HDFSJavaAPIDemo
C_API
安装gcc (c语言的编译器)
yum -y install gcc gcc-c++ autoconf make
测试HDFS C_API
hdfs_cpp_demo.c
#inlude "hdfs.h"
int main(int argc,char **argv){
hdfsFS fs = hdfsConnect("backup01",9000);
if(!sf){
fprintrf(stderr,"Cannot connect to HDFS.\n");
exit(-1);
}
char* fileName = "demo_txt";
char* message="Welcome to HDFS C API!";
int size = strlen(message);
}
//bin/tar.zz.mds 没有源码的包
个人配置的话
编译
gcc hdfs_app_demo.c \
-I $HADOOP_HOME/src/c++/libhdfs \ //-I 包含
-I $JAVA_HOME/include \
-I $JAVA_HOME/include/linux/ \
-L $HADOOP_HOME/c++/Linux-amd64-64/lib/ -lhdfs \ //-L 连接库的路径
-L $JAVA_HOME/jre/lib/amd64/server -ljvm \
-o hdfs_cpp_demo // -o 输出
利用之前ant输出设置CLASSPATH环境变量
要把Hadoop所有的jar包都列进去
利用ant打印环境变量
/home/xxx/ant print -cp
export CLASSPATH=xxxxx
//要在同一命令行下执行
LD_LIBRARY_PATH=$HADOOP_HOME/xxx/amd64/server ./hdfs_cpp_demo
java解读:
FileSystem
public class FileSystemCat{
main() throws Exception{
String uri = args[0];
Configuration conf = new Configuration();
FIleSystem fs = FileSystem.get(URI.create(uri),conf);
InputStream in = null;
try{
in = fs.open(new Path(uri));
IOUtils.copyBytes(in,System.out,4096,false); //hadoop包的
}finally{
IOUtils.closeStream(in);
}
}
}
hadoop FileSystemCat hdfs://localhost/user/tom/quangle.txt
//C程序可以查看hdfs.h这个文件了解API
Hadoop 2.x (namenode不再是单点)
HDFS HA
管理命令手册
块池
同一个datanode可以存着属于多个block pool的多个块
//hadoop_v4_02g
hdfs-site.xml
dfs.nameservices
<value>ns1,ns2</value>
...
格式化名称节点
HDFS快照
快照位置
hdfs dfs -ls /foo/.snapshot
//note03 HDFS HA联邦安装
DNS安装
yum -y install bind bind-utils bind-chroot
rpm -qa | grep '^bind' //查看是否安装成功
vim /etc/named.conf
options{
listen-on port 53 { 127.0.0.1;}->
{ any; }
allow-query {localhost;} -> { any; } //对所有用户开放
}
vim /etc/named.rfc1912.zones
//末尾添加
zone "hadoop.com" IN { //正解区域
type master;
file "named.hadoop.com";
allow-update { none; };
}
zone "0.168.192.in-addr.arpa" IN { //反解区域
type master;
file "named.192.168.0.zone";
allow-update { none; };
}
cp -p named.localhost named.hadoop.com //复制的时候保持文件权限不变
vim named.hadoop.com
IN SOA user3.hadoop.com. grid.user3.hadoop.com. (
0 ; serial
1D ; refresh
1H ; retry
1W ; expire
3H ) ; minimum
IN NS user3.hadoop.com.
user3.hadoop.com. IN A 192.168.0.109
user3.hadoop.com. IN A 192.168.0.110
user3.hadoop.com. IN A 192.168.0.111
user3.hadoop.com. IN A 192.168.0.112
user3.hadoop.com. IN A 192.168.0.113
user3.hadoop.com. IN A 192.168.0.114
[named] cp -p named.localhost named.192.168.0.zone
vim named.192.168.0.zone
IN SOA user3.hadoop.com. grid.user3.hadoop.com. (
0 ; serial
1D ; refresh
1H ; retry
1W ; expire
3H ) ; minimum
IN NS user3.hadoop.com.
109 IN PTR user3.hadoop.com.
110 IN PTR user3.hadoop.com.
111 IN PTR user3.hadoop.com.
112 IN PTR user3.hadoop.com.
113 IN PTR user3.hadoop.com.
114 IN PTR user3.hadoop.com.
//slave vim /etc/sysconfig/network-scripts/ifcfg-eth0
//末尾添加DNS服务器IP地址
DNS1=192.168.0.109
//在每台slave添加
service network restart
service named start //启动DNS
chkconfig named on //开机就将DNS服务启动
//检查
chkconfig --list named
chkconfig --level 123456 named off
tail -n /var/log/messages | grep named
//测试主机名解析
nslookup user3.hadoop.com
HDFS HA + 联邦 + Resource Manager HA
//安装好DNS之后
cat /etc/resolv.conf //查看
nslookup www.dataguru.cn //测试
NFS (网络文件系统)
可以设置配置文件把某些目录共享出去,并可以设置权限
scp -rp ./hadoop-0.20.2 grid@h1:/home/grid
awk脚本
awk '{print $1}' //awk 都用单引号,以空格/制表符分隔 ,一般处理表格等文件,reg:日志文件
awk '$9~/rr/{print $9}' //~包含 {}里面放执行语句
//除了awk,还要学sed
cat slave
h1
h2
h3
h4
h5
h6
cat ./slave | awk '{print "scp -rp ./hadoop-0.20.2 grid@"$1":/home/grid"}' > scp_test
chmod a+x scp_test //变成可执行文件
sh ./scp_test
//note04 MR
//超级计算机结构是非开放的,每一台都是定制的
//reg:日志分析,通过hdfs切割成很多的块,分散到各个节点上,然后各个节点一起来并行计算,再把结果加起来
//并行计算框架
MPI
c语言的函数库
计算密集型,算是瓶颈
PVM
CUDA
英伟达配合显卡GPU推出来的包,利用GPU多核心来处理
BOINC
互联网计算(可以当分析志愿者)
Map-Reduce
负担主要在I/O
云计算
目前流行的开源云计算解决方案
reg:气象数据集:
zcat xxx-xx-x.gz //查看.gz文件
解压及合并: zcat *.gz > sample.txt //按数据量大小来解压合并
分析MR
input -> |有偏移量key| -> map -> |(key,value)| -> shuffle -> |聚合操作| -> reduce -> |求value的最大值/平均值等| -> output -> hdfs
Java MapReduce 通常需要3段程序
1.映射器 (从原始数据读成key,value)MaxTemperatureMapper
2.reducer(key,value变成最后需要的形式) MaxTemperatureReducer
3.作业程序,总调度 MaxTemperature
运行MR
cd myclass //讲课创建的jar测试目录
cat MaxTemperatureMapper.java
cat MaxTemperature.java
cat MaxTemperatureReducer.java
javac -classpath ../hadoop-core-1.1.2.jar *.java
../bin/hadoop MaxTemperature ./user/huang/in/723440-13964 ./out6 //数据文件. 输出到out6
//没有找到Mapper,解决方式,打成jar包
jar cvf ./MaxTemperature.jar *.class
mv MaxTemperature.jar ..
//需要删掉之前的class文件
rm *.class
cd ..
bin/hadoop jar ./MaxTemperature.jar MaxTemperature ./user/huang/in/723440-13964 ./out6
../bin/hadoop fs -ls ./out6
../bin/hadoop fs -cat ./out6/part-r-00000
//分析计算过程
Mapper
public void map(LongWritable key,Text value,Context context) throws IOException,InterruptedException{
String line = value.toString();
String year = line.substring(15,19);
int airTemperature;
if(line.charAt(87) == '+'){
//parseInt doesn't like leading plus signs
airTemperature = Integer.parseInt(line.substring(88,92));
}else{
airTemperature = Integer.parseInt(line.substring(87,92));
}
String quality = line.substring(92,93);
if(airTemperature != MISSING && quality.matches("[01459]")){
context.write(new Text(year),new IntWritable(airTemperature));
}
}
Reducer
public class MaxTemperatureReducer extends Reducer<Text,IntWritable,Text,IntWritable>{
@Override
public void reduce(Text key,Iterable<IntWritable> values,Context context) throws IOException,InterruptedException{
int maxValue = Integer.MIN_VALUE;
for(IntWritable value:values){
maxValue = Math.max(maxValue,value.get();
}
context.write(key,new IntWritable(maxValue));
}
}
M-R job
public class MaxTemperature{
main() throws Exception{
if(args.length != 2){
System.err.println("Usage:MaxTemperature<input path> <output path>");
System.exit(-1);
}
Job job = new Job();
job.setJarByClass(MaxTemperature.class);
job.setJobName("Max temperature");
FileInputFormat.addInputPath(job,new Path(args[0]));
FileOutputFormat.setOutputPath(job,new Path(args[1]));
job.setMapperClass(MaxTemperatureMapper.class);
job.setReducerClass(MaxTemperatureReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
System.exit(job.waitForCompletion(true)?0:1);
}
}
分片的问题
Combiner 做预计算的。
Map-Reduce工作机制
//java跟c相比,慢的是启动jvm这个过程。jvm启动不挂的话还是很快的
有一个心跳是3秒一次,jobtracker周期是1分钟一次
主要工作:SQL或PL/SQL改写为Map-Reduce程序
Eclipse:
hadoop/contrib/eclipse-plugin/xxx.jar
windows->preferences->hadoop map/reduce
show Map-Reduce 显示MR视图
在视图右键->new Hadoop loaction->
Location name:xxx
hadoop/conf/mapred-site.xml ->找端口
端口填写一致
左侧右键DFS Loactions->Disconnect->home user
new -> MapReduce Project -> name:xxx
src->new example.java
@Override
public int run(String[] args)throws Exception{
Configuration conf = getConf();
Job job = new Job(conf,"example"); //任务名
job.setJarByClass(example.class); //指定Class
FileInputFormat.addInputPath(job,new Path(args[0])); //输入路径
FileOutputFormat.setOutputPath(job,new Path(args[1])); //输出路径
job.setMapperClass(Map.class); //调用上面Map类作为Map任务代码
job.setReducerClass(Reduce.class); //调用上面Reduce类作为Reduce任务代码
job.setOutputFormatClass(TextOutputFormat.class); //指定输出的KEY的格式
job.setOutputKeyClass(Text.class); //指定输出的KEY的格式
job.setOutputValueClass(Text.class); //指定输出的VALUE的格式
job.waitForCompletion(true);
return job.isSuccessful()?0:1;
}
main(){
int res = ToolRunner.run(new Configuration(),new example(),args);
System.exit(res);
}
源文件:
->
Mapper
1.分割原始数据
2.输出所需数据
3.处理异常数据
->
输出到HDFS
public class Test_1 extends Configured implements Tool{
enum Counter{ //可以对其自增操作
LINESKIP, //出错的行
}
public static class Map extends Mapper<LongWritable,Text,NullWritable,Text>{//变量为: 输入,输出key,value格式
//Text主要记录字符串的,NullWritable 空值
//key 偏移量 ,内容 value
public void map(LongWritable key,Text value,Context context) throws IOException,InterruptedException{
String line = value.toString(); //读取源数据
try{
//数据处理
String[] lineSplit = line.split(" ");
String month = lineSplit[0];
String time = lineSplit[1];
String mac = lineSplit[6];
Text out = new Text(month + '' + time + '' + mac);
//如果是context.write(key,out);则会出现\t
//用了NullWritable.get() 之后不会出现 \t
context.write(NullWritable.get(),out);//输出 key \t value
}catch(java.lang.ArrayIndexOutOfBoundsExecption e){
context.getCounter(Counter.LINESKIP).increment(1);//出错令计数器+1
return;
}
}
}
@Override
public int run(String[] args)throws Exception{
Configuration conf = getConf();
Job job = new Job(conf,"example"); //任务名
job.setJarByClass(Test_1.class); //指定Class
FileInputFormat.addInputPath(job,new Path(args[0])); //输入路径
FileOutputFormat.setOutputPath(job,new Path(args[1])); //输出路径
job.setMapperClass(Map.class); //调用上面Map类作为Map任务代码
job.setReducerClass(Reduce.class); //调用上面Reduce类作为Reduce任务代码
job.setOutputFormatClass(TextOutputFormat.class); //指定输出的KEY的格式
job.setOutputKeyClass(Text.class); //指定输出的KEY的格式
job.setOutputValueClass(Text.class); //指定输出的VALUE的格式
job.waitForCompletion(true);
return job.isSuccessful()?0:1;
}
main(){
//运行任务
int res = ToolRunner.run(new Configuration(),new Test_1(),args);
System.exit(res);
}
}
Run_Configurations->Test_1->Arguments:
hdfs://localhost:9000/user/james/input hdfs://localhost:9000/user/james/output
//output必须是不存在的
倒排索引
new->Haodoop Project->Test_2
public calss Test_2 extends Configured implements Tool{
enum Counter{
LINESKIP
}
public static class Map extends Mapper<LongWritable,Text,Text,Text>{//变量为: 输入,输出格式
String line = value.toString();
try{
//数据处理
String[] lineSplit = line.split(" ");//135,10085
String anum = lineSplit[0];
String bnum = lineSplit[1];
context.write(new Text(bnum),new Text(anum));
}catch(java.lang.ArrayIndexOutOfBoundsExecption e){
context.getCounter(Counter.LINESKIP).increment(1);//出错令计数器+1
return;
}
}
public static class Reduce extends Reducer<Text,Text,Text,Text>{
public void reduce(Text key,Iterable<Text> values,Context context)throws IOException,InterruptedException{
String valueString;
String out = "";
for (Text value:values){
valueString = value.toString();
out += valueString + "|";
}
context.write(key,new Text(out));
}
}
run(){
job.setReducerClass(Reduce.class);
}
main(){}
}
Export->JAR->JAR file path->next->Main Class填写->Clone
//note05 MR实战
性能调优:
究竟需要多个reducer?
输入:大文件(上G的)优于小文件
减少网络传输:压缩map的输出
优化每个节点能运行的任务数:mapred.tasktracker.map.tasks.maximum和mapred.tasktracker.reduce.tasks.maximum(缺省值均为2)
hadoop流与脚本
wordcount:
//数单词
cat install.log | wc
web Apache日志分析:
PV IP
图片/日志点击先区分开
爬虫/日志点击区分开
//排除爬虫
探针设计 //在网站点击一下,只用算法排除的话,依然是多出3-5倍无用点击
//不直接分析网站日志,而是间接分析探针日志,来计算pv
<script type="text/javascriopt">
var _gaq = _gaq || [];
_gaq.push(['_setAccount','UA-20237423-4']);
_gaq.push(['_setDomainName','.itpub.net']);
_gaq.push(['_trackPageview']);
(function(){
var ga = document.createElement('script');
ga.type = 'text/javascript';
ga.async = true;
ga.src = ('https:' == doucment.location.protocol? 'https://ssl':'http://www') + '.google-a???';
var s = document.getElementsByTagName('script')[0];
s.parentNode.insertBefore(ga,s);
})();
</script>
<div style="display:none">
<script type="text/javascript">
var _bdhmProtocol = (("https:" == document.location.protocol) ? "https://" : "http://");
document.write(unescape("%3Cscript src='" + _bdhmProtocol + "hm.baidu.com/h.js%3F5016281862f595e78"));
</script></div>
<!-- END STAT PV --></body>
</html>
排除爬虫和程序点击,对抗作弊
·用鼠标测动对抗爬虫
·常用流量作弊手段
·跟踪用户
### 一边点击一边换IP ###
拿搜索词
??纯真88
统计浏览器类型
少量数据的情况下:
awk,grep,sort,join等,perl,python,正则等
海量数据的情况下: 10G,100G 增长的时候
CDN
(反向代理加速)
ip去重
//note06 复杂应用/hadoop流
InputFormat()
OutputFormat()
// 06 hadoop_v4_06c 04:00
//note07 Pig
set命令检查环境变量
进入grunt shell
pig -x local
PIG_CLASSPATH
pig
Pig的运行方法:
脚本
Grunt
嵌入式
pig转换为java,再由jvm执行
Grunt
自动补全机制
Autocomplete文件
Eclipse插件PigPen
help
ls,cat,cd
copyToLocal test1.txt ttt (复制到grunt外面的当前路径)
sh /usr/java/jdk1.7.0.0_26/bin/jps //直接执行命令
Bag,Tuple,Field,Pig不要求具有各tuple相同数量或相同类型的field
pig -x local
A = LOAD '/home/grid/csdn.txt'
USING PigStorage('#')
B = FOREACH A
STORE B INTO '/home/grid/emmail.txt'
USING PiagStorage();
脚本:
grunt> records = LOAD 'input/ncdc/micro-tab/sample.txt'
>> AS (year:chararray,temperature:int,quality:int); //如果没有定义分隔符,则默认是制表符。
DUMP records; //输出
DESCRIBE records; //输出查看结构
filtered_records = FILTER records BY temperature != 9999 AND
>> (quality == 0 OR quality == 1 OR quality == 4 OR quality == 5 OR quality == 9);
DUMP filtered_records;
GROUP
FOREACH //对每一行进行扫描处理
//有点类似面向数据流的处理语言,Mapp-Reduce有点面向计算
UDF 用户自定义函数
guoyunsky.iteye.com/blog/1317084
reg:
pig
cat score.txt
A = LOAD 'score.txt' USING PigStorage(',') AS (student,course,teacher,score:int);
DESCRIBE A;
B = FOREACH A CENERATE student,teacher;
DESCRIBE B;
C = DISTINCT //去重
C = DISTINCT B;
D = GROUP C BY student;
D = FOREACH (GROUP C BY student) CENERATE group AS student,COUNT(C);
DUMP D;
//第二种方法
DESCRIBE B;
E = GROUP B BY student;
DESCRIBE E
F = FOREACH E
{
T = B.teacher;
uniq = DISTINCT T;
GENERATE group AS student,COUNT(uniq) AS cnt;
}
//note08 Hive
数据仓库工程师
NoSQL -> Not Only SQL
Hive安装
配置文件
cd hive/conf
hive-env.sh.template -> hive-env.sh
HADOOP_HOME=xxx
export HIVE_CONF_DIR=xxx
hive-site.xml
hadoop-env.sh
export HADOOP_CLASSTHAN= xxx;
./hive
show tables
create table abc (c1 string);
drop table abc;
/user/hive/warehouse/abc/数据
insert overwrite table result
select xxx frrom loc
thrift server / JDBC
reg:
main()throws Exception{
CLass.forNmae("org.apache.hadoop.hive.jdbc.HiveDriber");
String dropSql="drop table pokes";
String createSql="create table pokes (foo int,bar string";
String insertSql="load data local inpath '/home/zhangxin/hive/kv1.txt' overwrite into table pokes";
String querySql="select bar from pokes limit 5";
Connection connection=DriverManager.getConnection("jdbc:hive://localhost:10000/default","","");
Statement statement = connection.createStatement();
statement.execute(dropSql);
statement.execute(createSql);
statement.execute(insertSql);
ResultSet rs = statement.executeQuery(querySql);
while(rs.next()){
sout(rs.getString("bar"));
}
}
http://10.20.151.7:9999/hwi/ //ip改成自己的,默认web路径访问
元数据
//note09 Hive
sql不同的,有3种独特的类型
struct
map
array
reg:
create table employees(
name STRING,
salary FLOAT,
subordinates ARRAY<STRING>,
deductions MAP<STRING,FLOAT>,
address STRUCT<street:STRING,city:STRING,state:STRING,zip:INT>
);
缺省分隔符:
\n
^A \001 分离不同的列(字段)
^B \002 分割数组/集合里面的元素
^C \003 map的key,value之间分割
create table employees(
xxx
)
ROW format delimited
fields terminated by '\001'
collection items terminated by '\002'
map keys terminated by '\003'
lines terminated by '\n'
stored as textfile;
DDL:
create database if not exists financials;
show databases like 'h.*';
存放目录
缺省存放目录由hive.metastore.warehouse.dir指定
可以使用以下命令覆盖
create database financials
location '/my/preferred/directory'
观看数据库描述
create database financials
comment 'Holds all financial tables';
describe database financials;
create database financials
with dbproperties('creator'='Mark Moneybags','date'='2012-01-02');
describe database extended financials;
切换数据库
USE financials;
set hive.cli.print.current.db=true;
hive (financials)> USE default;
hive (default)> set hive.cli.print.current.db=false;
hive> ...
删除和更改数据库
drop database if exists financials;
drop database if exists financials cascade; //连数据一起删掉
alter database financials set dbproperties('edited-by'='Joe');
创建表
create table employees(
name STRING comment 'Employee name',
salary FLOAT comment 'Employee salary',
subordinates ARRAY<STRING> comment 'Names of xx',
deductions MAP<STRING,FLOAT>,
address STRUCT<street:STRING,city:STRING,state:STRING,zip:INT>
)
comment 'Description of the table'
tblproperties('creator'='Mark Moneybags','date'='2012-01-02')
location '/user/hive/warehouse/mydb.db/employees';
create table if not exists mydb.employees2
like mydb.employees;
列出表
USE mydb;
show tables;
use default;
show tables in mydb;
use mydb;
show tables 'empl.*'
观看表的描述
describe extended mydb.employees;
外部表
create table employees(
name STRING comment 'Employee name',
salary FLOAT comment 'Employee salary',
subordinates ARRAY<STRING> comment 'Names of xx',
deductions MAP<STRING,FLOAT>,
address STRUCT<street:STRING,city:STRING,state:STRING,zip:INT>
)
row format delimited fields terminated by ','
location '/data/stocks';
create external table if not exists mydb.employee3
like mydb.employees
location '/path/to/data';
分区表:
create table employees(
xxx
)
partitioned by (country STRING,state STRING);
分区表的存储:会变成一个子目录里面的一系列文件
set hive.mapred.mode=strict;
select e.name,e.salalry from employees e limit 100;
//报错,然后
set hive.mapred.mode=nonstrict;
select e.name,e.salalry from employees e limit 100;
指定存储格式
create table kst
partitioned by (ds string)
row format serde 'com.linkedin.haivvreo.AvroSerDe'
with serdeproperties ('schema.url'='http://schema_provider/kst.avsc')
stored as
inputformat 'com.linkedin.haivvreo.AvroContainerInputFormat'
outputformat 'com.linkedin.haivvreo.AvroContainerOutputFormat';
create external table if not exists stocks(
xxx
)
clustered by (exchange,symbol)
sorted by (ymd asc)
into 96 buckets
删除和更改表
alter table log_messages partition(year=2011,month=12,day=2)
set location 's3n://ourbucket/logs/2011/01/02';
alert table log_messages set tblproperties('notes'='The xxxx');
列操作
alter table log_messages
change column hms hours_minutes_secondes int
comment 'xx'
after severity;
alter table log_messages add columns( app_name STRING);
alter tbal log_messages replace columns(message STRING);
DML操作
Hive不支持行级别,将数据放入表中的唯一办法是批量载入
LOAD DATA LOCAL inpath '{env:HOME}/california-employees'
overwrite into table employees
partition(country = 'US',state='CA');
Insert overwrite语句
insert overwrite table employees
partition(country='US')
select * from staged_employees se
where se.cnty='US'
from staged_employees se //非分区表
insert overwrite table employees
partition (country='US')
select * from xxx s
where s.cnty='US' //将非分区的表变成了分区表
动态分区插入
set hive.exec.dynamic.partition=true;
set hive.exec.dynamic.partition.mode=nonstrict;
set hive.exec.max.dynamic.partitions.pernode=1000;
创建表的同时把数据放进去
create table ca_employees
as select name,salary,address
from employees
where se.state='CA';
导出数据
直接复制粘贴
如果需要改动数据格式,可以使用insert overwrite
insert overwrite local directory '/tmp/ca_employees'
select name,salary,address
from employees
where se.state = 'CA';
SELECT:
使用正则表达式
select symobl,'price.*' from stocks;
select name from employees where address.street like '%Ave.'
select name,address.street from employees where address.street rlike '.*(Chicage|Ontario).*'; //rlike 来正则匹配
函数
//求各种统计指标的函数
explode //可以把数组元素展开成很多行
select explode(array(1,2,3)) as element from src;
嵌套select
from (
select upper(name),deductions["Federal Taxes"] as fed_taxes
from employees
) e
select e.name,e.salary_minus_fed_taxes
where e.salary_minus_fed_taxes > 70000;
连接操作(缓慢)
set hive.auto.convert.join=true;
select s.ymd,s.symbol,s.price_close,d.dividend
from stocks s join dividends d on s.ymd = d.ymd and
s.symbol = d.symbol where s.symbol='AAPL'
排序
order by and sort by
distribute by
cluster by
bucket : 桶 hash (抽样查询)
select * from numbers TABLESAMPLE(BUCKET 3 OUT OF 10 ON rand())s;
视图与索引
create index employees_index
on tbale employees(country)
as 'org.apache.hadoop.hive.ql.index.compact.CompactIndexHandler'
with deferred rebuild
idxproperties('creator = 'me','created_at'='some_time')
in table employees_index_table
partitioned by (country,name)
comment 'Employees indexd by country and name.';
位图索引
建模
执行计划
Google Dremel
山寨货:Apache Drill
Cloudera Impala (Hive的替代方案)
yarm是底层
架构在yarm上面就可以不用map-reduce了,可以用hadoop流什么的
//note10 Hive-impala子项目 HBase
Google Bigtable的开源实现
列式数据库
可集群化
可以使用shell、web、api等多种方 式访问
适合高读写(insert)的场景
HQL查询语言
NoSQL的典型代表产品
//不是使用的SQL语言
mysql是什么东西限制了其扩展,noSQL怎么解决的这个问题
Sqoop
用于在Hadoop和关系型数据库之间交换数据
通过JDBC接口连入关系型数据库
Avro
数据序列化工具
Chukwa
架构在Hadoop之上的数据采集与分析框架
主要进行日志采集和分析
Cassandra (几乎淘汰)
与Hbase类似,借鉴Google Bigtable的思想体系
特点:无中心的数据库 缺点:效率比较低
Zookeeper:
//还是一个资源库
文件没有上下级之分
znode可以存放data,上限1M;还可以放ACL,访问控制列表。
一般是奇数个节点,节点都是平等的,表面看起来zookeeper是无中心的但是会选举一个出来。
zab协议
阶段1:领导者选举
阶段2:原子广播
每个节点放一个watch,leader修改了信息后其他节点可以查看到修改的信息,并更改自己的信息
分布锁:
znode---leader
->
lock 观察
羊群效应
//note11 zookeeper
Big Table想法:
(S#,sn,sd,sa)
/*
1.行列:key值
2.属性
3.value
*/
hbase
删除不是正常删,而且给某个时间戳的一行插入一个新的行键打个标记作为删除 (HDFS不能进行文件修改,追加也很麻烦,Hbase做了一个折中的方式来insert。所有数据都是往内存里面插,在一定时间内存满了之后才可以写,即收集一定数据后就可以往里写,一写就是一块。)(Hbase每隔一段时间进行重整操作,会把一些比较小的时间拿出来合并成比较大的文件。抛弃是在重整操作过程中操作的,打标记的都扔掉,然后再重新写成一个大的文件。)
行键也是可以重复的
面向时间查询
行键
列族与列
时间戳
可以由用户显式赋值
行键,列族:限定符,时间戳 来唯一决定
列族元素在物理上存放的是同一个地方,不同的列族是不同的物理存放
store
memoryStore (先)
storeFile (后)
当内存里面的东西足够多时会存到storeFile(物理)
读取是在memoryStore里面的
每过一段时间会触发合并过程,会把小的storeFile合并成大的storeFile,合并过程中会删除标记的行及过期的行
每一个storeFile对应一个HDFS的文件,会分散在不同的物理节点里面
Region和Region服务器
HLog
-ROOT-和.META.表
HBase中有两张特殊的Table,-ROOT-和.META.
Ø .META.:记录了用户表的Region信息,.META.可以有多个regoin
Ø -ROOT-:记录了.META.表的Region信息,-ROOT-只有一个region
Ø Zookeeper中记录了-ROOT-表的location
Memstore与storeFile
一个store包含一个列族的所有数据,列族存放是在临近的区域里面
传统数据库的行式存储
为了读某个列的数据,必须要把整个行读完才能对其读取
联机事务处理随机读写还是要用行式数据库。
行式数据库存储问题
行标识访问:B树索引
B树索引原理:树形
oracle行式存储的访问形式
BigTable的LSM索引 (日志及数据)
L:log S:结构 M:merge(合并)
日志就是数据
zookeeper:
安装:单机模式
配置
安装:集群模式
hadoop与hbase版本问题
hbase->hadoop-core-x.x.x.jar //可以看hadoop匹配版本
修改hbase-env.sh
配置hbase-site.xml
启动Hbase及验证
bin/start-hbase.sh
/usr/java/jdk1.6.0_26/bin/jps
Hbase安装:伪分布模式
编辑hbase-env.sh增加HBASE_CLASSPATH环境变量
编辑hbase-site.xml打开分布模式
覆盖hadoop核心jar包
Hbase安装:完全分布模式
192.168.5.134:60010/master.jsp
//note12
Hbase操作命令复杂
Hbase数据建模问题
关系型数据库的弱点
CAP定律:
NoSQL运动
NoSQL数据库家族
redis一半内存一半硬盘
列式数据库在数据分析时工作特别快
满足一致性,可用性的系统
Redis
key-value类型的数据库
Hbase
不能group by等连接
Cassandra
MongoDB
擅长处理非结构化数据
Neo4J
适用于社交网站
NoSQL与CAP
密切相关的。真的要做成分布式的话必须要在其中放弃一种,一般都选一致性
Hbase存储架构理解
Key Length Value Length Key Value //key,value的长度比较重要
什么情况下使用Hbase?
成熟的数据分析主题,查询模式已经确立并且不轻易改变
传统的关系型数据库已经无法承受负荷,高速插入,大量读取
适合海量的,但同时也是简单的操作(例如key-value)
关系型数据库的困难
模式设计
Hbase:表设计与查询实现
搜索优化:
u-t
t-u
辅助索引
复合行键设计
//note13 数据集成
Sqoop
mysql hadoop 连接
Flume
Chukwa
日志收集
ODCH/OLH
oracle hadoop 连接
Oracle大数据连接器
Sqoop
SQL-to-HDFS工具
JDBC
hadoop-0.20.2下Squoop是不支持此版本的
配置
sqoop命令选项:
% sqoop help
% sqoop help import
从mysql导入数据的例子
% sqoop import --connect jdbc:mysql://localhost/hadoopguide \
//连入mysql
>--table widgets -m 1
% hadoop fs -cat widgets/part-m-00000
//间隔符用的,
导入到Hbase
sqoop import --connect jdbc:mysql//mysqlserver_IP/databaseName --table datatable --hbase-create-table --hbase-table hbase_tablename --column-family col_fam_name --hbase-row-key key_col_name
其中,databaseName和datatable是mysql的数据库和表名,hbase_tablename是要导成hbase的表名,key_col_name可以指定datatable中哪一列作为hbase新表的rowkey,col_fam_name是除rowkey之外的所有列的列族名
从oracle导入数据
需要有ojdbc6.jar放在$SQOOP_HOME/lib里,不需要添加到classpath
connecturl=jdbc:oracle:thin:@172.7.10.16:1521:orcl
oraclename=scott
oraclepassword=wang123456
oracleTableName=test
#需要从oracle中导入的表中的字段名
columns=ID,STATE
#导出到HDFS后的存放路径
hdfsPath=/tmp/
sqoop import --append --connect $CONNECTURL --username $ORACLENAME --password $ORACLEPASSWORD --m 1 --table $oracleTableName --columns $columns --hbase-create-table --hbase-table orl --hbase-row-key STATE --column-family orl
oracle big data connectors
HDFS直接连接器
可以把带有分隔符的文件作为oracle的外部表访问
还可以直接hadoop的文本文件作为数据源
hadoop装载器
直接把hadoop里面的东西装载过去
Oracle HDFS直接连接器(ODCH)实验
Oracle Enterprise Linux
配置hdfs_steam script文件
... (hadoop_v4_13d)
/logs
/extdir
!cat lab4.2_setup_DB_dir.sql
set echo on
create or replace directory ODCH_LOG_DIR as '/home/hadoop/.../logs'
grant read,write on directory ODCH_LOG_DIR to SCOTT;
@lab4.2_setup_DB_dir.sql
sqlplus scott/tiger
!cat lab4.3_ext_tab.sql
SQL创建外部表
preprocessor HDFS_BIN_PATH:hdfs_stream //先预处理数据再读
PROMPT>sqlplus scott/tiger
select count(*) from odch_ext_table;
set autotrace trace exp //设置追踪,来观察执行计划
select count(*) from odch_ext_para_table;
CDN加速
Flume
提供分布式,可靠和高可用的海量日志采集,聚合和传输的系统
Chukwa
//note14 扩展开发,与应用集成
UDF (用户定义函数) (reg:Pig,Hive)
Thrift接口
Rhadoop
UDF
写个自定义jar
create temporary function strip as 'com.hadoopbook.hive.[ClassName]'
% hive --auxpath /path/to/hive-example.jar
select xxx('name') from student
filter:
jar
register pig-examples.jar
grunt>filter.... com.hadoopbook.pig.xxxx(xxx);
DEFINE isGood com.hadoopbook.pig.xxxx();
然后就可以用 isGood(xxx);
应用与Hbase的对接:通过Thrift
Thrift是一个跨语言的服务部署框架
通过一个中间语言(IDL,接口定义语言)来定义RPC的接口和数据类型
//note15 与应用层连接
并行计算框架
MPI
PVM
Mesos
Map-Reduce
YARN
可以同时支持Map-Reduce,Storm,Spark,MPI等多种流行计算模型
Spark
YARN配置
//note16 hadoop源代码
//note17 hadoop与机器学习
Hadoop与机器学习
Mahout (封装各种算法)(天生适合做离线数据分析)
Hadoop在互联网企业中的应用
spark基于内存来计算。成本比hadoop高
不要求实时得出结果,可以不选spark
spark不太好转,跟java几乎无关
Mahout (在Data Mining上)
数据金字塔:从下往上
Making Decisions (决策层)
Data Presentations (数据展示层)
Data Mining (数据挖掘,建立数学模型/算法,找到一种合适的算法)
Data Exploration (对数据进行简单的查询)
Data Warehouses/Data Marts ETL(数据仓库)
Data Sources (数据源)
回归
样本这里叫学习集
用来做预测
分类器
决策树
贝叶斯分类器(顾客流失)(自己设置阈值)(文本分类)(搜索引擎判断两篇文章是否一致,概率多高)
有学习集的进行特征提取就可以自动完成
聚类(没有学习集)
(层次聚类法)
数据挖掘
数据分析
SAS,R,SPSS
SAS数据是经过检验的,R就不太可靠,SAS主要应用于金融
传统数据分析工具的困境
处理数据受限于内存,因此无法处理海量数据(R处理上限可能是100万)(R跟SAS处理不能超过内存数,不然会机器异常)
...
(聚类,推荐系统就无法使用抽样)
解决方向:hadoop集群和Map-Reduce并行计算
常见算法的Map-Reduce化
样本独立性比较强的就可以map-reduce
Lucene 早期搜索引擎的项目
Mahout的特点:
下载和解压Mahout
wget http://mirrors.cnnic.cn/apache/mahout/0.6/mahout-distribution-0.6.tar.gz
tar xzf ./mahout-distribution-0.6.tar.gz
配置环境变量
export HADOOP_HOME=/home/huang/hadoop-1.1.2
export HADOOP_CONF_DIR=/home/huang/hadoop-1.1.2/conf
export MAHOUT_HOME=/home/huang/hadoop-1.1.2/mahout-distribution-0.6
export MAHOUT_CONF_DIR=/home/huang/hadoop-1.1.2/mahout-distribution-0.6/conf
export PATH=$PATH:$MAHOUT_HOME/conf:$MAHOUT_HOME/bin
几个重要环境变量
JAVA_HOME mahout运行需指定jdk的目录 MAHOUT_JAVA_HOME指定此变量可覆盖JAVA_HOME值
HADOOP_HOME 如果配置,则在hadoop分布式平台上运行,否则单机运行 HADOOP_CONF_DIR指定hadoop的配置文件目录
MAHOUT_LOCAL 如果此变量值不为空,则单机运行mahout。
MAHOUT_CONF_DIR mahout配置文件的路径,默认值是$MAHOUT_HOME/src/conf MAHOUT_HEAPSIZE mahout运行时可用的最大heap大小 (堆大小)
验证安装成功
bin/mahout
源码和部分样本数据
装的时候要装源代码包(即-src的包)
将测试数据copy到HDFS
hadoop/bin/hadoop fs -mkdir ./testdata
hadooop/bin/hadoop fs -put ./synthetic_control.data ./testdata
hadoop/bin/hadoop fs -ls ./testdata
做一个kmeans测试(聚类测试)
mahout org.apache.mahout.clustering.syntheticcontrol.kmeans.Job
观察输出
用mahout输出
mahout vectordump --seqFile ./output/data/part-m-00000
20Newsgroups数据集
使用Mahout进行文本自动分类
上传并解压数据
20news-bydate-test 测试数据
20news-dydate-train 训练数据
建立训练集
mahout org.apache.mahout.classifier.bayes.PrepareTwentyNewsgroups \
-p /home/huang/data/20news-bydate-train \
-o /home/huang/data/bayes-test-input \ (结果输出到了本地。。)
-a org.apache.mahout.vectorizer.DefaultAnalyzer \
-c UTF-8
(作了分词什么的。。)
上传到HDFS
cd ../hadoop-1.1.2
bin/hadoop fs -mkdir ./20news
bin/hadoop fs -put ../data/bayes-train-input ./20news
bin/hadoop fs -put ../data/bayes-test-input ./20news
训练贝叶斯分类器
mahout trainclassifier \
-i /user/huang/20news/bayes-train-input \
-o /user/huang/20news/newsmodel \ (放输出的模型,即统计参数数据)
-type cbayes
-ng 2 \
-source hdfs
生成的模型
bin/hadoop fs -ls ./20news/newsmodel (里面放了一堆模型数据)
测试贝叶斯分类器
mahout testclassifier \
-m /user/huang/20news/newsmodel \
-d /user/huang/20news/bayes-test-input \
-type cbayes
-ng 2 \
-source hdfs \
-method mapreduce
京东
部门结构
运维团队(负责管理维护集群的正常运行)
数据仓库团队(根据业务部门的要求进行数据统计和查询)
成都研究院(负责底层,包括源代码修改和按上层部门要求开发 Map-Reduce程序,比如一些UDF)
淘宝
对Hadoop源码的修改
管理模式
准实时的流数据处理技术
从Oracle, Mysql日志直接读取数据
部分数据源来自应用消息系统
以上数据经由Meta+Storm的流数据处理,写入HDFS,实现实时或准实时的数据分析
数据装载到Hive进行处理,结果写回Oracle和Mysql数据库
Oceanbase
百度
日志的存储和统计;
网页数据的分析和挖掘;
商业分析,如用户的行为和广告关注度等;
在线数据的反馈,及时得到在线广告的点击情况;
用户网页的聚类,分析用户的推荐度及用户之间的关联度。