转载

工作流程的微服务:使用F#DSL表达业务流程

我们在Jet上使用F#并且从一开始就是这样,这就是为什么在评估构建DSL( 领域特定语言 )的选项时,F#是一个领先者。当我们决定构建DSL时,我们需要确定DSL有哪些重要的特征:

  • 编译时间验证:由于开发人员主要是构建步骤和定义工作流程,因此我们希望工作流程享受F#提供的类型安全性和工具。
  • 可读性:工作流是业务逻辑的表示,应该由开发人员和业务用户轻松阅读。
  • 可扩展性:任何好的DSL都允许扩展或改进,而不会破坏或影响现有的实现。我们希望将来能够轻松添加新类型的步骤和组合。

我们最终确定的工作流DSL主要围绕链/组合步骤的能力。每一步都只是一个函数,它给出一个输入和状态产生一个输出/副作用(Input → State → Output/SideEffect)。这些步骤可以组合或链接在一起以表示复杂的业务流程。

流程的可视化表示是工程师和业务用户可以一起设计和理解,一旦确定了流的可视化表示,开发人员就可以使用DSL来定义工作流,使用以下函数表示:

workflow : (name : string) -> 
    (triggers : Trigger list) -> 
    (metadata : WorkflowMetadata) -> 
    (step : WorkflowStep) -> 
    Workflow

比如一个示例工作流程:创建订单,预留库存,发送订单,然后最终向客户收费。这可以使用上面的工作流程函数编写:

workflow <font>"SampleWorkflow"</font><font>
    [Trigger.Stream (</font><font>"kafka://jetkafka/mock-input"</font><font>, TaskIdType.FromPath(Path.JsonPath(</font><font>"$.orderId"</font><font>)), Path.JsonPath(</font><font>"$.orderId"</font><font>))]
    metadata
    (step(</font><font>"CreateOrder"</font><font>, </font><font>"CreateOrder"</font><font>) =>
        step(</font><font>"ReserveInventory"</font><font>, </font><font>"ReserveInventory"</font><font>) => 
            step(</font><font>"ShipOrder"</font><font>, </font><font>"ShipOrder"</font><font>) =>>
                [
                    step(</font><font>"ChargeCustomer"</font><font>, </font><font>"ChargeCustomer"</font><font>) =?>
                        [
                            cond(</font><font>"WriteChargeSuccess"</font><font>, </font><font>"WriteChargeSuccess"</font><font>, Condition.Simple(Qualifier.State, </font><font>"$.transactionSuccess"</font><font>, </font><font>"true"</font><font>))
                            cond(</font><font>"WriteChargeFailure"</font><font>, </font><font>"WriteChargeFailure"</font><font>, Condition.Simple(Qualifier.State, </font><font>"$.transactionSuccess"</font><font>, </font><font>"false"</font><font>))
                        ]
                    step(</font><font>"UpdateOrderHistory"</font><font>, </font><font>"UpdateOrderHistory"</font><font>)
                ]
    )
</font>

我将在下面更详细地讨论这个DSL的一些元素。但是,我想强调该workflow功能的主要特点:

  1. 工作流名称(workflow "Sample Workflow"):工作流都需要一个名称,这些名称负责许多标识元素以及控制元素,如执行通道控制,启用/禁用键名等。
  2.  触发:使用Discriminated Union指定触发器,其中每种情况对应于特定的输入类型/源。常见的触发器是一个Stream有三个参数string * TaskIdType * PrimaryKeyPath的字符串:字符串是StreamDefinition,而TaskIdType和PrimaryKeyPath是触发器消息的JsonPath。注意:工作流可以有多个触发器,并且具有各种类型。
  3. 工作流元数据:工作流元数据定义了与工作流执行器,副作用执行器以及失败的工作流/副作用应该在哪里进行通信的通道,以及各种配置元素,如工作流的并发设置。(有关更多架构详细信息,请参阅我之前发布的微服务 到工作流程 )
  4. 步骤:步骤表示需要执行的操作或副作用。这些操作是显示名称和实现模块名称的元组。注意:在此示例中,显示名称和步骤实现是相同的,但是在工作流程中多次重复使用相同步骤实现的情况将导致不同的显示名称。
  5. 步骤组成:通过一系列组合功能组合步骤以形成工作流程。更多关于后者的内容。

触发器

工作流可以有一个或多个不同的触发器。这些触发器用于配置WorkflowTriggers服务从哪里使用触发器消息。这种消费与我们的微服务中使用的消费机制相同,这在以前的帖子 F#Microservice Patterns @ Jet.com中 有详细 介绍 。的触发的类型是可区分联合(代数型):

type Trigger =
| Simple of string * TaskIdType
| Stream of string * TaskIdType * PrimaryKeyPath
...

注意:我不打算在这篇文章中详细介绍所有这些不同类型的触发器,我只讨论Simple和Stream。我们目前支持其他几个在处理包含多种不同消息类型的触发器流时非常有用的方法。

Stream上面示例工作流程中使用的触发器有三个参数:

  • string :string这里表示流定义,它是外部系统的表示(即 Kafka , EventStore ,API等)。之前的文章 在F#中抽象IO 有关这个概念以及我们如何在整个系统中建模和使用它们的详细信息。
  • TaskIdType :每次执行工作流都有唯一的任务ID。该TaskIdType指定如何的taskid获得。目前,我们支持多种类型的任务ID。我们支持随机生成的这些,当您不希望进行任何形式的重复数据删除检查并始终希望运行工作流时,可以使用这些。我们还支持从输入有效负载中提取任务ID,以确保每个消息只执行一次工作流。
  • PrimaryKeyPath :是一个JSON路径,用于从哪里提取要用作日记帐ID的值。Journal是所有工作流程的真实来源,它是工作流程所采取的每个操作的事件源日志。

Simple触发类型是消费为每个输入一个新的密钥指定的流定义的触发器。工作流输入的工作流实例ID设置为新GUID。当传入数据没有与之关联的唯一标识符时,将使用此触发器。

步骤

步骤表示工作流需要采取的操作。步骤是任何工作流程的基础,也是定义其功能的基础。目前有三种不同类型的步骤:

  • Simple Steps(step):只表示一个动作,将始终执行,并且所有业务逻辑在步骤本身中都是自包含的。使用该step函数表示简单步骤。
  • 条件步骤(cond):包含a的步骤Condition,这些条件允许将行为信息编码到工作流DSL中,以根据特定逻辑决定执行哪些步骤。
  • 可选步骤(option):步骤包含条件但不需要具有负面情况的路径。条件步骤总是需要有正面和负面的路径。

谓词逻辑

谓词逻辑又名条件,可以在任一使用cond或option步骤或在触发流用于条件过滤。通过提取数据的工作条件规定的Qualifier基础上一些JSON路径,然后他们比较这与提取基于该预期值数据Comparison和OperandType 。支持的不同类型的条件是:

type Condition =
| True
| False
| Simple of Qualifier * path:string * expected:string
| Match of Qualifier * path:string * expectedRegx:string
| Compare of qualifier:Qualifier * typ:OperandsType * path:string * comparison:Comparison * expected:string
| Count of qualifier:Qualifier * path:string * comparison:Comparison * expected:string
| Exists of qualifier:Qualifier * path:string
| Not of Condition
| And of Condition List
| OR of Condition List

这些基本条件解释如下:

  • True | False:始终评估(True)或永不评估(False)
  • Simple: 从指定的数据中提取数据与预期值进行比较。
  • Match: 根据路径从指定的限定符中提取数据,并使用正则表达式匹配预期值。
  • Compare: 提取基于路径上的指定的数据,并与预期值比较,基于Comparison cast 到在OperandsType,Comparison类型目前支持以下比较:
    type Comparison =
    | GreaterThan
    | GreaterThanOrEqual
    | LesserThan
    | LesserThanOrEqual
    | NotEqual
    | Equal

OperandsType是常见的数据类型,目前支持的类型是:

type OperandsType =
| Int
| Long
| Double
| Decimal
| Boolean
| DateTime
  • Count: 据路径(通常是数组)从指定的限定符中提取数据,获取元素的计数,并与期望值(字符串化的int值)进行比较。
  • Exists: 根据路径从指定的限定符中提取数据,检测是否存在
  • Not | And | Or: 布尔运算符是否用于组成条件,即
let cond = Condition.Count(Qualifier.Input, <font>"$.a"</font><font>, Comparison.Equal, </font><font>"10"</font><font>)
let cond2 = Condition.Simple (Qualifier.Aggregate, </font><font>"$.bs"</font><font>, </font><font>"sting"</font><font>)
let cond3 = Condition.Simple (Qualifier.State, </font><font>"$.a"</font><font>, </font><font>"20"</font><font>)
let condORAnd = (cond OR (cond2 AND cond3)) |> Condition.Not
</font>

步骤组合

step(“CreateOrder”, “CreateOrder”) => 
     step(“PreDealOrder”, “PreDealOrder”)

我们使用合成运算符组合步骤:

  • chain(=>):步骤可以组合(=>)在一起形成一个链。因此a => b,意味着a首先执行步骤然后执行步骤b。
  • any(=?>):步骤可以由一组条件步骤组成,其中只执行满足条件的第一步。因此a =?> [b;c;d],暗示步骤a首先执行。然后,假设步b不符合条件,但步骤c和d满足条件,则由于c是第一位的,只有c得到执行。
  • every(=??>):步骤可以由一组条件组成,其中执行满足条件的每个步骤。所以a =??> [b;c;d]意味着一步a最先被执行,那么,假设步a不符合条件,但步骤c和d满足条件,那么这两个步骤c,并d得到执行。
  • all(=>>):可以在执行所有步骤的组中组合步骤。因此a =>> [b;c;d],意味着a首先执行步骤然后执行所有步骤b,c然后d执行下一步。

运行

F#DSL允许我们轻松设计和实现工作流程,但为了便于跨服务执行DSL,我们创建了一个不同的内部工作流表示,以便在我们的后端服务中使用。我们将DSL转换为DAG(有向无 环图 ),以将我们的规范DSL与运行时环境分离。我们的服务使用此图表来实际执行工作流程。此图表很容易表示为F#类型:

type Name = string
    
    type EdgeData = string
    
    type VertexData<'V> = Name * 'V <font><i>//'V can be the representation of a step</i></font><font>
    
    type Vertex<'V> = 
        { data : VertexData<'V>
          outEdges : Edge<'V> List }
    
    and Edge<'V> = 
        { data : EdgeData
          tail : Vertex<'V> }
    
    type Graph<'V> = 
        { name : Name
          vertices : Vertex<'V> List }
</font>

我们的运行时环境使用evaluate函数将我们的F#DSL转换为Graph : evaluate : workflow:Workflow -> WorkflowEvaluation. 工作流评估只是我们图形的一个类型包装器,它允许我们轻松查找用于哪个工作流的图形:

type WorkflowEvaluation = 
{
   name : string 
   model : Graph<WorkflowStep>  
}

该图允许我们的服务轻松遍历和理解任何给定工作流的路径,以及将我们的服务运行时与DSL表示分离,这使我们可能拥有可以编译到运行时的多个不同的DSL。DAG允许我们今天使用DSL,然后适应任何其他可能的DSL或语言,而无需更改我们的后端。这也使我们能够开发出一种设计语言,然后可以将其映射回DAG。可以通过图形UI指定此设计语言,以允许业务用户使用预定义的块轻松开发工作流。

除了允许我们的后端服务具有自己的运行时表示之外,它还允许我们通过将图形转换为 点图形 来轻松地在UI中可视化我们的工作流程, 点图形 是用于可视化图形的通常可接受的格式。

结论

所有这些不同元素的组合构成了我们工作流程方案的基础。虽然工作流DSL( Netflix Conductor , Apache Airflow 等)允许在运行时定义工作流,但我们发现在编译时定义工作流并通过源控制流程时,开发人员的实践会得到改进。

上述F#工作流模型允许通过F#签名检查进行工作流验证,以及能够运行和预提交工作流步骤和DSL的测试,以确保在部署之前的正确性。通过将我们的工作流DSL与后端执行层分离,我们已经能够演示以其他语言(javascript)和原始文本表达的工作流定义。

原文  https://www.jdon.com/51953
正文到此结束
Loading...