RDD创建方式
1)从Hadoop文件系统(如HDFS、Hive、HBase)输入创建。
2)从父RDD转换得到新RDD。
3)通过parallelize或makeRDD将单机数据创建为分布式RDD。
4)基于DB(Mysql)、NoSQL(HBase)、S3(SC3)、数据流创建。
从集合创建RDD
parallelize
def parallelizeT(implicit arg0: ClassTag[T]): RDD[T]
从一个Seq集合创建RDD。
参数1:Seq集合,必须。
参数2:分区数,默认为该Application分配到的资源的CPU核数
scala> var rdd = sc.parallelize(1 to 10)
makeRDD
def makeRDDT(implicit arg0: ClassTag[T]): RDD[T]
这种用法和parallelize完全相同
def makeRDDT(implicit arg0: ClassTag[T]): RDD[T]
该用法可以指定每一个分区的preferredLocations。
scala> var rdd=sc.makeRDD(Seq((1 to 10)))
从外部存储创建RDD
textFile
//从hdfs文件创建.
//从hdfs文件创建
scala> var rdd = sc.textFile(“hdfs:///tmp/lxw1234/1.txt”)
//从本地文件创建
scala> var rdd = sc.textFile(“file:///etc/hadoop/conf/core-site.xml”)
注意这里的本地文件路径需要在Driver和Executor端存在。
从其他HDFS文件格式创建
hadoopFile
sequenceFile
objectFile
newAPIHadoopFile
从Hadoop接口API创建
hadoopRDD
newAPIHadoopRDD
比如:从HBase创建RDD
scala> var rdd = sc.parallelize(1 to 10)
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[15] at parallelize at
scala> rdd.foreach(println(_))
3
8
1
9
10
4
5
2
6
7
1.RDD -> Dataset
scala> val ds = rdd.toDS()
ds: org.apache.spark.sql.Dataset[Int] = [value: int]
scala> ds.foreach(println(_))
1
2
6
7
3
4
5
8
9
10
2.RDD -> DataFrame
val df = spark.read.json(rdd)
scala> val df=rdd.toDF()
df: org.apache.spark.sql.DataFrame = [value: int]
scala> df.foreach(println(_))
[1]
[3]
[2]
[6]
[7]
[4]
[8]
[9]
[10]
[5]
3.Dataset -> RDD
val rdd = ds.rdd
4.Dataset -> DataFrame
val df = ds.toDF()
5.DataFrame -> RDD
val rdd = df.toJSON.rdd
6.DataFrame -> Dataset
val ds = df.toJSON