publicstaticclass CountMapper extends Mapper<Object, Text, Text, DoublePair> { private Text word = new Text(); private DoublePair pair = new DoublePair();
publicvoid map(Object key, Text value, Context context) throws IOException, InterruptedException { StringTokenizer itr = new StringTokenizer(value.toString()); while (itr.hasMoreTokens()) { word.set(itr.nextToken()); pair.setValue(1, 1); context.write(word, pair); } } }
//Reduce求平均心跳
publicstaticclass CalulateReducer extends Reducer<Text, DoublePair, Text, DoublePair> { DoublePair pair = new DoublePair(); Text text = new Text();
job.setOutputKeyClass(Text.class); job.setOutputValueClass(DoubleWritable.class); for (int i = 0; i < otherArgs.length - 1; ++i) { FileInputFormat.addInputPath(job, new Path(otherArgs[i]));//输出 } FileOutputFormat.setOutputPath(job, new Path( otherArgs[otherArgs.length - 1])); job.waitForCompletion(true);
val topN = 10; //配置环境 val conf = newSparkConf().setAppName("Mapreduce").setMaster("local") val sc = newSparkContext(conf)
//打印每个partition的topN val a = sc.textFile("file:///home/elliottqiann/Temp/a", 1) val b = sc.textFile("file:///home/elliottqiann/Temp/b", 1) val c = sc.textFile("file:///home/elliottqiann/Temp/c", 1) val union = a.union(b).union(c)
val union2 = union.map { x => (x.split(" ")(0), x.split(" ")(1).toInt)} .mapPartitions{iter => {PartitionSort.handing(iter, topN)}}
val xx = union2.collect()
//打印每个partition的topN for(x < - xx) { println(x) }
val tempResult = union2.sortBy(_._2, false).collect() val result = tempResult.dropRight(tempResult.size - topN)
val inFile=sc.textFile("/rate.txt") var rates=inFile. map (_. toDouble).toArray var length = rates.length-1 var yavg=(rates.sum-rates(0))/length var xavg = (rates.sum-rates(length))/length def myfunc(iter: Iterator[Double],sd:Int) : Double = { var SDsum=0.0; var temp=0.0; var pre= iter.next; while (iter.hasNext) { var cur = iter.next; if(sd == 1){ temp =(pre - cur)+(yavg - xavg); }elseif(sd == 2){ temp =(pre + cur)-(yavg + xavg); } var Xi = math.pow(temp,2)/2; SDsum+=Xi; pre = cur; } math.sqrt(SDsum/length) } var SD1= myfunc(rates.iterator,1) var SD2= myfunc(rates.iterator,2)