解耦合的 HDP Spark 和 Hive 与 MinIO
1. 云原生架构
Kubernetes 在计算节点上弹性地管理无状态的 Spark 和 Hive 容器。Spark 与 Kubernetes 具有原生调度程序集成。Hive 由于遗留原因,在 Kubernetes 之上使用 YARN 调度程序。
所有对 MinIO 对象存储的访问均通过 S3/SQL SELECT API 进行。除了计算节点外,MinIO 容器还由 Kubernetes 作为有状态容器进行管理,本地存储 (JBOD/JBOF) 映射为持久本地卷。这种架构使多租户 MinIO 成为可能,允许在客户之间隔离数据。
MinIO 还支持类似于 AWS 区域和层的跨集群、跨站点联合。使用 MinIO 信息生命周期管理 (ILM),您可以配置数据在基于 NVMe 的热存储和基于 HDD 的温存储之间分层。所有数据都使用每个对象密钥进行加密。租户之间的访问控制和身份管理由 MinIO 使用 OpenID Connect 或 Kerberos/LDAP/AD 进行管理。
2. 先决条件
使用以下 指南 安装 Hortonworks 发行版。
使用以下指南之一安装 MinIO 分布式服务器。
3. 配置 Hadoop、Spark 和 Hive 以使用 MinIO
安装成功后,导航到 Ambari UI http://<ambari-server>:8080/
并使用默认凭据登录: [用户名:admin,密码:admin]
3.1 配置 Hadoop
导航到 **服务** -> **HDFS** -> **配置** -> **高级**,如下所示
导航到 **自定义 core-site** 以配置 MinIO 参数,用于 _s3a_
连接器
sudo pip install yq
alias kv-pairify='yq ".configuration[]" | jq ".[]" | jq -r ".name + \"=\" + .value"'
例如,假设有一组 12 个计算节点,总内存为 *1.2TiB*,我们需要对以下设置进行优化才能获得最佳结果。添加以下用于 *core-site.xml* 的最佳条目以使用 **MinIO** 配置 *s3a*。这里最重要的选项是
cat ${HADOOP_CONF_DIR}/core-site.xml | kv-pairify | grep "mapred"
mapred.maxthreads.generate.mapoutput=2 # Num threads to write map outputs
mapred.maxthreads.partition.closer=0 # Asynchronous map flushers
mapreduce.fileoutputcommitter.algorithm.version=2 # Use the latest committer version
mapreduce.job.reduce.slowstart.completedmaps=0.99 # 99% map, then reduce
mapreduce.reduce.shuffle.input.buffer.percent=0.9 # Min % buffer in RAM
mapreduce.reduce.shuffle.merge.percent=0.9 # Minimum % merges in RAM
mapreduce.reduce.speculative=false # Disable speculation for reducing
mapreduce.task.io.sort.factor=999 # Threshold before writing to drive
mapreduce.task.sort.spill.percent=0.9 # Minimum % before spilling to drive
S3A 是用于使用 S3 和其他与 S3 兼容的对象存储(如 MinIO)的连接器。MapReduce 工作负载通常与对象存储的交互方式与与 HDFS 的交互方式相同。这些工作负载依赖于 HDFS 原子重命名功能来完成将数据写入数据存储。对象存储操作本质上是原子的,它们不需要/不实现重命名 API。默认的 S3A 提交器通过复制和删除 API 模拟重命名。这种交互模式会导致由于写入放大而导致性能严重下降。例如,Netflix 开发了两个新的暂存提交器 - 目录暂存提交器和分区暂存提交器 - 以充分利用原生对象存储操作。这些提交器不需要重命名操作。对这两个暂存提交器以及另一个名为 Magic 提交器的新的添加进行了评估,以进行基准测试。
发现目录暂存提交器在三个提交器中速度最快,应使用以下参数配置 S3A 连接器以获得最佳结果
cat ${HADOOP_CONF_DIR}/core-site.xml | kv-pairify | grep "s3a"
fs.s3a.access.key=minio
fs.s3a.secret.key=minio123
fs.s3a.path.style.access=true
fs.s3a.block.size=512M
fs.s3a.buffer.dir=${hadoop.tmp.dir}/s3a
fs.s3a.committer.magic.enabled=false
fs.s3a.committer.name=directory
fs.s3a.committer.staging.abort.pending.uploads=true
fs.s3a.committer.staging.conflict-mode=append
fs.s3a.committer.staging.tmp.path=/tmp/staging
fs.s3a.committer.staging.unique-filenames=true
fs.s3a.connection.establish.timeout=5000
fs.s3a.connection.ssl.enabled=false
fs.s3a.connection.timeout=200000
fs.s3a.endpoint=http://minio:9000
fs.s3a.impl=org.apache.hadoop.fs.s3a.S3AFileSystem
fs.s3a.committer.threads=2048 # Number of threads writing to MinIO
fs.s3a.connection.maximum=8192 # Maximum number of concurrent conns
fs.s3a.fast.upload.active.blocks=2048 # Number of parallel uploads
fs.s3a.fast.upload.buffer=disk # Use drive as the buffer for uploads
fs.s3a.fast.upload=true # Turn on fast upload mode
fs.s3a.max.total.tasks=2048 # Maximum number of parallel tasks
fs.s3a.multipart.size=512M # Size of each multipart chunk
fs.s3a.multipart.threshold=512M # Size before using multipart uploads
fs.s3a.socket.recv.buffer=65536 # Read socket buffer hint
fs.s3a.socket.send.buffer=65536 # Write socket buffer hint
fs.s3a.threads.max=2048 # Maximum number of threads for S3A
其他优化选项将在以下链接中进行讨论
https://hadoop.apache.ac.cn/docs/current/hadoop-aws/tools/hadoop-aws/index.html
https://hadoop.apache.ac.cn/docs/r3.1.1/hadoop-aws/tools/hadoop-aws/committers.html
应用配置更改后,继续重新启动 **Hadoop** 服务。
3.2 配置 Spark2
导航到 **服务** -> **Spark2** -> **配置**,如下所示
导航到“自定义 spark-defaults” 以配置 MinIO 参数,用于 _s3a_
连接器
添加以下用于 *spark-defaults.conf* 的最佳条目以使用 **MinIO** 配置 Spark。
spark.hadoop.fs.s3a.access.key minio
spark.hadoop.fs.s3a.secret.key minio123
spark.hadoop.fs.s3a.path.style.access true
spark.hadoop.fs.s3a.block.size 512M
spark.hadoop.fs.s3a.buffer.dir ${hadoop.tmp.dir}/s3a
spark.hadoop.fs.s3a.committer.magic.enabled false
spark.hadoop.fs.s3a.committer.name directory
spark.hadoop.fs.s3a.committer.staging.abort.pending.uploads true
spark.hadoop.fs.s3a.committer.staging.conflict-mode append
spark.hadoop.fs.s3a.committer.staging.tmp.path /tmp/staging
spark.hadoop.fs.s3a.committer.staging.unique-filenames true
spark.hadoop.fs.s3a.committer.threads 2048 # number of threads writing to MinIO
spark.hadoop.fs.s3a.connection.establish.timeout 5000
spark.hadoop.fs.s3a.connection.maximum 8192 # maximum number of concurrent conns
spark.hadoop.fs.s3a.connection.ssl.enabled false
spark.hadoop.fs.s3a.connection.timeout 200000
spark.hadoop.fs.s3a.endpoint http://minio:9000
spark.hadoop.fs.s3a.fast.upload.active.blocks 2048 # number of parallel uploads
spark.hadoop.fs.s3a.fast.upload.buffer disk # use disk as the buffer for uploads
spark.hadoop.fs.s3a.fast.upload true # turn on fast upload mode
spark.hadoop.fs.s3a.impl org.apache.hadoop.spark.hadoop.fs.s3a.S3AFileSystem
spark.hadoop.fs.s3a.max.total.tasks 2048 # maximum number of parallel tasks
spark.hadoop.fs.s3a.multipart.size 512M # size of each multipart chunk
spark.hadoop.fs.s3a.multipart.threshold 512M # size before using multipart uploads
spark.hadoop.fs.s3a.socket.recv.buffer 65536 # read socket buffer hint
spark.hadoop.fs.s3a.socket.send.buffer 65536 # write socket buffer hint
spark.hadoop.fs.s3a.threads.max 2048 # maximum number of threads for S3A
应用配置更改后,继续重新启动 **Spark** 服务。
3.3 配置 Hive
导航到 **服务** -> **Hive** -> **配置** -> **高级**,如下所示
导航到“Custom hive-site”以配置_s3a_
连接器的 MinIO 参数
添加以下hive-site.xml
的最佳条目,以使用MinIO配置 Hive。
hive.blobstore.use.blobstore.as.scratchdir=true
hive.exec.input.listing.max.threads=50
hive.load.dynamic.partitions.thread=25
hive.metastore.fshandler.threads=50
hive.mv.files.threads=40
mapreduce.input.fileinputformat.list-status.num-threads=50
有关这些选项的更多信息,请访问 https://www.cloudera.com/documentation/enterprise/5-11-x/topics/admin_hive_on_s3_tuning.html
应用配置更改后,继续重新启动所有 Hive 服务。
4. 运行示例应用程序
在成功安装 Hive、Hadoop 和 Spark 后,我们现在可以继续运行一些示例应用程序,以查看它们是否配置正确。我们可以使用 Spark Pi 和 Spark WordCount 程序来验证我们的 Spark 安装。我们还可以探索如何从命令行和 Spark shell 运行 Spark 作业。
4.1 Spark Pi
通过运行以下计算密集型示例测试 Spark 安装,该示例通过“向圆形投掷飞镖”来计算 pi。该程序在单位正方形((0,0) 到 (1,1))中生成点,并计算有多少点落在正方形内的单位圆内。结果近似于 pi。
按照以下步骤运行 Spark Pi 示例
以用户“spark”身份登录。
作业运行时,库现在可以在中间处理过程中使用MinIO。
导航到具有 Spark 客户端的节点并访问 spark2-client 目录
cd /usr/hdp/current/spark2-client
su spark
以 yarn-client 模式运行 Apache Spark Pi 作业,使用来自org.apache.spark的代码
./bin/spark-submit --class org.apache.spark.examples.SparkPi \
--master yarn-client \
--num-executors 1 \
--driver-memory 512m \
--executor-memory 512m \
--executor-cores 1 \
examples/jars/spark-examples*.jar 10
该作业应生成如下所示的输出。注意输出中 pi 的值。
17/03/22 23:21:10 INFO DAGScheduler: Job 0 finished: reduce at SparkPi.scala:38, took 1.302805 s
Pi is roughly 3.1445191445191445
还可以通过导航到 YARN ResourceManager Web UI 并单击作业历史记录服务器信息,在浏览器中查看作业状态。
4.2 WordCount
WordCount 是一个简单的程序,它计算文本文件中每个单词出现的频率。该代码构建了一个名为 counts 的 (String, Int) 对数据集,并将该数据集保存到文件中。
以下示例将 WordCount 代码提交到 Scala shell。为 Spark WordCount 示例选择一个输入文件。我们可以使用任何文本文件作为输入。
以用户“spark”身份登录。
作业运行时,库现在可以在中间处理过程中使用MinIO。
导航到具有 Spark 客户端的节点并访问 spark2-client 目录
cd /usr/hdp/current/spark2-client
su spark
以下示例使用log4j.properties作为输入文件
4.2.1 将输入文件上传到 HDFS:
hadoop fs -copyFromLocal /etc/hadoop/conf/log4j.properties
s3a://testbucket/testdata
4.2.2 运行 Spark shell:
./bin/spark-shell --master yarn-client --driver-memory 512m --executor-memory 512m
该命令应生成如下所示的输出。(带有其他状态消息)
Spark context Web UI available at http://172.26.236.247:4041
Spark context available as 'sc' (master = yarn, app id = application_1490217230866_0002).
Spark session available as 'spark'.
Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
/___/ .__/\_,_/_/ /_/\_\ version 2.1.0.2.6.0.0-598
/_/
Using Scala version 2.11.8 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_112)
Type in expressions to have them evaluated.
Type :help for more information.
scala>
在scala>提示符下,通过键入以下命令提交作业,将节点名称、文件名和文件位置替换为您的值
scala> val file = sc.textFile("s3a://testbucket/testdata")
file: org.apache.spark.rdd.RDD[String] = s3a://testbucket/testdata MapPartitionsRDD[1] at textFile at <console>:24
scala> val counts = file.flatMap(line => line.split(" ")).map(word => (word, 1)).reduceByKey(_ + _)
counts: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[4] at reduceByKey at <console>:25
scala> counts.saveAsTextFile("s3a://testbucket/wordcount")
使用以下方法之一查看作业输出
在 Scala shell 中查看输出
scala> counts.count()
364
要查看来自 MinIO 的输出,请退出 Scala shell。查看 WordCount 作业状态
hadoop fs -ls s3a://testbucket/wordcount
输出应类似于以下内容
Found 3 items
-rw-rw-rw- 1 spark spark 0 2019-05-04 01:36 s3a://testbucket/wordcount/_SUCCESS
-rw-rw-rw- 1 spark spark 4956 2019-05-04 01:36 s3a://testbucket/wordcount/part-00000
-rw-rw-rw- 1 spark spark 5616 2019-05-04 01:36 s3a://testbucket/wordcount/part-00001