本文共 4268 字,大约阅读时间需要 14 分钟。
分组操作排序又称辅助排序,是在reducer操作之前对mapper传出的kv值进行shuffle操作并归并排序之后,对kv值进行的一系列操
作,比如对key中相同的一项或者两项相同的数据归为一组,以分组好的一个key值传入values。
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-WXkpGQTL-1605151279150)(https://s1.ax1x.com/2020/10/28/B8CrRJ.png)]
需求:
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-0yxeGDU1-1605151279156)(https://s1.ax1x.com/2020/10/28/B8Pul9.png)]
注意在这个实例中为分组,并不是实际意义上的按照id相同的分为一组,要注意到在归并排序好的kv中,id相同的kv值其实还是分开的,只是根据compre方法的特殊性判断下一个key是否按照业务逻辑相同,从而选择性的跳过找到最大的订单金额。
key封装对象:
package com.order.bean;//import org.apache.hadoop.hdfs.protocol.SnapshotInfo;import org.apache.hadoop.io.WritableComparable;import java.io.DataInput;import java.io.DataOutput;import java.io.IOException;public class orderBean implements WritableComparable{ private int order_id; private double price; public orderBean() { super(); } public orderBean(int order_id, double price) { super(); this.order_id = order_id; this.price = price; } @Override public int compareTo(orderBean bean) { int res; if (order_id>bean.order_id) { res = 1; } else if (order_id bean.price) { res = -1; } else if (price
mapper:
package com.order.bean;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.NullWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Mapper;import java.io.IOException;public class orderMapper extends Mapper{ orderBean k = new orderBean(); //NullWritable v = new NullWritable(); @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { //super.map(key, value, context); String[] line = value.toString().split(" "); k.setOrder_id(Integer.parseInt(line[0])); k.setPrice(Double.parseDouble(line[2])); context.write(k,NullWritable.get()); }}
GroupingCompator
package com.order.bean;import org.apache.hadoop.io.WritableComparable;import org.apache.hadoop.io.WritableComparator;import java.io.DataInput;import java.io.DataOutput;import java.io.IOException;public class orderGroupingComparator extends WritableComparator { protected orderGroupingComparator() { super(orderBean.class,true); } @Override public int compare(WritableComparable a, WritableComparable b) { int result; orderBean aBean = (orderBean)a; orderBean bBean = (orderBean)b; if (aBean.getOrder_id()>bBean.getOrder_id()) { result = 1; } else if (aBean.getOrder_id()
orderReducer:
package com.order.bean;import org.apache.hadoop.io.NullWritable;import org.apache.hadoop.mapreduce.Reducer;import java.io.IOException;public class orderReducer extends Reducer{ @Override protected void reduce(orderBean key, Iterable values, Context context) throws IOException, InterruptedException { context.write(key,NullWritable.get()); }}
驱动类:
package com.order.bean;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.NullWritable;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import java.io.IOException;public class orderDriver { public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { args = new String[]{ "D:/mapreduceinput/input1","D:/mapreduceoutput/outputgroup"}; Job job = Job.getInstance(new Configuration()); job.setJarByClass(orderDriver.class); job.setMapperClass(orderMapper.class); job.setReducerClass(orderReducer.class); job.setMapOutputKeyClass(orderBean.class); job.setMapOutputValueClass(NullWritable.class); job.setOutputKeyClass(orderBean.class); job.setOutputValueClass(NullWritable.class); job.setGroupingComparatorClass(orderGroupingComparator.class); FileInputFormat.setInputPaths(job,new Path(args[0])); FileOutputFormat.setOutputPath(job,new Path(args[1])); boolean result = job.waitForCompletion(true); System.exit(result?0:1); }}
转载地址:http://xicki.baihongyu.com/