,可以通过以下步骤实现:
from pyspark.sql import SparkSession
from pyspark.sql.types import *
spark = SparkSession.builder \
.appName("Write DataFrame to Postgres") \
.getOrCreate()
postgres_url = "jdbc:postgresql://<host>:<port>/<database>"
postgres_properties = {
"user": "<username>",
"password": "<password>",
"driver": "org.postgresql.Driver"
}
请将<host>
、<port>
、<database>
、<username>
和<password>
替换为实际的数据库连接信息。
data = [("Alice", 25), ("Bob", 30), ("Charlie", 35)]
schema = StructType([
StructField("name", StringType(), nullable=False),
StructField("age", IntegerType(), nullable=True)
])
df = spark.createDataFrame(data, schema)
在上述示例中,我们创建了一个包含"name"和"age"两列的dataframe,其中"name"列被标记为非空,"age"列可为空。
df.write \
.format("jdbc") \
.option("url", postgres_url) \
.option("dbtable", "<table_name>") \
.mode("overwrite") \
.options(**postgres_properties) \
.save()
请将<table_name>
替换为实际的目标表名。
通过以上步骤,我们可以将pyspark dataframe写入PostgreSQL数据库,而不将列标记为非空。在这个过程中,我们使用了SparkSession对象创建dataframe,并通过JDBC连接器将数据写入PostgreSQL数据库。
没有搜到相关的文章