大數(shù)據(jù)和云原生一直都是基建方面的兩個熱點,而現(xiàn)在越來越多的大數(shù)據(jù)基建逐漸往云原生方向發(fā)展,例如云原生的消息隊列Pulsar,又例如Snowflake提供云原生的數(shù)倉。因此筆者想要探索大數(shù)據(jù)和云原生的結(jié)合,于是發(fā)現(xiàn)了一個非常有意思的項目Openlake,該項目掛在minio下,是在kubernetes環(huán)境下,利用minion,spark,kafka,dremio,iceberg搭建一套數(shù)據(jù)湖,非常適合學(xué)習(xí),本文主要就是記錄搭建過程和心得。
0.準(zhǔn)備kubernetes環(huán)境
如果已經(jīng)有集群可以跳過本節(jié),我這邊想快速做實驗所以采用docker-compose的方式在linux上搭建k3s
安裝docker compose,參考guide,執(zhí)行如下命令:
sudo curl -L "https://github.com/docker/compose/releases/download/1.25.0/docker-compose-$(uname -s)-$(uname -m)" -o /usr/local/bin/docker-compose
sudo chmod +x /usr/local/bin/docker-compose
本地創(chuàng)建一個目錄用于保存k3s的配置和數(shù)據(jù):
mkdir -p /home/xiowang/k3s
mkdir -p /home/xiowang/k3s/data
創(chuàng)建docker-compose.yaml用于k3s啟動:
#vim /home/xiowang/k3s/docker-compose.yaml
version: '3'
services:
server:
image: rancher/k3s:v1.20.2-k3s1
container_name: k3s_server
hostname: xiowang.dev
command: server --tls-san=xiowang.dev
tmpfs:
- /run
- /var/run
privileged: true
restart: always
ports:
- 6443:6443
- 443:30443
- 80:30080
- 50050:30050
- 50051:30051
environment:
- K3S_TOKEN=16963276443662
- K3S_KUBECONFIG_OUTPUT=/root/.kube/config
- K3S_KUBECONFIG_MODE=600
volumes:
- /var/lib/rancher/k3s:/var/lib/rancher/k3s
- /etc/rancher:/etc/rancher
- /home/xiowang/k3s/.kube:/root/.kube
- /home/xiowang/k3s/data:/data:shared,rw
上面的配置,我們主要做了:
- 通過本機的6443用于訪問kubernetes的apiserver,方便kubectl進行管理;
- 通過本機的443和80分別映射集群的nodeport:30443和30080;
- 把kubeconfig保存到/home/xiowang/k3s/.kube;
- 掛載集群的/data目錄到/home/xiowang/k3s/data;
- 本地的32000和32001用于后續(xù)暴露minio的端口;
開始啟動k3s:
cd /home/xiowang/k3s
docker-compose up -d
因為本機80和443對應(yīng)集群的nodeport:30443和30080,所以這里改一下trafik的service,將其80和443的nodeport分別指向30080和30443:
#kubectl -n kube-system edit svc traefik
apiVersion: v1
kind: Service
metadata:
annotations:
meta.helm.sh/release-name: traefik
meta.helm.sh/release-namespace: kube-system
creationTimestamp: "2023-05-17T09:29:42Z"
labels:
app: traefik
app.kubernetes.io/managed-by: Helm
chart: traefik-1.81.0
heritage: Helm
release: traefik
name: traefik
namespace: kube-system
resourceVersion: "368985"
uid: 7bbb1758-ca01-4e84-b166-dae950613adf
spec:
clusterIP: 10.43.115.234
clusterIPs:
- 10.43.115.234
externalTrafficPolicy: Cluster
ports:
- name: http
nodePort: 30080
port: 80
protocol: TCP
targetPort: http
- name: https
nodePort: 30443
port: 443
protocol: TCP
targetPort: https
selector:
app: traefik
release: traefik
sessionAffinity: None
type: LoadBalancer
拷貝/home/xiowang/k3s/.kube/config到本機的~/.kube/config(建議使用kubecm來管理)
cp /home/xiowang/k3s/.kube/config ~/.kube/config
接下來我們就可以開始o(jì)penlake之旅了
1.安裝和配置minio
在kubernetes上安裝minio,有兩種推薦方式:
這里暫時沒時間學(xué)習(xí)operator的CRD,所以采用helm安裝minio,步驟如下
helm repo add minio https://charts.min.io/
#設(shè)置密碼和磁盤大小,默認(rèn)也沒有tls(根據(jù)需求改一下,我就20Gi用于測試)
helm upgrade --install --namespace minio --set rootUser=rootuser,rootPassword=rootpass123,persistence.size=20Gi,resources.requests.memory=100Mi,resources.limits.memory=2Gi,replicas=3,service.type=NodePort,consoleService.type=NodePort one --create-namespace minio/minio
成功部署日志:
espace minio/minio
NAME: one
LAST DEPLOYED: Mon May 22 12:22:31 2023
NAMESPACE: minio
STATUS: deployed
REVISION: 1
TEST SUITE: None
NOTES:
MinIO can be accessed via port 9000 on the following DNS name from within your cluster:
one-minio.minio.svc.cluster.local
To access MinIO from localhost, run the below commands:
1. export POD_NAME=$(kubectl get pods --namespace minio -l "release=one" -o jsonpath="{.items[0].metadata.name}")
2. kubectl port-forward $POD_NAME 9000 --namespace minio
Read more about port forwarding here: http://kubernetes.io/docs/user-guide/kubectl/kubectl_port-forward/
You can now access MinIO server on http://localhost:9000. Follow the below steps to connect to MinIO server with mc client:
1. Download the MinIO mc client - https://min.io/docs/minio/linux/reference/minio-mc.html#quickstart
2. export MC_HOST_one-minio-local=http://$(kubectl get secret --namespace minio one-minio -o jsonpath="{.data.rootUser}" | base64 --decode):$(kubectl get secret --namespace minio one-minio -o jsonpath="{.data.rootPassword}" | base64 --decode)@localhost:9000
3. mc ls one-minio-local
這里注意上面日志中的MC_HOST_one-minio-local環(huán)境變量名似乎是非法的,所以我換成了MC_HOST_one_minio_local:
2. export MC_HOST_one_minio_local=http://$(kubectl get secret --namespace minio one-minio -o jsonpath="{.data.rootUser}" | base64 --decode):$(kubectl get secret --namespace minio one-minio -o jsonpath="{.data.rootPassword}" | base64 --decode)@localhost:9000
3. mc ls one_minio_local
若想訪問minio的console控制臺,則forward 9001端口,再用root賬號登陸localhost:9001:
export POD_NAME=$(kubectl get pods --namespace minio -l "release=one" -o jsonpath="{.items[0].metadata.name}")
kubectl port-forward $POD_NAME 9001 --namespace minio
而訪問bucket則是9000端口
export POD_NAME=$(kubectl get pods --namespace minio -l "release=one" -o jsonpath="{.items[0].metadata.name}")
kubectl port-forward $POD_NAME 9000 --namespace minio
2.搭建spark on k8s
spark on k8s是google發(fā)起的一個開源項目(不是google的官方產(chǎn)品),使用operator對k8s的資源進行調(diào)度,方便spark對接k8s。
spark on k8s采用helm安裝,安裝命令如下:
helm repo add spark-operator https://googlecloudplatform.github.io/spark-on-k8s-operator
helm install my-release spark-operator/spark-operator \
--namespace spark-operator \
--set webhook.enable=true \
--set image.repository=openlake/spark-operator \
--set image.tag=3.3.2 \
--create-namespace
驗證部署結(jié)果:
kubectl get pods -n spark-operator
應(yīng)該能看到my-release的pod:
NAME READY STATUS RESTARTS AGE
my-release-spark-operator-6547984586-xzw4p 1/1 Running 2 4d20h
參考例子部署一個spark應(yīng)用,保存為spark-pi.yaml用于計算pi(這里注意之前helm會在spark-operator的namespace中為serviceAccount: my-release-spark部署rbac,因此這里spark app都在spark-operator的namespace中,使用serviceAccount: my-release-spark運行):
apiVersion: "sparkoperator.k8s.io/v1beta2"
kind: SparkApplication
metadata:
name: pyspark-pi
namespace: spark-operator
spec:
type: Python
pythonVersion: "3"
mode: cluster
image: "openlake/spark-py:3.3.2"
imagePullPolicy: Always
mainApplicationFile: local:///opt/spark/examples/src/main/python/pi.py
sparkVersion: "3.3.2"
restartPolicy:
type: OnFailure
onFailureRetries: 3
onFailureRetryInterval: 10
onSubmissionFailureRetries: 5
onSubmissionFailureRetryInterval: 20
driver:
cores: 1
coreLimit: "1200m"
memory: "512m"
labels:
version: 3.3.2
serviceAccount: my-release-spark
executor:
cores: 1
instances: 1
memory: "512m"
labels:
version: 3.3.2
kubectl apply -f spark-pi.yaml
查看sparkapp和pods:
#kubectl -n spark-operator get sparkapp,pod
NAME STATUS ATTEMPTS START FINISH AGE
sparkapplication.sparkoperator.k8s.io/pyspark-pi COMPLETED 1 2023-05-22T06:43:24Z 2023-05-22T06:44:37Z 4m50s
NAME READY STATUS RESTARTS AGE
pod/my-release-spark-operator-webhook-init-xzx9c 0/1 Completed 0 4d20h
pod/my-release-spark-operator-6547984586-xzw4p 1/1 Running 2 4d20h
pod/pyspark-pi-driver 0/1 Completed 0 4m46s
查看最后20行l(wèi)og, 可以看到DAGScheduler調(diào)度完成的日志:
#kubectl logs pyspark-pi-driver -n spark-operator --tail 10
23/05/22 06:44:35 INFO TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks have all completed, from pool
23/05/22 06:44:35 INFO DAGScheduler: ResultStage 0 (reduce at /opt/spark/examples/src/main/python/pi.py:42) finished in 1.571 s
23/05/22 06:44:35 INFO DAGScheduler: Job 0 is finished. Cancelling potential speculative or zombie tasks for this job
23/05/22 06:44:35 INFO TaskSchedulerImpl: Killing all running tasks in stage 0: Stage finished
23/05/22 06:44:35 INFO DAGScheduler: Job 0 finished: reduce at /opt/spark/examples/src/main/python/pi.py:42, took 1.615592 s
Pi is roughly 3.145160
23/05/22 06:44:35 INFO SparkUI: Stopped Spark web UI at http://pyspark-pi-60e9d1884232c31f-driver-svc.spark-operator.svc:4040
23/05/22 06:44:35 INFO KubernetesClusterSchedulerBackend: Shutting down all executors
23/05/22 06:44:35 INFO KubernetesClusterSchedulerBackend$KubernetesDriverEndpoint: Asking each executor to shut down
23/05/22 06:44:35 WARN ExecutorPodsWatchSnapshotSource: Kubernetes client has been closed.
23/05/22 06:44:36 INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped!
23/05/22 06:44:36 INFO MemoryStore: MemoryStore cleared
23/05/22 06:44:36 INFO BlockManager: BlockManager stopped
23/05/22 06:44:36 INFO BlockManagerMaster: BlockManagerMaster stopped
23/05/22 06:44:36 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped!
23/05/22 06:44:36 INFO SparkContext: Successfully stopped SparkContext
23/05/22 06:44:36 INFO ShutdownHookManager: Shutdown hook called
23/05/22 06:44:36 INFO ShutdownHookManager: Deleting directory /var/data/spark-4e8e1507-e941-4437-b0b5-18818fc8865f/spark-4f604216-aeab-4679-83ce-f2527613ec66
23/05/22 06:44:36 INFO ShutdownHookManager: Deleting directory /tmp/spark-00ddbc21-98da-4ad8-9905-fcf0e8d64129
23/05/22 06:44:36 INFO ShutdownHookManager: Deleting directory /var/data/spark-4e8e1507-e941-4437-b0b5-18818fc8865f/spark-4f604216-aeab-4679-83ce-f2527613ec66/pyspark-10201045-5307-40dc-b6b2-d7e5447763c4
3.使用spark分析minio上的數(shù)據(jù)
準(zhǔn)備好dockerfile,因為和編譯命令,因為后續(xù)會經(jīng)常編輯py文件和推鏡像.
這里直接使用openlake原文的鏡像作為baseimage,因為里面提前安裝spark的各種依賴
FROM openlake/sparkjob-demo:3.3.2
WORKDIR /app
COPY *.py .
編譯和push dockerimage命令如下(因為dockerhub經(jīng)常掛,所以我在華為云上開了一個鏡像倉庫,可以根據(jù)自身情況修改一下):
docker build -t xiowang/spark-minio:3.3.2 .
docker tag xiowang/spark-minio:3.3.2 swr.cn-north-4.myhuaweicloud.com/xiowang/spark-minio:3.3.2
docker push swr.cn-north-4.myhuaweicloud.com/xiowang/spark-minio:3.3.2
下載紐約出租車司機數(shù)據(jù)(~112M rows and ~10GB in size):
wget https://data.cityofnewyork.us/api/views/t29m-gskq/rows.csv ./
minio中創(chuàng)建bucket:
mc mb one_minio_local/openlake/spark/sample-data
拷貝出租車數(shù)據(jù)到minio
mc cp rows.csv one_minio_local/openlake/spark/sample-data/
進到j(luò)upyter中安裝pyspark
pip3 install pyspark
在minio中創(chuàng)建對one_minio_local/openlake的讀寫權(quán)限的用戶,并申請key和secret,如下
export AWS_ACCESS_KEY_ID=3436ZpuHMvI5EEoR
export AWS_SECRET_ACCESS_KEY=6US0FDsSFdlg5DzbWPPJtS1UeL75Rb0G
export ENDPOINT=one-minio.minio:9000
export OUTPUT_PATH=s3a://openlake/spark/result/taxi
export INPUT_PATH=s3a://openlake/spark/sample-data/rows.csv
創(chuàng)建k8s secret,保存上述信息:
kubectl create secret generic minio-secret \
--from-literal=AWS_ACCESS_KEY_ID=3436ZpuHMvI5EEoR \
--from-literal=AWS_SECRET_ACCESS_KEY=6US0FDsSFdlg5DzbWPPJtS1UeL75Rb0G \
--from-literal=ENDPOINT=http://one-minio.minio:9000 \
--from-literal=AWS_REGION=us-east-1 \
--namespace spark-operator
部署sparkapp:
apiVersion: "sparkoperator.k8s.io/v1beta2"
kind: SparkApplication
metadata:
name: spark-minio
namespace: spark-operator
spec:
type: Python
pythonVersion: "3"
mode: cluster
image: "swr.cn-north-4.myhuaweicloud.com/xiowang/spark-minio:3.3.2"
imagePullPolicy: Always
mainApplicationFile: local:///app/main.py
sparkVersion: "3.3.2"
restartPolicy:
type: OnFailure
onFailureRetries: 3
onFailureRetryInterval: 10
onSubmissionFailureRetries: 5
onSubmissionFailureRetryInterval: 20
driver:
cores: 1
memory: "1024m"
labels:
version: 3.3.2
serviceAccount: my-release-spark
env:
- name: INPUT_PATH
value: "s3a://openlake/spark/sample-data/rows.csv"
- name: OUTPUT_PATH
value: "s3a://openlake/spark/result/taxi"
- name: SSL_ENABLED
value: "true"
- name: AWS_REGION
valueFrom:
secretKeyRef:
name: minio-secret
key: AWS_REGION
- name: AWS_ACCESS_KEY_ID
valueFrom:
secretKeyRef:
name: minio-secret
key: AWS_ACCESS_KEY_ID
- name: AWS_SECRET_ACCESS_KEY
valueFrom:
secretKeyRef:
name: minio-secret
key: AWS_SECRET_ACCESS_KEY
- name: ENDPOINT
valueFrom:
secretKeyRef:
name: minio-secret
key: ENDPOINT
executor:
cores: 1
instances: 3
memory: "1024m"
labels:
version: 3.3.2
env:
- name: INPUT_PATH
value: "s3a://openlake/spark/sample-data/rows.csv"
- name: OUTPUT_PATH
value: "s3a://openlake/spark/result/taxi"
- name: SSL_ENABLED
value: "true"
- name: AWS_REGION
valueFrom:
secretKeyRef:
name: minio-secret
key: AWS_REGION
- name: AWS_ACCESS_KEY_ID
valueFrom:
secretKeyRef:
name: minio-secret
key: AWS_ACCESS_KEY_ID
- name: AWS_SECRET_ACCESS_KEY
valueFrom:
secretKeyRef:
name: minio-secret
key: AWS_SECRET_ACCESS_KEY
- name: ENDPOINT
valueFrom:
secretKeyRef:
name: minio-secret
key: ENDPOINT
這個sparkapp中的python腳本內(nèi)容如下,實際上做的就是統(tǒng)計每天超過6位乘客的信息:
import logging
import os
from pyspark import SparkContext
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, LongType, DoubleType, StringType
logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s")
logger = logging.getLogger("MinIOSparkJob")
spark = SparkSession.builder.getOrCreate()
def load_config(spark_context: SparkContext):
spark_context._jsc.hadoopConfiguration().set("fs.s3a.access.key", os.getenv("AWS_ACCESS_KEY_ID", "openlakeuser"))
spark_context._jsc.hadoopConfiguration().set("fs.s3a.secret.key",
os.getenv("AWS_SECRET_ACCESS_KEY", "openlakeuser"))
spark_context._jsc.hadoopConfiguration().set("fs.s3a.endpoint", os.getenv("ENDPOINT", "play.min.io:50000"))
# spark_context._jsc.hadoopConfiguration().set("fs.s3a.connection.ssl.enabled", os.getenv("SSL_ENABLED", "true"))
spark_context._jsc.hadoopConfiguration().set("fs.s3a.path.style.access", "true")
spark_context._jsc.hadoopConfiguration().set("fs.s3a.attempts.maximum", "1")
# spark_context._jsc.hadoopConfiguration().set("fs.s3a.connection.establish.timeout", "5000")
spark_context._jsc.hadoopConfiguration().set("fs.s3a.connection.timeout", "10000")
load_config(spark.sparkContext)
# Define schema for NYC Taxi Data
schema = StructType([
StructField('VendorID', LongType(), True),
StructField('tpep_pickup_datetime', StringType(), True),
StructField('tpep_dropoff_datetime', StringType(), True),
StructField('passenger_count', DoubleType(), True),
StructField('trip_distance', DoubleType(), True),
StructField('RatecodeID', DoubleType(), True),
StructField('store_and_fwd_flag', StringType(), True),
StructField('PULocationID', LongType(), True),
StructField('DOLocationID', LongType(), True),
StructField('payment_type', LongType(), True),
StructField('fare_amount', DoubleType(), True),
StructField('extra', DoubleType(), True),
StructField('mta_tax', DoubleType(), True),
StructField('tip_amount', DoubleType(), True),
StructField('tolls_amount', DoubleType(), True),
StructField('improvement_surcharge', DoubleType(), True),
StructField('total_amount', DoubleType(), True)])
# Read CSV file from MinIO
df = spark.read.option("header", "true").schema(schema).csv(
os.getenv("INPUT_PATH", "s3a://openlake/spark/sample-data/taxi-data.csv"))
# Filter dataframe based on passenger_count greater than 6
large_passengers_df = df.filter(df.passenger_count > 6)
total_rows_count = df.count()
filtered_rows_count = large_passengers_df.count()
# File Output Committer is used to write the output to the destination (Not recommended for Production)
large_passengers_df.write.format("csv").option("header", "true").save(
os.getenv("OUTPUT_PATH", "s3a://openlake-tmp/spark/nyc/taxis_small"))
logger.info(f"Total Rows for NYC Taxi Data: {total_rows_count}")
logger.info(f"Total Rows for Passenger Count > 6: {filtered_rows_count}")
如果上面的代碼跑的有問題可以創(chuàng)建如下一個debug-pod,并進入pod進行debug
apiVersion: v1
kind: Pod
metadata:
name: debug-pod
namespace: spark-operator
spec:
containers:
- name: spark-minio
image: swr.cn-north-4.myhuaweicloud.com/xiowang/spark-minio:3.3.2
command: ["sleep"]
args: ["infinity"]
env:
- name: INPUT_PATH
value: "s3a://openlake/spark/sample-data/rows.csv"
- name: OUTPUT_PATH
value: "s3a://openlake/spark/result/taxi"
- name: SSL_ENABLED
value: "true"
- name: AWS_REGION
valueFrom:
secretKeyRef:
name: minio-secret
key: AWS_REGION
- name: AWS_ACCESS_KEY_ID
valueFrom:
secretKeyRef:
name: minio-secret
key: AWS_ACCESS_KEY_ID
- name: AWS_SECRET_ACCESS_KEY
valueFrom:
secretKeyRef:
name: minio-secret
key: AWS_SECRET_ACCESS_KEY
- name: ENDPOINT
valueFrom:
secretKeyRef:
name: minio-secret
key: ENDPOINT
最后查看日志,可以看到打印的日志
#kubectl -n spark-operator logs spark-minio-driver
2023-05-25 01:13:49,104 - MinIOSparkJob - INFO - Total Rows for NYC Taxi Data: 112234626
2023-05-25 01:13:49,104 - MinIOSparkJob - INFO - Total Rows for Passenger Count > 6: 1066
4.spark中使用iceberg分析minio上的數(shù)據(jù)
iceberg是Netflix開源的一款軟件,簡單來說就是方便大數(shù)據(jù)工程師通過sql方式操作csv,parquet等文件,并且支持snapshot,具體可以見官網(wǎng)介紹。
原文中的一些地址寫死了,這里我修改了一下,保存為main-iceberg.py
import logging
import os
from pyspark import SparkConf
from pyspark import SparkContext
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, LongType, DoubleType, StringType
logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s")
logger = logging.getLogger("MinIOSparkJob")
# adding iceberg configs
conf = (
SparkConf()
.set("spark.sql.extensions",
"org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions") # Use Iceberg with Spark
.set("spark.sql.catalog.demo", "org.apache.iceberg.spark.SparkCatalog")
.set("spark.sql.catalog.demo.io-impl", "org.apache.iceberg.aws.s3.S3FileIO")
.set("spark.sql.catalog.demo.warehouse", os.getenv("WAREHOUSE", "s3a://openlake/warehouse/"))
.set("spark.sql.catalog.demo.s3.endpoint", os.getenv("ENDPOINT", "play.min.io:50000"))
.set("spark.sql.defaultCatalog", "demo") # Name of the Iceberg catalog
.set("spark.sql.catalogImplementation", "in-memory")
.set("spark.sql.catalog.demo.type", "hadoop") # Iceberg catalog type
.set("spark.executor.heartbeatInterval", "300000")
.set("spark.network.timeout", "400000")
)
spark = SparkSession.builder.config(conf=conf).getOrCreate()
# Disable below line to see INFO logs
spark.sparkContext.setLogLevel("ERROR")
def load_config(spark_context: SparkContext):
spark_context._jsc.hadoopConfiguration().set("fs.s3a.access.key", os.getenv("AWS_ACCESS_KEY_ID", "openlakeuser"))
spark_context._jsc.hadoopConfiguration().set("fs.s3a.secret.key",
os.getenv("AWS_SECRET_ACCESS_KEY", "openlakeuser"))
spark_context._jsc.hadoopConfiguration().set("fs.s3a.endpoint", os.getenv("ENDPOINT", "play.min.io:50000"))
spark_context._jsc.hadoopConfiguration().set("fs.s3a.connection.ssl.enabled", "true")
spark_context._jsc.hadoopConfiguration().set("fs.s3a.path.style.access", "true")
spark_context._jsc.hadoopConfiguration().set("fs.s3a.attempts.maximum", "1")
spark_context._jsc.hadoopConfiguration().set("fs.s3a.connection.establish.timeout", "5000")
spark_context._jsc.hadoopConfiguration().set("fs.s3a.connection.timeout", "10000")
load_config(spark.sparkContext)
# Define schema for NYC Taxi Data
schema = StructType([
StructField('VendorID', LongType(), True),
StructField('tpep_pickup_datetime', StringType(), True),
StructField('tpep_dropoff_datetime', StringType(), True),
StructField('passenger_count', DoubleType(), True),
StructField('trip_distance', DoubleType(), True),
StructField('RatecodeID', DoubleType(), True),
StructField('store_and_fwd_flag', StringType(), True),
StructField('PULocationID', LongType(), True),
StructField('DOLocationID', LongType(), True),
StructField('payment_type', LongType(), True),
StructField('fare_amount', DoubleType(), True),
StructField('extra', DoubleType(), True),
StructField('mta_tax', DoubleType(), True),
StructField('tip_amount', DoubleType(), True),
StructField('tolls_amount', DoubleType(), True),
StructField('improvement_surcharge', DoubleType(), True),
StructField('total_amount', DoubleType(), True)])
# Read CSV file from MinIO
df = spark.read.option("header", "true").schema(schema).csv(
os.getenv("INPUT_PATH", "s3a://openlake/spark/sample-data/taxi-data.csv"))
# Create Iceberg table "nyc.taxis_large" from RDD
df.write.mode("overwrite").saveAsTable("nyc.taxis_large")
# Query table row count
count_df = spark.sql("SELECT COUNT(*) AS cnt FROM nyc.taxis_large")
total_rows_count = count_df.first().cnt
logger.info(f"Total Rows for NYC Taxi Data: {total_rows_count}")
# Rename column "fare_amount" in nyc.taxis_large to "fare"
spark.sql("ALTER TABLE nyc.taxis_large RENAME COLUMN fare_amount TO fare")
# Rename column "trip_distance" in nyc.taxis_large to "distance"
spark.sql("ALTER TABLE nyc.taxis_large RENAME COLUMN trip_distance TO distance")
# Add description to the new column "distance"
spark.sql(
"ALTER TABLE nyc.taxis_large ALTER COLUMN distance COMMENT 'The elapsed trip distance in miles reported by the taximeter.'")
# Move "distance" next to "fare" column
spark.sql("ALTER TABLE nyc.taxis_large ALTER COLUMN distance AFTER fare")
# Add new column "fare_per_distance" of type float
spark.sql("ALTER TABLE nyc.taxis_large ADD COLUMN fare_per_distance FLOAT AFTER distance")
# Check the snapshots available
snap_df = spark.sql("SELECT * FROM nyc.taxis_large.snapshots")
snap_df.show() # prints all the available snapshots (1 till now)
# Populate the new column "fare_per_distance"
logger.info("Populating fare_per_distance column...")
spark.sql("UPDATE nyc.taxis_large SET fare_per_distance = fare/distance")
# Check the snapshots available
logger.info("Checking snapshots...")
snap_df = spark.sql("SELECT * FROM nyc.taxis_large.snapshots")
snap_df.show() # prints all the available snapshots (2 now) since previous operation will create a new snapshot
# Qurey the table to see the results
res_df = spark.sql("""SELECT VendorID
,tpep_pickup_datetime
,tpep_dropoff_datetime
,fare
,distance
,fare_per_distance
FROM nyc.taxis_large LIMIT 15""")
res_df.show()
# Delete rows from "fare_per_distance" based on criteria
logger.info("Deleting rows from fare_per_distance column...")
spark.sql("DELETE FROM nyc.taxis_large WHERE fare_per_distance > 4.0 OR distance > 2.0")
spark.sql("DELETE FROM nyc.taxis_large WHERE fare_per_distance IS NULL")
# Check the snapshots available
logger.info("Checking snapshots...")
snap_df = spark.sql("SELECT * FROM nyc.taxis_large.snapshots")
snap_df.show() # prints all the available snapshots (4 now) since previous operations will create 2 new snapshots
# Query table row count
count_df = spark.sql("SELECT COUNT(*) AS cnt FROM nyc.taxis_large")
total_rows_count = count_df.first().cnt
logger.info(f"Total Rows for NYC Taxi Data after delete operations: {total_rows_count}")
# Partition table based on "VendorID" column
logger.info("Partitioning table based on VendorID column...")
spark.sql("ALTER TABLE nyc.taxis_large ADD PARTITION FIELD VendorID")
# Query Metadata tables like snapshot, files, history
logger.info("Querying Snapshot table...")
snapshots_df = spark.sql("SELECT * FROM nyc.taxis_large.snapshots ORDER BY committed_at")
snapshots_df.show() # shows all the snapshots in ascending order of committed_at column
logger.info("Querying Files table...")
files_count_df = spark.sql("SELECT COUNT(*) AS cnt FROM nyc.taxis_large.files")
total_files_count = files_count_df.first().cnt
logger.info(f"Total Data Files for NYC Taxi Data: {total_files_count}")
spark.sql("""SELECT file_path,
file_format,
record_count,
null_value_counts,
lower_bounds,
upper_bounds
FROM nyc.taxis_large.files LIMIT 1""").show()
# Query history table
logger.info("Querying History table...")
hist_df = spark.sql("SELECT * FROM nyc.taxis_large.history")
hist_df.show()
# Time travel to initial snapshot
logger.info("Time Travel to initial snapshot...")
snap_df = spark.sql("SELECT snapshot_id FROM nyc.taxis_large.history LIMIT 1")
spark.sql(f"CALL demo.system.rollback_to_snapshot('nyc.taxis_large', {snap_df.first().snapshot_id})")
# Qurey the table to see the results
res_df = spark.sql("""SELECT VendorID
,tpep_pickup_datetime
,tpep_dropoff_datetime
,fare
,distance
,fare_per_distance
FROM nyc.taxis_large LIMIT 15""")
res_df.show()
# Query history table
logger.info("Querying History table...")
hist_df = spark.sql("SELECT * FROM nyc.taxis_large.history")
hist_df.show() # 1 new row
# Query table row count
count_df = spark.sql("SELECT COUNT(*) AS cnt FROM nyc.taxis_large")
total_rows_count = count_df.first().cnt
logger.info(f"Total Rows for NYC Taxi Data after time travel: {total_rows_count}")
創(chuàng)建sparkapp,保存為spark-iceberg-minio.yaml
apiVersion: "sparkoperator.k8s.io/v1beta2"
kind: SparkApplication
metadata:
name: spark-iceberg-minio
namespace: spark-operator
spec:
type: Python
pythonVersion: "3"
mode: cluster
image: "swr.cn-north-4.myhuaweicloud.com/xiowang/spark-minio:3.3.2"
imagePullPolicy: Always
mainApplicationFile: local:///app/main-iceberg.py
sparkVersion: "3.3.2"
restartPolicy:
type: OnFailure
onFailureRetries: 3
onFailureRetryInterval: 10
onSubmissionFailureRetries: 5
onSubmissionFailureRetryInterval: 20
driver:
cores: 1
memory: "1024m"
labels:
version: 3.3.2
serviceAccount: my-release-spark
env:
- name: INPUT_PATH
value: "s3a://openlake/spark/sample-data/rows.csv"
- name: OUTPUT_PATH
value: "s3a://openlake/spark/result/taxi"
- name: SSL_ENABLED
value: "true"
- name: WAREHOUSE
value: "s3a://openlake/warehouse"
- name: AWS_REGION
valueFrom:
secretKeyRef:
name: minio-secret
key: AWS_REGION
- name: AWS_ACCESS_KEY_ID
valueFrom:
secretKeyRef:
name: minio-secret
key: AWS_ACCESS_KEY_ID
- name: AWS_SECRET_ACCESS_KEY
valueFrom:
secretKeyRef:
name: minio-secret
key: AWS_SECRET_ACCESS_KEY
- name: ENDPOINT
valueFrom:
secretKeyRef:
name: minio-secret
key: ENDPOINT
executor:
cores: 1
instances: 3
memory: "1024m"
labels:
version: 3.3.2
env:
- name: INPUT_PATH
value: "s3a://openlake/spark/sample-data/rows.csv"
- name: OUTPUT_PATH
value: "s3a://openlake/spark/result/taxi"
- name: SSL_ENABLED
value: "true"
- name: WAREHOUSE
value: "s3a://openlake/warehouse"
- name: AWS_REGION
valueFrom:
secretKeyRef:
name: minio-secret
key: AWS_REGION
- name: AWS_ACCESS_KEY_ID
valueFrom:
secretKeyRef:
name: minio-secret
key: AWS_ACCESS_KEY_ID
- name: AWS_SECRET_ACCESS_KEY
valueFrom:
secretKeyRef:
name: minio-secret
key: AWS_SECRET_ACCESS_KEY
- name: ENDPOINT
valueFrom:
secretKeyRef:
name: minio-secret
key: ENDPOINT
小結(jié)
本文主要參考o(jì)penlake的guide體驗了一下k8s環(huán)境下spark如何處理minio中的數(shù)據(jù)。能感受到spark對應(yīng)的生態(tài)比較完善,對于結(jié)構(gòu)化和半結(jié)構(gòu)化的數(shù)據(jù)處理起來非常方便。當(dāng)然也有一些不太習(xí)慣的地方,比如iceberg這些中間件都是通過jar包的方式被引入,而不是通過中間件服務(wù),這就意味著更新中間件需要去更新容器鏡像。
另外openlake原文中還有dremio做查詢層,利用kakfa進行spark stream處理的內(nèi)容。感興趣的同學(xué)可以去試一下。