當(dāng)SparkSQL里內(nèi)置的函數(shù)無(wú)法滿足我們業(yè)務(wù)需求時(shí),我們可以通過(guò)自定義UDF來(lái)實(shí)現(xiàn)。
1、自定義ConcatLongStringUDF
/**
* 將兩個(gè)字段拼接起來(lái)(使用指定的分隔符)
*/
class ConcatLongStringUDF extends UDF3[Long, String, String, String] {
override def call(v1: Long, v2: String, spilt: String): String = {
v1.toString + spilt + v2
}
}
這里自定義UDF來(lái)使用指定的分隔符來(lái)拼接Long和String
2、使用
首先注冊(cè)自定義UDF并指定返回類型
spark.udf.register("concat_long_str", new ConcatLongStringUDF(), DataTypes.StringType)
使用自定義UDF
spark.sql("select concat_long_str(session_id,page_id,':') from temp_session_page").show()
完成代碼UDFTest
object UDFTest {
def main(args: Array[String]): Unit = {
//權(quán)限問(wèn)題
System.setProperty("HADOOP_USER_NAME", "hadoop")
val spark =
SparkSession.builder()
.appName("UDFTest")
.master("local[1]")
.getOrCreate()
spark.udf.register("concat_long_str", new ConcatLongStringUDF(), DataTypes.StringType)
val tempSchema: StructType = StructType(Seq(
StructField("session_id", LongType, false),
StructField("page_id", StringType, false)
))
val rowRDD =
spark.sparkContext.parallelize(Array("1,page1", "2,page2"))
.map(line => {
val fields = line.split(",")
Row(fields(0).toLong, fields(1))
})
val tempDF = spark.createDataFrame(rowRDD, tempSchema)
tempDF.createOrReplaceTempView("temp_session_page")
spark.sql("select * from temp_session_page").show()
spark.sql("select concat_long_str(session_id,page_id,':') from temp_session_page").show()
}
}
打印結(jié)果
+----------+-------+
|session_id|page_id|
+----------+-------+
| 1| page1|
| 2| page2|
+----------+-------+
+-------------------------------------------+
|UDF:concat_long_str(session_id, page_id, :)|
+-------------------------------------------+
| 1:page1|
| 2:page2|
+-------------------------------------------+
可以根據(jù)自己業(yè)務(wù)的需求定制更多UDF