0302 Data Sources

轉載請注明出處,謝謝合作~

該篇中的示例暫時只有 Scala 版本~

數(shù)據(jù)源

Spark SQL 支持通過 DataFrame 接口操作多種數(shù)據(jù)源。一個 DataFrame 可以通過關系型轉換算子操作,還可以用來創(chuàng)建一個臨時視圖。將 DataFrame 注冊成一個臨時視圖之后就可以通過 SQL 語句進行查詢。本章節(jié)介紹了使用 Spark 數(shù)據(jù)源加載和存儲數(shù)據(jù)的常用方法,之后給出內(nèi)置數(shù)據(jù)源需要的不同的參數(shù)。

常用讀寫函數(shù)

最簡單的情況,默認的數(shù)據(jù)源(parquet 除非配置了參數(shù) spark.sql.sources.default)將會被使用。

val usersDF = spark.read.load("examples/src/main/resources/users.parquet")
usersDF.select("name", "favorite_color").write.save("namesAndFavColors.parquet")

完整的示例代碼位于 Spark 安裝包的「examples/src/main/scala/org/apache/spark/examples/sql/SQLDataSourceExample.scala」。

手動指定參數(shù)

可以通過手動指定數(shù)據(jù)源參數(shù)和相關的額外選項來創(chuàng)建 DataFrame。數(shù)據(jù)源通過全限定類名來指定(比如 org.apache.spark.sql.parquet),但是對于內(nèi)置的數(shù)據(jù)源可以使用簡稱(json, parquet, jdbc, orc, libsvm, csv, text)。從任意數(shù)據(jù)源類型中加載的數(shù)據(jù)可以輕松的轉換成其他支持的格式。

可以通過 API 文檔了解內(nèi)置數(shù)據(jù)源的所有選項,例如 org.apache.spark.sql.DataFrameReaderorg.apache.spark.sql.DataFrameWriter。文檔中的選項對于非 Scala 語言的 Spark API(比如 PySpark)也適用。對于其他的格式,參見相應的 API 文檔。

加載 JSON 文件:

val peopleDF = spark.read.format("json").load("examples/src/main/resources/people.json")
peopleDF.select("name", "age").write.format("parquet").save("namesAndAges.parquet")

完整的示例代碼位于 Spark 安裝包的「examples/src/main/scala/org/apache/spark/examples/sql/SQLDataSourceExample.scala」。

加載 CSV 文件:

val peopleDFCsv = spark.read.format("csv")
  .option("sep", ";")
  .option("inferSchema", "true")
  .option("header", "true")
  .load("examples/src/main/resources/people.csv")

完整的示例代碼位于 Spark 安裝包的「examples/src/main/scala/org/apache/spark/examples/sql/SQLDataSourceExample.scala」。

額外的選項對于寫出操作也適用。例如,對于 ORC 文件可以通過選項控制布隆過濾器和字典編碼。下面的 ORC 樣例將會創(chuàng)建布隆過濾器并對 favorite_color 列使用字典編碼。對于 Parquet 文件,也有一個 parquet.enable.dictionary 的選項。有關 ORC/Parquet 數(shù)據(jù)格式選項的更多詳情請移步 Apache ORC/Parquet 官方網(wǎng)站。

usersDF.write.format("orc")
  .option("orc.bloom.filter.columns", "favorite_color")
  .option("orc.dictionary.key.threshold", "1.0")
  .option("orc.column.encoding.direct", "name")
  .save("users_with_options.orc")

完整的示例代碼位于 Spark 安裝包的「examples/src/main/scala/org/apache/spark/examples/sql/SQLDataSourceExample.scala」。

直接通過 SQL 操作文件

除了使用 read API 加載文件成為一個 DataFrame,還可以通過 SQL 直接查詢。

val sqlDF = spark.sql("SELECT * FROM parquet.`examples/src/main/resources/users.parquet`")

完整的示例代碼位于 Spark 安裝包的「examples/src/main/scala/org/apache/spark/examples/sql/SQLDataSourceExample.scala」。

存儲模式

存儲操作有一個可選的 SaveMode 類型的參數(shù),該參數(shù)指定如何處理存儲路徑中已經(jīng)存在的數(shù)據(jù)。需要注意的是存儲模式并不加鎖,也不是原子性的。此外,當存儲模式設置為 Overwrite 時,在寫入新數(shù)據(jù)之前會刪除舊數(shù)據(jù)。

Scala/Java Any Language Meaning
SaveMode.ErrorIfExists (default) "error" or "errorifexists" (default) 如果輸出路徑中的數(shù)據(jù)已存在,會拋出異常。
SaveMode.Append "append" 如果輸出路徑中的數(shù)據(jù)已存在,寫出的數(shù)據(jù)會被追加到現(xiàn)有數(shù)據(jù)之后。
SaveMode.Overwrite "overwrite" 如果輸出路徑中的數(shù)據(jù)已存在,會被新寫出的數(shù)據(jù)替換。
SaveMode.Ignore "ignore" 如果輸出路徑中的數(shù)據(jù)已存在,新寫出的數(shù)據(jù)不會被存儲,現(xiàn)有的數(shù)據(jù)也不會被更新。類似于 SQL 中的 CREATE TABLE IF NOT EXISTS。

持久化數(shù)據(jù)表

DataFrames 還可以通過 saveAsTable 方法存儲到 Hive 中。注意,并不需要事先安裝好 Hive,Spark 會通過 Derby 創(chuàng)建一個默認的本地 Hive 元數(shù)據(jù)庫。跟 createOrReplaceTempView 方法不同的是,saveAsTable 方法會將 DataFrame 中的數(shù)據(jù)落盤并在 Hive 元數(shù)據(jù)庫中創(chuàng)建一條描述存儲信息的記錄。只要能夠訪問元數(shù)據(jù)庫,持久化后的數(shù)據(jù)表在 Spark 應用程序重啟之后依舊可以訪問。在持久化數(shù)據(jù)表時可以通過調用 SparkSessiontable 方法指定表名稱。

基于文件的數(shù)據(jù)源,比如 text,parquet,json 等等,可以通過 path 選項指定存儲路徑,例如 df.write.option("path", "/some/path").saveAsTable("t")。當數(shù)據(jù)表被刪除后,自定義存儲路徑中的數(shù)據(jù)不會被刪除。如果不指定存儲路徑,Spark 會將數(shù)據(jù)寫出到 warehouse 文件夾,這種情況下如果數(shù)據(jù)表被刪除,數(shù)據(jù)也會被刪除。

從 Spark 2.1 開始,持久化的數(shù)據(jù)表的每個分區(qū)都會在 Hive 元數(shù)據(jù)庫中有相關記錄,這種方式帶來一些優(yōu)化:

  • 對于查詢元數(shù)據(jù)庫可以只返回所需要分區(qū)的信息,在第一次查詢時不再需要加載所有的分區(qū)。
  • Datasource API 支持了像 ALTER TABLE PARTITION ... SET LOCATION 這樣的 Hive DDL 語句。

注意,在創(chuàng)建外部數(shù)據(jù)表(自定義了 path 選項)時,默認情況下分區(qū)信息并沒有被收集,同步元數(shù)據(jù)庫中的分區(qū)信息需要執(zhí)行 MSCK REPAIR TABLE 語句。

分桶,排序和分區(qū)

對于基于文件的數(shù)據(jù)源,還可以對輸出進行分桶,排序和分區(qū)。分桶和排序只適用于通過 saveAsTable 持久化數(shù)據(jù)表:

peopleDF.write.bucketBy(42, "name").sortBy("age").saveAsTable("people_bucketed")

完整的示例代碼位于 Spark 安裝包的「examples/src/main/scala/org/apache/spark/examples/sql/SQLDataSourceExample.scala」。

但是分區(qū)操作對于 savesaveAsTable 都適用:

usersDF.write.partitionBy("favorite_color").format("parquet").save("namesPartByColor.parquet")

完整的示例代碼位于 Spark 安裝包的「examples/src/main/scala/org/apache/spark/examples/sql/SQLDataSourceExample.scala」。

可以同時在一張表中適用分桶和分區(qū):

usersDF
  .write
  .partitionBy("favorite_color")
  .bucketBy(42, "name")
  .saveAsTable("users_partitioned_bucketed")

完整的示例代碼位于 Spark 安裝包的「examples/src/main/scala/org/apache/spark/examples/sql/SQLDataSourceExample.scala」。

就像 Partition Discovery 章節(jié)描述的那樣,partitionBy 操作會創(chuàng)建一個目錄結構。所以,以很大基數(shù)的字段分區(qū)不會是一個明智的選擇。相反,bucketBy 操作將數(shù)據(jù)按照固定數(shù)量分桶,適用于數(shù)據(jù)技基數(shù)很大的場景。

常用的文件數(shù)據(jù)源選項

這些常用的選項只在適用文件類型的數(shù)據(jù)源時有效:parquet, orc, avro, json, csv, text。

注意在示例中使用的目錄結構如下:

dir1/
 ├── dir2/
 │    └── file2.parquet (schema: <file: string>, content: "file2.parquet")
 └── file1.parquet (schema: <file, string>, content: "file1.parquet")
 └── file3.json (schema: <file, string>, content: "{'file':'corrupt.json'}")

忽略損壞的文件

在從文件讀取數(shù)據(jù)的過程中可以通過設置參數(shù) spark.sql.files.ignoreCorruptFiles 忽略損壞的文件,當設置為 true 時,Spark 作業(yè)在遇到損壞的文件的時候會繼續(xù)運行,已經(jīng)被讀取的部分依舊有效。

// enable ignore corrupt files
spark.sql("set spark.sql.files.ignoreCorruptFiles=true")
// dir1/file3.json is corrupt from parquet's view
val testCorruptDF = spark.read.parquet(
  "examples/src/main/resources/dir1/",
  "examples/src/main/resources/dir1/dir2/")
testCorruptDF.show()
// +-------------+
// |         file|
// +-------------+
// |file1.parquet|
// |file2.parquet|
// +-------------+

完整的示例代碼位于 Spark 安裝包的「examples/src/main/scala/org/apache/spark/examples/sql/SQLDataSourceExample.scala」。

忽略丟失的文件

在從文件讀取數(shù)據(jù)的過程中可以通過設置參數(shù) spark.sql.files.ignoreMissingFiles 忽略丟失的文件。在這里丟失的文件是指在創(chuàng)建好 DataFrame 之后刪除了的文件。當設置為 true 時,Spark 作業(yè)在遇到丟失的文件的時候會繼續(xù)運行,已經(jīng)被讀取的部分依舊有效。

全局文件過濾器

選項 pathGlobFilter 用來只讀取匹配目標模式的文件,語法與 org.apache.hadoop.fs.GlobFilter 相同,過濾操作不會改變分區(qū)發(fā)現(xiàn)的行為。

val testGlobFilterDF = spark.read.format("parquet")
  .option("pathGlobFilter", "*.parquet") // json file should be filtered out
  .load("examples/src/main/resources/dir1")
testGlobFilterDF.show()
// +-------------+
// |         file|
// +-------------+
// |file1.parquet|
// +-------------+

完整的示例代碼位于 Spark 安裝包的「examples/src/main/scala/org/apache/spark/examples/sql/SQLDataSourceExample.scala」。

遞歸文件搜索

選項 recursiveFileLookup 用來遞歸查找目標路徑下的匹配文件,但是會關閉分區(qū)推斷機制,默認值為 false。如果在選項 recursiveFileLookup 為 true 時顯示指定了 partitionSpec 選項,會拋出異常。

val recursiveLoadedDF = spark.read.format("parquet")
  .option("recursiveFileLookup", "true")
  .load("examples/src/main/resources/dir1")
recursiveLoadedDF.show()
// +-------------+
// |         file|
// +-------------+
// |file1.parquet|
// |file2.parquet|
// +-------------+

完整的示例代碼位于 Spark 安裝包的「examples/src/main/scala/org/apache/spark/examples/sql/SQLDataSourceExample.scala」。

Parquet 文件

Parquet 是一種被許多數(shù)據(jù)處理系統(tǒng)支持的列式存儲格式。Spark SQL 提供了對 Parquet 文件讀寫的支持,文件中會保存原始數(shù)據(jù)的模式。當讀取 Parquet 文件時,處于兼容性的考慮所有的列被自動轉換為 nullable 類型。

通過程序加載數(shù)據(jù)

采用上述示例中的數(shù)據(jù):

// Encoders for most common types are automatically provided by importing spark.implicits._
import spark.implicits._

val peopleDF = spark.read.json("examples/src/main/resources/people.json")

// DataFrames can be saved as Parquet files, maintaining the schema information
peopleDF.write.parquet("people.parquet")

// Read in the parquet file created above
// Parquet files are self-describing so the schema is preserved
// The result of loading a Parquet file is also a DataFrame
val parquetFileDF = spark.read.parquet("people.parquet")

// Parquet files can also be used to create a temporary view and then used in SQL statements
parquetFileDF.createOrReplaceTempView("parquetFile")
val namesDF = spark.sql("SELECT name FROM parquetFile WHERE age BETWEEN 13 AND 19")
namesDF.map(attributes => "Name: " + attributes(0)).show()
// +------------+
// |       value|
// +------------+
// |Name: Justin|
// +------------+

完整的示例代碼位于 Spark 安裝包的「examples/src/main/scala/org/apache/spark/examples/sql/SQLDataSourceExample.scala」。

分區(qū)發(fā)現(xiàn)

數(shù)據(jù)分區(qū)是一種常見的優(yōu)化方式,比如在 Hive 中就是如此。對于分區(qū)表,數(shù)據(jù)通常存儲在不同的目錄,分區(qū)鍵的值會被編碼到分區(qū)目錄的名稱中。所有內(nèi)置的基于文件的數(shù)據(jù)源(Text/CSV/JSON/ORC/Parquet)都能夠自動的發(fā)現(xiàn)和推斷分區(qū)信息。例如,可以將之前用到的人口數(shù)據(jù)采用下面的目錄結構存儲到分區(qū)表中,分區(qū)鍵是 gendercountry 兩列:

path
└── to
    └── table
        ├── gender=male
        │   ├── ...
        │   │
        │   ├── country=US
        │   │   └── data.parquet
        │   ├── country=CN
        │   │   └── data.parquet
        │   └── ...
        └── gender=female
            ├── ...
            │
            ├── country=US
            │   └── data.parquet
            ├── country=CN
            │   └── data.parquet
            └── ...

將目錄 path/to/table 傳遞給 SparkSession.read.parquet 或者 SparkSession.read.load 方法,Spark SQL 會從目錄中自動提取分區(qū)信息,于是 DataFrame 的模式變成了:

root
|-- name: string (nullable = true)
|-- age: long (nullable = true)
|-- gender: string (nullable = true)
|-- country: string (nullable = true)

注意,分區(qū)鍵的數(shù)據(jù)類型是自動推斷的,目前支持的有數(shù)字類型,日期類型,時間戳類型和字符串類型。有事用戶不希望自動推斷分區(qū)鍵的數(shù)據(jù)類型,對于此類需求,分區(qū)鍵類型推斷可以由參數(shù) spark.sql.sources.partitionColumnTypeInference.enabled 控制,默認值為 true。當類型推薦被禁用時,分區(qū)鍵會被指定為字符串類型。

從 Spark 1.6.0 開始,默認情況下分區(qū)發(fā)現(xiàn)功能只會尋找指定路徑下的分區(qū)。對于上述示例,如果用戶將路徑 path/to/table/gender=male 傳遞給 SparkSession.read.parquet 或者 SparkSession.read.load 方法,gender 不會被當做一個分區(qū)鍵。如果用戶需要指定發(fā)現(xiàn)分區(qū)的根目錄,可以設置 basePath 選項。例如,當給定路徑為 path/to/table/gender=male 而選項 basePath 設置為 path/to/table/ 時,gender 也會被當做一個分區(qū)鍵。

模式融合

像 Protocol Buffer,Avro 和 Thrift 一樣,Parquet 也支持模式演化。用戶可以初始定義一個簡單的模式,之后在需要的時候增加列。如此一來,可能最終會有多個模式不同但又相互兼容的 Parquet 文件。Parquet 數(shù)據(jù)源現(xiàn)在已經(jīng)能夠自動檢測這種情況,并在需要時融合多個文件的模式。

由于模式融合是一個開銷較大的操作,而且在大多數(shù)情況下是不需要的,自 Spark 1.5.0 以來該功能默認情況下時關閉的,可以通過以下兩種方式開啟:

  1. 在讀取 Parquet 文件時設置數(shù)據(jù)源選項 mergeSchematrue
  2. 設置全局的 SQL 參數(shù) spark.sql.parquet.mergeSchematrue。
// This is used to implicitly convert an RDD to a DataFrame.
import spark.implicits._

// Create a simple DataFrame, store into a partition directory
val squaresDF = spark.sparkContext.makeRDD(1 to 5).map(i => (i, i * i)).toDF("value", "square")
squaresDF.write.parquet("data/test_table/key=1")

// Create another DataFrame in a new partition directory,
// adding a new column and dropping an existing column
val cubesDF = spark.sparkContext.makeRDD(6 to 10).map(i => (i, i * i * i)).toDF("value", "cube")
cubesDF.write.parquet("data/test_table/key=2")

// Read the partitioned table
val mergedDF = spark.read.option("mergeSchema", "true").parquet("data/test_table")
mergedDF.printSchema()

// The final schema consists of all 3 columns in the Parquet files together
// with the partitioning column appeared in the partition directory paths
// root
//  |-- value: int (nullable = true)
//  |-- square: int (nullable = true)
//  |-- cube: int (nullable = true)
//  |-- key: int (nullable = true)

完整的示例代碼位于 Spark 安裝包的「examples/src/main/scala/org/apache/spark/examples/sql/SQLDataSourceExample.scala」。

Hive 元數(shù)據(jù) Parquet 表轉換

當從 Hive 中讀取 Parquet 格式的表和向 Hive 中寫入不分區(qū)的 Parquet 表時,處于性能的考量 Spark SQL 會嘗試使用自己的 Parquet 解析器而不是 Hive 的 SerDe 類庫。這個機制由參數(shù)spark.sql.hive.convertMetastoreParquet` 控制,默認開啟。

Hive/Parquet 模式調和

從模式解析的角度來看,Hive 和 Parquet 之間有兩個重要的不同之處。

  1. Hive 是大小寫不敏感的,而 Parquet 大小寫敏感。
  2. Hive 中的所有列都默認是 nullable,但是空值在 Parquet 中是有意義的。

因此,當把一個 Hive 中的 Parquet 表轉換為 Spark SQL 中的 Parquet 表時必須調和兩者模式之間的差異。規(guī)則如下:

  1. 無論是否可為空值,兩者模式中相同名稱的字段必須擁有相同的數(shù)據(jù)類型。為了滿足可為空值的條件,被調和的字段應該具有 Parquet 端的數(shù)據(jù)類型。
  2. 被調和的模式需要完全包含 Hive 模式中的字段。
    • 在被調和的模式中,只出現(xiàn)在 Parquet 端的字段會被刪除。Any fields that only appear in the Parquet schema are dropped in the reconciled schema.
    • 在被調和的模式中,只出現(xiàn)在 Hive 模式中的字段會被添加為一個 nullable 字段。

刷新元數(shù)據(jù)

為了更好的性能,Spark SQL 會緩存 Parquet 表的元數(shù)據(jù)。開啟 Hive 元數(shù)據(jù) Parquet 表轉換之后,被轉換的的表的元數(shù)據(jù)也會被緩存。如果這些表被 Hive 或者外部工具更新,需要手動刷新來保持元數(shù)據(jù)的一致性。

// spark is an existing SparkSession
spark.catalog.refreshTable("my_table")

配置項

Parquet 的配置項可以通過 SparkSessionsetConf 方法或者使用 SQL 語法 SET key=value 來設置。

Property Name Default Meaning Since Version
spark.sql.parquet.binaryAsString false 一些處理 Parquet 文件的計算引擎,尤其是 Impala,HIve 和早期版本的 Spark SQL,在寫出數(shù)據(jù)時并不區(qū)分二進制類型和自負串類型。該參數(shù)告訴 Spark SQL 將二進制數(shù)據(jù)解析為字符串來與那些系統(tǒng)保持兼容。 1.1.1
spark.sql.parquet.int96AsTimestamp true 一些處理 Parquet 文件的計算引擎,尤其是 Impala 和 HIve,把時間戳存儲為 INT96 類型。該參數(shù)告訴 Spark SQL 將 INT96 數(shù)據(jù)解析為時間戳來與那些系統(tǒng)保持兼容。 1.3.0
spark.sql.parquet.compression.codec snappy 寫出 Parquet 文件時使用的壓縮格式。如果在表屬性中同時設置了 compressionparquet.compression 屬性,則配置生效的優(yōu)先級從高到低依次為 compression, parquet.compression, spark.sql.parquet.compression.codec??蛇x的壓縮格式有:snappy, gzip, lzo, brotli, lz4, zstd。注意,在 Hadoop 2.9.0 之前 zstd 格式需要事先安裝 ZStandardCodec,brotli 格式需要事先安裝 BrotliCodec。 1.1.1
spark.sql.parquet.filterPushdown true 是否開啟 Parquet 謂詞下推的優(yōu)化機制。 1.2.0
spark.sql.hive.convertMetastoreParquet true 設置為 false 時,Spark SQL 會使用 Hive SerDe 解析 Parquet 文件而不是使用內(nèi)置的解析器。 1.1.1
spark.sql.parquet.mergeSchema false 設置為 true 時,Parquet 數(shù)據(jù)源會融合所有數(shù)據(jù)文件的模式,否則模式將會從概要文件中推斷,如果概要文件不存在,就從隨機數(shù)據(jù)文件中推斷。 1.5.0
spark.sql.parquet.writeLegacyFormat false 如果設置為 true,數(shù)據(jù)將會以 Spark 1.4 之前的格式寫出。例如,小數(shù)格式的數(shù)據(jù)將會以定長字節(jié)數(shù)組的格式寫出,也是 Hive 和 Impala 的方式。如果寫出失敗,那么將會使用新的數(shù)據(jù)格式。例如,小數(shù)格式的數(shù)據(jù)會以 int 類型的格式寫出。如果寫出的 Parquet 文件是給不支持新格式的系統(tǒng)所用,請將該參數(shù)設置為 true。 1.6.0

ORC 文件

從 Spark 2.3 開始,Spark 支持以向量化的方式讀取 ORC 文件,不過需要新增幾個配置項。對于原生 ORC 數(shù)據(jù)表(使用 USING ORC 語句創(chuàng)建的數(shù)據(jù)表),需要設置參數(shù) spark.sql.orc.implnative ,設置參數(shù) spark.sql.orc.enableVectorizedReadertrue 來開啟向量化讀取。對于 Hive ORC serde 數(shù)據(jù)表(使用 USING HIVE OPTIONS (fileFormat 'ORC') 語句創(chuàng)建的數(shù)據(jù)表),需要設置參數(shù) spark.sql.hive.convertMetastoreOrctrue 來開啟向量化讀取

Since Spark 2.3, Spark supports a vectorized ORC reader with a new ORC file format for ORC files. To do that, the following configurations are newly added. The vectorized reader is used for the native ORC tables (e.g., the ones created using the clause USING ORC) when spark.sql.orc.impl is set to native and spark.sql.orc.enableVectorizedReader is set to true. For the Hive ORC serde tables (e.g., the ones created using the clause USING HIVE OPTIONS (fileFormat 'ORC')), the vectorized reader is used when spark.sql.hive.convertMetastoreOrc is also set to true.

Property Name Default Meaning Since Version
spark.sql.orc.impl native ORC 的實現(xiàn)模式??梢允?nativehive。native 表示原生 ORC, hive 表示 Hive 中的 ORC 類庫。 2.3.0
spark.sql.orc.enableVectorizedReader true native 模式下開啟向量化讀取 ORC 文件。如果設置為 false,會使用非向量化的方式讀取 ORC 文件。對于 hive 模式,該配置被忽略。 2.3.0

JSON 文件

Spark SQL 可以自動推斷 JSON 文件的模式并將其加載為一個 Dataset[Row]??梢允褂梅椒ㄗx取一個 Dataset[String] 或是 JSON 文件。

注意所,讀取的 JSON 文件不是經(jīng)典的格式 JSON 文件。文件中每一行必須是一個獨立的、自包含的、有效的 JSON 對象。詳情參見 JSON Lines text format, also called newline-delimited JSON。

對于常見的多行 JSON 文件,請設置選項 multiLinetrue。

// Primitive types (Int, String, etc) and Product types (case classes) encoders are
// supported by importing this when creating a Dataset.
import spark.implicits._

// A JSON dataset is pointed to by path.
// The path can be either a single text file or a directory storing text files
val path = "examples/src/main/resources/people.json"
val peopleDF = spark.read.json(path)

// The inferred schema can be visualized using the printSchema() method
peopleDF.printSchema()
// root
//  |-- age: long (nullable = true)
//  |-- name: string (nullable = true)

// Creates a temporary view using the DataFrame
peopleDF.createOrReplaceTempView("people")

// SQL statements can be run by using the sql methods provided by spark
val teenagerNamesDF = spark.sql("SELECT name FROM people WHERE age BETWEEN 13 AND 19")
teenagerNamesDF.show()
// +------+
// |  name|
// +------+
// |Justin|
// +------+

// Alternatively, a DataFrame can be created for a JSON dataset represented by
// a Dataset[String] storing one JSON object per string
val otherPeopleDataset = spark.createDataset(
  """{"name":"Yin","address":{"city":"Columbus","state":"Ohio"}}""" :: Nil)
val otherPeople = spark.read.json(otherPeopleDataset)
otherPeople.show()
// +---------------+----+
// |        address|name|
// +---------------+----+
// |[Columbus,Ohio]| Yin|
// +---------------+----+

完整的示例代碼位于 Spark 安裝包的「examples/src/main/scala/org/apache/spark/examples/sql/SQLDataSourceExample.scala」。

Hive 表

Spark SQL 也支持讀寫存儲在 Hive(Apache Hive)中的數(shù)據(jù)。然而,由于 Hive 有很多很多依賴,這些依賴默認沒有包含在 Spark 發(fā)布的版本中。如果 Hive 依賴可以再 classpath 中找到,Spark 會自動加載它們。注意這些 Hive 依賴需要同時在所有的的 worker 節(jié)點上,為了方位存儲在 Hive 中的數(shù)據(jù)需要用到 Hive 的序列化和反序列化類庫(SerDes)。

需要將 Hive 的配置文件 hive-site.xml, core-site.xml(其中的安全配置)和 hdfs-site.xml(HDFS 配置) 放到 conf/ 目錄下。

集成 Hive 功能,必須在初始化 SparkSession 時配置開啟 Hive 支持,包括一個跟 Hive metastore 的長連接,對 Hive serdes 的支持,和 Hive UDF。如果沒有安裝 Hive 也可以開啟 Hive 支持,若沒有配置 hive-site.xml,Spark 會在當前目錄自動創(chuàng)建一個 metastore_db 文件和一個由參數(shù) spark.sql.warehouse.dir 指定的目錄,默認值是當前目錄下的 spark-warehouse,這里所說的當前目錄是指 Spark 應用程序啟動的目錄。注意從 Spark 2.0.0 開始,hive-site.xml 文件中的 hive.metastore.warehouse.dir 配置項已經(jīng)被標記為棄用,目前使用的是參數(shù) spark.sql.warehouse.dir 指定數(shù)據(jù)倉庫的位置,啟動程序的用戶必須對該目錄擁有寫權限。

import java.io.File

import org.apache.spark.sql.{Row, SaveMode, SparkSession}

case class Record(key: Int, value: String)

// warehouseLocation points to the default location for managed databases and tables
val warehouseLocation = new File("spark-warehouse").getAbsolutePath

val spark = SparkSession
  .builder()
  .appName("Spark Hive Example")
  .config("spark.sql.warehouse.dir", warehouseLocation)
  .enableHiveSupport()
  .getOrCreate()

import spark.implicits._
import spark.sql

sql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING) USING hive")
sql("LOAD DATA LOCAL INPATH 'examples/src/main/resources/kv1.txt' INTO TABLE src")

// Queries are expressed in HiveQL
sql("SELECT * FROM src").show()
// +---+-------+
// |key|  value|
// +---+-------+
// |238|val_238|
// | 86| val_86|
// |311|val_311|
// ...

// Aggregation queries are also supported.
sql("SELECT COUNT(*) FROM src").show()
// +--------+
// |count(1)|
// +--------+
// |    500 |
// +--------+

// The results of SQL queries are themselves DataFrames and support all normal functions.
val sqlDF = sql("SELECT key, value FROM src WHERE key < 10 ORDER BY key")

// The items in DataFrames are of type Row, which allows you to access each column by ordinal.
val stringsDS = sqlDF.map {
  case Row(key: Int, value: String) => s"Key: $key, Value: $value"
}
stringsDS.show()
// +--------------------+
// |               value|
// +--------------------+
// |Key: 0, Value: val_0|
// |Key: 0, Value: val_0|
// |Key: 0, Value: val_0|
// ...

// You can also use DataFrames to create temporary views within a SparkSession.
val recordsDF = spark.createDataFrame((1 to 100).map(i => Record(i, s"val_$i")))
recordsDF.createOrReplaceTempView("records")

// Queries can then join DataFrame data with data stored in Hive.
sql("SELECT * FROM records r JOIN src s ON r.key = s.key").show()
// +---+------+---+------+
// |key| value|key| value|
// +---+------+---+------+
// |  2| val_2|  2| val_2|
// |  4| val_4|  4| val_4|
// |  5| val_5|  5| val_5|
// ...

// Create a Hive managed Parquet table, with HQL syntax instead of the Spark SQL native syntax
// `USING hive`
sql("CREATE TABLE hive_records(key int, value string) STORED AS PARQUET")
// Save DataFrame to the Hive managed table
val df = spark.table("src")
df.write.mode(SaveMode.Overwrite).saveAsTable("hive_records")
// After insertion, the Hive managed table has data now
sql("SELECT * FROM hive_records").show()
// +---+-------+
// |key|  value|
// +---+-------+
// |238|val_238|
// | 86| val_86|
// |311|val_311|
// ...

// Prepare a Parquet data directory
val dataDir = "/tmp/parquet_data"
spark.range(10).write.parquet(dataDir)
// Create a Hive external Parquet table
sql(s"CREATE EXTERNAL TABLE hive_bigints(id bigint) STORED AS PARQUET LOCATION '$dataDir'")
// The Hive external table should already have data
sql("SELECT * FROM hive_bigints").show()
// +---+
// | id|
// +---+
// |  0|
// |  1|
// |  2|
// ... Order may vary, as spark processes the partitions in parallel.

// Turn on flag for Hive Dynamic Partitioning
spark.sqlContext.setConf("hive.exec.dynamic.partition", "true")
spark.sqlContext.setConf("hive.exec.dynamic.partition.mode", "nonstrict")
// Create a Hive partitioned table using DataFrame API
df.write.partitionBy("key").format("hive").saveAsTable("hive_part_tbl")
// Partitioned column `key` will be moved to the end of the schema.
sql("SELECT * FROM hive_part_tbl").show()
// +-------+---+
// |  value|key|
// +-------+---+
// |val_238|238|
// | val_86| 86|
// |val_311|311|
// ...

spark.stop()

完整的示例代碼位于 Spark 安裝包的「examples/src/main/scala/org/apache/spark/examples/sql/hive/SparkHiveExample.scala」。

指定 Hive 表的存儲格式

創(chuàng)建一個 Hive 表時,需要定義這個表該如何從文件系統(tǒng)中讀取以及寫入,即「input format」和「output format」。還需要定義這個表應該如何進行正反序列化的操作,即「serde」。下面的選項可以用來指定存儲格式(「serde」,「input format」,「output format」),例如 CREATE TABLE src(id int) USING hive OPTIONS(fileFormat 'parquet')。默認情況下,表數(shù)據(jù)將會以文本的格式讀取。注意,目前在創(chuàng)建表時還不支持 Hive storage handler,你可以在 Hive 中適用 Hive storage handler 創(chuàng)建表,然后通過 Spark SQL 來讀取。

Property Name Meaning
fileFormat 指定數(shù)據(jù)存儲的文件格式,包括「serde」,「input format」,「output format」。目前支持 6 種文件格式:'sequencefile', 'rcfile', 'orc', 'parquet', 'textfile' and 'avro'。
inputFormat, outputFormat 這兩個選項指定 InputFormatOutputFormat 相應的全限定類名,例如: org.apache.hadoop.hive.ql.io.orc.OrcInputFormat。這兩個選項必須成對出現(xiàn),而且不能在設置了 fileFormat 選項的情況下再設置。
serde 該選項指定 serde 的全限定類名。在選項 fileFormat 被指定的情況下,如果其中已經(jīng)包含了 serde 信息,那么請不要設置該選項。目前「sequencefile」,「textfile」,「rcfile」格式并不包含 serde 信息,所以可以在這 3 種數(shù)據(jù)格式的情況下使用該選項。
fieldDelim, escapeDelim, collectionDelim, mapkeyDelim, lineDelim 這項選項只適用于「textfile」文件格式,它們定義了如何將文本分割成數(shù)據(jù)行。

所有其他的在 OPTIONS 中定義的屬性都會被當做 Hive serde 屬性。

與不同版本的 Hive Metastore 交互

Spark SQL 對 Hive 支持的一個很重要的點就是與 Hive metastore 進行交互,這可以讓 Spark SQL 能夠訪問 Hive 表的元信息。從 Spark 1.4.0 開始,一個單獨的 Spark SQL 構建版本可以用來跟不同版本的 Hive metastore 進行交互,請使用下面的配置。注意,與 Hive 的版本無關,這里說的的是 metastore 的版本,在 Spark SQL 內(nèi)部會編譯內(nèi)建的 Hive 并使用那些類來執(zhí)行內(nèi)部流程(serdes, UDFs, UDAFs 等等)。

下列選項用來配置 Hive 版本,以獲取元數(shù)據(jù):

Property Name Default Meaning Since Version
spark.sql.hive.metastore.version 2.3.7 Hive metastore 的版本,可選項從 0.12.02.3.7,從 3.0.03.1.2。 1.4.0
spark.sql.hive.metastore.jars builtin 初始化 HiveMetastoreClient 所需要的 Jar 包的位置。該選項有三種可選值:1、builtin 使用 Hive 2.3.7,跟 Spark 的集成構建綁定,編譯時使用 -Phive 參數(shù)生效,此時參數(shù) spark.sql.hive.metastore.version 的值必須是 2.3.7 或者未定義;2、maven 使用從 Maven 倉庫下載的指定版本的 Hive 依賴,這種配置在生產(chǎn)環(huán)境不建議使用。3、一個適用于 JVM 的標準的 classpath 路徑,該路徑中必須包含所有 Hive 相關的類庫及其依賴,包括正確版本的 Hadoop。只有 driver 程序需要這些依賴,但是如果使用 yarn cluster 模式啟動應用程序,就需要把這些依賴裝進你的應用程序 Jar 文件。 1.4.0
spark.sql.hive.metastore.sharedPrefixes com.mysql.jdbc,org.postgresql,com.microsoft.sqlserver,oracle.jdbc 以逗號分隔的全限定類名前綴列表,相應的類需要通過 Spark SQL 和指定版本的 Hive 共享的類加載器進行加載。例如其中一個需要被共享的依賴就是連接 metastore 需要用到的 JDBC 驅動包。其他需要被共享的依賴就是那些已經(jīng)被共享的依賴,例如 log4j。 1.4.0
spark.sql.hive.metastore.barrierPrefixes (empty) 以逗號分隔的全限定類名前綴列表,這些依賴需要被 Spark SQL 連接的 Hive 的相應版本重新加載。例如,定義了 Hive UDF 的依賴應該被共享(即 org.apache.spark.*)。 1.4.0

JDBC 連接其他數(shù)據(jù)庫

Spark SQL 還提供了一個可以通過 JDBC 連接其他數(shù)據(jù)庫的數(shù)據(jù)源,該功能需要通過 JdbcRDD 來實現(xiàn)。結果將會以 DataFrame 的形式返回,之后就可以通過 Spark SQL 進行處理或者與其他數(shù)據(jù)源進行連接。JDBC 數(shù)據(jù)源在 Java 和 Python 中用起來也很簡單,并不要求提供一個 ClassTag。(注意 JDBC 數(shù)據(jù)源和 Spark SQL JDBC server 不是一回事,后者可以讓應用程序通過 Spark SQL 執(zhí)行查詢)

要使用 JDBC 數(shù)據(jù)源,需要將相應的 JDBC 驅動包放到 spark 的 classpath 中。例如,如果想從 Spark Shell 中連接 PostGRE 數(shù)據(jù)庫需要執(zhí)行以下命令:

./bin/spark-shell --driver-class-path postgresql-9.4.1207.jar --jars postgresql-9.4.1207.jar

遠端數(shù)據(jù)庫中的表可以通過數(shù)據(jù)源 API 被加載成為一個 DataFrame 或者 Spark SQL 臨時視圖。用戶可以通過數(shù)據(jù)源選項指定 JDBC 連接的參數(shù)。userpassword 選項通常用來登錄數(shù)據(jù)庫,Spark 還支持下列大小寫不敏感的選項:

Property Name Meaning
url 連接 JDBC 所用的 URL。有些選項可以包含在 URL 中。例如,jdbc:postgresql://localhost/test?user=fred&password=secret。
dbtable 需要讀寫的表。當該選項被用作讀取時的參數(shù)時,任何 SQL 中有效的 FROM 語句都可以被使用。例如,可以不填寫完整的表名而是一個用小括號括起來的子查詢。不允許同時指定 dbtablequery 選項。
query 一個從數(shù)據(jù)庫中讀取數(shù)據(jù)的查詢語句。被指定的查詢會被小括號括起來作為 FROM 語句的子查詢,Spark 會為該子查詢分配一個別名。例如,Spark 會組織一個這樣的語句來訪問 JDBC:SELECT <columns> FROM (<user_specified_query>) spark_gen_alias。下面是一些使用該選項的限制。不允許同時指定 dbtablequery 選項。不允許同時指定 partitionColumnquery 選項;當需要指定 partitionColumn 選項時,請使用 dbtable 來指定子查詢,分區(qū)列可以通過子查詢中的別名來指定。示例:spark.read.format("jdbc").option("url", jdbcUrl).option("query", "select c1, c2 from t1").load()
driver JDBC 驅動的全限定類名。
partitionColumn, lowerBound, upperBound 這些選項必須同時指定,此外,選項 numPartitions 也必須指定。這些選項一同定義了如何從數(shù)據(jù)庫并行讀取數(shù)據(jù)。partitionColumn 選項必須是一個數(shù)字,日期或者時間戳類型。注意 lowerBoundupperBound 選項只影響分區(qū)步距,并不過濾表中的數(shù)據(jù)。所以表中的所有數(shù)據(jù)會被分區(qū)之后讀取,這些選項只在讀取時有效。
numPartitions 讀寫數(shù)據(jù)時可以被用到的最大分區(qū)數(shù),該選項也決定了最大并發(fā) JDBC 連接數(shù)。如果寫出時的分區(qū)數(shù)超過了這個限制,會在寫出之前調用 coalesce(numPartitions) 方法削減分區(qū)。
queryTimeout JDBC 查詢超時時間,單位為秒,零表示無限制。在寫出時,該選項取決于 JDBC 驅動實現(xiàn) setQueryTimeout API 的方式,例如,h2 JDBC 驅動每次查詢都會檢查超時時間,而不是對于整個 JDBC 批次檢查。默認值為 0。
fetchsize JDBC 拉取數(shù)據(jù)的數(shù)量,決定了一次拉取多少行數(shù)據(jù)。該選項可以幫助提升那些默認拉取值很小 JDBC 的性能(例如 Oracle 是一次 10 行)。該選項只在讀取數(shù)據(jù)時有效。
batchsize JDBC 批次數(shù)據(jù)的數(shù)量,決定了一次寫出多少數(shù)據(jù)。該選項可以幫助提升 JDBC 的性能。該選項只在寫出數(shù)據(jù)時有效。默認值為 1000。
isolationLevel 事務隔離級別,應用于當前連接??梢允?NONE,READ_COMMITTED,READ_UNCOMMITTED,REPEATABLE_READ,或者 SERIALIZABLE,與 JDBC 連接定義的標準事務隔離級別相對應,默認值是 READ_UNCOMMITTED。該選項只在寫出數(shù)據(jù)時有效。詳情參見 java.sql.Connection。
sessionInitStatement 在連接到遠端數(shù)據(jù)庫的會話開啟之后,在讀取數(shù)據(jù)之前,執(zhí)行該選項所定義的 SQL 語句(或是 PL/SQL 塊),可以通過該選項做一些會話初始化工作。示例:option("sessionInitStatement", """BEGIN execute immediate 'alter session set "_serial_direct_read"=true'; END;""")。
truncate 當存儲模式設置為 SaveMode.Overwrite 時,該選項會在寫出時清空表而不是刪除后再創(chuàng)建。這樣會更高效一些,還避免了刪除表的元數(shù)據(jù)(比如說索引)。然而在某些情況下該選項不會生效,比如在新數(shù)據(jù)跟原始表的模式不同的時候。默認值為 false。該選項只在寫出數(shù)據(jù)時有效。
cascadeTruncate 這是一個 JDBC 寫出時選項。如果開啟,同時數(shù)據(jù)庫也支持(目前只有 PostgreSQL 和 Oracle),該選項在寫出前會執(zhí)行 TRUNCATE TABLE t CASCADE 語句(在 PostgreSQL 中是 TRUNCATE TABLE ONLY t CASCADE,來避免無意間清空了衍生表)。該選項會影響其他的表,需要謹慎使用。該選項只在寫出數(shù)據(jù)時有效。默認值為相應數(shù)據(jù)庫默認的 cascading truncate 行為,通過 JDBC 會話中的 isCascadeTruncate 參數(shù)指定。
createTableOptions 這是一個 JDBC 寫出時選項。該選項可以指定寫出數(shù)據(jù)時的建表語句(例如 CREATE TABLE t (name string) ENGINE=InnoDB.)。該選項只在寫出數(shù)據(jù)時有效。
createTableColumnTypes 在建表時指定每列的數(shù)據(jù)類型而不是采用默認值。數(shù)據(jù)類型信息應該和 CREATE TABLE 語句中指定的一致(例如:"name CHAR(64), comments VARCHAR(1024)"))。被指定的類型應該是有效的 Spark SQL 數(shù)據(jù)類型。該選項只在寫出數(shù)據(jù)時有效。
customSchema 從數(shù)據(jù)庫中讀取數(shù)據(jù)時自定義數(shù)據(jù)模式,例如:"id DECIMAL(38, 0), name STRING"??梢灾恢付ú糠至?,其他列使用默認類型映射,例如:"id DECIMAL(38, 0)"。列名應該和數(shù)據(jù)庫表中的字段名相同。用戶可以通過該選項指定 Spark SQL 中的相應類型而不是使用默認映射。該選項只在讀取數(shù)據(jù)時有效。
pushDownPredicate 是否開啟謂詞下推機制,默認值是 true,Spark 會盡力將過濾語句下推到數(shù)據(jù)庫執(zhí)行。否則,如果被設置為 false,不會有謂詞下推,所有的過濾操作都會由 Spark 來執(zhí)行。
// Note: JDBC loading and saving can be achieved via either the load/save or jdbc methods
// Loading data from a JDBC source
val jdbcDF = spark.read
  .format("jdbc")
  .option("url", "jdbc:postgresql:dbserver")
  .option("dbtable", "schema.tablename")
  .option("user", "username")
  .option("password", "password")
  .load()

val connectionProperties = new Properties()
connectionProperties.put("user", "username")
connectionProperties.put("password", "password")
val jdbcDF2 = spark.read
  .jdbc("jdbc:postgresql:dbserver", "schema.tablename", connectionProperties)
// Specifying the custom data types of the read schema
connectionProperties.put("customSchema", "id DECIMAL(38, 0), name STRING")
val jdbcDF3 = spark.read
  .jdbc("jdbc:postgresql:dbserver", "schema.tablename", connectionProperties)

// Saving data to a JDBC source
jdbcDF.write
  .format("jdbc")
  .option("url", "jdbc:postgresql:dbserver")
  .option("dbtable", "schema.tablename")
  .option("user", "username")
  .option("password", "password")
  .save()

jdbcDF2.write
  .jdbc("jdbc:postgresql:dbserver", "schema.tablename", connectionProperties)

// Specifying create table column data types on write
jdbcDF.write
  .option("createTableColumnTypes", "name CHAR(64), comments VARCHAR(1024)")
  .jdbc("jdbc:postgresql:dbserver", "schema.tablename", connectionProperties)

完整的示例代碼位于 Spark 安裝包的「examples/src/main/scala/org/apache/spark/examples/sql/SQLDataSourceExample.scala」。

Avro 文件

從 Spark 2.4 開始, Spark SQL 對 Avro 數(shù)據(jù)的讀寫提供了原生支持。

部署

spark-avro 是額外的模塊,默認情況下并不包含在 spark-submit 或者 spark-shell 的 classpath 中。

對于 Spark 應用程序來說, spark-submit 腳本用來啟動應用程序。spark-avro_2.12 和它的依賴可以通過添加到 spark-submit 腳本的參數(shù)中,就像:

./bin/spark-submit --packages org.apache.spark:spark-avro_2.12:3.0.0 ...

對于 spark-shell 腳本,也可以通過 --packages 參數(shù)直接添加 org.apache.spark:spark-avro_2.12 和它的依賴:

./bin/spark-shell --packages org.apache.spark:spark-avro_2.12:3.0.0 ...

更多有關啟動應用程序添加額外依賴的詳情參見 Application Submission Guide。

讀寫函數(shù)

由于 spark-avro 模塊是外部的,在 DataFrameReader 或者 DataFrameWriter 中并不存在 .avro API。

讀寫 Avro 格式的數(shù)據(jù),需要指定數(shù)據(jù)源選項 formatavro(或者 org.apache.spark.sql.avro)。

val usersDF = spark.read.format("avro").load("examples/src/main/resources/users.avro")
usersDF.select("name", "favorite_color").write.format("avro").save("namesAndFavColors.avro")

to_avro() 和 from_avro()

Avro 依賴提供了 to_avro() 函數(shù)來編碼一列數(shù)據(jù)為 Avro 格式,以及 from_avro() 函數(shù)來解碼 Avro 數(shù)據(jù)成為一列對象。兩個函數(shù)都將一列轉換為另外一列,輸入輸出的 SQL 數(shù)據(jù)類型可以是復雜類型或者基礎類型。

當從一個像 Kafka 一樣的流失數(shù)據(jù)源讀寫數(shù)據(jù)時,使用 Avro record 作為一列很方便。每一個 Kafka 鍵值對記錄會跟一些元信息一起被讀取進來,比如 Kafka 收錄這條記錄時的時間戳,記錄偏移量等等。

  • 如果字段中包含的數(shù)據(jù)是 Avro 格式,可以使用 from_avro() 函數(shù)來抽取數(shù)據(jù),改進,清洗,然后再輸出到 Kafka 中去 或者寫入到文件。
  • 函數(shù)可以用來將結構化數(shù)據(jù)轉換為 Avro record。對于需要將多列融合為一列寫出到 Kafka 的場景很適用。

兩個函數(shù)目前已支持 Scala 和 Java 語言。

import org.apache.spark.sql.avro.functions._

// `from_avro` requires Avro schema in JSON string format.
val jsonFormatSchema = new String(Files.readAllBytes(Paths.get("./examples/src/main/resources/user.avsc")))

val df = spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .option("subscribe", "topic1")
  .load()

// 1. Decode the Avro data into a struct;
// 2. Filter by column `favorite_color`;
// 3. Encode the column `name` in Avro format.
val output = df
  .select(from_avro('value, jsonFormatSchema) as 'user)
  .where("user.favorite_color == \"red\"")
  .select(to_avro($"user.name") as 'value)

val query = output
  .writeStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .option("topic", "topic2")
  .start()

數(shù)據(jù)源選項

Avro 數(shù)據(jù)源選項可以通過以下兩種方式設置:

  • DataFrameReader 或者 DataFrameWriter 中的 .option 方法。
  • 函數(shù) from_avro 中的 options 參數(shù)。
Property Name Default Meaning Scope
avroSchema None 可選,用戶提供的 JSON 格式的模式。在讀取 Avro 數(shù)據(jù)時,此選項可以用來定義一個演化的模式,可以和實際的 Avro 模式不同,但必須兼容。反序列化的模式會和演化模式一致。 例如,如果設置一個擁有一個額外字段(有默認值)的演化模式,Spark 讀取的結果中也會包含這個新字段。當寫出數(shù)據(jù)為 Avro 時,該選項可以調和目標模式與 Spark 轉換的模式不匹配的情況。例如,目標模式中有一個列式枚舉類型,而 Spark 生成的模式中該字段是字符串類型。 read, write and function from_avro
recordName topLevelRecord 寫出數(shù)據(jù)時的 record 名稱,Avro 需要指定。 write
recordNamespace "" 寫出數(shù)據(jù)時的 record 命名空間。 write
ignoreExtension true 該選項控制在讀取數(shù)據(jù)時是否忽略文件的 .avro 后綴。如果開啟,所有的文件都會被加載(有或者沒有 .avro 后綴)。該項選已被棄用,會在將來的版本中移除。請使用更通用的數(shù)據(jù)源選項 pathGlobFilter 來過濾文件。 read
compression snappy compression 選項指定在寫出數(shù)據(jù)時的壓縮格式,目前支持的選項有 uncompressed, snappy, deflate, bzip2xz。如果沒有指定,會使用配置中的 spark.sql.avro.compression.codec 參數(shù)。 write
mode FAILFAST mode 選項指定 from_avro 函數(shù)的解析模式,目前支持的模式有 FAILFAST:在處理損壞文件時拋出異常;PERMISSIVE:損壞的記錄被當做空值處理。所以,數(shù)據(jù)模式會被強制改變?yōu)榭梢詾榭罩?,這有可能與用戶提供的模式不同。 function from_avro

配置項

可以通過 SparkSession 對象的 setConf 方法或者執(zhí)行 SQL 中的 SET key=value 命令來改變的配置項。

Property Name Default Meaning Since Version
spark.sql.legacy.replaceDatabricksSparkAvro.enabled true 如果設置為 true,為了向后兼容,數(shù)據(jù)源 provider com.databricks.spark.avro 會被作為外部數(shù)據(jù)源模塊映射為內(nèi)置模式。 2.4.0
spark.sql.avro.compression.codec snappy 寫出 Avro 文件時使用的壓縮格式。支持的壓縮格式有:uncompressed, deflate, snappy, bzip2 和 xz。默認值為 snappy。 2.4.0
spark.sql.avro.deflate.level -1 寫出 Avro 文件時石筍的壓縮級別,有效值必須在 1 到9 之間或者 -1。默認值為 -1,在目前的版本中相當于 6。 2.4.0

與 Databricks spark-avro 的兼容

該 Avro 數(shù)據(jù)源模塊衍生于 Databricks 的開源版本 spark-avro,并與之保持兼容。

默認情況下 spark.sql.legacy.replaceDatabricksSparkAvro.enabled 配置是開啟的,數(shù)據(jù)源 provider com.databricks.spark.avro 會被映射為內(nèi)置的 Avro 模塊。對于通過 com.databricks.spark.avro 中的 Provider 創(chuàng)建的表,其元數(shù)據(jù)可以被內(nèi)置 Avro 模塊讀取。

注意,在 Databricks 的 spark-avro 中,隱式類 AvroDataFrameWriterAvroDataFrameReader 可以通過函數(shù) .avro() 創(chuàng)建。而在內(nèi)置的模塊中,兩個隱式類都被移除了。請在 DataFrameWriterDataFrameReader 對象中使用 .format("avro") 方法,足夠簡潔明了。

如果你更想使用自己構建的 spark-avro jar 文件,可以將配置 spark.sql.legacy.replaceDatabricksSparkAvro.enabled 設置為 false,通過 --jars 參數(shù)來部署應用程序。詳情參見 Advanced Dependency Management。

支持的數(shù)據(jù)類型 Avro -> Spark SQL

目前 Spark 支持讀取 Avro record 中所有的基礎數(shù)據(jù)類型(primitive types)和復雜數(shù)據(jù)類型(complex types)。

Avro type Spark SQL type
boolean BooleanType
int IntegerType
long LongType
float FloatType
double DoubleType
string StringType
enum StringType
fixed BinaryType
bytes BinaryType
record StructType
array ArrayType
map MapType
union See below

除了上面列出的類型之外,還支持讀取 union 類型,下面三種類型被解析為 union 類型:

  1. union(int, long) 會被映射為 LongType。
  2. union(float, double) 會被映射為 DoubleType。
  3. union(something, null),其中 something 是任意支持的 Avro 類型。該類型會被映射為與 something 相關的 Spark SQL 類型(nullable 設置為 true)。所有其他的類型會被當做復雜數(shù)據(jù)類型,會被映射為 StructType,其中的字段名稱分別為 member0,member1 等等,數(shù)量與 union 中的類型數(shù)量一致。在 Avro 和 Parquet 之間相互轉換時也遵循同樣的原則。

還支持讀取下列 Avro 邏輯類型(logical types):

Avro logical type Avro type Spark SQL type
date int DateType
timestamp-millis long TimestampType
timestamp-micros long TimestampType
decimal fixed DecimalType
decimal bytes DecimalType

目前,Spark SQL 會忽略 Avro 文件的 docs,aliases 和其他屬性。

支持的數(shù)據(jù)類型 Spark SQL -> Avro conversion

Spark 支持寫入 Spark SQL 數(shù)據(jù)類型的數(shù)據(jù)到 Avro。對于大多數(shù)數(shù)據(jù)類型,從 Spark SQL 數(shù)據(jù)類型到 Avro 類型的映射是很直接的(比如說 IntegerType 映射為 int);然而還有一些特殊的情況:

Spark SQL type Avro type Avro logical type
ByteType int
ShortType int
BinaryType bytes
DateType int date
TimestampType long timestamp-micros
DecimalType fixed decimal

還可以通過選項 avroSchema 指定完整的 Avro 模式,這樣 Spark SQL 數(shù)據(jù)類型會被轉換為另一種 Avro 數(shù)據(jù)類型。下列轉換不能通過默認映射執(zhí)行,需要用戶指定 Avro 模式:

Spark SQL type Avro type Avro logical type
BinaryType fixed
StringType enum
TimestampType long timestamp-millis
DecimalType bytes decimal

二進制文件

從 Spark 3.0 開始,Spark 支持二進制文件數(shù)據(jù)源,可以讀取二進制文件,將每個文件轉換為一條記錄,記錄中包含了完整的文件數(shù)據(jù)。生成的 DataFrame 會包含以下字段以及可能的分區(qū)字段:

  • path: StringType
  • modificationTime: TimestampType
  • length: LongType
  • content: BinaryType

讀取完整的二進制文件需要指定數(shù)據(jù)源選項 formatbinaryFile,可以通過數(shù)據(jù)源選項 pathGlobFilter 來過濾需要加載的文件同時不影響分區(qū)發(fā)現(xiàn)的功能。例如下面的代碼讀取指定路徑中所有的 PNG 文件:

spark.read.format("binaryFile").option("pathGlobFilter", "*.png").load("/path/to/data")

二進制文件數(shù)據(jù)源不支持將一個 DataFrame 寫回到原來的文件。

疑難解答

  • JDBC 驅動包必須可以被 client 和所有 executor 的 Application ClassLoader 加載。這是因為 Java 的 DriverManager 類會進行安全檢查,會在打開數(shù)據(jù)庫連接時忽略所有無法被 Application ClassLoader 加載的類。將相關的 Jar 文件加入到所有節(jié)點上的 compute_classpath.sh 腳本中是一種簡便的解決方案。
  • 某些數(shù)據(jù)庫,比如說 H2,將所有的名稱轉換為大寫,此時在 Spark SQL 中引用那些名稱時也需要用大寫形式。
  • 用戶可以在數(shù)據(jù)源選項中指定 JDBC 驅動包供應商提供的特殊屬性。例如 spark.read.format("jdbc").option("url", oracleJdbcUrl).option("oracle.jdbc.mapDateToTimestamp", "false")。oracle.jdbc.mapDateToTimestamp 默認為 true,用戶通常會關閉該選項來避免 Oracle 中的 date 類型轉換為 timestamp。
?著作權歸作者所有,轉載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務。

友情鏈接更多精彩內(nèi)容