我有一个代码,它可以将Pyspark流数据转换为dataframe。我需要把这些数据存储到Hbase中。另外,帮助我编写代码。
import sys
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.sql import Row, SparkSession
def getSparkSessionInstance(sparkConf):
if ('sparkSessionSingletonInstance' not in globals()):
globals()['sparkSessionSingletonInstance'] = SparkSession\
.builder\
.config(conf=sparkConf)\
.getOrCreate()
return globals()['sparkSessionSingletonInstance']
if __name__ == "__main__":
if len(sys.argv) != 3:
print("Usage: sql_network_wordcount.py <hostname> <port> ",
file=sys.stderr)
exit(-1)
host, port = sys.argv[1:]
sc = SparkContext(appName="PythonSqlNetworkWordCount")
ssc = StreamingContext(sc, 1)
lines = ssc.socketTextStream(host, int(port))
def process(time, rdd):
print("========= %s =========" % str(time))
try:
words = rdd.map(lambda line :line.split(" ")).collect()
spark = getSparkSessionInstance(rdd.context.getConf())
linesDataFrame = spark.createDataFrame(words,schema=["lat","lon"])
linesDataFrame.show()
except :
pass
lines.foreachRDD(process)
ssc.start()
ssc.awaitTermination()发布于 2018-11-29 08:12:02
您可以使用Spark连接器从HBase访问Spark.It,在低级别RDD和Dataframes中都提供了API。
连接器要求您为HBase表定义一个HBase。下面是为名称为HBase、行键作为键和许多列(col1-col8)定义的table1表的架构示例。注意,rowkey还必须详细定义为列(col0),它具有特定的cf (rowkey)。
def catalog = '{
"table":{"namespace":"default", "name":"table1"},\
"rowkey":"key",\
"columns":{\
"col0":{"cf":"rowkey", "col":"key", "type":"string"},\
"col1":{"cf":"cf1", "col":"col1", "type":"boolean"},\
"col2":{"cf":"cf1", "col":"col2", "type":"double"},\
"col3":{"cf":"cf1", "col":"col3", "type":"float"},\
"col4":{"cf":"cf1", "col":"col4", "type":"int"},\
"col5":{"cf":"cf2", "col":"col5", "type":"bigint"},\
"col6":{"cf":"cf2", "col":"col6", "type":"smallint"},\
"col7":{"cf":"cf2", "col":"col7", "type":"string"},\
"col8":{"cf":"cf2", "col":"col8", "type":"tinyint"}\
}\
}'一旦根据数据文件的架构定义了目录,就可以使用以下方法将dataframe写入HBase:
df.write\
.options(catalog=catalog)\
.format("org.apache.spark.sql.execution.datasources.hbase")\
.save()要从HBase读取数据:
df = spark.\
read.\
format("org.apache.spark.sql.execution.datasources.hbase").\
option(catalog=catalog).\
load()您需要包括火花-HBase连接器包如下,同时提交火花申请。
pyspark --packages com.hortonworks:shc-core:1.1.1-2.1-s_2.11 --repositories http://repo.hortonworks.com/content/groups/public/https://stackoverflow.com/questions/53533459
复制相似问题