Skip to content

Commit c10a1a0

Browse files
committed
Adding reactive demo using R2DBC and WebFlux
1 parent 7037e1e commit c10a1a0

File tree

14 files changed

+507
-0
lines changed

14 files changed

+507
-0
lines changed

pom.xml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
<module>roach-data-jpa-orders</module>
2828
<module>roach-data-json</module>
2929
<module>roach-data-mybatis</module>
30+
<module>roach-data-reactive</module>
3031
</modules>
3132

3233
<dependencyManagement>

roach-data-reactive/README.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
# Roach Demo Data :: Reactive
2+
3+
A CockroachDB Spring Boot Demo using [Spring Data R2DBC](https://spring.io/projects/spring-data-r2dbc)
4+
for data access combined with WebFlux.
5+

roach-data-reactive/pom.xml

Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
<project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
2+
xmlns="http://maven.apache.org/POM/4.0.0"
3+
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
4+
<modelVersion>4.0.0</modelVersion>
5+
<parent>
6+
<groupId>io.roach.data</groupId>
7+
<artifactId>roach-data-parent</artifactId>
8+
<version>1.0.0.BUILD-SNAPSHOT</version>
9+
</parent>
10+
11+
<artifactId>roach-data-reactive</artifactId>
12+
13+
<dependencies>
14+
<dependency>
15+
<groupId>org.springframework.boot</groupId>
16+
<artifactId>spring-boot-starter-webflux</artifactId>
17+
</dependency>
18+
<dependency>
19+
<groupId>org.springframework.boot</groupId>
20+
<artifactId>spring-boot-starter-jetty</artifactId>
21+
</dependency>
22+
<dependency>
23+
<groupId>org.springframework.boot</groupId>
24+
<artifactId>spring-boot-starter-hateoas</artifactId>
25+
</dependency>
26+
<dependency>
27+
<groupId>org.springframework.boot</groupId>
28+
<artifactId>spring-boot-starter-aop</artifactId>
29+
</dependency>
30+
<dependency>
31+
<groupId>org.springframework.boot</groupId>
32+
<artifactId>spring-boot-starter-data-r2dbc</artifactId>
33+
<!-- <version>0.8.10.RELEASE</version>-->
34+
</dependency>
35+
<dependency>
36+
<groupId>org.springframework.data</groupId>
37+
<artifactId>spring-data-r2dbc</artifactId>
38+
<!-- <version>1.4.0</version>-->
39+
</dependency>
40+
<dependency>
41+
<groupId>io.r2dbc</groupId>
42+
<artifactId>r2dbc-postgresql</artifactId>
43+
<scope>runtime</scope>
44+
</dependency>
45+
</dependencies>
46+
47+
<build>
48+
<finalName>roach-data-reactive</finalName>
49+
<plugins>
50+
<plugin>
51+
<groupId>org.springframework.boot</groupId>
52+
<artifactId>spring-boot-maven-plugin</artifactId>
53+
</plugin>
54+
</plugins>
55+
</build>
56+
</project>
Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
package io.roach.data.reactive;
2+
3+
import java.math.BigDecimal;
4+
5+
import org.springframework.data.annotation.Id;
6+
import org.springframework.data.relational.core.mapping.Table;
7+
8+
@Table("account")
9+
public class Account {
10+
@Id
11+
private Long id;
12+
13+
private String name;
14+
15+
private AccountType type;
16+
17+
private BigDecimal balance;
18+
19+
public Long getId() {
20+
return id;
21+
}
22+
23+
public String getName() {
24+
return name;
25+
}
26+
27+
public AccountType getType() {
28+
return type;
29+
}
30+
31+
public BigDecimal getBalance() {
32+
return balance;
33+
}
34+
}
Lines changed: 94 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,94 @@
1+
package io.roach.data.reactive;
2+
3+
import java.math.BigDecimal;
4+
5+
import org.springframework.beans.factory.annotation.Autowired;
6+
import org.springframework.data.domain.Page;
7+
import org.springframework.data.domain.PageImpl;
8+
import org.springframework.data.domain.PageRequest;
9+
import org.springframework.data.domain.Pageable;
10+
import org.springframework.data.domain.Sort;
11+
import org.springframework.data.web.PageableDefault;
12+
import org.springframework.hateoas.RepresentationModel;
13+
import org.springframework.http.HttpStatus;
14+
import org.springframework.http.ResponseEntity;
15+
import org.springframework.transaction.annotation.Transactional;
16+
import org.springframework.web.bind.annotation.GetMapping;
17+
import org.springframework.web.bind.annotation.PathVariable;
18+
import org.springframework.web.bind.annotation.PostMapping;
19+
import org.springframework.web.bind.annotation.RequestParam;
20+
import org.springframework.web.bind.annotation.RestController;
21+
22+
import reactor.core.publisher.Mono;
23+
24+
import static org.springframework.hateoas.server.mvc.WebMvcLinkBuilder.linkTo;
25+
import static org.springframework.hateoas.server.mvc.WebMvcLinkBuilder.methodOn;
26+
import static org.springframework.transaction.annotation.Propagation.REQUIRES_NEW;
27+
28+
@RestController
29+
public class AccountController {
30+
@Autowired
31+
private AccountRepository accountRepository;
32+
33+
@GetMapping
34+
public ResponseEntity<RepresentationModel> index() {
35+
RepresentationModel index = new RepresentationModel();
36+
37+
index.add(linkTo(methodOn(AccountController.class)
38+
.listAccounts(PageRequest.of(0, 5)))
39+
.withRel("accounts"));
40+
41+
index.add(linkTo(AccountController.class)
42+
.slash("transfer{?fromId,toId,amount}")
43+
.withRel("transfer"));
44+
45+
return new ResponseEntity<>(index, HttpStatus.OK);
46+
}
47+
48+
@GetMapping("/account")
49+
@Transactional(propagation = REQUIRES_NEW)
50+
public Mono<Page<Account>> listAccounts(@PageableDefault(size = 5, direction = Sort.Direction.ASC) Pageable page) {
51+
return getAccounts(PageRequest.of(page.getPageNumber(), page.getPageSize()));
52+
}
53+
54+
private Mono<Page<Account>> getAccounts(PageRequest pageRequest) {
55+
return this.accountRepository.findAllBy(pageRequest)
56+
.collectList()
57+
.zipWith(this.accountRepository.count())
58+
.map(t -> new PageImpl<>(t.getT1(), pageRequest, t.getT2()));
59+
}
60+
61+
@GetMapping(value = "/account/{id}")
62+
@Transactional(propagation = REQUIRES_NEW)
63+
public Mono<ResponseEntity<Account>> getAccount(@PathVariable("id") Long accountId) {
64+
return accountRepository.findById(accountId)
65+
.map(ResponseEntity::ok)
66+
.defaultIfEmpty(ResponseEntity.notFound().build());
67+
}
68+
69+
@PostMapping(value = "/transfer")
70+
@Transactional(propagation = REQUIRES_NEW)
71+
public Mono<Void> transfer(
72+
@RequestParam("fromId") Long fromId,
73+
@RequestParam("toId") Long toId,
74+
@RequestParam("amount") BigDecimal amount
75+
) {
76+
if (amount.compareTo(BigDecimal.ZERO) < 0) {
77+
throw new IllegalArgumentException("Negative amount");
78+
}
79+
if (fromId.equals(toId)) {
80+
throw new IllegalArgumentException("From and to accounts must be different");
81+
}
82+
83+
return accountRepository.getBalance(fromId).map(balance -> {
84+
if (balance.compareTo(BigDecimal.ZERO) < 0) {
85+
throw new NegativeBalanceException("Insufficient funds " + amount + " for account " + fromId);
86+
}
87+
return balance;
88+
}).flatMap(unused -> {
89+
return accountRepository.updateBalance(fromId, amount.negate());
90+
}).flatMap(unused -> {
91+
return accountRepository.updateBalance(toId, amount);
92+
});
93+
}
94+
}
Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
package io.roach.data.reactive;
2+
3+
import java.math.BigDecimal;
4+
5+
import org.springframework.data.domain.Pageable;
6+
import org.springframework.data.r2dbc.repository.Modifying;
7+
import org.springframework.data.r2dbc.repository.Query;
8+
import org.springframework.data.repository.reactive.ReactiveSortingRepository;
9+
import org.springframework.stereotype.Repository;
10+
import org.springframework.transaction.annotation.Transactional;
11+
12+
import reactor.core.publisher.Flux;
13+
import reactor.core.publisher.Mono;
14+
15+
import static org.springframework.transaction.annotation.Propagation.MANDATORY;
16+
17+
@Repository
18+
@Transactional(propagation = MANDATORY)
19+
public interface AccountRepository extends ReactiveSortingRepository<Account, Long> {
20+
Flux<Account> findAllBy(Pageable pageable);
21+
22+
@Query(value = "select balance from Account where id=:id")
23+
Mono<BigDecimal> getBalance(Long id);
24+
25+
@Modifying
26+
@Query("update Account set balance = balance + :balance where id=:id")
27+
Mono<Void> updateBalance(Long id, BigDecimal balance);
28+
}
Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
package io.roach.data.reactive;
2+
3+
public enum AccountType {
4+
asset,
5+
expense
6+
}
Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
package io.roach.data.reactive;
2+
3+
import org.springframework.dao.DataIntegrityViolationException;
4+
import org.springframework.http.HttpStatus;
5+
import org.springframework.web.bind.annotation.ResponseStatus;
6+
7+
@ResponseStatus(value = HttpStatus.BAD_REQUEST, reason = "Negative balance")
8+
public class NegativeBalanceException extends DataIntegrityViolationException {
9+
public NegativeBalanceException(String message) {
10+
super(message);
11+
}
12+
}
Lines changed: 117 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,117 @@
1+
package io.roach.data.reactive;
2+
3+
import java.math.BigDecimal;
4+
import java.util.ArrayDeque;
5+
import java.util.Deque;
6+
import java.util.HashMap;
7+
import java.util.Map;
8+
import java.util.concurrent.CompletableFuture;
9+
import java.util.concurrent.ExecutionException;
10+
import java.util.concurrent.Executors;
11+
import java.util.concurrent.ScheduledExecutorService;
12+
import java.util.concurrent.ThreadLocalRandom;
13+
import java.util.stream.IntStream;
14+
15+
import org.slf4j.Logger;
16+
import org.slf4j.LoggerFactory;
17+
import org.springframework.boot.CommandLineRunner;
18+
import org.springframework.boot.WebApplicationType;
19+
import org.springframework.boot.autoconfigure.SpringBootApplication;
20+
import org.springframework.boot.builder.SpringApplicationBuilder;
21+
import org.springframework.context.annotation.Bean;
22+
import org.springframework.context.annotation.EnableAspectJAutoProxy;
23+
import org.springframework.core.io.ClassPathResource;
24+
import org.springframework.data.r2dbc.repository.config.EnableR2dbcRepositories;
25+
import org.springframework.hateoas.Link;
26+
import org.springframework.hateoas.config.EnableHypermediaSupport;
27+
import org.springframework.r2dbc.connection.init.ConnectionFactoryInitializer;
28+
import org.springframework.r2dbc.connection.init.ResourceDatabasePopulator;
29+
import org.springframework.transaction.annotation.EnableTransactionManagement;
30+
import org.springframework.web.client.HttpStatusCodeException;
31+
import org.springframework.web.client.RestTemplate;
32+
33+
import io.r2dbc.spi.ConnectionFactory;
34+
35+
@EnableHypermediaSupport(type = EnableHypermediaSupport.HypermediaType.HAL)
36+
@EnableAspectJAutoProxy(proxyTargetClass = true)
37+
@EnableTransactionManagement
38+
@SpringBootApplication
39+
@EnableR2dbcRepositories
40+
public class ReactiveApplication implements CommandLineRunner {
41+
protected static final Logger logger = LoggerFactory.getLogger(ReactiveApplication.class);
42+
43+
public static void main(String[] args) {
44+
new SpringApplicationBuilder(ReactiveApplication.class).web(WebApplicationType.SERVLET).run(args);
45+
}
46+
47+
@Bean
48+
public ConnectionFactoryInitializer initializer(ConnectionFactory connectionFactory) {
49+
ConnectionFactoryInitializer initializer = new ConnectionFactoryInitializer();
50+
initializer.setConnectionFactory(connectionFactory);
51+
initializer.setDatabasePopulator(new ResourceDatabasePopulator(new ClassPathResource("db/create.sql")));
52+
return initializer;
53+
}
54+
55+
@Override
56+
public void run(String... args) {
57+
logger.info("Lets move some $$ around!");
58+
59+
final int concurrency = args.length > 0 ? Integer.parseInt(args[0]) : 10;
60+
61+
final Link transferLink = Link.of("http://localhost:9090/transfer/{?fromId,toId,amount}");
62+
final ScheduledExecutorService executorService = Executors.newScheduledThreadPool(concurrency);
63+
final Deque<CompletableFuture<Integer>> futures = new ArrayDeque<>();
64+
final ThreadLocalRandom random = ThreadLocalRandom.current();
65+
66+
IntStream.rangeClosed(1, concurrency).forEach(value -> {
67+
CompletableFuture<Integer> f = CompletableFuture.supplyAsync(() -> {
68+
RestTemplate template = new RestTemplate();
69+
int errors = 0;
70+
for (int j = 0; j < 100; j++) {
71+
int fromId = 1 + random.nextInt(10000);
72+
int toId = fromId % 10000 + 1;
73+
74+
BigDecimal amount = new BigDecimal("10.00");
75+
76+
Map<String, Object> form = new HashMap<>();
77+
form.put("fromId", fromId);
78+
form.put("toId", toId);
79+
form.put("amount", amount);
80+
81+
String uri = transferLink.expand(form).getHref();
82+
83+
logger.debug("({}) Transfer {} from {} to {}", uri, amount, fromId, toId);
84+
85+
try {
86+
template.postForEntity(uri, null, String.class);
87+
} catch (HttpStatusCodeException e) {
88+
logger.warn(e.getResponseBodyAsString());
89+
errors++;
90+
}
91+
}
92+
return errors;
93+
});
94+
futures.add(f);
95+
});
96+
97+
int totalErrors = 0;
98+
99+
while (!futures.isEmpty()) {
100+
try {
101+
int errors = futures.pop().get();
102+
totalErrors += errors;
103+
logger.info("Worker finished with {} errors - {} remaining", errors, futures.size());
104+
} catch (InterruptedException e) {
105+
Thread.currentThread().interrupt();
106+
} catch (ExecutionException e) {
107+
logger.warn("Worker failed", e.getCause());
108+
}
109+
}
110+
111+
logger.info("All client workers finished with {} errors and server keeps running. Have a nice day!",
112+
totalErrors);
113+
114+
executorService.shutdownNow();
115+
}
116+
}
117+

0 commit comments

Comments
 (0)