Spark 的 shell 作為一個強(qiáng)大的交互式數(shù)據(jù)分析工具,提供了一個簡單的方式來學(xué)習(xí) API。它可以使用 Scala(在 Java 虛擬機(jī)上運(yùn)行現(xiàn)有的 Java 庫的一個很好方式) 或 Python。在 Spark 目錄里使用下面的方式開始運(yùn)行:
./bin/spark-shell
Spark 最主要的抽象是叫Resilient Distributed Dataset(RDD) 的彈性分布式集合。RDDs 可以使用 Hadoop InputFormats(例如 HDFS 文件)創(chuàng)建,也可以從其他的 RDDs 轉(zhuǎn)換。讓我們在 Spark 源代碼目錄從 README 文本文件中創(chuàng)建一個新的 RDD。
scala> val textFile = sc.textFile("README.md")
textFile: spark.RDD[String] = spark.MappedRDD@2ee9b6e3
RDD 的 actions 從 RDD 中返回值,transformations 可以轉(zhuǎn)換成一個新 RDD 并返回它的引用。讓我們開始使用幾個操作:
scala> textFile.count() // RDD 的數(shù)據(jù)條數(shù)
res0: Long = 126
scala> textFile.first() // RDD 的第一行數(shù)據(jù)
res1: String = # Apache Spark
現(xiàn)在讓我們使用一個 transformation,我們將使用 filter 在這個文件里返回一個包含子數(shù)據(jù)集的新 RDD。
scala> val linesWithSpark = textFile.filter(line => line.contains("Spark"))
linesWithSpark: spark.RDD[String] = spark.FilteredRDD@7dd4af09
我們可以把 actions 和 transformations 鏈接在一起:
scala> textFile.filter(line => line.contains("Spark")).count() // 有多少行包括 "Spark"?
res3: Long = 15
RDD actions 和 transformations 能被用在更多的復(fù)雜計算中。比方說,我們想要找到一行中最多的單詞數(shù)量:
scala> textFile.map(line => line.split(" ").size).reduce((a, b) => if (a > b) a else b)
res4: Long = 15
首先將行映射成一個整型數(shù)值產(chǎn)生一個新 RDD。 在這個新的 RDD 上調(diào)用 reduce
找到行中最大的個數(shù)。 map
和 reduce
的參數(shù)是 Scala 的函數(shù)串(閉包),并且可以使用任何語言特性或者 Scala/Java 類庫。例如,我們可以很方便地調(diào)用其他的函數(shù)聲明。 我們使用 Math.max()
函數(shù)讓代碼更容易理解:
scala> import java.lang.Math
import java.lang.Math
scala> textFile.map(line => line.split(" ").size).reduce((a, b) => Math.max(a, b))
res5: Int = 15
Hadoop 流行的一個通用的數(shù)據(jù)流模式是 MapReduce。Spark 能很容易地實(shí)現(xiàn) MapReduce:
scala> val wordCounts = textFile.flatMap(line => line.split(" ")).map(word => (word, 1)).reduceByKey((a, b) => a + b)
wordCounts: spark.RDD[(String, Int)] = spark.ShuffledAggregatedRDD@71f027b8
這里,我們結(jié)合 [flatMap](), [map]() 和 [reduceByKey]() 來計算文件里每個單詞出現(xiàn)的數(shù)量,它的結(jié)果是包含一組(String, Int) 鍵值對的 RDD。我們可以使用 [collect] 操作在我們的 shell 中收集單詞的數(shù)量:
scala> wordCounts.collect()
res6: Array[(String, Int)] = Array((means,1), (under,2), (this,3), (Because,1), (Python,2), (agree,1), (cluster.,1), ...)
Spark 支持把數(shù)據(jù)集拉到集群內(nèi)的內(nèi)存緩存中。當(dāng)要重復(fù)訪問時這是非常有用的,例如當(dāng)我們在一個小的熱(hot)數(shù)據(jù)集中查詢,或者運(yùn)行一個像網(wǎng)頁搜索排序這樣的重復(fù)算法。作為一個簡單的例子,讓我們把 linesWithSpark
數(shù)據(jù)集標(biāo)記在緩存中:
scala> linesWithSpark.cache()
res7: spark.RDD[String] = spark.FilteredRDD@17e51082
scala> linesWithSpark.count()
res8: Long = 15
scala> linesWithSpark.count()
res9: Long = 15
緩存 100 行的文本文件來研究 Spark 這看起來很傻。真正讓人感興趣的部分是我們可以在非常大型的數(shù)據(jù)集中使用同樣的函數(shù),甚至在 10 個或者 100 個節(jié)點(diǎn)中交叉計算。你同樣可以使用 bin/spark-shell
連接到一個 cluster 來替換掉編程指南中的方法進(jìn)行交互操作。
更多建議: