SpringBatch批處理框架+mysql倉(cāng)庫(kù)+web監(jiān)控實(shí)錄

1、概念

Spring Batch 是一款輕量級(jí)地適合企業(yè)級(jí)應(yīng)用的批處理框架,值得注意的是,不同于其他調(diào)度框架,Spring Batch不提供調(diào)度功能。


2、批處理過程

批處理可以分為以下幾個(gè)步驟:

  1. 讀取數(shù)據(jù)
  2. 按照業(yè)務(wù)處理數(shù)據(jù)
  3. 歸檔數(shù)據(jù)的過程

3、Spring Batch給我們提供了什么?

  1. 統(tǒng)一的讀寫接口
  2. 豐富的任務(wù)處理方式
  3. 靈活的事務(wù)管理及并發(fā)處理
  4. 日志、監(jiān)控、任務(wù)重啟與跳過等特性

4、基礎(chǔ)組件

名稱 用途
JobRepository 用于注冊(cè)和存儲(chǔ)Job的容器
JobLauncher 用于啟動(dòng)Job
Job 實(shí)際要執(zhí)行的作業(yè),包含一個(gè)或多個(gè)step
step 步驟,批處理的步驟一般包含ItemReader, ItemProcessor, ItemWriter
ItemReader 從給定的數(shù)據(jù)源讀取item
ItemProcessor 在item寫入數(shù)據(jù)源之前進(jìn)行數(shù)據(jù)整理
ItemWriter 把Chunk中包含的item寫入數(shù)據(jù)源。
Chunk 數(shù)據(jù)塊,給定數(shù)量的item集合,讓item進(jìn)行多次讀和處理,當(dāng)滿足一定數(shù)量的時(shí)候再一次寫入。
TaskLet 子任務(wù)表, step的一個(gè)事務(wù)過程,包含重復(fù)執(zhí)行,同步/異步規(guī)則等。

5、job, step, tasklet 和 chunk 關(guān)系

一個(gè)job對(duì)應(yīng)至少一個(gè)step,一個(gè)step對(duì)應(yīng)0或者1個(gè)TaskLet,一個(gè)taskLet對(duì)應(yīng)0或者1個(gè)Chunk


我的頭像

6、實(shí)戰(zhàn):批處理excel插入數(shù)據(jù)庫(kù)

6.1:定義數(shù)據(jù)倉(cāng)庫(kù)

  <!-- 內(nèi)存?zhèn)}庫(kù)  -->
    <!--<bean id="jobRepository" class="org.springframework.batch.core.repository.support.MapJobRepositoryFactoryBean"/>-->

    <!-- 數(shù)據(jù)庫(kù)倉(cāng)庫(kù)  -->
    <batch:job-repository id="jobRepository" data-source="dataRepDruidDataSource"
                          isolation-level-for-create="SERIALIZABLE" transaction-manager="transactionManager"
                          table-prefix="BATCH_" max-varchar-length="1000" />

6.2:定義啟動(dòng)器

    <!-- 作業(yè)調(diào)度器,用來啟動(dòng)job,引用作業(yè)倉(cāng)庫(kù) -->
    <bean id="jobLauncher"
          class="org.springframework.batch.core.launch.support.SimpleJobLauncher">
        <property name="jobRepository" ref="jobRepository"/>
    </bean>

6.3:定義JOB

    <batch:job id="userBatchJobName" restartable="true">
        <batch:step id="userStep">
            <batch:tasklet allow-start-if-complete="false"
                           start-limit="1" task-executor="taskExecutor" throttle-limit="5">
                <batch:chunk reader="userReader" writer="userWriter"
                             processor="userProcessor" commit-interval="5" retry-limit="10">
                    <batch:retryable-exception-classes>
                        <batch:include class="org.springframework.dao.DuplicateKeyException"/>
                        <batch:include class="java.sql.BatchUpdateException"/>
                        <batch:include class="java.sql.SQLException"/>
                    </batch:retryable-exception-classes>
                </batch:chunk>
            </batch:tasklet>
        </batch:step>
    </batch:job>

    <bean id="taskExecutor"
          class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor">
        <!-- 線程池維護(hù)線程的最少數(shù)量 -->
        <property name="corePoolSize" value="100"/>
        <!-- 線程池維護(hù)線程所允許的空閑時(shí)間 -->
        <property name="keepAliveSeconds" value="30000"/>
        <!-- 線程池維護(hù)線程的最大數(shù)量 -->
        <property name="maxPoolSize" value="300"/>
        <!-- 線程池所使用的緩沖隊(duì)列 -->
        <property name="queueCapacity" value="100"/>
    </bean>

6.4:定義ItemReader

     <bean id="userReader" class="org.springframework.batch.item.file.FlatFileItemReader">
        <property name="lineMapper" ref="lineMapper"/>
        <property name="resource" value="classpath:message/batch-data-source.csv"/>
    </bean>
 <!-- 將每行映射成對(duì)象 -->
    <bean id="lineMapper" class="org.springframework.batch.item.file.mapping.DefaultLineMapper">
        <property name="lineTokenizer">
            <bean class="org.springframework.batch.item.file.transform.DelimitedLineTokenizer">
                <property name="delimiter" value=","/><!-- 根據(jù)某種分隔符分割 -->
                <property name="names" value="id,name" />
            </bean>
        </property>
        <property name="fieldSetMapper"><!-- 將拆分后的字段映射成對(duì)象 -->
            <bean class="com.hcw.core.batch.UserFieldSetMapper" />
        </property>
    </bean>

6.5:定義ItemWriter

     <bean id="userWriter" class="com.hcw.core.batch.MyBatchItemWriter" scope="step">
        <property name="statementId" value="com.hcw.core.batch.dao.UserToMapper.batchInsert"/>
        <property name="sqlSessionFactory" ref="sqlSessionFactoryTo"/>
    </bean>

6.6:定義ItemProcessor

    <bean id="userProcessor" class="com.hcw.core.batch.UserItemProcessor"/>

6.7: 定義jobRepository的數(shù)據(jù)源

   <bean id="dataRepDruidDataSource" class="com.alibaba.druid.pool.DruidDataSource"
          init-method="init" destroy-method="close">
        <property name="url" value="${jdbc.mysql.rep.connection.url}" />
        <property name="username" value="${jdbc.mysql.rep.connection.username}" />
        <property name="password" value="${jdbc.mysql.rep.connection.password}" />
        <property name="filters" value="${jdbc.mysql.rep.connection.filters}" />
        <property name="maxActive" value="${jdbc.mysql.rep.connection.maxActive}" />
        <property name="initialSize" value="${jdbc.mysql.rep.connection.initialSize}" />
        <property name="maxWait" value="${jdbc.mysql.rep.connection.maxWait}" />
        <property name="minIdle" value="${jdbc.mysql.rep.connection.minIdle}" />
        <property name="timeBetweenEvictionRunsMillis"
                  value="${jdbc.mysql.rep.connection.timeBetweenEvictionRunsMillis}" />
        <property name="minEvictableIdleTimeMillis"
                  value="${jdbc.mysql.rep.connection.minEvictableIdleTimeMillis}" />
        <property name="validationQuery"
                  value="${jdbc.mysql.rep.connection.validationQuery}" />
        <property name="testWhileIdle"
                  value="${jdbc.mysql.rep.connection.testWhileIdle}" />
        <property name="testOnBorrow" value="${jdbc.mysql.rep.connection.testOnBorrow}" />
        <property name="testOnReturn" value="${jdbc.mysql.rep.connection.testOnReturn}" />
        <property name="poolPreparedStatements"
                  value="${jdbc.mysql.rep.connection.poolPreparedStatements}" />
        <property name="maxPoolPreparedStatementPerConnectionSize"
                  value="${jdbc.mysql.rep.connection.maxPoolPreparedStatementPerConnectionSize}" />
    </bean>

6.8: 啟動(dòng)JOB

啟動(dòng)tomcat,打開啟動(dòng)頁(yè)面
我的頭像

7、源碼地址

batch-framework


8、參考資料

SpringBatch 中文文檔

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

友情鏈接更多精彩內(nèi)容