使用spring boot和axon實(shí)現(xiàn)saga模式-- 中

在這個系統(tǒng)之前的文章我們介紹了什么是Saga模式。也描述了Saga的幾種類型。最后,我們還描述了我們將會在一些場景中使用Saga模式來實(shí)現(xiàn)。

下面是我們關(guān)于Saga模式實(shí)現(xiàn)的本系列的提綱:
第一部分: 我們學(xué)習(xí)了《Saga》的基本內(nèi)容。如果你不確定什么是Saga模式,我強(qiáng)烈建議你去看看那篇文章,然后再回到這篇文章。

第二部分(本文):我們將首先為一個常見場景用Saga模式 解決。

第三部分: 我們將繼續(xù)未完成的工作,并用Axon Server來串聯(lián)起各個服務(wù)。

第四部分:我們將測試我們的Saga應(yīng)用程序。

這整個應(yīng)用程序可以在Github上找到。

現(xiàn)在讓我們開始動手吧。

Saga模式實(shí)現(xiàn)類型問題

正如我們前面所討論的,我們將使用基于編排方式的Saga方法來實(shí)現(xiàn)Saga模式。當(dāng)然基本協(xié)調(diào)方式的Saga也可以實(shí)現(xiàn)。

換句話說,編排器就是一個管理器,它編排參與的服務(wù)執(zhí)行一個或多個本地事務(wù)。

下圖顯示了這種方法(基于編排)的一個典型示例:


圖片.png

在這里,我們著眼于基于編排的Saga來解決訂單管理的問題。這個訂單管理問題可以應(yīng)用于任何電子商務(wù)商店、食品配送應(yīng)用程序或任何其他類似的用例。

當(dāng)然,出于演示目的,我們將這個問題簡化了很多,它比實(shí)際生產(chǎn)級別的復(fù)雜性要簡單得多。

頂層組件

關(guān)于我們的Saga模式實(shí)現(xiàn),這個應(yīng)用程序有5個主要部分。各部分內(nèi)容如下:

  • 訂單服務(wù):

該服務(wù)暴露api,幫助在系統(tǒng)中創(chuàng)建訂單。此外,該服務(wù)還管理訂單聚合。訂單聚合只是一個維護(hù)訂單相關(guān)信息的實(shí)體。然而,訂單服務(wù)還是訂單管理Saga的一個子服務(wù)(維護(hù)訂單內(nèi)部的事務(wù))。

  • 支付服務(wù):

支付服務(wù)根據(jù)訂單管理Saga發(fā)出的創(chuàng)建發(fā)票命令進(jìn)行操作。一旦它完成了它的工作,它就發(fā)布一個事件。這一事件將這個Saga推向了下一步流程。

  • 運(yùn)輸服務(wù):

該服務(wù)負(fù)責(zé)在系統(tǒng)中創(chuàng)建與訂單相關(guān)的發(fā)貨。它根據(jù)Saga管理器發(fā)出的命令執(zhí)行對應(yīng)的動作。一旦它完成了它的任務(wù),它還會發(fā)布一個推動Saga推向下一步流程。

  • 核心API( Core-APIs) :

這不是一個服務(wù)。然而,核心api充當(dāng)了各種服務(wù)之間的集成粘合劑,這些服務(wù)構(gòu)成了這個Saga的一部分。在我們的示例中,核心api將包含Saga實(shí)現(xiàn)運(yùn)行所需的各種命令和事件定義。

  • Axon Server

Axon服務(wù)器是Axon平臺的一部分。我們將使用Axon框架來管理我們的聚合,如訂單、支付、發(fā)貨。此外,我們將使用Axon服務(wù)器來處理這三個服務(wù)之間的通信。如果你想了解更多關(guān)于Axon服務(wù)器的細(xì)節(jié),可以查看我的文章

讓我們從具體的細(xì)節(jié)開始吧

整個應(yīng)用架構(gòu)

下面是我們的應(yīng)用程序的總體工程結(jié)構(gòu)圖。

.
├── core-apis
├── order-service
├── payment-service
├── pom.xml
├── saga-axon-server-spring-boot.iml
└── shipping-service

如您所見,它是一個多maven模塊結(jié)構(gòu)。每個服務(wù)都是maven子模塊,是整個項(xiàng)目的一部分。

這個pom.xml文件將他們合并在一起。

<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.progressivecoder.saga-pattern</groupId>
    <artifactId>saga-axon-server-spring-boot</artifactId>
    <packaging>pom</packaging>
    <version>1.0-SNAPSHOT</version>

    <modules>
        <module>order-service</module>
        <module>payment-service</module>
        <module>core-apis</module>
        <module>shipping-service</module>
    </modules>

</project>

訂單服務(wù)的實(shí)現(xiàn)

Order Service(以及所有其他服務(wù)),我們將用Spring Boot來構(gòu)建應(yīng)用程序。如果您不知道Spring Boot或想要更新,請參閱我關(guān)于Spring Boot微服務(wù)的文章

下面是這個應(yīng)用的依賴:

<dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-actuator</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-data-jpa</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

        <!-- Axon -->
        <dependency>
            <groupId>org.axonframework</groupId>
            <artifactId>axon-spring-boot-starter</artifactId>
            <version>4.0.3</version>
        </dependency>

        <dependency>
            <groupId>com.h2database</groupId>
            <artifactId>h2</artifactId>
            <scope>runtime</scope>
        </dependency>

        <!-- Swagger -->
        <dependency>
            <groupId>io.springfox</groupId>
            <artifactId>springfox-swagger2</artifactId>
            <version>2.9.2</version>
        </dependency>

        <dependency>
            <groupId>io.springfox</groupId>
            <artifactId>springfox-swagger-ui</artifactId>
            <version>2.9.2</version>
        </dependency>

        <dependency>
            <groupId>javax.inject</groupId>
            <artifactId>javax.inject</artifactId>
            <version>1</version>
        </dependency>

        <dependency>
            <groupId>com.progressivecoder.saga-pattern</groupId>
            <artifactId>core-apis</artifactId>
            <version>${project.version}</version>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
</dependencies>

以下是一些需要考慮的重要問題:

  • 我們使用的是Axon Spring Boot starter(版本4.0.3)。他對對Axon框架和Axon服務(wù)器提供了比較好的接入。

  • 為了方便測試我們的應(yīng)用程序,我們還包括使用了Swagger。

  • 我們使用Spring Boot Starter Data JPA和基于內(nèi)存實(shí)現(xiàn)的H2數(shù)據(jù)庫作為持久層。

  • 此外,我們還有另一個稱為core-api的模塊。我們一會再回來講。

訂單聚合根

訂單聚合是Saga模式實(shí)現(xiàn)中最重要的部分之一。它是訂單管理Saga工作的基礎(chǔ)。讓我們看看它是怎樣的:

@Aggregate
public class OrderAggregate {

    @AggregateIdentifier
    private String orderId;

    private ItemType itemType;

    private BigDecimal price;

    private String currency;

    private OrderStatus orderStatus;

    public OrderAggregate() {
    }

    @CommandHandler
    public OrderAggregate(CreateOrderCommand createOrderCommand){
        AggregateLifecycle.apply(new OrderCreatedEvent(createOrderCommand.orderId, createOrderCommand.itemType,
                createOrderCommand.price, createOrderCommand.currency, createOrderCommand.orderStatus));
    }

    @EventSourcingHandler
    protected void on(OrderCreatedEvent orderCreatedEvent){
        this.orderId = orderCreatedEvent.orderId;
        this.itemType = ItemType.valueOf(orderCreatedEvent.itemType);
        this.price = orderCreatedEvent.price;
        this.currency = orderCreatedEvent.currency;
        this.orderStatus = OrderStatus.valueOf(orderCreatedEvent.orderStatus);
    }

    @CommandHandler
    protected void on(UpdateOrderStatusCommand updateOrderStatusCommand){
        AggregateLifecycle.apply(new OrderUpdatedEvent(updateOrderStatusCommand.orderId, updateOrderStatusCommand.orderStatus));
    }

    @EventSourcingHandler
    protected void on(OrderUpdatedEvent orderUpdatedEvent){
        this.orderId = orderId;
        this.orderStatus = OrderStatus.valueOf(orderUpdatedEvent.orderStatus);
    }
}

如您所見,這是一個典型的實(shí)體類。這里需要注意的主要事項(xiàng)是這里有axon自己的的注解@Aggregate和@AggregateIdentifier。這些注解允許Axon框架管理訂單聚合實(shí)例。

此外,我們正在使用事件溯源來存儲在此聚合上發(fā)生的事件。事件溯源是另一種微服務(wù)體系結(jié)構(gòu)模式,它們存儲領(lǐng)域事件然后構(gòu)建聚合信息。如果你想了解更多的話,我有一個關(guān)于事件來源實(shí)現(xiàn)的詳細(xì)文章

訂單Service層

為了方便創(chuàng)建訂單,我們還定義了一個訂單服務(wù)接口及其相應(yīng)的實(shí)現(xiàn)。

public interface OrderCommandService {

    public CompletableFuture<String> createOrder(OrderCreateDTO orderCreateDTO);

}
@Service
public class OrderCommandServiceImpl implements OrderCommandService {

    private final CommandGateway commandGateway;

    public OrderCommandServiceImpl(CommandGateway commandGateway) {
        this.commandGateway = commandGateway;
    }

    @Override
    public CompletableFuture<String> createOrder(OrderCreateDTO orderCreateDTO) {
        return commandGateway.send(new CreateOrderCommand(UUID.randomUUID().toString(), orderCreateDTO.getItemType(),
                orderCreateDTO.getPrice(), orderCreateDTO.getCurrency(), String.valueOf(OrderStatus.CREATED)));
    }
}

該service服務(wù)層實(shí)現(xiàn)使用Axon框架的命令網(wǎng)關(guān)向聚合發(fā)出命令。該命令在我們前面聲明的聚合類中處理。

訂單Controller層

Order Controller類是我們創(chuàng)建API端點(diǎn)的地方。此時,出于演示的目的,我們只有一個端點(diǎn)(一個api接口)。

@RestController
@RequestMapping(value = "/api/orders")
@Api(value = "Order Commands", description = "Order Commands Related Endpoints", tags = "Order Commands")
public class OrderCommandController {

    private OrderCommandService orderCommandService;

    public OrderCommandController(OrderCommandService orderCommandService) {
        this.orderCommandService = orderCommandService;
    }

    @PostMapping
    public CompletableFuture<String> createOrder(@RequestBody OrderCreateDTO orderCreateDTO){
        return orderCommandService.createOrder(orderCreateDTO);
    }
}

為了幫助Swagger找到這些端點(diǎn)并暴露swagger api,我們還得配置Swagger。

@Configuration
@EnableSwagger2
public class SwaggerConfig {

    @Bean
    public Docket apiDocket(){
        return new Docket(DocumentationType.SWAGGER_2)
                .select()
                .apis(RequestHandlerSelectors.basePackage("com.progressivecoder.ordermanagement"))
                .paths(PathSelectors.any())
                .build()
                .apiInfo(getApiInfo());
    }

    private ApiInfo getApiInfo(){
        return new ApiInfo(
                "Saga Pattern Implementation using Axon and Spring Boot",
                "App to demonstrate Saga Pattern using Axon and Spring Boot",
                "1.0.0",
                "Terms of Service",
                new Contact("Saurabh Dashora", "progressivecoder.com", "coder.progressive@gmail.com"),
                "",
                "",
                Collections.emptyList());
    }

}

訂單管理Saga

Saga模式實(shí)現(xiàn)的核心是訂單管理Saga。簡而言之,這也是一個典型的Java類,它就是一個流程管理器,用于處理業(yè)務(wù)的流程。

@Saga
public class OrderManagementSaga {

    @Inject
    private transient CommandGateway commandGateway;

    @StartSaga
    @SagaEventHandler(associationProperty = "orderId")
    public void handle(OrderCreatedEvent orderCreatedEvent){
        String paymentId = UUID.randomUUID().toString();
        System.out.println("Saga invoked");

        //associate Saga
        SagaLifecycle.associateWith("paymentId", paymentId);

        System.out.println("order id" + orderCreatedEvent.orderId);

        //send the commands
        commandGateway.send(new CreateInvoiceCommand(paymentId, orderCreatedEvent.orderId));
    }

    @SagaEventHandler(associationProperty = "paymentId")
    public void handle(InvoiceCreatedEvent invoiceCreatedEvent){
        String shippingId = UUID.randomUUID().toString();

        System.out.println("Saga continued");

        //associate Saga with shipping
        SagaLifecycle.associateWith("shipping", shippingId);

        //send the create shipping command
        commandGateway.send(new CreateShippingCommand(shippingId, invoiceCreatedEvent.orderId, invoiceCreatedEvent.paymentId));
    }

    @SagaEventHandler(associationProperty = "orderId")
    public void handle(OrderShippedEvent orderShippedEvent){
        commandGateway.send(new UpdateOrderStatusCommand(orderShippedEvent.orderId, String.valueOf(OrderStatus.SHIPPED)));
    }

    @SagaEventHandler(associationProperty = "orderId")
    public void handle(OrderUpdatedEvent orderUpdatedEvent){
        SagaLifecycle.end();
    }
}
  • @StartSaga注解表示Saga的開始。它告訴Axon去創(chuàng)造一個新的Saga。Saga還與聚合的一個特定實(shí)例相關(guān)聯(lián)。這是通過使用@SagaEventHandler指定的associationProperty來完成的。在本例中,我們使用屬性orderId將Saga與訂單聚合的實(shí)例關(guān)聯(lián)起來。我們還指定了應(yīng)該調(diào)用該方法的事件。在我們的例子中,它是Order Created Event。

  • 被@SagaEventHandler注解的其他方法表示屬于Saga的其他事務(wù)(處理另一種業(yè)務(wù))。

  • 我們還使用sagalifcycle .associate with()方法將Saga與其他業(yè)務(wù)關(guān)聯(lián)起來,比如支付和運(yùn)輸。通過允許客戶機(jī)生成唯一標(biāo)識符,我們不必遵循請求-響應(yīng)模型。這使我們能夠輕松地將標(biāo)識符與Saga關(guān)聯(lián)起來。

  • 我們使用 SagaLifecycle.end()來結(jié)束這個Saga。

Core-APIs

現(xiàn)在是了解core - api模塊的好時機(jī)(command,event沒有setter方法)。

核心api只是一堆命令和事件的類,這些命令和事件將成為Saga模式實(shí)現(xiàn)的一部分。

命令(Commands)

在我們的應(yīng)用程序中創(chuàng)建新訂單時,將觸發(fā)Create Order Command。此命令由訂單聚合處理。

public class CreateOrderCommand {

    @TargetAggregateIdentifier
    public final String orderId;

    public final String itemType;

    public final BigDecimal price;

    public final String currency;

    public final String orderStatus;

    public CreateOrderCommand(String orderId, String itemType, BigDecimal price, String currency, String orderStatus) {
        this.orderId = orderId;
        this.itemType = itemType;
        this.price = price;
        this.currency = currency;
        this.orderStatus = orderStatus;
    }
}

接下來,創(chuàng)建訂單時,訂單管理Saga將觸發(fā)Create Invoice Command。

public class CreateInvoiceCommand{

    @TargetAggregateIdentifier
    public final String paymentId;

    public final String orderId;

    public CreateInvoiceCommand(String paymentId, String orderId) {
        this.paymentId = paymentId;
        this.orderId = orderId;
    }
}

當(dāng)發(fā)票創(chuàng)建和付款處理完成時,訂單管理Saga也會觸發(fā)Create Shipping Command。

public class CreateShippingCommand {

    @TargetAggregateIdentifier
    public final String shippingId;

    public final String orderId;

    public final String paymentId;

    public CreateShippingCommand(String shippingId, String orderId, String paymentId) {
        this.shippingId = shippingId;
        this.orderId = orderId;
        this.paymentId = paymentId;
    }
}

最后,我們有Update Order Status Command。當(dāng)運(yùn)輸完成后,將觸發(fā)此命令。

public class UpdateOrderStatusCommand {

    @TargetAggregateIdentifier
    public final String orderId;

    public final String orderStatus;

    public UpdateOrderStatusCommand(String orderId, String orderStatus) {
        this.orderId = orderId;
        this.orderStatus = orderStatus;
    }
}

事件(Events)

整個過程中的第一個事件是訂單創(chuàng)建事件(Order Created Event.)。就像我們之前看到的,這個事件也是開始這個Saga的原因。

public class OrderCreatedEvent {

    public final String orderId;

    public final String itemType;

    public final BigDecimal price;

    public final String currency;

    public final String orderStatus;

    public OrderCreatedEvent(String orderId, String itemType, BigDecimal price, String currency, String orderStatus) {
        this.orderId = orderId;
        this.itemType = itemType;
        this.price = price;
        this.currency = currency;
        this.orderStatus = orderStatus;
    }
}

下一個事件是Invoice Created Event。Invoice服務(wù)發(fā)布此事件。我們將在下一篇文章中看到它的實(shí)現(xiàn)。

public class InvoiceCreatedEvent  {

    public final String paymentId;

    public final String orderId;

    public InvoiceCreatedEvent(String paymentId, String orderId) {
        this.paymentId = paymentId;
        this.orderId = orderId;
    }
}

在那之后,我們有Order Shipped Event。該事件由Shipping服務(wù)在完成必要的操作后發(fā)布。

public class OrderShippedEvent {

    public final String shippingId;

    public final String orderId;

    public final String paymentId;

    public OrderShippedEvent(String shippingId, String orderId, String paymentId) {
        this.shippingId = shippingId;
        this.orderId = orderId;
        this.paymentId = paymentId;
    }
}

最后,我們有Order Updated Event。此事件由訂單聚合在更新訂單狀態(tài)后發(fā)布。

public class OrderUpdatedEvent {

    public final String orderId;

    public final String orderStatus;

    public OrderUpdatedEvent(String orderId, String orderStatus) {
        this.orderId = orderId;
        this.orderStatus = orderStatus;
    }
}

結(jié)論:

至此,我們已經(jīng)成功實(shí)現(xiàn)了應(yīng)用程序的兩個主要部分——訂單服務(wù)和核心api。

Order服務(wù)包含了我們的主要Saga模式實(shí)現(xiàn)代碼。另一方面,核心api是我們訂單管理Saga的支柱。

我們將在這里結(jié)束這篇文章,因?yàn)樗呀?jīng)變得相當(dāng)長。然而,整個Saga實(shí)現(xiàn)的代碼可以在Github上獲得以供參考。

在下一篇文章中,我們將開始實(shí)現(xiàn)其他服務(wù)并將它們連接到Axon服務(wù)器。所以請繼續(xù)關(guān)注。

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容