这似乎与
How to change hdfs block size in pyspark?
我可以成功地改变hdfs块大小与rdd.saveAsTextFile,但没有相应的DataFrame.write.parquet和无法保存与拼接格式。
不确定是pyspark DataFrame中的bug,还是我没有正确设置配置。
以下是我的测试代码:
##########
# init
##########
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession
import hdfs
from hdfs import InsecureClient
import os
import numpy as np
import pandas as pd
import logging
os.environ['SPARK_HOME'] = '/opt/spark-2.2.1-bin-hadoop2.7'
block_size = 512 * 1024
conf = SparkConf().setAppName("myapp").setMaster("spark://spark1:7077").set('spark.cores.max', 20).set("spark.executor.cores", 10).set("spark.executor.memory", "10g").set("spark.hadoop.dfs.blocksize", str(block_size)).set("spark.hadoop.dfs.block.size", str(block_size))
spark = SparkSession.builder.config(conf=conf).getOrCreate()
spark.sparkContext._jsc.hadoopConfiguration().setInt("dfs.blocksize", block_size)
spark.sparkContext._jsc.hadoopConfiguration().setInt("dfs.block.size", block_size)
##########
# main
##########
# create DataFrame
df_txt = spark.createDataFrame([\{'temp': "hello"}, \{'temp': "world"}, \{'temp': "!"}])
# save using DataFrameWriter, resulting 128MB-block-size
df_txt.write.mode('overwrite').format('parquet').save('hdfs://spark1/tmp/temp_with_df')
# save using rdd, resulting 512k-block-size
client = InsecureClient('http://spark1:50070')
client.delete('/tmp/temp_with_rrd', recursive=True)
df_txt.rdd.saveAsTextFile('hdfs://spark1/tmp/temp_with_rrd')发布于 2018-03-14 22:51:19
Hadoop和Spark是两个独立的工具,它们有自己的工作策略。Spark和Parquet使用数据分区和块大小对它们没有意义。做Spark所说的,然后在HDFS中做你想做的事情。
您可以通过以下方式更改拼花地板分区编号
df_txt.repartition(6).format("parquet").save("hdfs://...")https://stackoverflow.com/questions/49279622
复制相似问题