转载

Spring Boot集成Java DSL

Spring Integration Java DSL已经融合到 Spring Integration Core 5.0 ,这是一个聪明而明显的举动,因为:

  • 基于Java Config启动新Spring项目的每个人都使用它
  • SI Java DSL使您可以使用Lambdas等新的强大Java 8功能
  • 您可以使用 基于 IntegrationFlowBuilder 的 Builder模式 构建流

让我们看看基于ActiveMQ JMS的示例如何使用它。

Maven依赖:

<dependencies>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-activemq</artifactId>
    </dependency>

    <dependency>
        <groupId>org.springframework.integration</groupId>
        <artifactId>spring-integration-core</artifactId>
    </dependency>

    <dependency>
        <groupId>org.springframework.integration</groupId>
        <artifactId>spring-integration-jms</artifactId>
    </dependency>

    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-test</artifactId>
        <scope>test</scope>
    </dependency>

    <dependency>
        <groupId>org.apache.activemq</groupId>
        <artifactId>activemq-kahadb-store</artifactId>
    </dependency>

    <!-- https://mvnrepository.com/artifact/org.springframework.integration/spring-integration-java-dsl -->
    <dependency>
        <groupId>org.springframework.integration</groupId>
        <artifactId>spring-integration-java-dsl</artifactId>
        <version>1.2.3.RELEASE</version>
    </dependency>
</dependencies>

示例1:Jms入站网关

我们有以下 ServiceActivator :

@Service
public class ActiveMQEndpoint {
    @ServiceActivator(inputChannel = "inboundChannel")
    public void processMessage(final String inboundPayload) {
        System.out.println("Inbound message: "+inboundPayload);
    }
}

如果您想使用SI Java DSL 将inboundPayload从Jms队列发送到 Gateway 风格的激活器,那么请使用DSL  Jms 工厂:

@Bean
public DynamicDestinationResolver dynamicDestinationResolver() {
    return new DynamicDestinationResolver();
}

@Bean
public ActiveMQConnectionFactory connectionFactory() {
    return new ActiveMQConnectionFactory();
}

@Bean
public DefaultMessageListenerContainer listenerContainer() {
    final DefaultMessageListenerContainer defaultMessageListenerContainer = new DefaultMessageListenerContainer();
    defaultMessageListenerContainer.setDestinationResolver(dynamicDestinationResolver());
    defaultMessageListenerContainer.setConnectionFactory(connectionFactory());
    defaultMessageListenerContainer.setDestinationName("jms.activeMQ.Test");
    return defaultMessageListenerContainer;
}

@Bean
public MessageChannel inboundChannel() {
    return MessageChannels.direct("inboundChannel").get();
}

@Bean
public JmsInboundGateway dataEndpoint() {
    return Jms.inboundGateway(listenerContainer())
            .requestChannel(inboundChannel()).get();
}

通过dataEndpoint bean 返回 JmsInboundGatewaySpec ,您还可以向SI通道或Jms目标发送回复。查看文档。

示例2:Jms消息驱动的通道适配器

如果您正在寻找替换消息驱动通道适配器的XML JMS配置,那么 JmsMessageDrivenChannelAdapter 是一种适合您的方式:

@Bean
public DynamicDestinationResolver dynamicDestinationResolver() {
    return new DynamicDestinationResolver();
}

@Bean
public ActiveMQConnectionFactory connectionFactory() {
    return new ActiveMQConnectionFactory();
}

@Bean
public DefaultMessageListenerContainer listenerContainer() {
    final DefaultMessageListenerContainer defaultMessageListenerContainer = new DefaultMessageListenerContainer();
    defaultMessageListenerContainer.setDestinationResolver(dynamicDestinationResolver());
    defaultMessageListenerContainer.setConnectionFactory(connectionFactory());
    defaultMessageListenerContainer.setDestinationName("jms.activeMQ.Test");
    return defaultMessageListenerContainer;
}

@Bean
public MessageChannel inboundChannel() {
    return MessageChannels.direct("inboundChannel").get();
}

@Bean
public JmsMessageDrivenChannelAdapter dataEndpoint() {
    final ChannelPublishingJmsMessageListener channelPublishingJmsMessageListener =
            new ChannelPublishingJmsMessageListener();
    channelPublishingJmsMessageListener.setExpectReply(false);
    final JmsMessageDrivenChannelAdapter messageDrivenChannelAdapter = new
            JmsMessageDrivenChannelAdapter(listenerContainer(), channelPublishingJmsMessageListener
            );

    messageDrivenChannelAdapter.setOutputChannel(inboundChannel());
    return messageDrivenChannelAdapter;
}

与前面的示例一样,入站有效负载如样本1中一样发送给激活器。

示例3:使用JAXB的Jms消息驱动的通道适配器

在典型的场景中,您希望通过Jms接受XML作为文本消息,将其转换为JAXB存根并在服务激活器中处理它。我将向您展示如何使用SI Java DSL执行此操作,但首先让我们为xml处理添加两个依赖项:

<dependency>
        <groupId>org.springframework.integration</groupId>
        <artifactId>spring-integration-xml</artifactId>
    </dependency>

    <dependency>
        <groupId>org.springframework</groupId>
        <artifactId>spring-oxm</artifactId>
    </dependency>

我们将通过JMS接受shiporders ,所以首先XSD命名为shiporder.xsd:

<?xml version="1.0" encoding="UTF-8" ?>
<xs:schema xmlns:xs="http://www.w3.org/2001/XMLSchema">

    <xs:element name="shiporder">
        <xs:complexType>
            <xs:sequence>
                <xs:element name="orderperson" type="xs:string"/>
                <xs:element name="shipto">
                    <xs:complexType>
                        <xs:sequence>
                            <xs:element name="name" type="xs:string"/>
                            <xs:element name="address" type="xs:string"/>
                            <xs:element name="city" type="xs:string"/>
                            <xs:element name="country" type="xs:string"/>
                        </xs:sequence>
                    </xs:complexType>
                </xs:element>
                <xs:element name="item" maxOccurs="unbounded">
                    <xs:complexType>
                        <xs:sequence>
                            <xs:element name="title" type="xs:string"/>
                            <xs:element name="note" type="xs:string" minOccurs="0"/>
                            <xs:element name="quantity" type="xs:positiveInteger"/>
                            <xs:element name="price" type="xs:decimal"/>
                        </xs:sequence>
                    </xs:complexType>
                </xs:element>
            </xs:sequence>
            <xs:attribute name="orderid" type="xs:string" use="required"/>
        </xs:complexType>
    </xs:element>

</xs:schema>

新增JAXB maven plugin 生成JAXB存根:

  <plugin>
            <groupId>org.codehaus.mojo</groupId>
            <artifactId>jaxb2-maven-plugin</artifactId>
            <version>2.3.1</version>
            <executions>
                <execution>
                    <id>xjc-schema1</id>
                    <goals>
                        <goal>xjc</goal>
                    </goals>
                    <configuration>
                        <!-- Use all XSDs under the west directory for sources here. -->
                        <sources>
                            <source>src/main/resources/xsds/shiporder.xsd</source>
                        </sources>

                        <!-- Package name of the generated sources. -->
                        <packageName>com.example.stubs</packageName>
                        <outputDirectory>src/main/java</outputDirectory>
                        <clearOutputDir>false</clearOutputDir>
                    </configuration>
                </execution>
            </executions>
        </plugin>

我们已经准备好了存根类和一切,现在使用Jaxb magic的Java DSL JMS消息驱动适配器:

/**
 * Sample 3: Jms message driven adapter with JAXB
 */<font>
@Bean
<b>public</b> JmsMessageDrivenChannelAdapter dataEndpoint() {
    <b>final</b> ChannelPublishingJmsMessageListener channelPublishingJmsMessageListener =
            <b>new</b> ChannelPublishingJmsMessageListener();
    channelPublishingJmsMessageListener.setExpectReply(false);
    channelPublishingJmsMessageListener.setMessageConverter(<b>new</b> MarshallingMessageConverter(shipOrdersMarshaller()));
    <b>final</b> JmsMessageDrivenChannelAdapter messageDrivenChannelAdapter = <b>new</b>
            JmsMessageDrivenChannelAdapter(listenerContainer(), channelPublishingJmsMessageListener
    );

    messageDrivenChannelAdapter.setOutputChannel(inboundChannel());
    <b>return</b> messageDrivenChannelAdapter;
}

@Bean
<b>public</b> Jaxb2Marshaller shipOrdersMarshaller() {
    Jaxb2Marshaller marshaller = <b>new</b> Jaxb2Marshaller();
    marshaller.setContextPath(</font><font>"com.example.stubs"</font><font>);
    <b>return</b> marshaller;
}
</font>

XML配置在Java中使用它可以为您提供如此强大的功能和灵活性。要完成此示例,inboundChannel的服务激活器将如下所示:

<font><i>/**
 * Sample 3
 * @param shiporder
 */</i></font><font>
@ServiceActivator(inputChannel = </font><font>"inboundChannel"</font><font>)
<b>public</b> <b>void</b> processMessage(<b>final</b> Shiporder shiporder) {
    System.out.println(shiporder.getOrderid());
    System.out.println(shiporder.getOrderperson());
}
</font>

要测试流,您可以使用以下XML通过JConsole发送到JMS队列:

 <?xml version=<font>"1.0"</font><font> encoding=</font><font>"UTF-8"</font><font>?>        
    <shiporder orderid=</font><font>"889923"</font><font>
        xmlns:xsi=</font><font>"http://www.w3.org/2001/XMLSchema-instance"</font><font>
        xsi:noNamespaceSchemaLocation=</font><font>"shiporder.xsd"</font><font>>
      <orderperson>John Smith</orderperson>
        <shipto>
          <name>Ola Nordmann</name>
          <address>Langgt 23</address>
          <city>4000 Stavanger</city>
          <country>Norway</country>
        </shipto>
        <item>
          <title>Empire Burlesque</title>
          <note>Special Edition</note>
          <quantity>1</quantity>
          <price>10.90</price>
          </item>
        <item>
          <title>Hide your heart</title>
          <quantity>1</quantity>
          <price>9.90</price>
        </item>
    </shiporder>
</font>

有关如何使用ActiveMQ和JConsole的快速概述,请查看本 教程

示例4:具有JAXB和有效负载根路由的Jms消息驱动的通道适配器

另一种典型情况是接受XML作为JMS文本消息,将其转换为JAXB存根并根据有效负载根类型将有效负载路由到某个服务激活器。当然SI Java DSL支持所有类型的路由,我将向您展示如何根据有效载荷类型进行路由。

首先,将以下XSD添加到shiporder.xsd所在的文件夹中,并将其命名为purchaseorder.xsd:

<xsd:schema xmlns:xsd=<font>"http://www.w3.org/2001/XMLSchema"</font><font>
            xmlns:tns=</font><font>"http://tempuri.org/PurchaseOrderSchema.xsd"</font><font>
            targetNamespace=</font><font>"http://tempuri.org/PurchaseOrderSchema.xsd"</font><font>
            elementFormDefault=</font><font>"qualified"</font><font>>
    <xsd:element name=</font><font>"PurchaseOrder"</font><font>>
        <xsd:complexType>
            <xsd:sequence>
                <xsd:element name=</font><font>"ShipTo"</font><font> type=</font><font>"tns:USAddress"</font><font> maxOccurs=</font><font>"2"</font><font>/>
                <xsd:element name=</font><font>"BillTo"</font><font> type=</font><font>"tns:USAddress"</font><font>/>
            </xsd:sequence>
            <xsd:attribute name=</font><font>"OrderDate"</font><font> type=</font><font>"xsd:date"</font><font>/>
        </xsd:complexType>
    </xsd:element>

    <xsd:complexType name=</font><font>"USAddress"</font><font>>
        <xsd:sequence>
            <xsd:element name=</font><font>"name"</font><font>   type=</font><font>"xsd:string"</font><font>/>
            <xsd:element name=</font><font>"street"</font><font> type=</font><font>"xsd:string"</font><font>/>
            <xsd:element name=</font><font>"city"</font><font>   type=</font><font>"xsd:string"</font><font>/>
            <xsd:element name=</font><font>"state"</font><font>  type=</font><font>"xsd:string"</font><font>/>
            <xsd:element name=</font><font>"zip"</font><font>    type=</font><font>"xsd:integer"</font><font>/>
        </xsd:sequence>
        <xsd:attribute name=</font><font>"country"</font><font> type=</font><font>"xsd:NMTOKEN"</font><font> fixed=</font><font>"US"</font><font>/>
    </xsd:complexType>
</xsd:schema>
</font>

然后添加到jaxb maven插件配置:

 <plugin>
            <groupId>org.codehaus.mojo</groupId>
            <artifactId>jaxb2-maven-plugin</artifactId>
            <version>2.3.1</version>
            <executions>
                <execution>
                    <id>xjc-schema1</id>
                    <goals>
                        <goal>xjc</goal>
                    </goals>
                    <configuration>
                        <!-- Use all XSDs under the west directory <b>for</b> sources here. -->
                        <sources>
                            <source>src/main/resources/xsds/shiporder.xsd</source>
                            <source>src/main/resources/xsds/purchaseorder.xsd</source>
                        </sources>

                        <!-- Package name of the generated sources. -->
                        <packageName>com.example.stubs</packageName>
                        <outputDirectory>src/main/java</outputDirectory>
                        <clearOutputDir>false</clearOutputDir>
                    </configuration>
                </execution>
            </executions>
        </plugin>

运行mvn clean install以生成新XSD的JAXB存根。现在承诺有效负载根映射:

@Bean
<b>public</b> Jaxb2Marshaller ordersMarshaller() {
    Jaxb2Marshaller marshaller = <b>new</b> Jaxb2Marshaller();
    marshaller.setContextPath(<font>"com.example.stubs"</font><font>);
    <b>return</b> marshaller;
}

</font><font><i>/**
 * Sample 4: Jms message driven adapter with Jaxb and Payload routing.
 * @return
 */</i></font><font>
@Bean
<b>public</b> JmsMessageDrivenChannelAdapter dataEndpoint() {
    <b>final</b> ChannelPublishingJmsMessageListener channelPublishingJmsMessageListener =
            <b>new</b> ChannelPublishingJmsMessageListener();
    channelPublishingJmsMessageListener.setMessageConverter(<b>new</b> MarshallingMessageConverter(ordersMarshaller()));
    <b>final</b> JmsMessageDrivenChannelAdapter messageDrivenChannelAdapter = <b>new</b>
            JmsMessageDrivenChannelAdapter(listenerContainer(), channelPublishingJmsMessageListener
    );

    messageDrivenChannelAdapter.setOutputChannel(inboundChannel());
    <b>return</b> messageDrivenChannelAdapter;
}

@Bean
<b>public</b> IntegrationFlow payloadRootMapping() {
    <b>return</b> IntegrationFlows.from(inboundChannel()).<Object, Class<?>>route(Object::getClass, m->m
            .subFlowMapping(Shiporder.<b>class</b>, sf->sf.handle((MessageHandler) message -> {
                <b>final</b> Shiporder shiporder = (Shiporder) message.getPayload();
                System.out.println(shiporder.getOrderperson());
                System.out.println(shiporder.getOrderid());
            }))
            .subFlowMapping(PurchaseOrder.<b>class</b>, sf->sf.handle((MessageHandler) message -> {
                <b>final</b> PurchaseOrder purchaseOrderType = (PurchaseOrder) message.getPayload();
                System.out.println(purchaseOrderType.getBillTo().getName());
            }))
    ).get();
}
</font>

注意payloadRootMapping bean,让我们解释一下重要的部分:

  • <Object, Class<?>> route - 表示来自inboundChannel的输入将是Object,并且将根据Class <?>执行路由
  • subFlowMapping(Shiporder.class.. - ShipOders的处理。
  • subFlowMapping(PurchaseOrder.class ... - 处理PurchaseOrders。

要测试ShipOrder有效负载,请使用示例3中的XML,以测试PurchaseOrder有效负载,使用以下XML:

<?xml version=<font>"1.0"</font><font> encoding=</font><font>"utf-8"</font><font>?>  
<PurchaseOrder OrderDate=</font><font>"1900-01-01"</font><font> xmlns=</font><font>"http://tempuri.org/PurchaseOrderSchema.xsd"</font><font>>  
  <ShipTo country=</font><font>"US"</font><font>>  
    <name>name1</name>  
    <street>street1</street>  
    <city>city1</city>  
    <state>state1</state>  
    <zip>1</zip>  
  </ShipTo>  
  <ShipTo country=</font><font>"US"</font><font>>  
    <name>name2</name>  
    <street>street2</street>  
    <city>city2</city>  
    <state>state2</state>  
    <zip>-79228162514264337593543950335</zip>  
  </ShipTo>  
  <BillTo country=</font><font>"US"</font><font>>  
    <name>name1</name>  
    <street>street1</street>  
    <city>city1</city>  
    <state>state1</state>  
    <zip>1</zip>  
  </BillTo>  
</PurchaseOrder>
</font>

应根据subflow 子流Map路由两个有效载荷。

示例5:IntegrationFlowAdapter

除了企业集成模式的其他实现(check them out )),我需要提到IntegrationFlowAdapter。通过扩展此类并实现buildFlow方法,如:

[url=https:<font><i>//bitbucket.org/Component/]@Component[/url] </i></font><font>
<b>public</b> <b>class</b> MyFlowAdapter <b>extends</b> IntegrationFlowAdapter {

@Autowired
 <b>private</b> ConnectionFactory rabbitConnectionFactory;

 @Override
 <b>protected</b> IntegrationFlowDefinition<?> buildFlow() {
      <b>return</b> from(Amqp.inboundAdapter(<b>this</b>.rabbitConnectionFactory, </font><font>"myQueue"</font><font>))
               .<String, String>transform(String::toLowerCase)
               .channel(c -> c.queue(</font><font>"myFlowAdapterOutput"</font><font>));
 }
</font>

你可以将bean的重复声明包装成一个组件并给它们所需的流量。然后可以配置这样的组件并将其作为一个类实例提供给调用代码!

因此,让我们举例说明这个repo中的示例3更短一些,并为所有JmsEndpoints定义基类,并在其中定义重复bean:

<b>public</b> <b>class</b> JmsEndpoint <b>extends</b> IntegrationFlowAdapter {

    <b>private</b> String queueName;

    <b>private</b> String channelName;

    <b>private</b> String contextPath;

    <font><i>/**
     * @param queueName
     * @param channelName
     * @param contextPath
     */</i></font><font>
    <b>public</b> JmsEndpoint(String queueName, String channelName, String contextPath) {
        <b>this</b>.queueName = queueName;
        <b>this</b>.channelName = channelName;
        <b>this</b>.contextPath = contextPath;
    }

    @Override
    <b>protected</b> IntegrationFlowDefinition<?> buildFlow() {
        <b>return</b> from(Jms.messageDrivenChannelAdapter(listenerContainer())
            .jmsMessageConverter(<b>new</b> MarshallingMessageConverter(shipOrdersMarshaller()))
        ).channel(channelName);
    }

    @Bean
    <b>public</b> Jaxb2Marshaller shipOrdersMarshaller() {
        Jaxb2Marshaller marshaller = <b>new</b> Jaxb2Marshaller();
        marshaller.setContextPath(contextPath);
        <b>return</b> marshaller;
    }

    @Bean
    <b>public</b> DynamicDestinationResolver dynamicDestinationResolver() {
        <b>return</b> <b>new</b> DynamicDestinationResolver();
    }

    @Bean
    <b>public</b> ActiveMQConnectionFactory connectionFactory() {
        <b>return</b> <b>new</b> ActiveMQConnectionFactory();
    }

    @Bean
    <b>public</b> DefaultMessageListenerContainer listenerContainer() {
        <b>final</b> DefaultMessageListenerContainer defaultMessageListenerContainer = <b>new</b> DefaultMessageListenerContainer();
        defaultMessageListenerContainer.setDestinationResolver(dynamicDestinationResolver());
        defaultMessageListenerContainer.setConnectionFactory(connectionFactory());
        defaultMessageListenerContainer.setDestinationName(queueName);
        <b>return</b> defaultMessageListenerContainer;
    }

    @Bean
    <b>public</b> MessageChannel inboundChannel() {
        <b>return</b> MessageChannels.direct(channelName).get();
    }
}
</font>

现在声明特定队列的Jms端点很容易:

@Bean
<b>public</b> JmsEndpoint jmsEndpoint() {
    <b>return</b> <b>new</b> JmsEndpoint(<font>"jms.activeMQ.Test"</font><font>, </font><font>"inboundChannel"</font><font>, </font><font>"com.example.stubs"</font><font>);
}
</font>

inboundChannel的服务激活器:

<font><i>/**
 * Sample 3, 5
 * @param shiporder
 */</i></font><font>
@ServiceActivator(inputChannel = </font><font>"inboundChannel"</font><font>)
<b>public</b> <b>void</b> processMessage(<b>final</b> Shiporder shiporder) {
    System.out.println(shiporder.getOrderid());
    System.out.println(shiporder.getOrderperson());
}
</font>

您不应该错过在项目中使用IntegrationFlowAdapter。我喜欢它的概念。

我最近在 Embedit 的新的基于Spring Boot的项目中开始使用Spring Integration Java DSL 。即使有一些配置,我发现它非常有用。

  • 它很容易调试。不添加像wiretap这样的配置。
  • 阅读起来要容易得多。是的,即使是lambdas!
  • 它很强大。在Java配置中,您现在有很多选择。

点击标题见原文, 源码

原文  https://www.jdon.com/51378
正文到此结束
Loading...