博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
MapReduce的手机流量统计的案例
阅读量:6863 次
发布时间:2019-06-26

本文共 8394 字,大约阅读时间需要 27 分钟。

程序:(另外一个关于单词计数的总结:)
1 import java.io.IOException;  2   3 import mapreduce.WordCountApp.WordCountMapper.WordCountReducer;  4   5 import org.apache.hadoop.conf.Configuration;  6 import org.apache.hadoop.fs.Path;  7 import org.apache.hadoop.io.LongWritable;  8 import org.apache.hadoop.io.Text;  9 import org.apache.hadoop.mapreduce.Job; 10 import org.apache.hadoop.mapreduce.Mapper; 11 import org.apache.hadoop.mapreduce.Reducer; 12 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; 13 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; 14  15 /** 16  * 以文本 17  * hello    you 18  * hello    me 19  * 为例子. 20  * map方法调用了两次,因为有两行 21  * k2 v2 键值对的数量有几个? 22  * 有4个.有四个单词. 23  *  24  * 会产生几个分组? 25  * 产生3个分组. 26  * 有3个不同的单词. 27  * 28  */ 29 public class WordCountApp { 30     public static void main(String[] args) throws Exception { 31         //程序在这里运行,要有驱动. 32         Configuration conf = new Configuration(); 33         Job job = Job.getInstance(conf,WordCountApp.class.getSimpleName()); 34          35         //我们运行此程序通过运行jar包来执行.一定要有这句话. 36         job.setJarByClass(WordCountApp.class); 37          38         FileInputFormat.setInputPaths(job,args[0]); 39          40         job.setMapperClass(WordCountMapper.class);//设置Map类 41         job.setMapOutputKeyClass(Text.class);//设置Map的key 42         job.setMapOutputValueClass(LongWritable.class);//设置Map的value 43         job.setReducerClass(WordCountReducer.class);//设置Reduce的类 44         job.setOutputKeyClass(Text.class);//设置Reduce的key Reduce这个地方只有输出的参数可以设置. 方法名字也没有Reduce关键字区别于Map 45         job.setOutputValueClass(LongWritable.class);//设置Reduce的value. 46          47         FileOutputFormat.setOutputPath(job, new Path(args[1])); 48         job.waitForCompletion(true);//表示结束了才退出,不结束不退出 49     } 50     /** 51      * 4个泛型的意识 52      * 第一个是LongWritable,固定就是这个类型,表示每一行单词的起始位置(单位是字节) 53      * 第二个是Text,表示每一行的文本内容. 54      * 第三个是Text,表示单词 55      * 第四个是LongWritable,表示单词的出现次数 56      */ 57     public static class WordCountMapper extends Mapper
{ 58 Text k2 = new Text(); 59 LongWritable v2 = new LongWritable(); 60 //增加一个计数器,这个Map调用几次就输出对应的次数. 61 int counter = 0; 62 63 64 /** 65 * key和value表示输入的信息 66 * 每一行文本调用一次map函数 67 */ 68 @Override 69 protected void map(LongWritable key, Text value,Mapper
.Context context) 70 throws IOException, InterruptedException { 71 counter = counter + 1; 72 System.out.println("mapper 调用的次数:" + counter); 73 //这个map方法中的Mapper的各个泛型和上面的意识是一样的,分别代表的是k1,v1,k2,v2 74 String line = value.toString(); 75 System.out.println(String.format("
的值<"+key.get()+","+line+">")); 76 String[] splited = line.split("\t"); 77 for (String word : splited) { 78 k2.set(word); 79 v2.set(1); 80 System.out.println(String.format("
的值<"+k2.toString()+","+v2.get()+">")); 81 context.write(k2, v2);//通过context对象写出去. 82 } 83 } 84 /** 85 * 这个地方的四个泛型的意思 86 * 前两个泛型是对应的Map方法的后两个泛型. 87 * Map的输出对应的是Reduce的输入. 88 * 第一个Text是单词 89 * 第二个LongWritable是单词对应的次数 90 * 我们想输出的也是单词 和 次数 91 * 所以第三个和第四个的类型和第一和第二个的一样 92 * 93 * 分组指的是把相同key2的value2放到一个集合中 94 * 95 */ 96 public static class WordCountReducer extends Reducer
{ 97 LongWritable v3 = new LongWritable(); 98 //增加一个计数器,这个Reduce调用几次就输出对应的次数. 99 int counter = 0;100 101 /**102 * 每一个分组调用一次reduce函数103 * 过来的k2 分别是hello you me104 * 105 */106 @Override107 protected void reduce(Text key2, Iterable
value2Iterable,Reducer
.Context context)109 throws IOException, InterruptedException {110 counter = counter + 1;111 System.out.println("reducer 调用的次数:" + counter);112 //第一个参数是单词,第二个是可迭代的集合. 为什么上面的LongWritable类型的对象value2变成了一个可以迭代的结合参数?113 //因为分组指的是把相同key2的value2放到一个集合中114 long sum = 0L;115 for (LongWritable value2 : value2Iterable) {116 System.out.println(String.format("
的值<"+key2.toString()+","+value2.toString()+">"));117 sum += value2.get(); //这个value2是LongWritable类型的,不能进行+= 操作,要用get()得到其对应的java基本类型.118 //sum表示单词k2 在整个文本中的出现次数.119 }120 v3.set(sum);121 context.write(key2, v3);122 System.out.println(String.format("
的值<"+key2.toString()+","+v3.get()+">"));123 }124 }125 }126 }

三:查看结果

打包上传到Hadoop集群,然后执行命令运行.详细运行过程不再写了.........

 

//==============================================================================================

程序二:

1 /*  2  * 一个hello文件内容如下:  3  *   hello        you  4  *   hello        me  5  */  6 import java.io.IOException;  7   8 import org.apache.hadoop.conf.Configuration;  9 import org.apache.hadoop.fs.Path; 10 import org.apache.hadoop.io.LongWritable; 11 import org.apache.hadoop.io.Text; 12 import org.apache.hadoop.mapreduce.Job; 13 import org.apache.hadoop.mapreduce.Mapper; 14 import org.apache.hadoop.mapreduce.Reducer; 15 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; 16 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; 17  18 public class WordCountApp { 19     public static void main(String[] args) throws Exception { 20         // 在main方法写驱动程序,把Map函数和Reduce函数组织在一起. 21         // 搞一个对象把Map对象和Reduce对象都放在这个对象中,我们把这个对象称作Job 22         // 两个形参,一个是Configuration对象,一个是Job的名称,这样获得了一个Job对象; 23         Job job = Job.getInstance(new Configuration(), 24                 WordCountApp.class.getSimpleName()); 25         // 对这个job进行设置 26         job.setJarByClass(WordCountApp.class);// 通过这个设置可以让框架识别你写的代码 27          28         job.setMapperClass(MyMapper.class);// 把自定义的Map类放到job中 29         job.setMapOutputKeyClass(Text.class);// 定义Map的key的输出类型,Map的输出是
30 job.setMapOutputValueClass(LongWritable.class);// 定义Map的value的输出类型 31 32 job.setReducerClass(MyReducer.class);// 把自定义的Reducer类放到job中 33 job.setOutputKeyClass(Text.class);// 因为Reduce的输出是最终的数据,Reduce的输出是
34 // 所以这个方法名中没有像Map对应的放发一样带有Reduce,直接就是setOutputKeyClass 35 job.setOutputValueClass(LongWritable.class);// 定义reduce的value输出 36 37 FileInputFormat.setInputPaths(job, args[0]);// 输入指定:传入一个job地址. 38 // 这个args[0] 就是新地址,"hdfs://192.168.0.170/hello" 39 FileOutputFormat.setOutputPath(job, new Path(args[1])); 40 // 输出指定 41 // 指定输入和输出路径可以通过在这里写死的方式,也可以通过main函数参数的形式 42 // 分别是args[0]和args[1] 43 44 // 把job上传到yarn平台上. 45 job.waitForCompletion(true); 46 } 47 48 /* 49 * 对于
而言,每一行产生一个
对,
表示
<行的起始位置,行的文本内容>
50 * 就本例而言map函数总共调用两次,因为总共只有两行. 51 * 正对要统计的文本内容可以知道总共两行,总共会调用两次Map函数对应产生的
分别是<0,hello you> 52 * 和第二个
是<10,hello me> 53 */ 54 private static class MyMapper extends 55 Mapper
{ 56 // 这个Mapper的泛型参数是
分别对应的是k1,v1,k2,v2 57 // 我们如下讲的k1,v1的类型是固定的. 58 // 就本例而言,map函数会被调用2次,因为总共文本文件就只有两行. 59 60 //要定义输出的k2和v2.本案例中可以分析出
是对文本内容的统计
61 //而且
的内容是和
中的内容是一样的. 62 Text k2 = new Text(); 63 LongWritable v2 = new LongWritable(); 64 //重写父类Mapper中的map方法 65 @Override 66 protected void map(LongWritable key, Text value, 67 Mapper
.Context context) 68 throws IOException, InterruptedException { 69 //通过代码或者案例分析就可以知道k1其实没有什么用出的. 70 String line = value.toString(); 71 String[] splited = line.split("\t");//根据制表分隔符机进行拆分.hello和me,you之间是一个制表分隔符. 72 for (String word : splited) { 73 k2.set(word); 74 v2.set(1); 75 context.write(k2, v2); 76 //用context把k2,v2写出去,框架会写,不用我们去管. 77 } 78 } 79 } 80 81 private static class MyReducer extends 82 Reducer
{ 83 //这个例子中的
中的k是一样的,所以这里,k2当做k3了. 84 LongWritable v3 = new LongWritable(); 85 @Override 86 protected void reduce(Text k2, Iterable
v2s, 87 Reducer
.Context context) 88 throws IOException, InterruptedException { 89 //Reduce是对上面Map中的结果进行汇总的. 90 //上面拆分出来的
Reduce方法中就要对其进行汇总. 91 long sum = 0L; 92 for(LongWritable v2:v2s){ 93 sum = sum +v2.get();//sum是long类型,v2是LongWritable类型 94 //LongWritable类型转换成long类型用get()方法. 95 //sum的值表示单词在整个文件中出现的中次数. 96 } 97 v3.set(sum); 98 context.write(k2,v3); 99 }100 }101 }

 

转载于:https://www.cnblogs.com/DreamDrive/p/5494866.html

你可能感兴趣的文章
基于koajs的web项目构建-心得篇
查看>>
通过minify将项目中js和css文件的打包
查看>>
提取Windows用户密钥文件cachedump
查看>>
自定义grains_module pillar
查看>>
log file sycn 概述
查看>>
Javascript对于不同浏览器的兼容性
查看>>
开源在线阅读技术资源
查看>>
慎用jQuery中的submit()方法
查看>>
淘宝样式初始化代码
查看>>
Gson解析json数据 亲自测试可用
查看>>
我与监控宝之间的点点滴滴
查看>>
delphi 数据库显示的TDBGrid配置
查看>>
对51CTO的看法
查看>>
userenv和sys_context函数
查看>>
是否会回到起点.回忆只能是回忆
查看>>
原创数据结构算法Flash动画演示课件-Action Script(AS)脚本实现
查看>>
基于Mysql主从同步的读写分离
查看>>
容灾备份技术的分类概述
查看>>
用乘法求圆周率
查看>>
自学使用sort他命令使用
查看>>