使用R2DBC实现数据库的响应式访问

Reactive Programming可以看作是一种编程模型,它通过创建事件驱动的非阻塞功能管道来促进可扩展性和稳定性,这些管道对资源的可用性和可处理性做出反应。延迟执行, 并发和异步性
只是底层编程模型的结果。

只有当整个堆栈都是被动的并且所有参与的组件(应用程序代码,运行时容器,集成)都遵循延迟执行,非阻塞API数据流的流式特性时,响应式编程的全部好处才会生效 – 基本上遵循基本假设。

虽然可以将非反应性组件引入以函数响应式编写的应用程序,但最终结果是对可扩展性和稳定性影响,实际预期收益减少。在最坏的情况下,运行时行为很少或没有区别。但是,响应式编程有助于提高代码的可读性。

如果我们查看响应式生态系统,我们将发现几个框架,库和集成。他们每个人都有自己的特长。通过通用方法或在特定响应框架的上下文中,可以很好地涵盖许多功能领域。我们来讨论关系数据库集成。

最常见的问题是:我们什么时候可以使用API​​进行响应性关系数据库集成?

Java使用JDBC作为与关系数据库集成的主要技术。JDBC具有阻塞性 – 没有什么可以减轻JDBC的阻塞性质。关于如何使调用无阻塞的第一个想法是将JDBC调用卸载到Executor(通常是Thread池)。虽然这种方法有些作用,但它有几个缺点,忽略了响应式编程模型的好处。

响应运行时通常使用与CPU核心数匹配的有限数量的线程。额外的线程引入开销并减少线程限制的影响。此外,JDBC调用通常会堆积在队列中,一旦线程充满请求,池将再次阻塞。所以,JDBC现在不是唯一一个选择。

响应式数据库API

Oracle宣布推出 ADBA
,该计划旨在通过使用期货为Java中的异步数据库访问提供标准化API。Postgres正在研究可用于第一次实验的 Postgres ADBA驱动程序
。 PgNio
是Postgres的另一个异步驱动程序,它开始尝试使用ADBA。

ADBA的可用性未知。它绝对不会在Java 12的版本中可使用,ADBA计划首次亮相的Java版本目前尚不清楚。

以下代码段显示了使用INSERT和SELECT语句的ADBA :

DataSource ds = dataSource();
CompletableFuture<Long> t;

try (Session session = ds.getSession()) {

  Submission<Long> submit = session
  .<Long>rowCountOperation(
    "INSERT INTO legoset (id, name, manual) " +
    "VALUES($1, $2, $3)")
    .set("$1", 42055, AdbaType.INTEGER)
    .set("$2", "Description", AdbaType.VARCHAR)
    .set("$3", null, AdbaType.INTEGER)
    .apply(Result.RowCount::getCount)
    .submit();

  t = submit.getCompletionStage().toCompletableFuture();
}

t.join();

CompletableFuture<List<Map<String, Object>>> t;
try (Session session = ds.getSession()) {

  Submission<List<Map<String, Object>>> submit = session
    .<List<Map<String, Object>>> rowOperation(
      "SELECT id, name, manual FROM legoset")
    .collect(collectToMap()) // custom collector
    .submit();
  t = submit.getCompletionStage().toCompletableFuture();
}

t.join();

请注意,这collectToMap(…)是应用程序提供的函数的示例,该函数将结果提取到所需的返回类型。

R2DBC

由于缺乏标准API和驱动程序不可用, Pivotal
的团队开始研究反应性关系API的想法,该API非常适合用于反应式编程。他们提出了 R2DBC
,它代表了Reactive Relational Database Connectivity。截至目前,R2DBC是一个孵化器项目,用于评估可行性并开始讨论驱动程序供应商是否有兴趣支持反应/非阻塞/异步驱动程序。

截至目前,有三种驱动程序实现:

R2DBC附带API规范(r2dbc-spi)和客户端(r2dbc-client),使SPI可用于应用程序。

以下代码段显示R2DBC SPI使用INSERT和SELECT语句:

ConnectionFactory connectionFactory = null;

Mono<Integer> count = Mono.from(connectionFactory.create())
  .flatMapMany(it ->
    it.createStatement(
    "INSERT INTO legoset (id, name, manual) " +
    "VALUES($1, $2, $3)")
      .bind("$1", 42055)
      .bind("$2", "Description")
      .bindNull("$3", Integer.class)
      .execute())
  .flatMap(io.r2dbc.spi.Result::getRowsUpdated)
  .next();

Flux<Map<String, Object>> rows = Mono.from(connectionFactory.create())
  .flatMapMany(it -> it.createStatement(
    "SELECT id, name, manual FROM legoset").execute())
  .flatMap(it -> it.map((row, rowMetadata) -> collectToMap(row, rowMetadata)));

虽然上面的代码有点笨重,但R2DBC还附带了一个客户端库项目,用于更人性化的用户API。R2DBC SPI不用于直接使用,而是通过客户端库使用。

使用R2DBC客户端重写的相同代码将是:

R2dbc r2dbc = new R2dbc(connectionFactory);

Flux<Integer> count = r2dbc.inTransaction(handle ->
  handle.createQuery(
    "INSERT INTO legoset (id, name, manual) " +
    "VALUES($1, $2, $3)")
    .bind("$1", 42055)
    .bind("$2", "Description")
    .bindNull("$3", Integer.class)
    .mapResult(io.r2dbc.spi.Result::getRowsUpdated));

Flux<Map<String, Object>> rows = r2dbc
  .inTransaction(handle -> handle.select(
    "SELECT id, name, manual FROM legoset")
  .mapRow((row, rowMetadata) -> collectToMap(row, rowMetadata));

请注意,collectToMap(…)作为应用程序提供的函数的示例,该函数将结果提取到所需的返回类型。

Spring Data团队将 Spring Data R2DBC
作为孵化器启动,通过数据库客户端提供响应API并支持反应式存储库。使用Spring Data R2DBC重写的示例代码将是:

DatabaseClient databaseClient = DatabaseClient.create(connectionFactory);

Mono<Integer> count = databaseClient.execute()
  .sql(
    "INSERT INTO legoset (id, name, manual) " +
    "VALUES($1, $2, $3)")
  .bind("$1", 42055)
  .bind("$2", "Description")
  .bindNull("$3", Integer.class)
  .fetch()
  .rowsUpdated();

Flux<Map<String, Object>> rows = databaseClient.execute()
  .sql("SELECT id, name, manual FROM legoset")
  .fetch()
  .all();

R2DBC及其生态系统仍然很年轻,并要求进行实验和反馈以收集用例,并查看反应性关系数据库集成是否有意义。

Fibers的JDBC

虽然JDBC和其他技术暴露了阻塞API(主要是由于等待I / O),但 Project Loom
引入Fibers了轻量级抽象,将阻塞API转变为非阻塞API。一旦调用遇到阻塞API,就可以通过堆栈切换实现这一点。因此Fiber,继续使用阻塞API的先前流程的基础尝试。

该Fiber执行模式大大减少了所需的原生线程数。结果是更好的可伸缩性和非阻塞行为 – 通过卸载对Fiber-backed的阻塞调用Executor。我们所需要的只是一个适当的API,允许在Fiber实现上使用非阻塞JDBC 。

结论

反应式编程和关系数据库的未来是什么?老实说,我不知道。如果我尝试了一个有根据的猜测,我可以看到Project Loom和它的基于Fiber的Executor与成熟的JDBC驱动程序相结合,成为业界潜在的游戏规则改变者。随着Java的加速发布节奏,这可能并不遥远。

ADBA的目标是包含在Java标准版运行时中,根据当前的时间表,我预计它将在早于2021年的Java 17中出现。

与现在可用的R2DBC形成对比。它配备了驱动程序和客户端,并允许实验使用。R2DBC的一个简洁的副作用是它暴露完全反应的API,同时独立于底层数据库引擎。随着已经发布的版本,没有必要猜测Project Loom,也不需要等待三年才能测试其驱动API。今天就可以使用R2DBC。

原文 

https://www.jdon.com/50943

本站部分文章源于互联网,本着传播知识、有益学习和研究的目的进行的转载,为网友免费提供。如有著作权人或出版方提出异议,本站将立即删除。如果您对文章转载有任何疑问请告之我们,以便我们及时纠正。

PS:推荐一个微信公众号: askHarries 或者qq群:474807195,里面会分享一些资深架构师录制的视频录像:有Spring,MyBatis,Netty源码分析,高并发、高性能、分布式、微服务架构的原理,JVM性能优化这些成为架构师必备的知识体系。还能领取免费的学习资源,目前受益良多

转载请注明原文出处:Harries Blog™ » 使用R2DBC实现数据库的响应式访问

赞 (0)
分享到:更多 ()

评论 0

  • 昵称 (必填)
  • 邮箱 (必填)
  • 网址