openlake:搭建云原生數(shù)據(jù)湖

大數(shù)據(jù)和云原生一直都是基建方面的兩個熱點,而現(xiàn)在越來越多的大數(shù)據(jù)基建逐漸往云原生方向發(fā)展,例如云原生的消息隊列Pulsar,又例如Snowflake提供云原生的數(shù)倉。因此筆者想要探索大數(shù)據(jù)和云原生的結(jié)合,于是發(fā)現(xiàn)了一個非常有意思的項目Openlake,該項目掛在minio下,是在kubernetes環(huán)境下,利用minion,spark,kafka,dremio,iceberg搭建一套數(shù)據(jù)湖,非常適合學(xué)習(xí),本文主要就是記錄搭建過程和心得。

0.準(zhǔn)備kubernetes環(huán)境

如果已經(jīng)有集群可以跳過本節(jié),我這邊想快速做實驗所以采用docker-compose的方式在linux上搭建k3s

安裝docker compose,參考guide,執(zhí)行如下命令:

sudo curl -L "https://github.com/docker/compose/releases/download/1.25.0/docker-compose-$(uname -s)-$(uname -m)" -o /usr/local/bin/docker-compose
sudo chmod +x /usr/local/bin/docker-compose

本地創(chuàng)建一個目錄用于保存k3s的配置和數(shù)據(jù):

mkdir -p /home/xiowang/k3s
mkdir -p /home/xiowang/k3s/data

創(chuàng)建docker-compose.yaml用于k3s啟動:

#vim /home/xiowang/k3s/docker-compose.yaml
version: '3'
services:
  server:
    image: rancher/k3s:v1.20.2-k3s1
    container_name: k3s_server
    hostname: xiowang.dev
    command: server --tls-san=xiowang.dev
    tmpfs:
    - /run
    - /var/run
    privileged: true
    restart: always
    ports:
    - 6443:6443
    - 443:30443
    - 80:30080
    - 50050:30050
    - 50051:30051
    environment:
    - K3S_TOKEN=16963276443662
    - K3S_KUBECONFIG_OUTPUT=/root/.kube/config
    - K3S_KUBECONFIG_MODE=600
    volumes:
    - /var/lib/rancher/k3s:/var/lib/rancher/k3s
    - /etc/rancher:/etc/rancher
    - /home/xiowang/k3s/.kube:/root/.kube
    - /home/xiowang/k3s/data:/data:shared,rw

上面的配置,我們主要做了:

  1. 通過本機的6443用于訪問kubernetes的apiserver,方便kubectl進行管理;
  2. 通過本機的443和80分別映射集群的nodeport:30443和30080;
  3. 把kubeconfig保存到/home/xiowang/k3s/.kube;
  4. 掛載集群的/data目錄到/home/xiowang/k3s/data;
  5. 本地的32000和32001用于后續(xù)暴露minio的端口;

開始啟動k3s:

cd /home/xiowang/k3s
docker-compose up -d

因為本機80和443對應(yīng)集群的nodeport:30443和30080,所以這里改一下trafik的service,將其80和443的nodeport分別指向30080和30443:

#kubectl -n kube-system edit svc traefik
apiVersion: v1
kind: Service
metadata:
  annotations:
    meta.helm.sh/release-name: traefik
    meta.helm.sh/release-namespace: kube-system
  creationTimestamp: "2023-05-17T09:29:42Z"
  labels:
    app: traefik
    app.kubernetes.io/managed-by: Helm
    chart: traefik-1.81.0
    heritage: Helm
    release: traefik
  name: traefik
  namespace: kube-system
  resourceVersion: "368985"
  uid: 7bbb1758-ca01-4e84-b166-dae950613adf
spec:
  clusterIP: 10.43.115.234
  clusterIPs:
  - 10.43.115.234
  externalTrafficPolicy: Cluster
  ports:
  - name: http
    nodePort: 30080
    port: 80
    protocol: TCP
    targetPort: http
  - name: https
    nodePort: 30443
    port: 443
    protocol: TCP
    targetPort: https
  selector:
    app: traefik
    release: traefik
  sessionAffinity: None
  type: LoadBalancer

拷貝/home/xiowang/k3s/.kube/config到本機的~/.kube/config(建議使用kubecm來管理)

cp /home/xiowang/k3s/.kube/config ~/.kube/config

接下來我們就可以開始o(jì)penlake之旅了

1.安裝和配置minio

在kubernetes上安裝minio,有兩種推薦方式:

  1. operator的方式安裝minio,參考guide
  2. helm安裝minio,參考guide

這里暫時沒時間學(xué)習(xí)operator的CRD,所以采用helm安裝minio,步驟如下

helm repo add minio https://charts.min.io/
#設(shè)置密碼和磁盤大小,默認(rèn)也沒有tls(根據(jù)需求改一下,我就20Gi用于測試)
helm upgrade --install --namespace minio --set rootUser=rootuser,rootPassword=rootpass123,persistence.size=20Gi,resources.requests.memory=100Mi,resources.limits.memory=2Gi,replicas=3,service.type=NodePort,consoleService.type=NodePort one --create-namespace minio/minio

成功部署日志:

espace minio/minio
NAME: one
LAST DEPLOYED: Mon May 22 12:22:31 2023
NAMESPACE: minio
STATUS: deployed
REVISION: 1
TEST SUITE: None
NOTES:
MinIO can be accessed via port 9000 on the following DNS name from within your cluster:
one-minio.minio.svc.cluster.local

To access MinIO from localhost, run the below commands:

  1. export POD_NAME=$(kubectl get pods --namespace minio -l "release=one" -o jsonpath="{.items[0].metadata.name}")

  2. kubectl port-forward $POD_NAME 9000 --namespace minio

Read more about port forwarding here: http://kubernetes.io/docs/user-guide/kubectl/kubectl_port-forward/

You can now access MinIO server on http://localhost:9000. Follow the below steps to connect to MinIO server with mc client:

  1. Download the MinIO mc client - https://min.io/docs/minio/linux/reference/minio-mc.html#quickstart

  2. export MC_HOST_one-minio-local=http://$(kubectl get secret --namespace minio one-minio -o jsonpath="{.data.rootUser}" | base64 --decode):$(kubectl get secret --namespace minio one-minio -o jsonpath="{.data.rootPassword}" | base64 --decode)@localhost:9000

  3. mc ls one-minio-local

這里注意上面日志中的MC_HOST_one-minio-local環(huán)境變量名似乎是非法的,所以我換成了MC_HOST_one_minio_local:

  2. export MC_HOST_one_minio_local=http://$(kubectl get secret --namespace minio one-minio -o jsonpath="{.data.rootUser}" | base64 --decode):$(kubectl get secret --namespace minio one-minio -o jsonpath="{.data.rootPassword}" | base64 --decode)@localhost:9000

  3. mc ls one_minio_local

若想訪問minio的console控制臺,則forward 9001端口,再用root賬號登陸localhost:9001:

export POD_NAME=$(kubectl get pods --namespace minio -l "release=one" -o jsonpath="{.items[0].metadata.name}")

kubectl port-forward $POD_NAME 9001 --namespace minio

而訪問bucket則是9000端口

export POD_NAME=$(kubectl get pods --namespace minio -l "release=one" -o jsonpath="{.items[0].metadata.name}")
kubectl port-forward $POD_NAME 9000 --namespace minio

2.搭建spark on k8s

spark on k8s是google發(fā)起的一個開源項目(不是google的官方產(chǎn)品),使用operator對k8s的資源進行調(diào)度,方便spark對接k8s。

spark on k8s采用helm安裝,安裝命令如下:

helm repo add spark-operator https://googlecloudplatform.github.io/spark-on-k8s-operator
helm install my-release spark-operator/spark-operator \
--namespace spark-operator \
--set webhook.enable=true \
--set image.repository=openlake/spark-operator \
--set image.tag=3.3.2 \
--create-namespace

驗證部署結(jié)果:

kubectl get pods -n spark-operator

應(yīng)該能看到my-release的pod:

NAME                                           READY   STATUS      RESTARTS   AGE
my-release-spark-operator-6547984586-xzw4p     1/1     Running     2          4d20h

參考例子部署一個spark應(yīng)用,保存為spark-pi.yaml用于計算pi(這里注意之前helm會在spark-operator的namespace中為serviceAccount: my-release-spark部署rbac,因此這里spark app都在spark-operator的namespace中,使用serviceAccount: my-release-spark運行):

apiVersion: "sparkoperator.k8s.io/v1beta2"
kind: SparkApplication
metadata:
    name: pyspark-pi
    namespace: spark-operator
spec:
    type: Python
    pythonVersion: "3"
    mode: cluster
    image: "openlake/spark-py:3.3.2"
    imagePullPolicy: Always
    mainApplicationFile: local:///opt/spark/examples/src/main/python/pi.py
    sparkVersion: "3.3.2"
    restartPolicy:
        type: OnFailure
        onFailureRetries: 3
        onFailureRetryInterval: 10
        onSubmissionFailureRetries: 5
        onSubmissionFailureRetryInterval: 20
    driver:
        cores: 1
        coreLimit: "1200m"
        memory: "512m"
        labels:
            version: 3.3.2
        serviceAccount: my-release-spark
    executor:
        cores: 1
        instances: 1
        memory: "512m"
        labels:
            version: 3.3.2
kubectl apply -f spark-pi.yaml

查看sparkapp和pods:

#kubectl -n spark-operator get sparkapp,pod
NAME                                               STATUS      ATTEMPTS   START                  FINISH                 AGE
sparkapplication.sparkoperator.k8s.io/pyspark-pi   COMPLETED   1          2023-05-22T06:43:24Z   2023-05-22T06:44:37Z   4m50s

NAME                                               READY   STATUS      RESTARTS   AGE
pod/my-release-spark-operator-webhook-init-xzx9c   0/1     Completed   0          4d20h
pod/my-release-spark-operator-6547984586-xzw4p     1/1     Running     2          4d20h
pod/pyspark-pi-driver                              0/1     Completed   0          4m46s

查看最后20行l(wèi)og, 可以看到DAGScheduler調(diào)度完成的日志:

#kubectl logs pyspark-pi-driver -n spark-operator --tail 10
23/05/22 06:44:35 INFO TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks have all completed, from pool
23/05/22 06:44:35 INFO DAGScheduler: ResultStage 0 (reduce at /opt/spark/examples/src/main/python/pi.py:42) finished in 1.571 s
23/05/22 06:44:35 INFO DAGScheduler: Job 0 is finished. Cancelling potential speculative or zombie tasks for this job
23/05/22 06:44:35 INFO TaskSchedulerImpl: Killing all running tasks in stage 0: Stage finished
23/05/22 06:44:35 INFO DAGScheduler: Job 0 finished: reduce at /opt/spark/examples/src/main/python/pi.py:42, took 1.615592 s
Pi is roughly 3.145160
23/05/22 06:44:35 INFO SparkUI: Stopped Spark web UI at http://pyspark-pi-60e9d1884232c31f-driver-svc.spark-operator.svc:4040
23/05/22 06:44:35 INFO KubernetesClusterSchedulerBackend: Shutting down all executors
23/05/22 06:44:35 INFO KubernetesClusterSchedulerBackend$KubernetesDriverEndpoint: Asking each executor to shut down
23/05/22 06:44:35 WARN ExecutorPodsWatchSnapshotSource: Kubernetes client has been closed.
23/05/22 06:44:36 INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped!
23/05/22 06:44:36 INFO MemoryStore: MemoryStore cleared
23/05/22 06:44:36 INFO BlockManager: BlockManager stopped
23/05/22 06:44:36 INFO BlockManagerMaster: BlockManagerMaster stopped
23/05/22 06:44:36 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped!
23/05/22 06:44:36 INFO SparkContext: Successfully stopped SparkContext
23/05/22 06:44:36 INFO ShutdownHookManager: Shutdown hook called
23/05/22 06:44:36 INFO ShutdownHookManager: Deleting directory /var/data/spark-4e8e1507-e941-4437-b0b5-18818fc8865f/spark-4f604216-aeab-4679-83ce-f2527613ec66
23/05/22 06:44:36 INFO ShutdownHookManager: Deleting directory /tmp/spark-00ddbc21-98da-4ad8-9905-fcf0e8d64129
23/05/22 06:44:36 INFO ShutdownHookManager: Deleting directory /var/data/spark-4e8e1507-e941-4437-b0b5-18818fc8865f/spark-4f604216-aeab-4679-83ce-f2527613ec66/pyspark-10201045-5307-40dc-b6b2-d7e5447763c4

3.使用spark分析minio上的數(shù)據(jù)

準(zhǔn)備好dockerfile,因為和編譯命令,因為后續(xù)會經(jīng)常編輯py文件和推鏡像.
這里直接使用openlake原文的鏡像作為baseimage,因為里面提前安裝spark的各種依賴

FROM openlake/sparkjob-demo:3.3.2

WORKDIR /app

COPY *.py .

編譯和push dockerimage命令如下(因為dockerhub經(jīng)常掛,所以我在華為云上開了一個鏡像倉庫,可以根據(jù)自身情況修改一下):

docker build -t xiowang/spark-minio:3.3.2 .
docker tag xiowang/spark-minio:3.3.2 swr.cn-north-4.myhuaweicloud.com/xiowang/spark-minio:3.3.2
 docker push swr.cn-north-4.myhuaweicloud.com/xiowang/spark-minio:3.3.2

下載紐約出租車司機數(shù)據(jù)(~112M rows and ~10GB in size):

wget  https://data.cityofnewyork.us/api/views/t29m-gskq/rows.csv ./

minio中創(chuàng)建bucket:

 mc mb one_minio_local/openlake/spark/sample-data

拷貝出租車數(shù)據(jù)到minio

mc cp rows.csv one_minio_local/openlake/spark/sample-data/

進到j(luò)upyter中安裝pyspark

pip3 install pyspark

在minio中創(chuàng)建對one_minio_local/openlake的讀寫權(quán)限的用戶,并申請key和secret,如下

export AWS_ACCESS_KEY_ID=3436ZpuHMvI5EEoR
export AWS_SECRET_ACCESS_KEY=6US0FDsSFdlg5DzbWPPJtS1UeL75Rb0G
export ENDPOINT=one-minio.minio:9000
export OUTPUT_PATH=s3a://openlake/spark/result/taxi
export INPUT_PATH=s3a://openlake/spark/sample-data/rows.csv

創(chuàng)建k8s secret,保存上述信息:

kubectl create secret generic minio-secret \
    --from-literal=AWS_ACCESS_KEY_ID=3436ZpuHMvI5EEoR \
    --from-literal=AWS_SECRET_ACCESS_KEY=6US0FDsSFdlg5DzbWPPJtS1UeL75Rb0G \
    --from-literal=ENDPOINT=http://one-minio.minio:9000 \
    --from-literal=AWS_REGION=us-east-1 \
    --namespace spark-operator

部署sparkapp:

apiVersion: "sparkoperator.k8s.io/v1beta2"
kind: SparkApplication
metadata:
    name: spark-minio
    namespace: spark-operator
spec:
    type: Python
    pythonVersion: "3"
    mode: cluster
    image: "swr.cn-north-4.myhuaweicloud.com/xiowang/spark-minio:3.3.2"
    imagePullPolicy: Always
    mainApplicationFile: local:///app/main.py
    sparkVersion: "3.3.2"
    restartPolicy:
        type: OnFailure
        onFailureRetries: 3
        onFailureRetryInterval: 10
        onSubmissionFailureRetries: 5
        onSubmissionFailureRetryInterval: 20
    driver:
        cores: 1
        memory: "1024m"
        labels:
            version: 3.3.2
        serviceAccount: my-release-spark
        env:
            - name: INPUT_PATH
              value: "s3a://openlake/spark/sample-data/rows.csv"
            - name: OUTPUT_PATH
              value: "s3a://openlake/spark/result/taxi"
            - name: SSL_ENABLED
              value: "true"
            - name: AWS_REGION
              valueFrom:
                  secretKeyRef:
                      name: minio-secret
                      key: AWS_REGION
            - name: AWS_ACCESS_KEY_ID
              valueFrom:
                  secretKeyRef:
                      name: minio-secret
                      key: AWS_ACCESS_KEY_ID
            - name: AWS_SECRET_ACCESS_KEY
              valueFrom:
                  secretKeyRef:
                      name: minio-secret
                      key: AWS_SECRET_ACCESS_KEY
            - name: ENDPOINT
              valueFrom:
                  secretKeyRef:
                      name: minio-secret
                      key: ENDPOINT
    executor:
        cores: 1
        instances: 3
        memory: "1024m"
        labels:
            version: 3.3.2
        env:
            - name: INPUT_PATH
              value: "s3a://openlake/spark/sample-data/rows.csv"
            - name: OUTPUT_PATH
              value: "s3a://openlake/spark/result/taxi"
            - name: SSL_ENABLED
              value: "true"
            - name: AWS_REGION
              valueFrom:
                  secretKeyRef:
                      name: minio-secret
                      key: AWS_REGION
            - name: AWS_ACCESS_KEY_ID
              valueFrom:
                  secretKeyRef:
                      name: minio-secret
                      key: AWS_ACCESS_KEY_ID
            - name: AWS_SECRET_ACCESS_KEY
              valueFrom:
                  secretKeyRef:
                      name: minio-secret
                      key: AWS_SECRET_ACCESS_KEY
            - name: ENDPOINT
              valueFrom:
                  secretKeyRef:
                      name: minio-secret
                      key: ENDPOINT

這個sparkapp中的python腳本內(nèi)容如下,實際上做的就是統(tǒng)計每天超過6位乘客的信息:

import logging
import os

from pyspark import SparkContext
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, LongType, DoubleType, StringType

logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s")
logger = logging.getLogger("MinIOSparkJob")

spark = SparkSession.builder.getOrCreate()


def load_config(spark_context: SparkContext):
    spark_context._jsc.hadoopConfiguration().set("fs.s3a.access.key", os.getenv("AWS_ACCESS_KEY_ID", "openlakeuser"))
    spark_context._jsc.hadoopConfiguration().set("fs.s3a.secret.key",
                                                 os.getenv("AWS_SECRET_ACCESS_KEY", "openlakeuser"))
    spark_context._jsc.hadoopConfiguration().set("fs.s3a.endpoint", os.getenv("ENDPOINT", "play.min.io:50000"))
    # spark_context._jsc.hadoopConfiguration().set("fs.s3a.connection.ssl.enabled", os.getenv("SSL_ENABLED", "true"))
    spark_context._jsc.hadoopConfiguration().set("fs.s3a.path.style.access", "true")
    spark_context._jsc.hadoopConfiguration().set("fs.s3a.attempts.maximum", "1")
    # spark_context._jsc.hadoopConfiguration().set("fs.s3a.connection.establish.timeout", "5000")
    spark_context._jsc.hadoopConfiguration().set("fs.s3a.connection.timeout", "10000")


load_config(spark.sparkContext)


# Define schema for NYC Taxi Data
schema = StructType([
    StructField('VendorID', LongType(), True),
    StructField('tpep_pickup_datetime', StringType(), True),
    StructField('tpep_dropoff_datetime', StringType(), True),
    StructField('passenger_count', DoubleType(), True),
    StructField('trip_distance', DoubleType(), True),
    StructField('RatecodeID', DoubleType(), True),
    StructField('store_and_fwd_flag', StringType(), True),
    StructField('PULocationID', LongType(), True),
    StructField('DOLocationID', LongType(), True),
    StructField('payment_type', LongType(), True),
    StructField('fare_amount', DoubleType(), True),
    StructField('extra', DoubleType(), True),
    StructField('mta_tax', DoubleType(), True),
    StructField('tip_amount', DoubleType(), True),
    StructField('tolls_amount', DoubleType(), True),
    StructField('improvement_surcharge', DoubleType(), True),
    StructField('total_amount', DoubleType(), True)])

# Read CSV file from MinIO
df = spark.read.option("header", "true").schema(schema).csv(
    os.getenv("INPUT_PATH", "s3a://openlake/spark/sample-data/taxi-data.csv"))
# Filter dataframe based on passenger_count greater than 6
large_passengers_df = df.filter(df.passenger_count > 6)

total_rows_count = df.count()
filtered_rows_count = large_passengers_df.count()
# File Output Committer is used to write the output to the destination (Not recommended for Production)
large_passengers_df.write.format("csv").option("header", "true").save(
    os.getenv("OUTPUT_PATH", "s3a://openlake-tmp/spark/nyc/taxis_small"))

logger.info(f"Total Rows for NYC Taxi Data: {total_rows_count}")
logger.info(f"Total Rows for Passenger Count > 6: {filtered_rows_count}")

如果上面的代碼跑的有問題可以創(chuàng)建如下一個debug-pod,并進入pod進行debug

apiVersion: v1
kind: Pod
metadata:
  name: debug-pod
  namespace: spark-operator
spec:
  containers:
    - name: spark-minio
      image: swr.cn-north-4.myhuaweicloud.com/xiowang/spark-minio:3.3.2
      command: ["sleep"]
      args: ["infinity"]
      env:
          - name: INPUT_PATH
            value: "s3a://openlake/spark/sample-data/rows.csv"
          - name: OUTPUT_PATH
            value: "s3a://openlake/spark/result/taxi"
          - name: SSL_ENABLED
            value: "true"
          - name: AWS_REGION
            valueFrom:
                secretKeyRef:
                    name: minio-secret
                    key: AWS_REGION
          - name: AWS_ACCESS_KEY_ID
            valueFrom:
                secretKeyRef:
                    name: minio-secret
                    key: AWS_ACCESS_KEY_ID
          - name: AWS_SECRET_ACCESS_KEY
            valueFrom:
                secretKeyRef:
                    name: minio-secret
                    key: AWS_SECRET_ACCESS_KEY
          - name: ENDPOINT
            valueFrom:
                secretKeyRef:
                    name: minio-secret
                    key: ENDPOINT

最后查看日志,可以看到打印的日志

#kubectl -n spark-operator logs spark-minio-driver
2023-05-25 01:13:49,104 - MinIOSparkJob - INFO - Total Rows for NYC Taxi Data: 112234626
2023-05-25 01:13:49,104 - MinIOSparkJob - INFO - Total Rows for Passenger Count > 6: 1066

4.spark中使用iceberg分析minio上的數(shù)據(jù)

iceberg是Netflix開源的一款軟件,簡單來說就是方便大數(shù)據(jù)工程師通過sql方式操作csv,parquet等文件,并且支持snapshot,具體可以見官網(wǎng)介紹。
原文中的一些地址寫死了,這里我修改了一下,保存為main-iceberg.py

import logging
import os
from pyspark import SparkConf
from pyspark import SparkContext
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, LongType, DoubleType, StringType

logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s")
logger = logging.getLogger("MinIOSparkJob")


# adding iceberg configs
conf = (
    SparkConf()
    .set("spark.sql.extensions",
         "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions") # Use Iceberg with Spark
    .set("spark.sql.catalog.demo", "org.apache.iceberg.spark.SparkCatalog")
    .set("spark.sql.catalog.demo.io-impl", "org.apache.iceberg.aws.s3.S3FileIO")
    .set("spark.sql.catalog.demo.warehouse", os.getenv("WAREHOUSE", "s3a://openlake/warehouse/"))
    .set("spark.sql.catalog.demo.s3.endpoint", os.getenv("ENDPOINT", "play.min.io:50000"))
    .set("spark.sql.defaultCatalog", "demo") # Name of the Iceberg catalog
    .set("spark.sql.catalogImplementation", "in-memory")
    .set("spark.sql.catalog.demo.type", "hadoop") # Iceberg catalog type
    .set("spark.executor.heartbeatInterval", "300000")
    .set("spark.network.timeout", "400000")
)

spark = SparkSession.builder.config(conf=conf).getOrCreate()

# Disable below line to see INFO logs
spark.sparkContext.setLogLevel("ERROR")


def load_config(spark_context: SparkContext):
    spark_context._jsc.hadoopConfiguration().set("fs.s3a.access.key", os.getenv("AWS_ACCESS_KEY_ID", "openlakeuser"))
    spark_context._jsc.hadoopConfiguration().set("fs.s3a.secret.key",
                                                 os.getenv("AWS_SECRET_ACCESS_KEY", "openlakeuser"))
    spark_context._jsc.hadoopConfiguration().set("fs.s3a.endpoint", os.getenv("ENDPOINT", "play.min.io:50000"))
    spark_context._jsc.hadoopConfiguration().set("fs.s3a.connection.ssl.enabled", "true")
    spark_context._jsc.hadoopConfiguration().set("fs.s3a.path.style.access", "true")
    spark_context._jsc.hadoopConfiguration().set("fs.s3a.attempts.maximum", "1")
    spark_context._jsc.hadoopConfiguration().set("fs.s3a.connection.establish.timeout", "5000")
    spark_context._jsc.hadoopConfiguration().set("fs.s3a.connection.timeout", "10000")


load_config(spark.sparkContext)

# Define schema for NYC Taxi Data
schema = StructType([
    StructField('VendorID', LongType(), True),
    StructField('tpep_pickup_datetime', StringType(), True),
    StructField('tpep_dropoff_datetime', StringType(), True),
    StructField('passenger_count', DoubleType(), True),
    StructField('trip_distance', DoubleType(), True),
    StructField('RatecodeID', DoubleType(), True),
    StructField('store_and_fwd_flag', StringType(), True),
    StructField('PULocationID', LongType(), True),
    StructField('DOLocationID', LongType(), True),
    StructField('payment_type', LongType(), True),
    StructField('fare_amount', DoubleType(), True),
    StructField('extra', DoubleType(), True),
    StructField('mta_tax', DoubleType(), True),
    StructField('tip_amount', DoubleType(), True),
    StructField('tolls_amount', DoubleType(), True),
    StructField('improvement_surcharge', DoubleType(), True),
    StructField('total_amount', DoubleType(), True)])

# Read CSV file from MinIO
df = spark.read.option("header", "true").schema(schema).csv(
    os.getenv("INPUT_PATH", "s3a://openlake/spark/sample-data/taxi-data.csv"))

# Create Iceberg table "nyc.taxis_large" from RDD
df.write.mode("overwrite").saveAsTable("nyc.taxis_large")

# Query table row count
count_df = spark.sql("SELECT COUNT(*) AS cnt FROM nyc.taxis_large")
total_rows_count = count_df.first().cnt
logger.info(f"Total Rows for NYC Taxi Data: {total_rows_count}")

# Rename column "fare_amount" in nyc.taxis_large to "fare"
spark.sql("ALTER TABLE nyc.taxis_large RENAME COLUMN fare_amount TO fare")

# Rename column "trip_distance" in nyc.taxis_large to "distance"
spark.sql("ALTER TABLE nyc.taxis_large RENAME COLUMN trip_distance TO distance")

# Add description to the new column "distance"
spark.sql(
    "ALTER TABLE nyc.taxis_large ALTER COLUMN distance COMMENT 'The elapsed trip distance in miles reported by the taximeter.'")

# Move "distance" next to "fare" column
spark.sql("ALTER TABLE nyc.taxis_large ALTER COLUMN distance AFTER fare")

# Add new column "fare_per_distance" of type float
spark.sql("ALTER TABLE nyc.taxis_large ADD COLUMN fare_per_distance FLOAT AFTER distance")

# Check the snapshots available
snap_df = spark.sql("SELECT * FROM nyc.taxis_large.snapshots")
snap_df.show()  # prints all the available snapshots (1 till now)

# Populate the new column "fare_per_distance"
logger.info("Populating fare_per_distance column...")
spark.sql("UPDATE nyc.taxis_large SET fare_per_distance = fare/distance")

# Check the snapshots available
logger.info("Checking snapshots...")
snap_df = spark.sql("SELECT * FROM nyc.taxis_large.snapshots")
snap_df.show()  # prints all the available snapshots (2 now) since previous operation will create a new snapshot

# Qurey the table to see the results
res_df = spark.sql("""SELECT VendorID
                            ,tpep_pickup_datetime
                            ,tpep_dropoff_datetime
                            ,fare
                            ,distance
                            ,fare_per_distance
                            FROM nyc.taxis_large LIMIT 15""")
res_df.show()

# Delete rows from "fare_per_distance" based on criteria
logger.info("Deleting rows from fare_per_distance column...")
spark.sql("DELETE FROM nyc.taxis_large WHERE fare_per_distance > 4.0 OR distance > 2.0")
spark.sql("DELETE FROM nyc.taxis_large WHERE fare_per_distance IS NULL")

# Check the snapshots available
logger.info("Checking snapshots...")
snap_df = spark.sql("SELECT * FROM nyc.taxis_large.snapshots")
snap_df.show()  # prints all the available snapshots (4 now) since previous operations will create 2 new snapshots

# Query table row count
count_df = spark.sql("SELECT COUNT(*) AS cnt FROM nyc.taxis_large")
total_rows_count = count_df.first().cnt
logger.info(f"Total Rows for NYC Taxi Data after delete operations: {total_rows_count}")

# Partition table based on "VendorID" column
logger.info("Partitioning table based on VendorID column...")
spark.sql("ALTER TABLE nyc.taxis_large ADD PARTITION FIELD VendorID")

# Query Metadata tables like snapshot, files, history
logger.info("Querying Snapshot table...")
snapshots_df = spark.sql("SELECT * FROM nyc.taxis_large.snapshots ORDER BY committed_at")
snapshots_df.show()  # shows all the snapshots in ascending order of committed_at column

logger.info("Querying Files table...")
files_count_df = spark.sql("SELECT COUNT(*) AS cnt FROM nyc.taxis_large.files")
total_files_count = files_count_df.first().cnt
logger.info(f"Total Data Files for NYC Taxi Data: {total_files_count}")

spark.sql("""SELECT file_path,
                    file_format,
                    record_count,
                    null_value_counts,
                    lower_bounds,
                    upper_bounds
                    FROM nyc.taxis_large.files LIMIT 1""").show()

# Query history table
logger.info("Querying History table...")
hist_df = spark.sql("SELECT * FROM nyc.taxis_large.history")
hist_df.show()

# Time travel to initial snapshot
logger.info("Time Travel to initial snapshot...")
snap_df = spark.sql("SELECT snapshot_id FROM nyc.taxis_large.history LIMIT 1")
spark.sql(f"CALL demo.system.rollback_to_snapshot('nyc.taxis_large', {snap_df.first().snapshot_id})")

# Qurey the table to see the results
res_df = spark.sql("""SELECT VendorID
                            ,tpep_pickup_datetime
                            ,tpep_dropoff_datetime
                            ,fare
                            ,distance
                            ,fare_per_distance
                            FROM nyc.taxis_large LIMIT 15""")
res_df.show()

# Query history table
logger.info("Querying History table...")
hist_df = spark.sql("SELECT * FROM nyc.taxis_large.history")
hist_df.show()  # 1 new row

# Query table row count
count_df = spark.sql("SELECT COUNT(*) AS cnt FROM nyc.taxis_large")
total_rows_count = count_df.first().cnt
logger.info(f"Total Rows for NYC Taxi Data after time travel: {total_rows_count}")

創(chuàng)建sparkapp,保存為spark-iceberg-minio.yaml

apiVersion: "sparkoperator.k8s.io/v1beta2"
kind: SparkApplication
metadata:
    name: spark-iceberg-minio
    namespace: spark-operator
spec:
    type: Python
    pythonVersion: "3"
    mode: cluster
    image: "swr.cn-north-4.myhuaweicloud.com/xiowang/spark-minio:3.3.2"
    imagePullPolicy: Always
    mainApplicationFile: local:///app/main-iceberg.py
    sparkVersion: "3.3.2"
    restartPolicy:
        type: OnFailure
        onFailureRetries: 3
        onFailureRetryInterval: 10
        onSubmissionFailureRetries: 5
        onSubmissionFailureRetryInterval: 20
    driver:
        cores: 1
        memory: "1024m"
        labels:
            version: 3.3.2
        serviceAccount: my-release-spark
        env:
            - name: INPUT_PATH
              value: "s3a://openlake/spark/sample-data/rows.csv"
            - name: OUTPUT_PATH
              value: "s3a://openlake/spark/result/taxi"
            - name: SSL_ENABLED
              value: "true"
            - name: WAREHOUSE
              value: "s3a://openlake/warehouse"
            - name: AWS_REGION
              valueFrom:
                  secretKeyRef:
                      name: minio-secret
                      key: AWS_REGION
            - name: AWS_ACCESS_KEY_ID
              valueFrom:
                  secretKeyRef:
                      name: minio-secret
                      key: AWS_ACCESS_KEY_ID
            - name: AWS_SECRET_ACCESS_KEY
              valueFrom:
                  secretKeyRef:
                      name: minio-secret
                      key: AWS_SECRET_ACCESS_KEY
            - name: ENDPOINT
              valueFrom:
                  secretKeyRef:
                      name: minio-secret
                      key: ENDPOINT
    executor:
        cores: 1
        instances: 3
        memory: "1024m"
        labels:
            version: 3.3.2
        env:
            - name: INPUT_PATH
              value: "s3a://openlake/spark/sample-data/rows.csv"
            - name: OUTPUT_PATH
              value: "s3a://openlake/spark/result/taxi"
            - name: SSL_ENABLED
              value: "true"
            - name: WAREHOUSE
              value: "s3a://openlake/warehouse"
            - name: AWS_REGION
              valueFrom:
                  secretKeyRef:
                      name: minio-secret
                      key: AWS_REGION
            - name: AWS_ACCESS_KEY_ID
              valueFrom:
                  secretKeyRef:
                      name: minio-secret
                      key: AWS_ACCESS_KEY_ID
            - name: AWS_SECRET_ACCESS_KEY
              valueFrom:
                  secretKeyRef:
                      name: minio-secret
                      key: AWS_SECRET_ACCESS_KEY
            - name: ENDPOINT
              valueFrom:
                  secretKeyRef:
                      name: minio-secret
                      key: ENDPOINT

小結(jié)

本文主要參考o(jì)penlake的guide體驗了一下k8s環(huán)境下spark如何處理minio中的數(shù)據(jù)。能感受到spark對應(yīng)的生態(tài)比較完善,對于結(jié)構(gòu)化和半結(jié)構(gòu)化的數(shù)據(jù)處理起來非常方便。當(dāng)然也有一些不太習(xí)慣的地方,比如iceberg這些中間件都是通過jar包的方式被引入,而不是通過中間件服務(wù),這就意味著更新中間件需要去更新容器鏡像。

另外openlake原文中還有dremio做查詢層,利用kakfa進行spark stream處理的內(nèi)容。感興趣的同學(xué)可以去試一下。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容