满天星
Fork me on GitHub

大数据学习笔记01-hadoop

Hadoop--炼数成金

//note01 Hadoop介绍与安装
。。。


倒排索引

(1;1) 单词出现在标识号为1的网页的编辑量是第1的位置

分词难度: 
    字典;
Page Rank
    用于给每个网页价值评分
Map-reduce思想:
    计算PR 
Lucene
    hadoop的起源,提供了全文检索引擎的架构
nutch
HBase
    列式存储(面向数据分析)(提高响应速度及I/O)

Namenode    
    HDFS的守护程序
    记录文件是如何分割成数据块的,以及这些数据块被存储到哪些节点上
    对内存和I/O进行集中管理
    是个单点,发生故障将使集群崩溃
Secondary Namenode
    监控HDFS状态的辅助后台程序
    每个集群都有一个
    与NameNode进行通讯,定期保存HDFS元数据快照
    当NameNode故障可以作为备用NameNode使用
DataNode
    每台从朋务器都运行一个
    负责把HDFS数据块读写到本地文件系统
JobTracker
    用于处理作业(用户提交代码)的后台程序
    决定有哪些文件参不处理,然后切割task幵分配节点
    监控task,重启失败的task(于不同的节点)
    每个集群只有唯一一个JobTracker,位于Master节点
TaskTracker
    位于slave节点上,与datanode结合(代码与数据一起的原则)
    管理各自节点上的task(由jobtracker分配)
    每个节点只有一个tasktracker,但一个tasktracker可以启动多个JVM, 用于并行执行map或reduce仸务
    与jobtracker交互
Master与Slave
    Master:Namenode、Secondary Namenode、Jobtracker。浏览器(用于观看管理界面),其它Hadoop工具
    Slave:Tasktracker、Datanode 
    Master不是唯一的

安装:
    ssh-keygen -t rsa
    scp ./id_rsa.pub huang@192.168.04:/home/huang/.ssh

    (名称节点的服务器)
        hadoop/conf/hadoop-env.sh      //只改JAVA_HOME就行
        hadoop/conf/core-site.xml
        <configuration>
            <property>
                <name>fs.default.name</name>
                <value>hdfs://backup01:9000</value> //指定名称节点位置
            </property>
            <property>
                <name>hadoop.tmp.dir</name>     //临时路径,不指定会默认用root下的./tmp目录,一定要设置这个参数
                <value>/home/huang/hadoop/tmp</value>
            </property>
        </configuration>

        hadoop/conf/hdfs-site.xml
        <configuration>
            <property>
                <name>dfs.replication</name>  //服务器因子
                <value>1</value>        //2代表复制2份,1不复制
            </property>
        </configuration>        

        hadoop/conf/mapred-site.xml
        <property>
            <name>mapred.job.tracker</name> 
            <value>backup01:9001</value>
        </property>

        hadoop/conf/masters.xml
        //填master主机名称
        hadoop/conf/slaves.xml
        //填slaves主机名称
        vi /etc/hosts
        //检查防火墙
        chkconfig iptables off

    (集群里面的配置几乎都一样)
    scp -r ./hadoop-1.1.2 huang@192.168.0.4:/home/huang/

    格式化名称节点:
        bin/hadoop namenode -format
        bin/start-all.sh
        //处理自己连自己免密码问题
        .ssh id_rsa.pub -> copy -> authorized_keys 在末尾粘贴
        bin/start-all.sh     //启动所有的节点
        //检查系统是否正常启动
        /usr/jdk1.7.0_25/bin/jps     //jps:java相关进程统计

伪分布式:
    到本地自己给自己免密码:
        只需要将.ssh/id_rsa.pub 复制成 authorized_keys


CentOS,安装与编译有关的包:
yum install svn     //可以部署到其他服务器
yum install autoconfautomake libtool cmake 
yum install ncurses-devel
yum install openssl-devel
yum install gcc*

安装maven
    //优点:参数可以结构化写在一个xml里面
    需要安装protobuf这个插件
    /usr/local/bin/protoc

svn
mvn     //之后有一个本地库问题

//测试hello world
//建立一个子目录
mkdir input
cd input/
echo "hello world" > test1.txt
echo "hello hadoop" > test2.txt
bin/hadoop fs -ls 
bin/hadoop fs -put ../input ./in     //复制input到/.in
bin/hadoop fs -ls 
bin/hadoop fs -ls ./in/*         //hadoop是没有当前路径一说的
bin/hadoop fs -cat ./in/test1.txt

jar包统计:
bin/hadoop jar hadoop-examples-1.1.2.jar wordcount in out
//out 输出文件名称 in 源文件名称
bin/hadoop fs -ls ./out

/out/part-r-00000             //放的是结果
port:50070
port:50030

219.232.252.17:50070

CDH安装


//note02 HDFS

提供分布式存储机制,提供可线性增长的海量存储能力
自动数据冗余,无须使用Raid,无须另行备份
为进一步分析计算提供数据基础

MR在HDFS基础上进行快速分析

PC组成集群即可。在任何节点,只要发布操作命令,就可以对整个HDFS系统进行统一操作
//本地化数据计算,节省传输花费的时间,也是HDFS设计的原则所在

包含:
    NameNode
    DataNode
    事务日志
    映像文件
    SecondaryNameNode

cat tmp/dfs/name/current/VERSION
namespaceID=         //记录命名空间的标识号,就是整个集群的标识
cTime=0         //这个HDFS创建的时间
storageType=NAME_NODE     //存储的类型
layoutVersion=-32        //-32 构造版本

还有影像文件,编辑日志
//每隔一段时间会有一个检查点将内存的数据写到fs里面实地保存
//edits会记录用户的各个操作,当系统如果有异常崩溃的话,系统恢复时它会先加载fsimage,调用edits重做一遍;如果写过一次fsimage,在这之前的操作就没用了;

cat dfs/namesecondary/current/VERSION     //备份
//blk开头的文件是数据块

一个文件的写是写到不同的datanode里面的

冗余副本策略
    //在复制冗余副本的时候用户是不能操作的

机架策略
    //机架一般放20多个服务器,每个机架之间用交换机相连,交换机通过一个上级交换机来连接;同一机架下的节点只经过一个交换机,所以传输速度快

core-site.xml    //设置机架
心跳机制
    //每隔一段时间给Namenode发送一次
安全模式
    //安全模式下用户不能写数据,节点多的话可能会长达10多分钟
    bin/hadoop dfsadmin -safemode enter     //强制进入安全模式
校验和
    blk_xxxx.meta     //crc校验然后写到.meta文件里,缺省值512字节产生4个字节校验和
    //校验和本身很消耗性能,使用的是jvm的软件进行软计算算的。可以直接改成cpu硬计算
    //性能优化一般都会到jvm里面去操作
回收站
    //如果打开的话需要先配置core-site.xml
    //测试
    bin/hadoop fs -rmr ./in/test1.txt
    //会出现./Trash这个文件,相当于移到了回收站的目录
恢复和清空
    mv 将.Trash的文件移回来就OK了
    fs -expunge     //清空
元数据保护
    //配置多个副本会影响namenode处理速度,但是会增加安全性。
快照


HDFS文件操作
    hadoop没有当前目录的概念,也没有cd命令,需要绝对地址

    //查看HDFS下某个文件的内容
    bin/hadoop fs -cat ./in/test1.txt
    //查看HDFS基本统计信息
    bin/hadoop dfsadmin -report
    //HDFS是不能修改,下载到linux文件改完再传回去

怎么添加节点?
    在新节点安装好hadoop
    把namenode的有关配置文件复制到该节点
    修改masters和slaves文件,增加该节点
    设置ssh免密码迕出该节点
    单独启劢该节点上的datanode和tasktracker(hadoop-daemon.sh start datanode/tasktracker)
    运行start-balancer.sh迕行数据负载均衡
    //start-dfs.sh,不需要重启集群

java操作HDFS:
URLCat.java
public class URLCat{
    static{
        URL.setURLStreamHandlerFactory(new FsUrlStreamHandlerFactory());
    }
    public static void main(String[] args) throws Exception{
        inputStream in = null;
        try{
            in = new URL(args[0]).openStream();
            IOUtils.copyBytes(in,System.out,4096,false);
        }finally{
            IOUtils.closeStream(in);
        }
    }
}

设置Hadoop类目录
    Hadoop-env.sh
    export HADOOP_CLASSPATH=xxxxx/myclass
设置搜索目录
    ls -a         //查看隐藏文件
    .bash_profile     //脚本

    javac URLCat.java     //会报错,因为没有import Hadoop的包
    cd hadoop/lib
    //导入包之后还会报错,再指定classpath的jar路径就OK了
    javac -classpath ../hadoop-core-1.1.2.jar URLCat.java
    bin/hadoop URLCat hdfs://backup01/usr/huang/in/test1.txt             //运行jar
    //这里没有指定端口
    bin/hadoop URLCat hdfs://backup01:9000/usr/huang/in/test1.txt 

下载ant
下载参考书的代码并上传解开
    7287OS_Code/
    cd chapter2
    cd HDFS_JAVA_API/
    cd src

设置HADOOP_HOME环境变量

build.xml
    loaction="build"      //输出到build文件夹里

/home/huang/apache-ant-1.9.2/bin/ant     //在HDFS_Java_API目录下执行
~/hadoop-1.1.2/bin/hadoop jar HDFSJavaAPI.jar HDFSJavaAPIDemo


C_API
安装gcc (c语言的编译器)
yum -y install gcc gcc-c++ autoconf make

测试HDFS C_API
hdfs_cpp_demo.c

#inlude "hdfs.h"
int main(int argc,char **argv){
    hdfsFS fs = hdfsConnect("backup01",9000);
    if(!sf){
        fprintrf(stderr,"Cannot connect to HDFS.\n");
        exit(-1);
    }

    char* fileName = "demo_txt";
    char* message="Welcome to HDFS C API!";
    int size = strlen(message);
}

//bin/tar.zz.mds         没有源码的包
个人配置的话 

编译
gcc hdfs_app_demo.c \
-I $HADOOP_HOME/src/c++/libhdfs \         //-I 包含
-I $JAVA_HOME/include \
-I $JAVA_HOME/include/linux/ \
-L $HADOOP_HOME/c++/Linux-amd64-64/lib/ -lhdfs \     //-L 连接库的路径
-L $JAVA_HOME/jre/lib/amd64/server -ljvm \
-o hdfs_cpp_demo            // -o 输出


利用之前ant输出设置CLASSPATH环境变量
    要把Hadoop所有的jar包都列进去
利用ant打印环境变量
    /home/xxx/ant print -cp
    export CLASSPATH=xxxxx

//要在同一命令行下执行    
LD_LIBRARY_PATH=$HADOOP_HOME/xxx/amd64/server ./hdfs_cpp_demo


java解读:
FileSystem
public class FileSystemCat{
    main() throws Exception{
        String uri = args[0];
        Configuration conf = new Configuration();
        FIleSystem fs = FileSystem.get(URI.create(uri),conf);
        InputStream in = null;
        try{
            in = fs.open(new Path(uri));
            IOUtils.copyBytes(in,System.out,4096,false);     //hadoop包的
        }finally{
            IOUtils.closeStream(in);
        }
    }
}

hadoop FileSystemCat hdfs://localhost/user/tom/quangle.txt

//C程序可以查看hdfs.h这个文件了解API

Hadoop 2.x     (namenode不再是单点)
    HDFS HA
    管理命令手册
    块池
        同一个datanode可以存着属于多个block pool的多个块
//hadoop_v4_02g

    hdfs-site.xml
        dfs.nameservices
            <value>ns1,ns2</value>
            ...

    格式化名称节点
    HDFS快照
    快照位置
        hdfs dfs -ls /foo/.snapshot

//note03 HDFS HA联邦安装

DNS安装
    yum -y install bind bind-utils bind-chroot
    rpm -qa | grep '^bind'         //查看是否安装成功
    vim /etc/named.conf
    options{
        listen-on port 53 { 127.0.0.1;}->
                            { any; }
        allow-query {localhost;} -> { any; }     //对所有用户开放
    }
    vim /etc/named.rfc1912.zones
    //末尾添加
    zone "hadoop.com" IN {     //正解区域
        type master;
        file "named.hadoop.com";
        allow-update { none; };
    }
    zone "0.168.192.in-addr.arpa" IN {     //反解区域
        type master;
        file "named.192.168.0.zone";
        allow-update { none; };
    }

cp -p named.localhost named.hadoop.com         //复制的时候保持文件权限不变
vim named.hadoop.com
    IN SOA     user3.hadoop.com. grid.user3.hadoop.com. (
                                    0        ; serial
                                    1D        ; refresh
                                    1H        ; retry
                                    1W        ; expire
                                    3H )     ; minimum
    IN NS user3.hadoop.com.
user3.hadoop.com. IN A 192.168.0.109
user3.hadoop.com. IN A 192.168.0.110
user3.hadoop.com. IN A 192.168.0.111
user3.hadoop.com. IN A 192.168.0.112
user3.hadoop.com. IN A 192.168.0.113
user3.hadoop.com. IN A 192.168.0.114                                    

[named] cp -p named.localhost named.192.168.0.zone
vim named.192.168.0.zone
    IN SOA     user3.hadoop.com. grid.user3.hadoop.com. (
                                    0        ; serial
                                    1D        ; refresh
                                    1H        ; retry
                                    1W        ; expire
                                    3H )     ; minimum
    IN NS user3.hadoop.com.
109 IN PTR user3.hadoop.com.
110 IN PTR user3.hadoop.com.
111 IN PTR user3.hadoop.com.
112 IN PTR user3.hadoop.com.
113 IN PTR user3.hadoop.com.
114 IN PTR user3.hadoop.com.

//slave     vim /etc/sysconfig/network-scripts/ifcfg-eth0
            //末尾添加DNS服务器IP地址
            DNS1=192.168.0.109
            //在每台slave添加
service network restart
service named start     //启动DNS
chkconfig named on         //开机就将DNS服务启动
//检查
chkconfig --list named
chkconfig --level 123456 named off
tail -n /var/log/messages | grep named
//测试主机名解析
nslookup user3.hadoop.com

HDFS HA + 联邦 + Resource Manager HA
//安装好DNS之后
cat /etc/resolv.conf     //查看
nslookup www.dataguru.cn     //测试
NFS     (网络文件系统)
    可以设置配置文件把某些目录共享出去,并可以设置权限
    scp -rp ./hadoop-0.20.2 grid@h1:/home/grid
    awk脚本
    awk '{print $1}'         //awk 都用单引号,以空格/制表符分隔 ,一般处理表格等文件,reg:日志文件
    awk '$9~/rr/{print $9}'             //~包含 {}里面放执行语句
    //除了awk,还要学sed

cat slave
h1
h2
h3
h4
h5
h6
cat ./slave | awk '{print "scp -rp ./hadoop-0.20.2 grid@"$1":/home/grid"}' > scp_test
chmod a+x scp_test         //变成可执行文件
sh ./scp_test


//note04 MR

//超级计算机结构是非开放的,每一台都是定制的

//reg:日志分析,通过hdfs切割成很多的块,分散到各个节点上,然后各个节点一起来并行计算,再把结果加起来
//并行计算框架
    MPI
        c语言的函数库
        计算密集型,算是瓶颈
    PVM
    CUDA
        英伟达配合显卡GPU推出来的包,利用GPU多核心来处理
    BOINC
        互联网计算(可以当分析志愿者)
    Map-Reduce
        负担主要在I/O

云计算

目前流行的开源云计算解决方案

reg:气象数据集:
    zcat xxx-xx-x.gz         //查看.gz文件

    解压及合并: zcat *.gz > sample.txt     //按数据量大小来解压合并
    分析MR
    input -> |有偏移量key| -> map -> |(key,value)| -> shuffle -> |聚合操作| -> reduce -> |求value的最大值/平均值等| -> output -> hdfs

    Java MapReduce 通常需要3段程序
    1.映射器 (从原始数据读成key,value)MaxTemperatureMapper
    2.reducer(key,value变成最后需要的形式) MaxTemperatureReducer
    3.作业程序,总调度 MaxTemperature

运行MR
cd myclass             //讲课创建的jar测试目录
cat MaxTemperatureMapper.java     
cat MaxTemperature.java
cat MaxTemperatureReducer.java

javac -classpath ../hadoop-core-1.1.2.jar *.java
../bin/hadoop MaxTemperature ./user/huang/in/723440-13964 ./out6  //数据文件. 输出到out6
//没有找到Mapper,解决方式,打成jar包
jar cvf ./MaxTemperature.jar *.class
mv MaxTemperature.jar ..
//需要删掉之前的class文件
rm *.class
cd ..
bin/hadoop jar ./MaxTemperature.jar MaxTemperature ./user/huang/in/723440-13964 ./out6
../bin/hadoop fs -ls ./out6
../bin/hadoop fs -cat ./out6/part-r-00000

//分析计算过程
Mapper
public void map(LongWritable key,Text value,Context context) throws IOException,InterruptedException{
    String line = value.toString();
    String year = line.substring(15,19);
    int airTemperature;
    if(line.charAt(87) == '+'){
        //parseInt doesn't like leading plus signs
        airTemperature = Integer.parseInt(line.substring(88,92));
    }else{
        airTemperature = Integer.parseInt(line.substring(87,92));
    }
    String quality = line.substring(92,93);
    if(airTemperature != MISSING && quality.matches("[01459]")){
        context.write(new Text(year),new IntWritable(airTemperature));
    }
}

Reducer
public class MaxTemperatureReducer extends Reducer<Text,IntWritable,Text,IntWritable>{
    @Override
    public void reduce(Text key,Iterable<IntWritable> values,Context context) throws IOException,InterruptedException{
        int maxValue = Integer.MIN_VALUE;
        for(IntWritable value:values){
            maxValue = Math.max(maxValue,value.get();
        }
        context.write(key,new IntWritable(maxValue));
    }
}

M-R job
public class MaxTemperature{
    main() throws Exception{
        if(args.length != 2){
            System.err.println("Usage:MaxTemperature<input path> <output path>");
            System.exit(-1);
        }
        Job job = new Job();
        job.setJarByClass(MaxTemperature.class);
        job.setJobName("Max temperature");
        FileInputFormat.addInputPath(job,new Path(args[0]));
        FileOutputFormat.setOutputPath(job,new Path(args[1]));
        job.setMapperClass(MaxTemperatureMapper.class);
        job.setReducerClass(MaxTemperatureReducer.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);
        System.exit(job.waitForCompletion(true)?0:1);
    }
}

分片的问题
Combiner     做预计算的。

Map-Reduce工作机制
    //java跟c相比,慢的是启动jvm这个过程。jvm启动不挂的话还是很快的
    有一个心跳是3秒一次,jobtracker周期是1分钟一次

主要工作:SQL或PL/SQL改写为Map-Reduce程序

Eclipse:
hadoop/contrib/eclipse-plugin/xxx.jar
windows->preferences->hadoop map/reduce
    show Map-Reduce 显示MR视图
在视图右键->new Hadoop loaction->
Location name:xxx
hadoop/conf/mapred-site.xml ->找端口
端口填写一致
左侧右键DFS Loactions->Disconnect->home user

new -> MapReduce Project -> name:xxx
src->new example.java

@Override
public int run(String[] args)throws Exception{
    Configuration conf = getConf();

    Job job = new Job(conf,"example");         //任务名
    job.setJarByClass(example.class);        //指定Class

    FileInputFormat.addInputPath(job,new Path(args[0]));     //输入路径
    FileOutputFormat.setOutputPath(job,new Path(args[1]));     //输出路径

    job.setMapperClass(Map.class);            //调用上面Map类作为Map任务代码
    job.setReducerClass(Reduce.class);        //调用上面Reduce类作为Reduce任务代码
    job.setOutputFormatClass(TextOutputFormat.class);    //指定输出的KEY的格式
    job.setOutputKeyClass(Text.class);         //指定输出的KEY的格式
    job.setOutputValueClass(Text.class);     //指定输出的VALUE的格式

    job.waitForCompletion(true);
    return job.isSuccessful()?0:1;
}

main(){
    int res = ToolRunner.run(new Configuration(),new example(),args);
    System.exit(res);
}

源文件:
->
Mapper
1.分割原始数据
2.输出所需数据
3.处理异常数据
->
输出到HDFS

public class Test_1 extends Configured implements Tool{
    enum Counter{ //可以对其自增操作
        LINESKIP,     //出错的行
    }

    public static class Map extends Mapper<LongWritable,Text,NullWritable,Text>{//变量为: 输入,输出key,value格式
        //Text主要记录字符串的,NullWritable 空值
        //key 偏移量 ,内容 value
        public void map(LongWritable key,Text value,Context context) throws IOException,InterruptedException{
            String line = value.toString();    //读取源数据
            try{
                //数据处理
                String[] lineSplit = line.split(" ");
                String month = lineSplit[0];
                String time = lineSplit[1];
                String mac = lineSplit[6];
                Text out = new Text(month + '' + time + '' + mac);

                //如果是context.write(key,out);则会出现\t
                //用了NullWritable.get() 之后不会出现 \t
                context.write(NullWritable.get(),out);//输出 key \t value
            }catch(java.lang.ArrayIndexOutOfBoundsExecption e){
                context.getCounter(Counter.LINESKIP).increment(1);//出错令计数器+1
                return;
            }
        }
    }
    @Override
    public int run(String[] args)throws Exception{
        Configuration conf = getConf();

        Job job = new Job(conf,"example");         //任务名
        job.setJarByClass(Test_1.class);        //指定Class

        FileInputFormat.addInputPath(job,new Path(args[0]));     //输入路径
        FileOutputFormat.setOutputPath(job,new Path(args[1]));     //输出路径

        job.setMapperClass(Map.class);            //调用上面Map类作为Map任务代码
        job.setReducerClass(Reduce.class);        //调用上面Reduce类作为Reduce任务代码
        job.setOutputFormatClass(TextOutputFormat.class);    //指定输出的KEY的格式
        job.setOutputKeyClass(Text.class);         //指定输出的KEY的格式
        job.setOutputValueClass(Text.class);     //指定输出的VALUE的格式

        job.waitForCompletion(true);
        return job.isSuccessful()?0:1;
    }
    main(){
        //运行任务
        int res = ToolRunner.run(new Configuration(),new Test_1(),args);
        System.exit(res);
    }
}

Run_Configurations->Test_1->Arguments:
hdfs://localhost:9000/user/james/input hdfs://localhost:9000/user/james/output
//output必须是不存在的

倒排索引

new->Haodoop Project->Test_2
public calss Test_2 extends Configured implements Tool{
    enum Counter{
        LINESKIP
    }
    public static class Map extends Mapper<LongWritable,Text,Text,Text>{//变量为: 输入,输出格式
        String line = value.toString();     
        try{
            //数据处理
            String[] lineSplit = line.split(" ");//135,10085
            String anum = lineSplit[0];
            String bnum = lineSplit[1];

            context.write(new Text(bnum),new Text(anum));
        }catch(java.lang.ArrayIndexOutOfBoundsExecption e){
            context.getCounter(Counter.LINESKIP).increment(1);//出错令计数器+1
            return;
        }
    }
    public static class Reduce extends Reducer<Text,Text,Text,Text>{
        public void reduce(Text key,Iterable<Text> values,Context context)throws IOException,InterruptedException{
            String valueString;
            String out = "";

            for (Text value:values){
                valueString = value.toString();
                out += valueString + "|";
            }
            context.write(key,new Text(out));
        }
    }
    run(){
        job.setReducerClass(Reduce.class);
    }
    main(){}
}

Export->JAR->JAR file path->next->Main Class填写->Clone


//note05     MR实战

性能调优:
    究竟需要多个reducer?
    输入:大文件(上G的)优于小文件
    减少网络传输:压缩map的输出
    优化每个节点能运行的任务数:mapred.tasktracker.map.tasks.maximum和mapred.tasktracker.reduce.tasks.maximum(缺省值均为2)

hadoop流与脚本

wordcount:
    //数单词
    cat install.log | wc

web Apache日志分析:
    PV     IP
    图片/日志点击先区分开     
    爬虫/日志点击区分开

//排除爬虫
探针设计         //在网站点击一下,只用算法排除的话,依然是多出3-5倍无用点击
//不直接分析网站日志,而是间接分析探针日志,来计算pv
<script type="text/javascriopt">
    var _gaq = _gaq || []; 
    _gaq.push(['_setAccount','UA-20237423-4']);
    _gaq.push(['_setDomainName','.itpub.net']);
    _gaq.push(['_trackPageview']);

    (function(){
        var ga = document.createElement('script');
        ga.type = 'text/javascript';
        ga.async = true;
        ga.src = ('https:' == doucment.location.protocol? 'https://ssl':'http://www') + '.google-a???';
        var s = document.getElementsByTagName('script')[0];
        s.parentNode.insertBefore(ga,s);
    })();
</script>
<div style="display:none">
<script type="text/javascript">
    var _bdhmProtocol = (("https:" == document.location.protocol) ? "https://" : "http://");
    document.write(unescape("%3Cscript src='" + _bdhmProtocol + "hm.baidu.com/h.js%3F5016281862f595e78"));
</script></div>
<!-- END STAT PV --></body>
</html>

排除爬虫和程序点击,对抗作弊
·用鼠标测动对抗爬虫
·常用流量作弊手段
·跟踪用户

### 一边点击一边换IP ###

拿搜索词
??纯真88
统计浏览器类型

少量数据的情况下:
    awk,grep,sort,join等,perl,python,正则等
海量数据的情况下: 10G,100G 增长的时候
CDN
    (反向代理加速)

ip去重


//note06  复杂应用/hadoop流

InputFormat()
OutputFormat()

// 06 hadoop_v4_06c 04:00



//note07 Pig

set命令检查环境变量

进入grunt shell
    pig -x local

PIG_CLASSPATH
    pig

Pig的运行方法:
    脚本
    Grunt
    嵌入式
        pig转换为java,再由jvm执行

Grunt
    自动补全机制
    Autocomplete文件
    Eclipse插件PigPen

    help
    ls,cat,cd
    copyToLocal test1.txt ttt  (复制到grunt外面的当前路径)
    sh /usr/java/jdk1.7.0.0_26/bin/jps       //直接执行命令

    Bag,Tuple,Field,Pig不要求具有各tuple相同数量或相同类型的field


pig -x local
A = LOAD '/home/grid/csdn.txt'
USING PigStorage('#')
B = FOREACH A
STORE B INTO '/home/grid/emmail.txt'
USING PiagStorage();

脚本:
    grunt> records = LOAD 'input/ncdc/micro-tab/sample.txt'
    >> AS (year:chararray,temperature:int,quality:int); //如果没有定义分隔符,则默认是制表符。
    DUMP records;         //输出
    DESCRIBE records;     //输出查看结构

    filtered_records = FILTER records BY temperature != 9999 AND 
    >> (quality == 0 OR quality == 1 OR quality == 4 OR quality == 5 OR quality == 9);
    DUMP filtered_records;
    GROUP
    FOREACH      //对每一行进行扫描处理

//有点类似面向数据流的处理语言,Mapp-Reduce有点面向计算

UDF 用户自定义函数
    guoyunsky.iteye.com/blog/1317084

reg:
pig
cat score.txt
A = LOAD 'score.txt' USING PigStorage(',') AS (student,course,teacher,score:int);
DESCRIBE A;
B = FOREACH A CENERATE student,teacher;
DESCRIBE B;
C = DISTINCT      //去重
C = DISTINCT B;
D = GROUP C BY student;
D = FOREACH (GROUP C BY student) CENERATE group AS student,COUNT(C);
DUMP D;

//第二种方法
DESCRIBE B;
E = GROUP B BY student;
DESCRIBE E
F = FOREACH E
{
    T = B.teacher;
    uniq = DISTINCT T;
    GENERATE group AS student,COUNT(uniq) AS cnt;
}



//note08      Hive
数据仓库工程师
NoSQL -> Not Only SQL

Hive安装

配置文件
cd hive/conf
hive-env.sh.template -> hive-env.sh
HADOOP_HOME=xxx
export HIVE_CONF_DIR=xxx
hive-site.xml

hadoop-env.sh
export HADOOP_CLASSTHAN= xxx;

./hive
show tables
create table abc (c1 string);
drop table abc;

/user/hive/warehouse/abc/数据

insert overwrite table result
select xxx frrom loc 

thrift server / JDBC
reg:
main()throws Exception{
    CLass.forNmae("org.apache.hadoop.hive.jdbc.HiveDriber");
    String dropSql="drop table pokes";
    String createSql="create table pokes (foo int,bar string";
    String insertSql="load data local inpath '/home/zhangxin/hive/kv1.txt' overwrite into table pokes";
    String querySql="select bar from pokes limit 5";
    Connection connection=DriverManager.getConnection("jdbc:hive://localhost:10000/default","","");
    Statement statement = connection.createStatement();
    statement.execute(dropSql);
    statement.execute(createSql);
    statement.execute(insertSql);
    ResultSet rs = statement.executeQuery(querySql);
    while(rs.next()){
        sout(rs.getString("bar"));
    }
}

http://10.20.151.7:9999/hwi/         //ip改成自己的,默认web路径访问

元数据


//note09     Hive

sql不同的,有3种独特的类型
struct
map
array

reg:
create table employees(
    name     STRING,
    salary    FLOAT,
    subordinates     ARRAY<STRING>,
    deductions         MAP<STRING,FLOAT>,
    address         STRUCT<street:STRING,city:STRING,state:STRING,zip:INT>    
);

缺省分隔符:
\n
^A     \001         分离不同的列(字段)
^B     \002         分割数组/集合里面的元素
^C     \003         map的key,value之间分割

create table employees(
    xxx
)
ROW format delimited
fields terminated by '\001'
collection items terminated by '\002'
map keys terminated by '\003'
lines terminated by '\n'
stored as textfile;

DDL:
create database if not exists financials;
show databases like 'h.*';

存放目录
缺省存放目录由hive.metastore.warehouse.dir指定
可以使用以下命令覆盖
    create database financials
    location '/my/preferred/directory'

观看数据库描述
create database financials
    comment 'Holds all financial tables';
describe database financials;
create database financials
    with dbproperties('creator'='Mark Moneybags','date'='2012-01-02');
describe database extended financials;

切换数据库
USE financials;
set hive.cli.print.current.db=true;
hive (financials)> USE default;
hive (default)> set hive.cli.print.current.db=false;
hive> ...

删除和更改数据库
drop database if exists financials;
drop database if exists financials cascade;     //连数据一起删掉
alter database financials set dbproperties('edited-by'='Joe');

创建表
create table employees(
    name     STRING comment 'Employee name',
    salary    FLOAT comment 'Employee salary',
    subordinates     ARRAY<STRING> comment 'Names of xx',
    deductions         MAP<STRING,FLOAT>,
    address         STRUCT<street:STRING,city:STRING,state:STRING,zip:INT>    
)
comment 'Description of the table'
tblproperties('creator'='Mark Moneybags','date'='2012-01-02')
location '/user/hive/warehouse/mydb.db/employees';

create table if not exists mydb.employees2
like mydb.employees;

列出表
USE mydb;
show tables;
use default;
show tables in mydb;
use mydb;
show tables 'empl.*'

观看表的描述
describe extended mydb.employees;

外部表
create table employees(
    name     STRING comment 'Employee name',
    salary    FLOAT comment 'Employee salary',
    subordinates     ARRAY<STRING> comment 'Names of xx',
    deductions         MAP<STRING,FLOAT>,
    address         STRUCT<street:STRING,city:STRING,state:STRING,zip:INT>    
)
row format delimited fields terminated by ','
location '/data/stocks';

create external table if not exists mydb.employee3
like mydb.employees
location '/path/to/data';

分区表:
create table employees(
    xxx
)
partitioned by (country STRING,state STRING);

分区表的存储:会变成一个子目录里面的一系列文件
set hive.mapred.mode=strict;
select e.name,e.salalry from employees e limit 100;
//报错,然后
set hive.mapred.mode=nonstrict;
select e.name,e.salalry from employees e limit 100;

指定存储格式

create table kst
partitioned by (ds string)
row format serde 'com.linkedin.haivvreo.AvroSerDe'
with serdeproperties ('schema.url'='http://schema_provider/kst.avsc')
stored as
inputformat 'com.linkedin.haivvreo.AvroContainerInputFormat'
outputformat 'com.linkedin.haivvreo.AvroContainerOutputFormat';

create external table if not exists stocks(
    xxx
)
clustered by (exchange,symbol)
sorted by (ymd asc)
into 96 buckets

删除和更改表
alter table log_messages partition(year=2011,month=12,day=2)
set location 's3n://ourbucket/logs/2011/01/02';
alert table log_messages set tblproperties('notes'='The xxxx');

列操作
alter table log_messages
change column hms hours_minutes_secondes int
comment 'xx'
after severity;

alter table log_messages add columns( app_name STRING);
alter tbal log_messages replace columns(message STRING);

DML操作
    Hive不支持行级别,将数据放入表中的唯一办法是批量载入
LOAD DATA LOCAL inpath '{env:HOME}/california-employees'
overwrite into table employees
partition(country = 'US',state='CA');

Insert overwrite语句
insert overwrite table employees
partition(country='US')
select * from staged_employees se
where se.cnty='US'

from staged_employees se     //非分区表
insert overwrite table employees 
    partition (country='US')
    select * from xxx s
    where s.cnty='US'                 //将非分区的表变成了分区表

动态分区插入
set hive.exec.dynamic.partition=true;
set hive.exec.dynamic.partition.mode=nonstrict;
set hive.exec.max.dynamic.partitions.pernode=1000;


创建表的同时把数据放进去
create table ca_employees
as select name,salary,address
from employees
where se.state='CA';

导出数据
直接复制粘贴
如果需要改动数据格式,可以使用insert overwrite
insert overwrite local directory '/tmp/ca_employees'
select name,salary,address
from employees
where se.state = 'CA';

SELECT:
使用正则表达式

select symobl,'price.*' from stocks;
select name from employees where address.street like '%Ave.'
select name,address.street from employees where address.street rlike '.*(Chicage|Ontario).*';     //rlike 来正则匹配

函数
    //求各种统计指标的函数
explode //可以把数组元素展开成很多行
select explode(array(1,2,3)) as element from src;

嵌套select
from (
    select upper(name),deductions["Federal Taxes"] as fed_taxes
    from employees
) e
select e.name,e.salary_minus_fed_taxes
where e.salary_minus_fed_taxes > 70000;

连接操作(缓慢)
set hive.auto.convert.join=true;

select s.ymd,s.symbol,s.price_close,d.dividend
from stocks s join dividends d on s.ymd = d.ymd and 
s.symbol = d.symbol where s.symbol='AAPL'

排序
order by and sort by
distribute by 
cluster by

bucket : 桶         hash     (抽样查询)
select * from numbers TABLESAMPLE(BUCKET 3 OUT OF 10 ON rand())s;

视图与索引

create index employees_index
on tbale employees(country)
as 'org.apache.hadoop.hive.ql.index.compact.CompactIndexHandler'
with deferred rebuild
idxproperties('creator = 'me','created_at'='some_time')
in table employees_index_table
partitioned by (country,name)
comment 'Employees indexd by country and name.';

位图索引

建模

执行计划

Google Dremel
山寨货:Apache Drill
     Cloudera Impala (Hive的替代方案)

yarm是底层
架构在yarm上面就可以不用map-reduce了,可以用hadoop流什么的


//note10 Hive-impala子项目 HBase

 Google Bigtable的开源实现
 列式数据库
 可集群化
 可以使用shell、web、api等多种方 式访问
 适合高读写(insert)的场景
 HQL查询语言
 NoSQL的典型代表产品

//不是使用的SQL语言
mysql是什么东西限制了其扩展,noSQL怎么解决的这个问题

Sqoop
    用于在Hadoop和关系型数据库之间交换数据
    通过JDBC接口连入关系型数据库

Avro
    数据序列化工具

Chukwa
    架构在Hadoop之上的数据采集与分析框架
    主要进行日志采集和分析

Cassandra (几乎淘汰)
    与Hbase类似,借鉴Google Bigtable的思想体系
    特点:无中心的数据库         缺点:效率比较低

Zookeeper:
    //还是一个资源库

    文件没有上下级之分
    znode可以存放data,上限1M;还可以放ACL,访问控制列表。

    一般是奇数个节点,节点都是平等的,表面看起来zookeeper是无中心的但是会选举一个出来。
    zab协议
    阶段1:领导者选举
    阶段2:原子广播

    每个节点放一个watch,leader修改了信息后其他节点可以查看到修改的信息,并更改自己的信息

    分布锁:
        znode---leader
        ->
        lock     观察
    羊群效应


//note11     zookeeper

Big Table想法:
    (S#,sn,sd,sa)     
    /*
        1.行列:key值    
        2.属性
        3.value
    */

    hbase
    删除不是正常删,而且给某个时间戳的一行插入一个新的行键打个标记作为删除 (HDFS不能进行文件修改,追加也很麻烦,Hbase做了一个折中的方式来insert。所有数据都是往内存里面插,在一定时间内存满了之后才可以写,即收集一定数据后就可以往里写,一写就是一块。)(Hbase每隔一段时间进行重整操作,会把一些比较小的时间拿出来合并成比较大的文件。抛弃是在重整操作过程中操作的,打标记的都扔掉,然后再重新写成一个大的文件。)
    行键也是可以重复的
    面向时间查询

行键
列族与列
时间戳
    可以由用户显式赋值
    行键,列族:限定符,时间戳    来唯一决定
    列族元素在物理上存放的是同一个地方,不同的列族是不同的物理存放
    store
        memoryStore (先)
        storeFile (后)
    当内存里面的东西足够多时会存到storeFile(物理)
    读取是在memoryStore里面的
    每过一段时间会触发合并过程,会把小的storeFile合并成大的storeFile,合并过程中会删除标记的行及过期的行

    每一个storeFile对应一个HDFS的文件,会分散在不同的物理节点里面

Region和Region服务器
HLog
-ROOT-和.META.表
 HBase中有两张特殊的Table,-ROOT-和.META.
 Ø .META.:记录了用户表的Region信息,.META.可以有多个regoin
 Ø -ROOT-:记录了.META.表的Region信息,-ROOT-只有一个region 
 Ø Zookeeper中记录了-ROOT-表的location

Memstore与storeFile
    一个store包含一个列族的所有数据,列族存放是在临近的区域里面

传统数据库的行式存储
为了读某个列的数据,必须要把整个行读完才能对其读取
联机事务处理随机读写还是要用行式数据库。

行式数据库存储问题
行标识访问:B树索引

B树索引原理:树形
oracle行式存储的访问形式

BigTable的LSM索引     (日志及数据)
L:log        S:结构        M:merge(合并)
日志就是数据

zookeeper:
安装:单机模式
配置
安装:集群模式

hadoop与hbase版本问题
    hbase->hadoop-core-x.x.x.jar //可以看hadoop匹配版本
修改hbase-env.sh
配置hbase-site.xml

启动Hbase及验证
bin/start-hbase.sh
/usr/java/jdk1.6.0_26/bin/jps

Hbase安装:伪分布模式
编辑hbase-env.sh增加HBASE_CLASSPATH环境变量
编辑hbase-site.xml打开分布模式
覆盖hadoop核心jar包
Hbase安装:完全分布模式

192.168.5.134:60010/master.jsp


//note12 

Hbase操作命令复杂
Hbase数据建模问题

关系型数据库的弱点

CAP定律:
NoSQL运动
NoSQL数据库家族
    redis一半内存一半硬盘
    列式数据库在数据分析时工作特别快

满足一致性,可用性的系统

Redis
    key-value类型的数据库
Hbase
    不能group by等连接
Cassandra
MongoDB
    擅长处理非结构化数据
Neo4J
    适用于社交网站


NoSQL与CAP
    密切相关的。真的要做成分布式的话必须要在其中放弃一种,一般都选一致性

Hbase存储架构理解
    Key Length Value Length Key Value     //key,value的长度比较重要

什么情况下使用Hbase?
     成熟的数据分析主题,查询模式已经确立并且不轻易改变
     传统的关系型数据库已经无法承受负荷,高速插入,大量读取 
     适合海量的,但同时也是简单的操作(例如key-value)

关系型数据库的困难
模式设计
Hbase:表设计与查询实现
搜索优化:
    u-t
    t-u

辅助索引
复合行键设计

//note13 数据集成
Sqoop
    mysql hadoop 连接
Flume    
Chukwa
    日志收集
ODCH/OLH
    oracle hadoop 连接

Oracle大数据连接器
Sqoop
    SQL-to-HDFS工具
    JDBC

hadoop-0.20.2下Squoop是不支持此版本的
配置
sqoop命令选项:
    % sqoop help
    % sqoop help import

从mysql导入数据的例子
% sqoop import --connect jdbc:mysql://localhost/hadoopguide \     
        //连入mysql
>--table widgets -m 1
% hadoop fs -cat widgets/part-m-00000
//间隔符用的,
导入到Hbase
sqoop import --connect jdbc:mysql//mysqlserver_IP/databaseName --table datatable --hbase-create-table --hbase-table hbase_tablename --column-family col_fam_name --hbase-row-key key_col_name
其中,databaseName和datatable是mysql的数据库和表名,hbase_tablename是要导成hbase的表名,key_col_name可以指定datatable中哪一列作为hbase新表的rowkey,col_fam_name是除rowkey之外的所有列的列族名

从oracle导入数据
需要有ojdbc6.jar放在$SQOOP_HOME/lib里,不需要添加到classpath
connecturl=jdbc:oracle:thin:@172.7.10.16:1521:orcl
oraclename=scott
oraclepassword=wang123456
oracleTableName=test
#需要从oracle中导入的表中的字段名
columns=ID,STATE
#导出到HDFS后的存放路径
hdfsPath=/tmp/

sqoop import --append --connect $CONNECTURL --username $ORACLENAME --password $ORACLEPASSWORD --m 1 --table $oracleTableName --columns $columns --hbase-create-table --hbase-table orl --hbase-row-key STATE --column-family orl

oracle big data connectors
    HDFS直接连接器
        可以把带有分隔符的文件作为oracle的外部表访问
        还可以直接hadoop的文本文件作为数据源
    hadoop装载器
        直接把hadoop里面的东西装载过去


Oracle HDFS直接连接器(ODCH)实验
    Oracle Enterprise Linux

配置hdfs_steam script文件
...    (hadoop_v4_13d)

/logs
/extdir

!cat lab4.2_setup_DB_dir.sql
set echo on
    create or replace directory ODCH_LOG_DIR as '/home/hadoop/.../logs'
    grant read,write on directory ODCH_LOG_DIR to SCOTT;
@lab4.2_setup_DB_dir.sql
sqlplus scott/tiger
!cat lab4.3_ext_tab.sql
SQL创建外部表
preprocessor HDFS_BIN_PATH:hdfs_stream         //先预处理数据再读

PROMPT>sqlplus scott/tiger
select count(*) from odch_ext_table;
set autotrace trace exp     //设置追踪,来观察执行计划
select count(*) from odch_ext_para_table;

CDN加速
Flume
    提供分布式,可靠和高可用的海量日志采集,聚合和传输的系统
Chukwa


//note14 扩展开发,与应用集成
UDF     (用户定义函数) (reg:Pig,Hive)
Thrift接口
Rhadoop

UDF
    写个自定义jar
    create temporary function strip as 'com.hadoopbook.hive.[ClassName]'
    % hive --auxpath /path/to/hive-example.jar
    select xxx('name') from student

    filter:
    jar
    register pig-examples.jar
    grunt>filter.... com.hadoopbook.pig.xxxx(xxx);
    DEFINE isGood com.hadoopbook.pig.xxxx();
    然后就可以用 isGood(xxx);

应用与Hbase的对接:通过Thrift
    Thrift是一个跨语言的服务部署框架
        通过一个中间语言(IDL,接口定义语言)来定义RPC的接口和数据类型


//note15     与应用层连接
并行计算框架
    MPI
    PVM
    Mesos
    Map-Reduce

YARN
    可以同时支持Map-Reduce,Storm,Spark,MPI等多种流行计算模型
Spark

YARN配置

//note16 hadoop源代码


//note17 hadoop与机器学习
Hadoop与机器学习
Mahout     (封装各种算法)(天生适合做离线数据分析)
Hadoop在互联网企业中的应用

spark基于内存来计算。成本比hadoop高
不要求实时得出结果,可以不选spark
spark不太好转,跟java几乎无关

Mahout     (在Data Mining上)
数据金字塔:从下往上
Making Decisions (决策层)
Data Presentations (数据展示层)
Data Mining     (数据挖掘,建立数学模型/算法,找到一种合适的算法)
Data Exploration     (对数据进行简单的查询)
Data Warehouses/Data Marts      ETL(数据仓库)
Data Sources (数据源)


回归
    样本这里叫学习集
    用来做预测
分类器
    决策树
    贝叶斯分类器(顾客流失)(自己设置阈值)(文本分类)(搜索引擎判断两篇文章是否一致,概率多高)
有学习集的进行特征提取就可以自动完成
聚类(没有学习集)
    (层次聚类法)

数据挖掘
数据分析
    SAS,R,SPSS
    SAS数据是经过检验的,R就不太可靠,SAS主要应用于金融
传统数据分析工具的困境
    处理数据受限于内存,因此无法处理海量数据(R处理上限可能是100万)(R跟SAS处理不能超过内存数,不然会机器异常)
    ...
    (聚类,推荐系统就无法使用抽样)
    解决方向:hadoop集群和Map-Reduce并行计算

常见算法的Map-Reduce化
    样本独立性比较强的就可以map-reduce

Lucene 早期搜索引擎的项目

Mahout的特点:

下载和解压Mahout
wget http://mirrors.cnnic.cn/apache/mahout/0.6/mahout-distribution-0.6.tar.gz
tar xzf ./mahout-distribution-0.6.tar.gz
配置环境变量
export HADOOP_HOME=/home/huang/hadoop-1.1.2
export HADOOP_CONF_DIR=/home/huang/hadoop-1.1.2/conf
export MAHOUT_HOME=/home/huang/hadoop-1.1.2/mahout-distribution-0.6
export MAHOUT_CONF_DIR=/home/huang/hadoop-1.1.2/mahout-distribution-0.6/conf 
export PATH=$PATH:$MAHOUT_HOME/conf:$MAHOUT_HOME/bin

几个重要环境变量
JAVA_HOME mahout运行需指定jdk的目录 MAHOUT_JAVA_HOME指定此变量可覆盖JAVA_HOME值
HADOOP_HOME 如果配置,则在hadoop分布式平台上运行,否则单机运行 HADOOP_CONF_DIR指定hadoop的配置文件目录
MAHOUT_LOCAL 如果此变量值不为空,则单机运行mahout。 
MAHOUT_CONF_DIR mahout配置文件的路径,默认值是$MAHOUT_HOME/src/conf MAHOUT_HEAPSIZE mahout运行时可用的最大heap大小    (堆大小)

验证安装成功
bin/mahout

源码和部分样本数据
    装的时候要装源代码包(即-src的包)
将测试数据copy到HDFS
hadoop/bin/hadoop fs -mkdir ./testdata
hadooop/bin/hadoop fs -put ./synthetic_control.data ./testdata
hadoop/bin/hadoop fs -ls ./testdata
做一个kmeans测试(聚类测试)
mahout org.apache.mahout.clustering.syntheticcontrol.kmeans.Job
观察输出
用mahout输出
mahout vectordump --seqFile ./output/data/part-m-00000


20Newsgroups数据集
使用Mahout进行文本自动分类

上传并解压数据
20news-bydate-test 测试数据
20news-dydate-train 训练数据

建立训练集
mahout org.apache.mahout.classifier.bayes.PrepareTwentyNewsgroups \ 
-p /home/huang/data/20news-bydate-train \
-o /home/huang/data/bayes-test-input \     (结果输出到了本地。。)
-a org.apache.mahout.vectorizer.DefaultAnalyzer \
-c UTF-8
(作了分词什么的。。)

上传到HDFS
cd ../hadoop-1.1.2
bin/hadoop fs -mkdir ./20news
bin/hadoop fs -put ../data/bayes-train-input ./20news
bin/hadoop fs -put ../data/bayes-test-input ./20news

训练贝叶斯分类器
mahout trainclassifier \
-i /user/huang/20news/bayes-train-input \ 
-o /user/huang/20news/newsmodel \  (放输出的模型,即统计参数数据)
-type cbayes
-ng 2 \
-source hdfs

生成的模型
bin/hadoop fs -ls ./20news/newsmodel     (里面放了一堆模型数据)

测试贝叶斯分类器
mahout testclassifier \
-m /user/huang/20news/newsmodel \
-d /user/huang/20news/bayes-test-input \ 
-type cbayes
-ng 2 \
-source hdfs \
-method mapreduce

京东
部门结构
 运维团队(负责管理维护集群的正常运行)
 数据仓库团队(根据业务部门的要求进行数据统计和查询)
 成都研究院(负责底层,包括源代码修改和按上层部门要求开发 Map-Reduce程序,比如一些UDF)

淘宝
对Hadoop源码的修改
管理模式
准实时的流数据处理技术
 从Oracle, Mysql日志直接读取数据
 部分数据源来自应用消息系统
 以上数据经由Meta+Storm的流数据处理,写入HDFS,实现实时或准实时的数据分析 
 数据装载到Hive进行处理,结果写回Oracle和Mysql数据库

Oceanbase

百度
 日志的存储和统计;
 网页数据的分析和挖掘;
 商业分析,如用户的行为和广告关注度等;
 在线数据的反馈,及时得到在线广告的点击情况;
 用户网页的聚类,分析用户的推荐度及用户之间的关联度。
-------------本文结束期待您的评论-------------