在這個系統(tǒng)之前的文章我們介紹了什么是Saga模式。也描述了Saga的幾種類型。最后,我們還描述了我們將會在一些場景中使用Saga模式來實(shí)現(xiàn)。
下面是我們關(guān)于Saga模式實(shí)現(xiàn)的本系列的提綱:
第一部分: 我們學(xué)習(xí)了《Saga》的基本內(nèi)容。如果你不確定什么是Saga模式,我強(qiáng)烈建議你去看看那篇文章,然后再回到這篇文章。
第二部分(本文):我們將首先為一個常見場景用Saga模式 解決。
第三部分: 我們將繼續(xù)未完成的工作,并用Axon Server來串聯(lián)起各個服務(wù)。
第四部分:我們將測試我們的Saga應(yīng)用程序。
這整個應(yīng)用程序可以在Github上找到。
現(xiàn)在讓我們開始動手吧。
Saga模式實(shí)現(xiàn)類型問題
正如我們前面所討論的,我們將使用基于編排方式的Saga方法來實(shí)現(xiàn)Saga模式。當(dāng)然基本協(xié)調(diào)方式的Saga也可以實(shí)現(xiàn)。
換句話說,編排器就是一個管理器,它編排參與的服務(wù)執(zhí)行一個或多個本地事務(wù)。
下圖顯示了這種方法(基于編排)的一個典型示例:

在這里,我們著眼于基于編排的Saga來解決訂單管理的問題。這個訂單管理問題可以應(yīng)用于任何電子商務(wù)商店、食品配送應(yīng)用程序或任何其他類似的用例。
當(dāng)然,出于演示目的,我們將這個問題簡化了很多,它比實(shí)際生產(chǎn)級別的復(fù)雜性要簡單得多。
頂層組件
關(guān)于我們的Saga模式實(shí)現(xiàn),這個應(yīng)用程序有5個主要部分。各部分內(nèi)容如下:
-
訂單服務(wù):
該服務(wù)暴露api,幫助在系統(tǒng)中創(chuàng)建訂單。此外,該服務(wù)還管理訂單聚合。訂單聚合只是一個維護(hù)訂單相關(guān)信息的實(shí)體。然而,訂單服務(wù)還是訂單管理Saga的一個子服務(wù)(維護(hù)訂單內(nèi)部的事務(wù))。
-
支付服務(wù):
支付服務(wù)根據(jù)訂單管理Saga發(fā)出的創(chuàng)建發(fā)票命令進(jìn)行操作。一旦它完成了它的工作,它就發(fā)布一個事件。這一事件將這個Saga推向了下一步流程。
-
運(yùn)輸服務(wù):
該服務(wù)負(fù)責(zé)在系統(tǒng)中創(chuàng)建與訂單相關(guān)的發(fā)貨。它根據(jù)Saga管理器發(fā)出的命令執(zhí)行對應(yīng)的動作。一旦它完成了它的任務(wù),它還會發(fā)布一個推動Saga推向下一步流程。
-
核心API( Core-APIs) :
這不是一個服務(wù)。然而,核心api充當(dāng)了各種服務(wù)之間的集成粘合劑,這些服務(wù)構(gòu)成了這個Saga的一部分。在我們的示例中,核心api將包含Saga實(shí)現(xiàn)運(yùn)行所需的各種命令和事件定義。
-
Axon Server
Axon服務(wù)器是Axon平臺的一部分。我們將使用Axon框架來管理我們的聚合,如訂單、支付、發(fā)貨。此外,我們將使用Axon服務(wù)器來處理這三個服務(wù)之間的通信。如果你想了解更多關(guān)于Axon服務(wù)器的細(xì)節(jié),可以查看我的文章。
讓我們從具體的細(xì)節(jié)開始吧
整個應(yīng)用架構(gòu)
下面是我們的應(yīng)用程序的總體工程結(jié)構(gòu)圖。
.
├── core-apis
├── order-service
├── payment-service
├── pom.xml
├── saga-axon-server-spring-boot.iml
└── shipping-service
如您所見,它是一個多maven模塊結(jié)構(gòu)。每個服務(wù)都是maven子模塊,是整個項(xiàng)目的一部分。
這個pom.xml文件將他們合并在一起。
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.progressivecoder.saga-pattern</groupId>
<artifactId>saga-axon-server-spring-boot</artifactId>
<packaging>pom</packaging>
<version>1.0-SNAPSHOT</version>
<modules>
<module>order-service</module>
<module>payment-service</module>
<module>core-apis</module>
<module>shipping-service</module>
</modules>
</project>
訂單服務(wù)的實(shí)現(xiàn)
Order Service(以及所有其他服務(wù)),我們將用Spring Boot來構(gòu)建應(yīng)用程序。如果您不知道Spring Boot或想要更新,請參閱我關(guān)于Spring Boot微服務(wù)的文章。
下面是這個應(yīng)用的依賴:
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-jpa</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<!-- Axon -->
<dependency>
<groupId>org.axonframework</groupId>
<artifactId>axon-spring-boot-starter</artifactId>
<version>4.0.3</version>
</dependency>
<dependency>
<groupId>com.h2database</groupId>
<artifactId>h2</artifactId>
<scope>runtime</scope>
</dependency>
<!-- Swagger -->
<dependency>
<groupId>io.springfox</groupId>
<artifactId>springfox-swagger2</artifactId>
<version>2.9.2</version>
</dependency>
<dependency>
<groupId>io.springfox</groupId>
<artifactId>springfox-swagger-ui</artifactId>
<version>2.9.2</version>
</dependency>
<dependency>
<groupId>javax.inject</groupId>
<artifactId>javax.inject</artifactId>
<version>1</version>
</dependency>
<dependency>
<groupId>com.progressivecoder.saga-pattern</groupId>
<artifactId>core-apis</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
以下是一些需要考慮的重要問題:
我們使用的是Axon Spring Boot starter(版本4.0.3)。他對對Axon框架和Axon服務(wù)器提供了比較好的接入。
為了方便測試我們的應(yīng)用程序,我們還包括使用了Swagger。
我們使用Spring Boot Starter Data JPA和基于內(nèi)存實(shí)現(xiàn)的H2數(shù)據(jù)庫作為持久層。
此外,我們還有另一個稱為core-api的模塊。我們一會再回來講。
訂單聚合根
訂單聚合是Saga模式實(shí)現(xiàn)中最重要的部分之一。它是訂單管理Saga工作的基礎(chǔ)。讓我們看看它是怎樣的:
@Aggregate
public class OrderAggregate {
@AggregateIdentifier
private String orderId;
private ItemType itemType;
private BigDecimal price;
private String currency;
private OrderStatus orderStatus;
public OrderAggregate() {
}
@CommandHandler
public OrderAggregate(CreateOrderCommand createOrderCommand){
AggregateLifecycle.apply(new OrderCreatedEvent(createOrderCommand.orderId, createOrderCommand.itemType,
createOrderCommand.price, createOrderCommand.currency, createOrderCommand.orderStatus));
}
@EventSourcingHandler
protected void on(OrderCreatedEvent orderCreatedEvent){
this.orderId = orderCreatedEvent.orderId;
this.itemType = ItemType.valueOf(orderCreatedEvent.itemType);
this.price = orderCreatedEvent.price;
this.currency = orderCreatedEvent.currency;
this.orderStatus = OrderStatus.valueOf(orderCreatedEvent.orderStatus);
}
@CommandHandler
protected void on(UpdateOrderStatusCommand updateOrderStatusCommand){
AggregateLifecycle.apply(new OrderUpdatedEvent(updateOrderStatusCommand.orderId, updateOrderStatusCommand.orderStatus));
}
@EventSourcingHandler
protected void on(OrderUpdatedEvent orderUpdatedEvent){
this.orderId = orderId;
this.orderStatus = OrderStatus.valueOf(orderUpdatedEvent.orderStatus);
}
}
如您所見,這是一個典型的實(shí)體類。這里需要注意的主要事項(xiàng)是這里有axon自己的的注解@Aggregate和@AggregateIdentifier。這些注解允許Axon框架管理訂單聚合實(shí)例。
此外,我們正在使用事件溯源來存儲在此聚合上發(fā)生的事件。事件溯源是另一種微服務(wù)體系結(jié)構(gòu)模式,它們存儲領(lǐng)域事件然后構(gòu)建聚合信息。如果你想了解更多的話,我有一個關(guān)于事件來源實(shí)現(xiàn)的詳細(xì)文章
。
訂單Service層
為了方便創(chuàng)建訂單,我們還定義了一個訂單服務(wù)接口及其相應(yīng)的實(shí)現(xiàn)。
public interface OrderCommandService {
public CompletableFuture<String> createOrder(OrderCreateDTO orderCreateDTO);
}
@Service
public class OrderCommandServiceImpl implements OrderCommandService {
private final CommandGateway commandGateway;
public OrderCommandServiceImpl(CommandGateway commandGateway) {
this.commandGateway = commandGateway;
}
@Override
public CompletableFuture<String> createOrder(OrderCreateDTO orderCreateDTO) {
return commandGateway.send(new CreateOrderCommand(UUID.randomUUID().toString(), orderCreateDTO.getItemType(),
orderCreateDTO.getPrice(), orderCreateDTO.getCurrency(), String.valueOf(OrderStatus.CREATED)));
}
}
該service服務(wù)層實(shí)現(xiàn)使用Axon框架的命令網(wǎng)關(guān)向聚合發(fā)出命令。該命令在我們前面聲明的聚合類中處理。
訂單Controller層
Order Controller類是我們創(chuàng)建API端點(diǎn)的地方。此時,出于演示的目的,我們只有一個端點(diǎn)(一個api接口)。
@RestController
@RequestMapping(value = "/api/orders")
@Api(value = "Order Commands", description = "Order Commands Related Endpoints", tags = "Order Commands")
public class OrderCommandController {
private OrderCommandService orderCommandService;
public OrderCommandController(OrderCommandService orderCommandService) {
this.orderCommandService = orderCommandService;
}
@PostMapping
public CompletableFuture<String> createOrder(@RequestBody OrderCreateDTO orderCreateDTO){
return orderCommandService.createOrder(orderCreateDTO);
}
}
為了幫助Swagger找到這些端點(diǎn)并暴露swagger api,我們還得配置Swagger。
@Configuration
@EnableSwagger2
public class SwaggerConfig {
@Bean
public Docket apiDocket(){
return new Docket(DocumentationType.SWAGGER_2)
.select()
.apis(RequestHandlerSelectors.basePackage("com.progressivecoder.ordermanagement"))
.paths(PathSelectors.any())
.build()
.apiInfo(getApiInfo());
}
private ApiInfo getApiInfo(){
return new ApiInfo(
"Saga Pattern Implementation using Axon and Spring Boot",
"App to demonstrate Saga Pattern using Axon and Spring Boot",
"1.0.0",
"Terms of Service",
new Contact("Saurabh Dashora", "progressivecoder.com", "coder.progressive@gmail.com"),
"",
"",
Collections.emptyList());
}
}
訂單管理Saga
Saga模式實(shí)現(xiàn)的核心是訂單管理Saga。簡而言之,這也是一個典型的Java類,它就是一個流程管理器,用于處理業(yè)務(wù)的流程。
@Saga
public class OrderManagementSaga {
@Inject
private transient CommandGateway commandGateway;
@StartSaga
@SagaEventHandler(associationProperty = "orderId")
public void handle(OrderCreatedEvent orderCreatedEvent){
String paymentId = UUID.randomUUID().toString();
System.out.println("Saga invoked");
//associate Saga
SagaLifecycle.associateWith("paymentId", paymentId);
System.out.println("order id" + orderCreatedEvent.orderId);
//send the commands
commandGateway.send(new CreateInvoiceCommand(paymentId, orderCreatedEvent.orderId));
}
@SagaEventHandler(associationProperty = "paymentId")
public void handle(InvoiceCreatedEvent invoiceCreatedEvent){
String shippingId = UUID.randomUUID().toString();
System.out.println("Saga continued");
//associate Saga with shipping
SagaLifecycle.associateWith("shipping", shippingId);
//send the create shipping command
commandGateway.send(new CreateShippingCommand(shippingId, invoiceCreatedEvent.orderId, invoiceCreatedEvent.paymentId));
}
@SagaEventHandler(associationProperty = "orderId")
public void handle(OrderShippedEvent orderShippedEvent){
commandGateway.send(new UpdateOrderStatusCommand(orderShippedEvent.orderId, String.valueOf(OrderStatus.SHIPPED)));
}
@SagaEventHandler(associationProperty = "orderId")
public void handle(OrderUpdatedEvent orderUpdatedEvent){
SagaLifecycle.end();
}
}
@StartSaga注解表示Saga的開始。它告訴Axon去創(chuàng)造一個新的Saga。Saga還與聚合的一個特定實(shí)例相關(guān)聯(lián)。這是通過使用@SagaEventHandler指定的associationProperty來完成的。在本例中,我們使用屬性orderId將Saga與訂單聚合的實(shí)例關(guān)聯(lián)起來。我們還指定了應(yīng)該調(diào)用該方法的事件。在我們的例子中,它是Order Created Event。
被@SagaEventHandler注解的其他方法表示屬于Saga的其他事務(wù)(處理另一種業(yè)務(wù))。
我們還使用sagalifcycle .associate with()方法將Saga與其他業(yè)務(wù)關(guān)聯(lián)起來,比如支付和運(yùn)輸。通過允許客戶機(jī)生成唯一標(biāo)識符,我們不必遵循請求-響應(yīng)模型。這使我們能夠輕松地將標(biāo)識符與Saga關(guān)聯(lián)起來。
我們使用 SagaLifecycle.end()來結(jié)束這個Saga。
Core-APIs
現(xiàn)在是了解core - api模塊的好時機(jī)(command,event沒有setter方法)。
核心api只是一堆命令和事件的類,這些命令和事件將成為Saga模式實(shí)現(xiàn)的一部分。
命令(Commands)
在我們的應(yīng)用程序中創(chuàng)建新訂單時,將觸發(fā)Create Order Command。此命令由訂單聚合處理。
public class CreateOrderCommand {
@TargetAggregateIdentifier
public final String orderId;
public final String itemType;
public final BigDecimal price;
public final String currency;
public final String orderStatus;
public CreateOrderCommand(String orderId, String itemType, BigDecimal price, String currency, String orderStatus) {
this.orderId = orderId;
this.itemType = itemType;
this.price = price;
this.currency = currency;
this.orderStatus = orderStatus;
}
}
接下來,創(chuàng)建訂單時,訂單管理Saga將觸發(fā)Create Invoice Command。
public class CreateInvoiceCommand{
@TargetAggregateIdentifier
public final String paymentId;
public final String orderId;
public CreateInvoiceCommand(String paymentId, String orderId) {
this.paymentId = paymentId;
this.orderId = orderId;
}
}
當(dāng)發(fā)票創(chuàng)建和付款處理完成時,訂單管理Saga也會觸發(fā)Create Shipping Command。
public class CreateShippingCommand {
@TargetAggregateIdentifier
public final String shippingId;
public final String orderId;
public final String paymentId;
public CreateShippingCommand(String shippingId, String orderId, String paymentId) {
this.shippingId = shippingId;
this.orderId = orderId;
this.paymentId = paymentId;
}
}
最后,我們有Update Order Status Command。當(dāng)運(yùn)輸完成后,將觸發(fā)此命令。
public class UpdateOrderStatusCommand {
@TargetAggregateIdentifier
public final String orderId;
public final String orderStatus;
public UpdateOrderStatusCommand(String orderId, String orderStatus) {
this.orderId = orderId;
this.orderStatus = orderStatus;
}
}
事件(Events)
整個過程中的第一個事件是訂單創(chuàng)建事件(Order Created Event.)。就像我們之前看到的,這個事件也是開始這個Saga的原因。
public class OrderCreatedEvent {
public final String orderId;
public final String itemType;
public final BigDecimal price;
public final String currency;
public final String orderStatus;
public OrderCreatedEvent(String orderId, String itemType, BigDecimal price, String currency, String orderStatus) {
this.orderId = orderId;
this.itemType = itemType;
this.price = price;
this.currency = currency;
this.orderStatus = orderStatus;
}
}
下一個事件是Invoice Created Event。Invoice服務(wù)發(fā)布此事件。我們將在下一篇文章中看到它的實(shí)現(xiàn)。
public class InvoiceCreatedEvent {
public final String paymentId;
public final String orderId;
public InvoiceCreatedEvent(String paymentId, String orderId) {
this.paymentId = paymentId;
this.orderId = orderId;
}
}
在那之后,我們有Order Shipped Event。該事件由Shipping服務(wù)在完成必要的操作后發(fā)布。
public class OrderShippedEvent {
public final String shippingId;
public final String orderId;
public final String paymentId;
public OrderShippedEvent(String shippingId, String orderId, String paymentId) {
this.shippingId = shippingId;
this.orderId = orderId;
this.paymentId = paymentId;
}
}
最后,我們有Order Updated Event。此事件由訂單聚合在更新訂單狀態(tài)后發(fā)布。
public class OrderUpdatedEvent {
public final String orderId;
public final String orderStatus;
public OrderUpdatedEvent(String orderId, String orderStatus) {
this.orderId = orderId;
this.orderStatus = orderStatus;
}
}
結(jié)論:
至此,我們已經(jīng)成功實(shí)現(xiàn)了應(yīng)用程序的兩個主要部分——訂單服務(wù)和核心api。
Order服務(wù)包含了我們的主要Saga模式實(shí)現(xiàn)代碼。另一方面,核心api是我們訂單管理Saga的支柱。
我們將在這里結(jié)束這篇文章,因?yàn)樗呀?jīng)變得相當(dāng)長。然而,整個Saga實(shí)現(xiàn)的代碼可以在Github上獲得以供參考。
在下一篇文章中,我們將開始實(shí)現(xiàn)其他服務(wù)并將它們連接到Axon服務(wù)器。所以請繼續(xù)關(guān)注。