转载

使用Activiti和Spring集成实现BPEL和BPM

使用Activiti和Spring集成实现BPEL和BPMN

BPEL流程自动管理和BPM人工工作流是两种流程,前者主要将现有的服务按照流程定义规则进行调度组合协同,是纯粹的机器之间的协同工作,而BPM代表的工作流是有人工参与的协同工作。

BPEL是一种基于XML的流程规范语言,主要关注自动化;BPMN 最初 是一个纯粹的图形化业务流程表示法,是BPM的符号表示法。BPMN和BPEL经常结合使用BPMN用于以业务用户为中心的视角,BPEL用于机器之间的技术规范。

到了BPMN 版本2.0 ,BPMN标准中添加了自己的XML格式。因此,BPEL在BPMN环境中变得不那么重要了,因为BPMN现在满足了大部分业务和IT需求,目前,BPMN 2.0 XML格式可以说是在系统之间传输过程模型的最流行的标准,包括了BPEL的一部分。

Activiti是一个流程引擎核心,能够接受流程定义,运行时记录流程状态,可以作为整个流程的记录器,或状态机,Spring-Intergration是Spring的集成框架,能够实现服务之间的集成调度,这两者结合可以实现流程的人机交互协同工作。

本文翻译自Spring 官方文档 :

什么是Activiti

Activiti是一个 业务流程引擎 。它基本上是一个有节点(状态)的有向图,用于模拟复杂业务流程的状态。它跟踪业务流程中描述的工作进度。它描述了系统中的自动和基于人的角色。它还支持询问业务流程引擎,询问有关正在进行的流程实例的问题:有多少存在,哪些存在停滞等。业务流程管理系统(BPMS)提供了许多优势,其中一些优点是:

  • 协作过程,人类和服务被用于推动更大的业务需求(想象贷款批准,法律合规,文档修订等)
  • 它们支持组织中重要业务流程的审计和日志记录。这在监管环境中非常宝贵。
  • BPM引擎 旨在 处理长时间运行的进程状态,这意味着您的域模型不再需要充满特定于进程状态的特定字段,如is_enrolled或reset_password_date。
  • 易于建模的复杂业务流程

最后一点值得关注:像Activiti这样的优秀BPM引擎支持可视化建模业务流程。UML支持使用活动(步骤)和泳道(参与满足这些步骤所涉及的代理)直观地描述流程。当然,UML只是一种建模工具。它没有运行时语义。业务流程管理的圣杯一直是有一个可供业务分析师 应用程序开发人员使用的建模符号。 BPMN 2 就像我们实现这一目标一样接近。

例如,这是一个 非常 简单的业务流程的可视化模型。

使用Activiti和Spring集成实现BPEL和BPM

这是为支持该模型而创建的标准XML标记。这个XML具有执行语义,而 不仅仅是 建模语义。

<?xml version="1.0" encoding="UTF-8"?>
<definitions id="definitions"
      xmlns="http://www.omg.org/spec/BPMN/20100524/MODEL"
      xmlns:activiti="http://activiti.org/bpmn"
      typeLanguage="http://www.w3.org/2001/XMLSchema"
      expressionLanguage="http://www.w3.org/1999/XPath"
      targetNamespace="http://www.activiti.org/bpmn2.0">

<process id="asyncProcess">

<startEvent id="start"/>

<sequenceFlow
            id="flow1"
               sourceRef="start"
               targetRef="sigw"/>

<serviceTask 
            id="sigw"
               name="Delegate to Spring Integration"
               activiti:delegateExpression="#{gateway}"/>

<sequenceFlow
            id="scriptFlow"
               sourceRef="sigw"
               targetRef="confirmMovementTask"/>

<scriptTask 
            id="confirmMovementTask"
               name="Execute script" scriptFormat="groovy">
<script>
                println 'Moving on..'
</script>
</scriptTask>

<sequenceFlow
            id="flow2"
               sourceRef="confirmMovementTask"
               targetRef="theEnd"/>

<endEvent id="theEnd"/>

</process>

</definitions>

大多数分析师不会手工编写这个XML。相反,他们将使用像 Alfresco的Activiti Cloud 这样的建模工具。然而,XML工件是可循环访问的:它可以由开发人员注释,然后在建模工具中进行修改。

但是,在检查时,您会发现大部分内容并不复杂。该过程有四种状态:

  • startEvent  - 流程开始的地方
  • serviceTask - 调用Spring bean调用gateway以在Spring Integration中启动一些处理(稍后会详细介绍!)
  • scriptTask  - 使用Groovy打印出一个简单的确认消息
  • endEvent  - 完成

订单由sequenceFlow连接节点的各种元素明确说明。

Activiti是跟踪业务流程状态的好方法,但它不是一个特别强大的组件模型或集成技术。为此,我们需要一个像Spring Integration这样的集成框架。

什么是Spring Integration

Spring Integration支持跨多个不兼容的系统集成服务和数据。从概念上讲,编写集成流类似于在UNIX OS上使用stdin和组合管道和过滤器流stdout:

cat input.txt |  grep ERROR | wc -l > output.txt

在该示例中,我们从源(文件input.txt)获取数据,将其传递给grep命令以过滤结果并仅保留包含令牌的行,ERROR然后将其传送到wc实用程序,我们将计算它有多少行。最后,最终计数被写入输出文件,output.txt。这些组件 - cat,grep和wc- 彼此都不知道。它们并不是为了彼此而设计的。相反,他们只知道如何阅读stdin和写作stdout。这种数据标准化使得从简单原子组成复杂解决方案变得非常容易。在该示例中,cat文件的行为将数据转换为任何stdin可识别的进程可读取的数据。它 适应 入站数据为规范化格式,字符串行。最后,redirect(>)操作符将规范化数据(字符串行)转换为文件系统上的数据。它 适应 它。pipe(|)字符用于表示一个组件的输出应该流向另一个组件的输入。

Spring Integration流程的工作方式相同:数据被规范化为Message<T>实例。每个Message<T>都有一个有效负载和头 - 关于有效负载的元数据Map<K,V>- 它们是不同消息传递组件的输入和输出。这些消息传递组件通常由Spring Integration提供,但您可以轻松编写和使用自己的组件。各种消息传递组件都支持所有 企业应用程序集成模式 (过滤器,路由器,变换器,适配器,网关等)。Spring框架MessageChannel是一个命名管道,通过它Message<T>在消息传递组件之间流动。它们是管道,默认情况下,它们的工作方式类似于java.util.Queue。数据输入,数据输出。

Spring集成入站适配器 来自外部系统(如RabbitMQ,Apache Kafka和JMS,电子邮件,Twitter,文件系统挂载,物联网设备和其他许多系统的消息队列)的数据调整为Message<T>s。Spring Integration出站适配器以相反的方式执行相同的操作,Message<T>并将其写入外部系统(例如,作为电子邮件,Tweet或Apache Kafka消息)。

Spring Integration支持 事件驱动的体系结构, 因为它可以帮助检测并响应外部世界中的事件。例如,您可以使用Spring Integration每10秒轮询一次文件系统,并Message<T>在出现新文件时发布。您可以使用Spring Integration充当传递给Apache Kafka主题的消息的侦听器。适配器处理响应外部事件并使您免于担心发起消息,并让您在消息到达后专注于处理消息。它是依赖注入的集成等价物!

依赖注入使组件代码不再担心资源初始化和获取,并使其可以专注于编写具有这些依赖关系的代码。该javax.sql.DataSource领域来自哪里?谁在乎!Spring将它连接起来,它可能是从测试中的Mock,经典应用程序服务器中的JNDI或配置的Spring Boot bean获得的。组件代码仍然不知道这些细节。大约15年前,当我们第一次开始谈论依赖注入时,我们会谈到“好莱坞校长:”“不要打电话给我,我会打电话给你!”这更适用于Spring Integration!

入站网关接收来自外部系统的传入请求,将其作为Message<T>s处理,并发送回复。出站网关采用Message<T>s,将它们转发到外部系统,并等待来自该系统的响应。它们支持请求和回复交互。

Activiti和Spring集成网关

Activiti可用于描述记录,可审计和可预测状态的复杂,长期运行的过程,Spring Integration可用于 集成 !Spring Integration是我们保存有趣Java代码的地方,Activiti会跟踪总体状态。这个技巧在20年前很有用,今天它在大规模分布的微服务世界中也很有用,其中单个请求的处理可能跨越多个服务和节点。那么, Spring Boot,Spring Integration和Activiti可以很好地协同工作 !

一个常见的用例是使用Activiti启动BPM流程,然后在进入等待状态时,将该状态的处理委托给Spring Integration,当然,这可以将工作分散到其他系统。这是一个说明流程的简单图表。

使用Activiti和Spring集成实现BPEL和BPM

BPM流程状态通常可以涉及人工代理。例如,工作流引擎可能具有将文档发送给人员以供批准的状态,但是审阅者正在度假并且将不会回来数周。保持线程开放是一种浪费,更不用说危险了,期望继续处理所需的任何确认都将在几毫秒或几秒内恢复。

Activiti足够聪明,可以暂停执行,在等待状态期间将执行状态持久保存到数据库,并且仅在 发出 流程执行 信号 后才恢复。发信号通知流程引擎可重新水化流程并恢复执行。一个简单的示例可能是新的用户注册业务流程,该流程委托Spring Integration发送带有注册确认链接的电子邮件。用户可能需要数天才能单击确认链接,但在单击时,系统应继续进行注册业务流程。

在这篇文章中,我们将讨论如何启动进入等待状态的BPM流程,然后委托Spring Integration进行某种自定义处理,然后在遥远的未来,继续执行流程。

我们将设置两个Spring Integration流程:一个用于处理来自Activiti的请求到Spring Integration,另一个用于处理最终的回复并触发恢复流程。

我们需要一些东西来启动我们的流程,所以这里是一个简单的REST端点(http://localhost:8080/start),每次启动一个新流程:

@RestController
class ProcessStartingRestController {

   @Autowired
   private ProcessEngine processEngine;

   @RequestMapping(method = RequestMethod.GET, value = "/start")
   Map<String, String> launch() {
      ProcessInstance asyncProcess = this.processEngine.getRuntimeService()
            .startProcessInstanceByKey("asyncProcess");
      return Collections.singletonMap("executionId", asyncProcess.getId());
   }
}

消息将跨越MesssageChannel我们将在@Configuration类中创建的两个通道:requests和replies。

@Configuration
class MessageChannels {

   @Bean
   DirectChannel requests() {
      return new DirectChannel();
   }

   @Bean
   DirectChannel replies() {
      return new DirectChannel();
   }
}

这两个通道类似一种队列通道,可以使用消息系统的队列实现,request是用来实现前面图中Activiti到Spring集成的请求,Activit可以通过这个通道发送需要Spring干的事情,而replies则是Spring干完事情的反馈结果。

使用@Configuration类的好处是它本身就是一个Spring组件,可以注入到任何地方。我们可以通过@Bean直接调用提供者的方法来取消引用通道。另一种方法是@Qualifier每次我们为其中一个通道注入引用时使用,例如:

public static final String REPLIES_CHANNEL_NAME = "replies";

@Autowired
@Qualifier(REPLIES_CHANNEL_NAME)
private MessageChannel replies;

我们的BPMN非常简单,但我们将使用一个特定于Activiti的命名空间属性activiti:delegateExpression="#{gateway}"来告诉Activiti需要调用一个名为gateway,它是在Spring中注册的bean 。Activiti知道这样做是因为这个应用程序是使用Spring引导的自动配置,其中包括将Spring管理的bean暴露给Activiti表达式语言。这个gateway是一种基于Activiti的bean类型ReceiveTaskActivityBehavior。Spring Boot具有Spring Integration和Activiti的自动配置,因此90%的繁琐设置都会消失。

让我们看一下我们的简单gateway组件,一个Activiti ActivityBehavior接口的实现,它充当回调函数,我们可以用其发送Message<T>到requests通道并启动Spring Integration流程。这里重要的是我们已经捕获了executionId,我们稍后需要 恢复发出信号 的过程。这里等于输入stdin;

@Bean
ActivityBehavior gateway(MessageChannels channels) {
   return new ReceiveTaskActivityBehavior() {

      @Override
      public void execute(ActivityExecution execution) throws Exception {

         Message<?> executionMessage = MessageBuilder
               .withPayload(execution)
               .setHeader("executionId", execution.getId())
               .build();

         channels.requests().send(executionMessage);
      }
   };
}

该Message<T>会触发通道requests MessageChannel另一边的处理。在一个复杂的例子中,将请求转换为有意义的消息,例如,将其转发到其他系统(如电子邮件)是小case。在这里,我们只打印出标题,以便我们可以记录executionId并稍后使用它。这里等于输出stdout;这里是请求通道,Activiti将命令Spring干的事情通过这个通道发给Spring,下面代码是Spring接受指令后干的事情,只是打印而已。

@Bean
IntegrationFlow requestsFlow(MessageChannels channels) {
   return IntegrationFlows.from(channels.requests())
         .handle(msg -> msg.getHeaders().entrySet()
               .forEach(e -> log.info(e.getKey() + '=' + e.getValue())))
         .get();
}

此时,工作流定义将保持不变,并且没有活动的流程实例。我们需要以某种方式异步地发出信号。我们实现一个REST端点来向Activiti发出信号:http://localhost:8080/resume/{executionId}。REST很容易理解,但实际上我们可以使用Spring Integration发送到任何外部系统中的一个事件来实现这种效果。唯一要确保的是,无论外部事件如何,我们都能以某种方式发送executionId,就像我们在这里通过在URI中捕获它一样。下面代码就是Spring向Activiti发出处理结果。

@RestController
class ProcessResumingRestController {

   @Autowired
   private MessageChannels messageChannels;

   @RequestMapping(method = RequestMethod.GET, value = "/resume/{executionId}")
   void resume(@PathVariable String executionId) {

      Message<String> build = MessageBuilder.withPayload(executionId)
            .setHeader("executionId", executionId)
            .build();

      this.messageChannels.replies().send(build);
   }
}

当Message<T>流过replies MessageChannel,它会触发另一端处理,注意这里是响应通道,Spring干完事情后的结果通过这个通道返回给Activiti。在这里,我们将使用另一个Spring Integration流程来接收传入Message<T>并发出恢复流程的信号。执行此流程后,您将看到流程中的下一步scriptTask,评估和打印到控制台上的“Moving on!”字样。

@Bean
IntegrationFlow repliesFlow(MessageChannels channels,
                     ProcessEngine engine) {
   return IntegrationFlows.from(channels.replies())
         .handle(msg -> engine.getRuntimeService().signal(
               String.class.cast(msg.getHeaders().get("executionId"))))
         .get();
}

下一步

真正的强大功能是使用BPM来编排复杂的处理逻辑:想象一下在BPM流程中保存流程的状态,然后调用Spring Batch作业,或者RestTemplate在Spring Cloud中使用Ribbon负载平衡调用REST服务,或者将其转发Message<T>到Spring Cloud中数据流流程。Spring Cloud Data Flow是我最喜欢的数据处理方法之一,因为它建立在Spring Cloud Stream的基础之上,而Spring Cloud Stream又建立在Spring Integration上:它MessageChannel一直在最底层!

深入理解Activiti工作流

Spring Batch

Spring Cloud

Spring Boot

SOA

原文  https://www.jdon.com/springboot/spring-integration.html
正文到此结束
Loading...