整合 Tachyon 运行 Spark
兼容性
如果你计划运行Spark 结合使用 Tachyon,以下版本的搭配将共同开箱即用。如果你计划使用不同版本而不是默认支持版本。请重新编译Spark 的正确版本,通过在 spark/core/pom.xml 中改变 tachyon-client 的版本。
Spark Version | Tachyon Version |
---|---|
1.0.x and Below | v0.4.1 |
1.1.x | v0.5.0 |
1.2.x | v0.5.0 |
1.3.x | v0.5.0 |
1.4.x | v0.6.4 |
1.5.x and Above | v0.7.1 |
Tachyon的输入/输出数据
这些额外的先决条件是Spark(0.6 或者更高版本)。我们还猜测到用户运行 Tachyon 0.7.1 或更高版本,按照这些指导 或 并已建立的Tachyon和Hadoop。
We also assume that the user is running on Tachyon 0.7.1 or later and has set up Tachyon and Hadoop in accordance to these guides or .
如果你运行Spark 版本低于 1.0.0, 请增加以下内容给 spark/conf/spark-env.sh:
export SPARK_CLASSPATH=/pathToTachyon/client/target/tachyon-client-0.7.1-jar-with-dependencies.jar:$SPARK_CLASSPATH
如果运行一个Hadoop 1.x 集群,创建新文件 spark/conf/core-site.xml 增加以下内容:
fs.tachyon.impl tachyon.hadoop.TFS
把一个文件X 放入HDFS运行 Spark shell:
$ ./spark-shell$ val s = sc.textFile("tachyon://localhost:19998/X")$ s.count()$ s.saveAsTextFile("tachyon://localhost:19998/Y")
浏览器中看看 . 应该有一个输出文件Y包含 x 文件中一定数量单词 。放文件 X 放置到HDFS运行Spark shell:
如果你使用sbt 或者从别的框架类似使用sbt 方式触发Spark Job:
val conf = new SparkConf()val sc = new SparkContext(conf)sc.hadoopConfiguration.set("fs.tachyon.impl", "tachyon.hadoop.TFS")
如果你将 tachyon 结合 zookeeper 容错模式运行,Hadoop 集群是 1.x 集群,另外添加新条目前创建 spark/conf/core-site.xml :
fs.tachyon-ft.impl tachyon.hadoop.TFSFT
添加以下内容在 spark/conf/spark-env.sh
:
export SPARK_JAVA_OPTS=" -Dtachyon.zookeeper.address=zookeeperHost1:2181,zookeeperHost2:2181 -Dtachyon.usezookeeper=true $SPARK_JAVA_OPTS"
将文件放入 HDFS中,当运行一个Spark Shell,你现在可以指向任何一个Tachyon master:
$ ./spark-shell$ val s = sc.textFile("tachyon-ft://stanbyHost:19998/X")$ s.count()$ s.saveAsTextFile("tachyon-ft://activeHost:19998/Y")
持久化Spark RDDs 到 Tachyon
这个特性,你需要运行Spark(1.0 或以上)和 Tachyon(0.4.1 或以上)。请参考 对这个功能的特性。你的Spark程序需要设置两个参数,spark.exernalBlockStore.url 和 spark.externalBlockStore.baseDir.externalBlockStore.url (默认情况下:tachyon://localhost:19998 ) 是Tachyon的文件系统中的TachyonStore的 URL地址。spark.externalBlockStore.baseDir(默认 java.io.tmpdir) 是Tachyon文件系统存储RDDs的基本目录。在Tachyon中可以以逗号分隔多个目录。为了持久化 RDD 到 Tachyon,你需要使用 StorageLevel.OFF_HEAP 参数,下面是以 Spark shell 的示例:
$ ./spark-shell$ val rdd = sc.textFile(inputPath)$ rdd.persist(StorageLevel.OFF_HEAP)
当Spark 应用正在运行,在Tachyon's WebUI 中看看 spark.externalBlockStore.baseDir
(默认URI 是 )。应该有一堆文件;他们是RDD 数据块。 目前,当Spark应用运行完成这些文件将被清除掉。
你也可以使用Tachyon 作为Spark 应用的 输入 和 输出 源。上面的部分已经显示了指令。