- 浏览: 59789 次
- 性别:
- 来自: 北京
最新评论
-
scu_cxh:
您好,我在学习hadoop方面的东西,想做一个对task监控的 ...
JobClient应用概述 -
bennie19870116:
看不到图呢...
Eclipse下配置使用Hadoop插件
一、背景
排序对于MR来说是个核心内容,如何做好排序十分的重要,这几天写了一些,总结一下,以供以后读阅。
二、准备
1、hadoop版本是0.20.2
2、输入的数据格式(这个很重要,看清楚格式),名称是secondary.txt:
abc 123
acb 124
cbd 523
abc 234
nbc 563
fds 235
khi 234
cbd 675
fds 971
hka 862
ubd 621
khi 123
fds 321
仔细看下,数据文件第一列是字母,第二列是数字,我要做的就是结合这组数据进行一些排序的测试。
3、代码框架,因为接下来的测试改动都是针对部分代码的修改,框架的代码是不会改变的,所以先把主要代码贴在这里。
代码分为2部分:自定义的key和主框架代码(注意看下红色部分)。先贴上主框架代码:
MyGrouping.java
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.WritableComparator;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Partitioner;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
import com.run.lenged.business.TextPair;
public class MyGrouping {
/**
* Map
*
* @author Administrator
*/
public static class MyGroupingMap extends Mapper<LongWritable, Text, TextPair, Text> {
protected void map(LongWritable key, Text value,
org.apache.hadoop.mapreduce.Mapper<LongWritable, Text, TextPair, Text>.Context context)
throws java.io.IOException, InterruptedException {
String arr[] = value.toString().split("\t");
if (arr.length != 2) {
return;
}
TextPair tp = new TextPair();
tp.set(new Text(arr[0]), new Text(arr[1]));
context.write(tp, new Text(arr[1]));
}
}
/**
* 按照Hashcode值来进行切分
*
* @author Administrator
*/
public static class MyGroupingPartition extends Partitioner<TextPair, Text> {
@Override
public int getPartition(TextPair key, Text value, int numPartitions) {
return (key.hashCode() & Integer.MAX_VALUE) % numPartitions;
}
}
/**
* group进行排序
*
* @author Administrator
*/
@SuppressWarnings("unchecked")
public static class MyGroupingGroup extends WritableComparator {
//代码变动部分
}
/**
* reduce
*
* @author Administrator
*/
public static class MyGroupingReduce extends Reducer<TextPair, Text, Text, Text> {
protected void reduce(TextPair key, java.lang.Iterable<Text> value,
org.apache.hadoop.mapreduce.Reducer<TextPair, Text, Text, Text>.Context context)
throws java.io.IOException, InterruptedException {
StringBuffer sb = new StringBuffer();
while (value.iterator().hasNext()) {
sb.append(value.iterator().next().toString() + "_");
}
context.write(key.getFirst(), new Text(sb.toString().substring(0, sb.toString().length() - 1)));
}
}
public static void main(String args[]) throws Exception {
Configuration conf = new Configuration();
GenericOptionsParser parser = new GenericOptionsParser(conf, args);
String[] otherArgs = parser.getRemainingArgs();
if (args.length != 2) {
System.err.println("Usage: NewlyJoin <inpath> <output>");
System.exit(2);
}
Job job = new Job(conf, "MyGrouping");
// 设置运行的job
job.setJarByClass(MyGrouping.class);
// 设置Map相关内容
job.setMapperClass(MyGroupingMap.class);
job.setMapOutputKeyClass(TextPair.class);
job.setMapOutputValueClass(Text.class);
job.setPartitionerClass(MyGroupingPartition.class);
job.setGroupingComparatorClass(MyGroupingGroup.class);
// 设置reduce
job.setReducerClass(MyGroupingReduce.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
// 设置输入和输出的目录
FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
// 执行,直到结束就退出
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
TextPair.java
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.WritableComparable;
public class TextPair implements WritableComparable<TextPair> {
private Text first;
private Text second;
public TextPair() {
set(new Text(), new Text());
}
public void set(Text first, Text second) {
this.first = first;
this.second = second;
}
public Text getFirst() {
return first;
}
public Text getSecond() {
return second;
}
@Override
public void readFields(DataInput in) throws IOException {
first.readFields(in);
second.readFields(in);
}
@Override
public void write(DataOutput out) throws IOException {
first.write(out);
second.write(out);
}
@Override
public int compareTo(TextPair o) {
int cmp = first.compareTo(o.first);
if (cmp != 0) {
return cmp;
} else {
return second.compareTo(o.second);
}
}
}
三、测试前提
1、首先提一个需求,我们结合需求来测试,然后再扩散开。
需求内容是:如果第一列值相同,第二列值叠加,并对第二列值进行升序排序。最后输出的时候,按照第一列值的升序排序输出。
2、需求实现。
根据上面的需求,我们可以分析一下:
需要对第一个字段和第二个字段都进行排序,那么单纯的利用MR框架对key迭代输出,value累加是不行的。因为value是没有进行排序。
所以我们需要做一些改动,定义key为符合组建。TextPair.java类就是自定义的key。
一般来说如果要对key和value同时做排序,那么,自定义的组合key的格式第一个值是第一个字段,第二个值就是第二个字段。
3、那么我们就定义一个job.setGroupingComparatorClass(MyGroupingGroup.class);代码如下:
public static class MyGroupingGroup extends WritableComparator {
protected MyGroupingGroup() {
super(TextPair.class, true);
}
@Override
public int compare(WritableComparable a, WritableComparable b) {
TextPair mip1 = (TextPair) a;
TextPair mip2 = (TextPair) b;
return mip1.getFirst().compareTo(mip2.getFirst());
}
}
只对输出的复合组建第一项值进行排序。输出的结果如下:
abc 123_234
acb 124
cbd 523_675
fds 235_321_971
hka 862
khi 123_234
nbc 563
ubd 621
4、查看结果,我们可以看出,基本满足了上面的需求。那么接下来,我们就将做个测试,来实现一下MR的排序功能。
四、Group按第二个字段值进行排序测试
1、修改一下group的排序方式,针对第二个值进行合并排序,代码如下:
public static class MyGroupingGroup extends WritableComparator {
protected MyGroupingGroup() {
super(TextPair.class, true);
}
@Override
public int compare(WritableComparable a, WritableComparable b) {
TextPair mip1 = (TextPair) a;
TextPair mip2 = (TextPair) b;
return mip1.getSecond().compareTo(mip2.getSecond());
//return mip1.getFirst().compareTo(mip2.getFirst());
}
}
2、reduce的输出稍微改下,将第2个字段也输出,方便查看,代码如下:
context.write(key.getFirst(), new Text(sb.toString().substring(0, sb.toString().length() - 1)));
reduce输出的结果:
abc_123 123
abc_234 234
acb_124 124
cbd_523 523
cbd_675 675
fds_235 235
fds_321 321
fds_971 971
hka_862 862
khi_123 123
khi_234 234
nbc_563 563
ubd_621 621
3、看到结果,第一反应就是没有按照我的要求,按第二个值进行排序操作。
其实不是,这个结果确实是进行了group的排序,只是说遇到没有符合合并结果数据。所以,看起来没有进行排序。
在这里有个概念,就是group到底是在什么时候做的排序,原文是这样写的:
Job.setGroupingComparatorClass(Class<? extends RawComparator> cls)
Define the comparator that controls which keys are grouped together
for a single call to Reducer.reduce(Object, Iterable, org.apache.hadoop.mapreduce.Reducer.Context)
我尝试翻译了一下(英文水平实在是有限,不对的地方还望各位指出):
在一个reduce的调用过程中,定义一个comparator,对分组在一起的key进行排序。
通过上面这句话就可以理解,为什么khi_123 123和abc_123 123没有叠加在一起。
五、总结
1、这里只写了group的排序,没有写sort,后面将会写一个,说不定就是今天晚上吧!
2、过几天写个MR的执行流程,并画个图,贴出来大家看看。
3、对于这块的排序我也是接触不久,可能有写的不对的地方。还望朋友们跟贴指出来。
4、如果有疑问或是不好跟贴,可以发邮件交流:dajuezhao@gmail.com
发表评论
-
Hadoop的基准测试工具使用(部分转载)
2011-01-21 11:58 1565一、背景由于以前没有 ... -
分布式集群中的硬件选择
2011-01-21 11:58 1004一、背景最近2个月时间一直在一个阴暗的地下室的角落里工作,主要 ... -
Map/Reduce的内存使用设置
2011-01-21 11:57 1608一、背景今天采用10台 ... -
Hadoop开发常用的InputFormat和OutputFormat(转)
2011-01-21 11:55 1458Hadoop中的Map Reduce框架依 ... -
SecondaryNamenode应用摘记
2010-11-04 15:54 1030一、环境 Hadoop 0.20.2、JDK 1.6、 ... -
Zookeeper分布式安装手册
2010-10-27 09:41 1296一、安装准备1、下载zookeeper-3.3.1,地址:ht ... -
Hadoop分布式安装
2010-10-27 09:41 983一、安装准备1、下载hadoop 0.20.2,地址:http ... -
Map/Reduce使用杂记
2010-10-27 09:40 909一、硬件环境1、CPU:Intel(R) Core(TM)2 ... -
Hadoop中自定义计数器
2010-10-27 09:40 1497一、环境1、hadoop 0.20.22、操作系统Linux二 ... -
Map/Reduce中的Partiotioner使用
2010-10-27 09:39 886一、环境1、hadoop 0.20.22 ... -
Map/Reduce中的Combiner的使用
2010-10-27 09:38 1156一、作用1、combiner最基本是实现本地key的聚合,对m ... -
Hadoop中DBInputFormat和DBOutputFormat使用
2010-10-27 09:38 2401一、背景 为了方便MapReduce直接访问关系型数据 ... -
Hadoop的MultipleOutputFormat使用
2010-10-27 09:37 1650一、背景 Hadoop的MapReduce中多文件输出默 ... -
Map/Reduce中公平调度器配置
2010-10-27 09:37 1507一、背景一般来说,JOB ... -
无法启动Datanode的问题
2010-10-27 09:37 2367一、背景早上由于误删namenode上的hadoop文件夹,在 ... -
Map/Reduce中分区和分组的问题
2010-10-27 09:35 1103一、为什么写分区和分组在排序中的作用是不一样的,今天早上看书, ... -
关于Map和Reduce最大的并发数设置
2010-10-27 09:34 1216一、环境1、hadoop 0.20.22、操作系统 Linux ... -
关于集群数据负载均衡
2010-10-27 09:33 857一、环境1、hadoop 0.20.22、操作系统 Linux ... -
Map/Reduce执行流程简述
2010-10-27 09:33 957一、背景最近总在弄MR的东西,所以写点关于这个方面的内容,总结 ... -
Hadoop集群中关于SSH认证权限的问题
2010-10-27 09:32 868今天回北京了,想把在外地做的集群移植回来,需要修改ip地址和一 ...
相关推荐
Map/Reduce介绍。一些基本基础介绍。
Map/Reduce:大规模集群上的简化数据处理中文翻译,但也有一些语句翻译不到位,请谅解。希望能够对大家有帮助。
讲述了Windows平台的Hadoop安装... 最后,以最简单的求和为例,剖析Hadoop的Map/Reduce工作机制,对于初学Hadoop及Map/Reduce的读者有很大的帮助。相信通过最简单的求和为例,读者可步入Hadoop的Map/Reduce开发者行列。
hadoop中map/reduce自学资料合集
在solr文献检索中用map/reduce
【摘要】在对Map/Reduce算法进行分析的基础上,利用开源Hadoop软件设计出高容错高性能的分布式搜索引擎,以面对搜索引擎对海量数据的处理和存储问题。
win7_64eclispe插件 解决An internal error occurred during: "Map/Reduce location status updater". org/codehaus/jackson/map/JsonMappingException 重新编译包
云计算的三大技术——Bigtable,Map/Reduce等的研究文章。。。。。。。。。
基于Map/Reduce的改进选择算法在云计算的Web数据挖掘中的研究.pdf
现有student.txt和student_score.txt。将两个文件上传到hdfs上。使用Map/Reduce框架完成下面
在对Map/Reduce算法进行分析的基础上,利用开源Hadoop软件设计出高容错高性能的分布式搜索引擎,以面对搜索引擎对海量数据的处理和存储问题
针对海量流数据的在线处理需求,提出一种不同于传统Map/Reduce流数据处理的系统模型Flexible workflow.该模型对workflow处理单元进行在线Map/Reduce并行化,实现了SPATE系统;同时为该系统定义一组关于作业的建立、管理...
hadoop开发文档
NULL 博文链接:https://roserouge.iteye.com/blog/733149
不过本文的Skynet没这么恐怖,它是一个ruby版本的Google Map/Reduce框架的名字而已。 Google的Map/Reduce框架实在太有名气了,他可以把一个任务切分为很多份,交给n台计算机并行执行,返回的结果再并行的归并,最后...
本文在研究BIRCH算法、规则关联算法、Hadoop的map/reduce机制的基础上,提 出了一种基于map/reduce的应用于网络安全事件分析的并行关联方法。一方面,通过对BIRCH 算法的改进,在BIRCH的分层次思想中引入预定义的...
NULL 博文链接:https://sgq0085.iteye.com/blog/1879442
本项目为一个Hadoop课程设计,使用Java语言和map/reduce实现贝叶斯文本分类器。项目的具体内容如下:1:用MapReduce算法实现贝叶斯分类器的训练过程,并输出训练模型; 2:用输出的模型对测试集文档进行分类测试。...
如果你读过Google的那篇大名鼎鼎的论文“MapReduce: Simplified Data Processing on Large Clusters”,你就能大概明白map/reduce的概念。 我们先看map。map()函数接收两个参数,一个是函数,一个是序列,map将传入...