博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
Spark移动平均:时间序列数据平均值
阅读量:2489 次
发布时间:2019-05-11

本文共 9794 字,大约阅读时间需要 32 分钟。

一、内存排序

import org.apache.spark.SparkConfimport org.apache.spark.SparkContextobject MovingAverageInMemory {  def main(args: Array[String]): Unit = {    val sparkConf = new SparkConf().setAppName("MovingAverageInMemory").setMaster("local")    val sc = new SparkContext(sparkConf)    val window = 3    val input = "file:///media/chenjie/0009418200012FF3/ubuntu/gupiao1.txt"    val output = "file:///media/chenjie/0009418200012FF3/ubuntu/gupiao1"    val brodcastWindow = sc.broadcast(window)    val rawData = sc.textFile(input)    /*    * GOOG,2004-11-04,184.70      GOOG,2004-11-03,191.67      GOOG,2004-11-02,194.87      AAPL,2013-10-09,486.59      AAPL,2013-10-08,480.94      AAPL,2013-10-07,487.75      AAPL,2013-10-04,483.03      AAPL,2013-10-03,483.41      IBM,2013-09-30,185.18      IBM,2013-09-27,186.92      IBM,2013-09-26,190.22      IBM,2013-09-25,189.47      GOOG,2013-07-19,896.60      GOOG,2013-07-18,910.68      GOOG,2013-07-17,918.55    * */    val keyValue = rawData.map(line => {      val tokens = line.split(",")      (tokens(0), (tokens(1), tokens(2).toDouble))    })    /*    * (GOOG,(2004-11-04,184.7))      (GOOG,(2004-11-03,191.67))      (GOOG,(2004-11-02,194.87))      (AAPL,(2013-10-09,486.59))      (AAPL,(2013-10-08,480.94))      (AAPL,(2013-10-07,487.75))      (AAPL,(2013-10-04,483.03))      (AAPL,(2013-10-03,483.41))      (IBM,(2013-09-30,185.18))      (IBM,(2013-09-27,186.92))      (IBM,(2013-09-26,190.22))      (IBM,(2013-09-25,189.47))      (GOOG,(2013-07-19,896.6))      (GOOG,(2013-07-18,910.68))      (GOOG,(2013-07-17,918.55))    * */    val groupByStockSymbol = keyValue.groupByKey()    /*    * (IBM,CompactBuffer((2013-09-30,185.18), (2013-09-27,186.92), (2013-09-26,190.22), (2013-09-25,189.47)))      (GOOG,CompactBuffer((2004-11-04,184.7), (2004-11-03,191.67), (2004-11-02,194.87), (2013-07-19,896.6), (2013-07-18,910.68), (2013-07-17,918.55)))      (AAPL,CompactBuffer((2013-10-09,486.59), (2013-10-08,480.94), (2013-10-07,487.75), (2013-10-04,483.03), (2013-10-03,483.41)))    * */    val result = groupByStockSymbol.mapValues(values => {      val dateFormat = new java.text.SimpleDateFormat("yyyy-MM-dd")      val sortedValues = values.map(s => (dateFormat.parse(s._1).getTime.toLong, s._2)).toSeq.sortBy(_._1)      /*      * values:CompactBuffer((2013-09-30,185.18), (2013-09-27,186.92), (2013-09-26,190.22), (2013-09-25,189.47))        sortedValues:List((1380038400000,189.47), (1380124800000,190.22), (1380211200000,186.92), (1380470400000,185.18))        values:CompactBuffer((2004-11-04,184.7), (2004-11-03,191.67), (2004-11-02,194.87), (2013-07-19,896.6), (2013-07-18,910.68), (2013-07-17,918.55))        sortedValues:List((1099324800000,194.87), (1099411200000,191.67), (1099497600000,184.7), (1373990400000,918.55), (1374076800000,910.68), (1374163200000,896.6))        values:CompactBuffer((2013-10-09,486.59), (2013-10-08,480.94), (2013-10-07,487.75), (2013-10-04,483.03), (2013-10-03,483.41))        sortedValues:List((1380729600000,483.41), (1380816000000,483.03), (1381075200000,487.75), (1381161600000,480.94), (1381248000000,486.59))      *      * */      val queue = new scala.collection.mutable.Queue[Double]()      for (tup <- sortedValues) yield {        queue.enqueue(tup._2)        if (queue.size > brodcastWindow.value)          queue.dequeue        (dateFormat.format(new java.util.Date(tup._1)), (queue.sum / queue.size))      }    })    /*    * (IBM,List((2013-09-25,189.47), (2013-09-26,189.845), (2013-09-27,188.87), (2013-09-30,187.43999999999997)))      (GOOG,List((2004-11-02,194.87), (2004-11-03,193.26999999999998), (2004-11-04,190.41333333333333), (2013-07-17,431.64000000000004), (2013-07-18,671.31), (2013-07-19,908.61)))      (AAPL,List((2013-10-03,483.41), (2013-10-04,483.22), (2013-10-07,484.73), (2013-10-08,483.9066666666667), (2013-10-09,485.0933333333333)))    * */    val formattedResult = result.flatMap(kv => {      kv._2.map(v => (kv._1 + "," + v._1 + "," + v._2.toString()))    })    /*    * IBM,2013-09-25,189.47      IBM,2013-09-26,189.845      IBM,2013-09-27,188.87      IBM,2013-09-30,187.43999999999997      GOOG,2004-11-02,194.87      GOOG,2004-11-03,193.26999999999998      GOOG,2004-11-04,190.41333333333333      GOOG,2013-07-17,431.64000000000004      GOOG,2013-07-18,671.31      GOOG,2013-07-19,908.61      AAPL,2013-10-03,483.41      AAPL,2013-10-04,483.22      AAPL,2013-10-07,484.73      AAPL,2013-10-08,483.9066666666667      AAPL,2013-10-09,485.0933333333333    * */    formattedResult.saveAsTextFile(output)    sc.stop()  }}

2、

自定义分区器CompositeKeyPartitioner

import MovingAverage.CompositeKeyimport org.apache.spark.Partitionerclass CompositeKeyPartitioner(partitions: Int) extends Partitioner {  require(partitions >= 0, s"Number of partitions ($partitions) cannot be negative.")  def numPartitions: Int = partitions  def getPartition(key: Any): Int = key match {    case k: CompositeKey => math.abs(k.stockSymbol.hashCode % numPartitions)    case null            => 0    case _               => math.abs(key.hashCode % numPartitions)  }  override def equals(other: Any): Boolean = other match {    case h: CompositeKeyPartitioner => h.numPartitions == numPartitions    case _                          => false  }  override def hashCode: Int = numPartitions}

import org.apache.spark.SparkConfimport org.apache.spark.SparkContextobject MovingAverageInMemory {  def main(args: Array[String]): Unit = {    val sparkConf = new SparkConf().setAppName("MovingAverageInMemory").setMaster("local")    val sc = new SparkContext(sparkConf)    val window = 3    val input = "file:///media/chenjie/0009418200012FF3/ubuntu/gupiao1.txt"    val output = "file:///media/chenjie/0009418200012FF3/ubuntu/gupiao1"    val brodcastWindow = sc.broadcast(window)    val rawData = sc.textFile(input)    /*    * GOOG,2004-11-04,184.70      GOOG,2004-11-03,191.67      GOOG,2004-11-02,194.87      AAPL,2013-10-09,486.59      AAPL,2013-10-08,480.94      AAPL,2013-10-07,487.75      AAPL,2013-10-04,483.03      AAPL,2013-10-03,483.41      IBM,2013-09-30,185.18      IBM,2013-09-27,186.92      IBM,2013-09-26,190.22      IBM,2013-09-25,189.47      GOOG,2013-07-19,896.60      GOOG,2013-07-18,910.68      GOOG,2013-07-17,918.55    * */    val keyValue = rawData.map(line => {      val tokens = line.split(",")      (tokens(0), (tokens(1), tokens(2).toDouble))    })    /*    * (GOOG,(2004-11-04,184.7))      (GOOG,(2004-11-03,191.67))      (GOOG,(2004-11-02,194.87))      (AAPL,(2013-10-09,486.59))      (AAPL,(2013-10-08,480.94))      (AAPL,(2013-10-07,487.75))      (AAPL,(2013-10-04,483.03))      (AAPL,(2013-10-03,483.41))      (IBM,(2013-09-30,185.18))      (IBM,(2013-09-27,186.92))      (IBM,(2013-09-26,190.22))      (IBM,(2013-09-25,189.47))      (GOOG,(2013-07-19,896.6))      (GOOG,(2013-07-18,910.68))      (GOOG,(2013-07-17,918.55))    * */    val groupByStockSymbol = keyValue.groupByKey()    /*    * (IBM,CompactBuffer((2013-09-30,185.18), (2013-09-27,186.92), (2013-09-26,190.22), (2013-09-25,189.47)))      (GOOG,CompactBuffer((2004-11-04,184.7), (2004-11-03,191.67), (2004-11-02,194.87), (2013-07-19,896.6), (2013-07-18,910.68), (2013-07-17,918.55)))      (AAPL,CompactBuffer((2013-10-09,486.59), (2013-10-08,480.94), (2013-10-07,487.75), (2013-10-04,483.03), (2013-10-03,483.41)))    * */    val result = groupByStockSymbol.mapValues(values => {      val dateFormat = new java.text.SimpleDateFormat("yyyy-MM-dd")      val sortedValues = values.map(s => (dateFormat.parse(s._1).getTime.toLong, s._2)).toSeq.sortBy(_._1)      /*      * values:CompactBuffer((2013-09-30,185.18), (2013-09-27,186.92), (2013-09-26,190.22), (2013-09-25,189.47))        sortedValues:List((1380038400000,189.47), (1380124800000,190.22), (1380211200000,186.92), (1380470400000,185.18))        values:CompactBuffer((2004-11-04,184.7), (2004-11-03,191.67), (2004-11-02,194.87), (2013-07-19,896.6), (2013-07-18,910.68), (2013-07-17,918.55))        sortedValues:List((1099324800000,194.87), (1099411200000,191.67), (1099497600000,184.7), (1373990400000,918.55), (1374076800000,910.68), (1374163200000,896.6))        values:CompactBuffer((2013-10-09,486.59), (2013-10-08,480.94), (2013-10-07,487.75), (2013-10-04,483.03), (2013-10-03,483.41))        sortedValues:List((1380729600000,483.41), (1380816000000,483.03), (1381075200000,487.75), (1381161600000,480.94), (1381248000000,486.59))      *      * */      val queue = new scala.collection.mutable.Queue[Double]()      for (tup <- sortedValues) yield {        queue.enqueue(tup._2)        if (queue.size > brodcastWindow.value)          queue.dequeue        (dateFormat.format(new java.util.Date(tup._1)), (queue.sum / queue.size))      }    })    /*    * (IBM,List((2013-09-25,189.47), (2013-09-26,189.845), (2013-09-27,188.87), (2013-09-30,187.43999999999997)))      (GOOG,List((2004-11-02,194.87), (2004-11-03,193.26999999999998), (2004-11-04,190.41333333333333), (2013-07-17,431.64000000000004), (2013-07-18,671.31), (2013-07-19,908.61)))      (AAPL,List((2013-10-03,483.41), (2013-10-04,483.22), (2013-10-07,484.73), (2013-10-08,483.9066666666667), (2013-10-09,485.0933333333333)))    * */    val formattedResult = result.flatMap(kv => {      kv._2.map(v => (kv._1 + "," + v._1 + "," + v._2.toString()))    })    /*    * IBM,2013-09-25,189.47      IBM,2013-09-26,189.845      IBM,2013-09-27,188.87      IBM,2013-09-30,187.43999999999997      GOOG,2004-11-02,194.87      GOOG,2004-11-03,193.26999999999998      GOOG,2004-11-04,190.41333333333333      GOOG,2013-07-17,431.64000000000004      GOOG,2013-07-18,671.31      GOOG,2013-07-19,908.61      AAPL,2013-10-03,483.41      AAPL,2013-10-04,483.22      AAPL,2013-10-07,484.73      AAPL,2013-10-08,483.9066666666667      AAPL,2013-10-09,485.0933333333333    * */    formattedResult.saveAsTextFile(output)    sc.stop()  }}

转载地址:http://pkqrb.baihongyu.com/

你可能感兴趣的文章
log日志记录是什么
查看>>
<rich:modelPanel>标签的使用
查看>>
<h:commandLink>和<h:inputLink>的区别
查看>>
<a4j:keeyAlive>的英文介绍
查看>>
关于list对象的转化问题
查看>>
VOPO对象介绍
查看>>
suse创建的虚拟机,修改ip地址
查看>>
linux的挂载的问题,重启后就挂载就没有了
查看>>
docker原始镜像启动容器并创建Apache服务器实现反向代理
查看>>
docker容器秒死的解决办法
查看>>
管理网&业务网的一些笔记
查看>>
openstack报错解决一
查看>>
openstack报错解决二
查看>>
linux source命令
查看>>
openstack报错解决三
查看>>
乙未年年终总结
查看>>
子网掩码
查看>>
第一天上班没精神
查看>>
启动eclipse报错:Failed to load the JNI shared library
查看>>
eclipse安装插件的两种方式在线和离线
查看>>