Spark DataSource API 的提出使得各個數(shù)據(jù)源按規(guī)范實現(xiàn)適配,那么就可以高效的利用Spark 的計算能力。典型如Parquet,CarbonData,Postgrep(JDBC類的都OK)等實現(xiàn)。本文則介紹如何利用Spark DataSource 對標(biāo)準(zhǔn)Rest接口實現(xiàn)讀取
引子
先說下這個需求的來源。通常在一個流式計算的主流程里,會用到很多映射數(shù)據(jù),譬如某某對照關(guān)系,而這些映射數(shù)據(jù)通常是通過HTTP接口暴露出來的,尤其是外部系統(tǒng),你基本沒有辦法直接通過JDBC去讀庫啥的。
上面是一個點,其次是從HTTP讀到的JSON數(shù)據(jù),我其實需要做扁平化處理的?,F(xiàn)在如果SQL作用于JSON數(shù)據(jù)可以解決簡單的嵌套問題,但是更復(fù)雜的方式是沒有太大辦法的。
比如下面格式的:
{
"status":"200",
"data":[
"id":1,
"userid":2,
"service":{
"3":{"a":1,"b":2},
"2":{"a":3,"b":2},
.....
}
]
}
最好能展開成這種格式才能夠被主流程直接join使用:
{id:1,userid:2,service:3,a:1,b:2}
{id:1,userid:2,service:2,a:3,b:2}
所以為了實現(xiàn)同事的需求,我需要第一將Rest接口的獲取方式用標(biāo)準(zhǔn)的DataSource API 來實現(xiàn),其次提供一個能夠做如上合并規(guī)則的模塊,并且允許配置。
最后實現(xiàn)的效果參看: Rest DataSource
實現(xiàn)代碼可以參看:RestJSONDataSource
實現(xiàn)目標(biāo)
先看看DataSource API 的樣子:
val df = SQLContext.getOrCreate(sc).
read.
format("driver class").//驅(qū)動程序,類似JDBC的 driver class
options(Map(....)). //你需要額外傳遞給驅(qū)動的參數(shù)
load("url")//資源路徑
如果做成配置化則是:
{
"name": "streaming.core.compositor.spark.source.SQLSourceCompositor",
"params": [
{
"format": "org.apache.spark.sql.execution.datasources.rest.json",
"url": "http://[your dns]/path",
"xPath": "$.data"
}
]
}
DefaultSource的實現(xiàn)
定義
org.apache.spark.sql.execution.datasources.rest.json.DefaultSource
extends RelationProvider
with DataSourceRegister
這是比較典型的命名規(guī)范。rest 代表支持的是rest作為接口,json則代表rest接口的數(shù)據(jù)是json格式的,包的命名讓人一目了然。
先看看DefaultSource繼承的兩個接口
- DataSourceRegister
該接口只有一個shortName 方法。我們看到上面的包名是很長的,你可以給一個更簡短的名字:
org.apache.spark.sql.execution.datasources.rest.json
==>
restJSON
所以具體實現(xiàn)就變成了:
override def shortName(): String = "restJSON"
- RelationProvider
這個接口也只有一個方法:
def createRelation(sqlContext: SQLContext, parameters: Map[String, String]): BaseRelation
其返回值BaseRelation對象描述了數(shù)據(jù)源和Spark SQL交互。createRelation方法允許你根據(jù)用戶定義的參數(shù)parameters 創(chuàng)建一個合適的BaseRelation的實現(xiàn)類。
其實除了這個,還有一些攜帶更多信息的繼承自RelationProvider的類,譬如:
SchemaRelationProvider 允許你直接傳遞Schema信息給BaseRelation實現(xiàn)。
HadoopFsRelationProvider 除了參數(shù)幫你加了path等,返回值也幫你約定成HadoopFsRelation. HadoopFsRelation 提供了和HDFS交互的大部分實現(xiàn)
在我們的實現(xiàn)里,只要實現(xiàn)基礎(chǔ)的RelationProvider就好。
我們來看下DefaultSource.createRelation的具體代碼:
override def createRelation(
sqlContext: SQLContext,
//還記的DataSource的options方法么,parameters就是
//用戶通過options傳遞過來的
parameters: Map[String, String]
): BaseRelation = {
//因為我們并需要用戶提供schema
//而是從JSON格式數(shù)據(jù)自己自己推導(dǎo)出來的
// 所以這里有個采樣率的概念
val samplingRatio = parameters.get("samplingRatio").map(_.toDouble).getOrElse(1.0)
// 還記得DataSource的 path么? 理論上是應(yīng)該通過那個傳遞過來的,然而
//這里是直接通過potions傳遞過來的。
val url = parameters.getOrElse("url", "")
// 我們需要能夠?qū)νㄟ^XPATH語法抽取我們要的數(shù)據(jù),比如
//前面的例子,我們需要能夠抽取出data那個數(shù)組
val xPath = parameters.getOrElse("xPath", "$")
//這里是核心
new RestJSONRelation(None, url, xPath, samplingRatio, None)(sqlContext)
}
源碼中已經(jīng)做了說明。這里RestJSONRelation是整個核心,它實現(xiàn)了Spark SQL 和數(shù)據(jù)源的交互。RestJSONRelation繼承自BaseRelation,TableScan等基類
RestJSONRelation
先看看RestJSONRelation 的簽名:
private[sql] class RestJSONRelation(
val inputRDD: Option[RDD[String]],
val url: String,
val xPath: String,
val samplingRatio: Double,
val maybeDataSchema: Option[StructType]
)(@transient val sqlContext: SQLContext)
extends BaseRelation with TableScan {
這些參數(shù)是你隨便定義的。當(dāng)然,url,xPath,smaplingRatio具體的含義在上一章節(jié)都提到了。
和數(shù)據(jù)源進(jìn)行交互有兩個必要的信息需要獲?。?/p>
-
Schema 信息。只有兩種方法:用戶告知你,或者程序自己根據(jù)數(shù)據(jù)推導(dǎo)。關(guān)于schema信息這塊,BaseRelation還提供了幾個基礎(chǔ)的約定:
- needConversion,是否需類型轉(zhuǎn)換,因為Spark SQL內(nèi)部的表示是Row,里面的數(shù)據(jù)需要特定的類型,比如String會被轉(zhuǎn)化成UTF8String。默認(rèn)為true,官方也是說不要管他就好。
- unhandledFilters, 返回一些數(shù)據(jù)源沒有辦法pushdown的filter。這樣解析器就知道可以在Spark內(nèi)部做filter了。否則Spark 會傻傻的以為你做了過濾,然后數(shù)據(jù)計算結(jié)果就錯了。
-
數(shù)據(jù)掃描的方法。 目前Spark SQL 提供了四種
- TableScan 全表掃描
- PrunedScan 可以指定列,其他的列數(shù)據(jù)源可以不用返回
- PrunedFilteredScan 指定列,并且還可以加一些過濾條件,只返回滿足條件的數(shù)據(jù)。這個也就是我們常說的數(shù)據(jù)源
下沉(pushdown)操作。 - CatalystScan 和PrunedFilteredScan類似,支持列過濾,數(shù)據(jù)過濾,但是接受的過濾條件是Spark 里的Expression。 理論上會更靈活些。話說在Spark源碼)里(1.6.1版本),我沒有看到這個類的具體實現(xiàn)案例。
這里我們只要實現(xiàn)一個簡單的TableScan就可以了,因為拿的是字典數(shù)據(jù),并不需要做過濾。
Schema推導(dǎo)
BaseRelation是需要你給出Schema的。這里我們會先定義一個dataSchema的lazy屬性,這樣防止schema方法被反復(fù)調(diào)用而反復(fù)推導(dǎo)。
override def schema: StructType = dataSchema
lazy val dataSchema = .....
因為我們是根據(jù)數(shù)據(jù)推導(dǎo)Schema,所以首先要獲取數(shù)據(jù)。我們定義一個方法:
private def createBaseRdd(inputPaths: Array[String]): RDD[String]
inputPaths 我沿用了文件系統(tǒng)的概念,其實在我們這里就是一個URL。我們知道,最終Spark SQL 的直接數(shù)據(jù)源都是RDD的。所以這里我們返回的也是RDD[String]類型。具體實現(xiàn)很簡單,就是通過HttpClient根據(jù)inputPaths拿到數(shù)據(jù)之后makeRDD一下就可以了。
//應(yīng)該要再加個重試機(jī)制就更好了
private def createBaseRdd(inputPaths: Array[String]): RDD[String] = {
val url = inputPaths.head
val res = Request.Get(new URL(url).toURI).execute()
val response = res.returnResponse()
val content = EntityUtils.toString(response.getEntity)
if (response != null && response.getStatusLine.getStatusCode == 200) {
//這里是做數(shù)據(jù)抽取的,把data的數(shù)組給抽取出來
import scala.collection.JavaConversions._
val extractContent = JSONArray.fromObject(JSONPath.read(content, xPath)).
map(f => JSONObject.fromObject(f).toString).toSeq
sqlContext.sparkContext.makeRDD(extractContent)
} else {
sqlContext.sparkContext.makeRDD(Seq())
}
}
有了這個類就能獲取到數(shù)據(jù),就可以做Schema推導(dǎo)了:
lazy val dataSchema = {
//我們也允許用戶傳遞給我們Schema,如果沒有就自己推導(dǎo)
val jsonSchema = maybeDataSchema.getOrElse {
InferSchema(
//拿到數(shù)據(jù)
inputRDD.getOrElse(createBaseRdd(Array(url))),
//采樣率,其實就是拿sc.sample方法
samplingRatio,
sqlContext.conf.columnNameOfCorruptRecord)
}
checkConstraints(jsonSchema)
jsonSchema
}
InferSchema的實現(xiàn)邏輯比較復(fù)雜,但最終就是為了返回StructType(fields: Array[StructField]) 這么個東西。我是直接拷貝的spark JSON DataSource的實現(xiàn)。有興趣的可以自己參看。StructType其實也很簡單了,無非就是一個描述Schema的結(jié)構(gòu),類似你定義一張表,你需要告訴系統(tǒng)字段名稱,類型,是否為Null等一些列信息。
現(xiàn)在我們終于搞定了數(shù)據(jù)表結(jié)構(gòu)了。
數(shù)據(jù)獲取
剛才我們說了數(shù)據(jù)獲取的四種類型,我們這里使用的是TableScan,繼承自該接口只要實現(xiàn)一個buildScan方法就好:
def buildScan(): RDD[Row] = {
JacksonParser(
inputRDD.getOrElse(createBaseRdd(Array(url))),
dataSchema, sqlContext.conf.columnNameOfCorruptRecord).asInstanceOf[RDD[Row]]
}
其本質(zhì)工作就是把JSON格式的String根據(jù)我們前面已經(jīng)拿到的Schema轉(zhuǎn)化為Row格式。
具體做法如下:
//這個是createBaseRDD返回的RDD[String]
//對應(yīng)的String 其實是JSON格式
//針對每個分區(qū)做處理
json.mapPartitions { iter =>
val factory = new JsonFactory()
iter.flatMap { record =>
try {
//JSON的解析器
val parser = factory.createParser(record)
parser.nextToken()
//這里開始做類型轉(zhuǎn)換了
convertField(factory, parser, schema) match {
case null => failedRecord(record)
case row: InternalRow => row :: Nil
case array: ArrayData =>
if (array.numElements() == 0) {
Nil
} else {
array.toArray[InternalRow](schema)
}
case _ =>
sys.error(
s"Failed to parse record $record. Please make sure that each line of the file " +
"(or each string in the RDD) is a valid JSON object or an array of JSON objects.")
}
} catch {
case _: JsonProcessingException =>
failedRecord(record)
}
}
}
這里的代碼還是比較清晰易懂的。但是 convertField(factory, parser, schema) 直接match 到 InternalRow 還是比較讓人困惑的,一個字段轉(zhuǎn)換咋就變成了InternalRow了呢?這里確實也有乾坤的。我們進(jìn)去看看convertField方法:
private[sql] def convertField(
factory: JsonFactory,
parser: JsonParser,
schema: DataType): Any = {
import com.fasterxml.jackson.core.JsonToken._
(parser.getCurrentToken, schema) match {
case (null | VALUE_NULL, _) =>
null
case (FIELD_NAME, _) =>
parser.nextToken()
convertField(factory, parser, schema)
.....
case (START_OBJECT, st: StructType) =>
convertObject(factory, parser, st)
如果你的JSON是個Map,經(jīng)過N次匹配case后會進(jìn)入最后一個case 情況。這里的st:StructType 就是我們之前自己推導(dǎo)出來的dataSchema. convertObject 方法如下:
while (nextUntil(parser, JsonToken.END_OBJECT)) {
schema.getFieldIndex(parser.getCurrentName) match {
case Some(index) =>
row.update(index, convertField(factory, parser, schema(index).dataType))
case None =>
parser.skipChildren()
}
}
到這里就真相大白了。為了能夠拿到一條完整的數(shù)據(jù),他會while循環(huán)直到遇到END_OBJECT 。所謂END_OBJECT 其實就是一個Map 結(jié)束了。 在每一次循環(huán)里,拿到一個字段,然后通過名字去schema里獲取類型信息,然后再回調(diào)convertField方法將這個字段轉(zhuǎn)化為row需要的類型,比如字符串類型的就通過UTF8String進(jìn)行轉(zhuǎn)換。
case (VALUE_STRING, StringType) => UTF8String.fromString(parser.getText)
得到的值通過Row的函數(shù)進(jìn)行更新,這里是 row.update 方法。到END_OBJECT后,就完成了將一個JSON Map 轉(zhuǎn)化為一條Row的功能了。
收工
到目前為止,我們已經(jīng)完成了具體的工作了。現(xiàn)在你已經(jīng)可以按如下的方式使用:
val df = SQLContext.getOrCreate(sc).
read.
format("org.apache.spark.sql.execution.datasources.rest.json").//驅(qū)動程序,類似JDBC的 driver class
options(Map(
"url"->"http://[your dns]/path"
"xPath" -> "$.data"
)). //你需要額外傳遞給驅(qū)動的參數(shù)
load("url")//資源路徑
獲取到的Dataframe 你可以做任意的操作。
總結(jié)
Spark DataSource API的提出,給Spark 構(gòu)建生態(tài)帶來了巨大的好處。各個存儲系統(tǒng)可以實現(xiàn)統(tǒng)一標(biāo)準(zhǔn)的接口去對接Spark。學(xué)會使用自己實現(xiàn)一個DataSoure是的你的存儲可以更好的和生態(tài)結(jié)合,也能得到更好的性能優(yōu)化。