Skip to content

Commit 1ad7964

Browse files
allow resubscriptions on client stream and channel calls (#50)
* fix channel interactions resubscriptions Signed-off-by: Maksym Ostroverkhov <m.ostroverkhov@gmail.com>
1 parent 6d74f57 commit 1ad7964

File tree

2 files changed

+24
-4
lines changed

2 files changed

+24
-4
lines changed

.travis.yml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,8 +22,8 @@ jobs:
2222
before_install:
2323
- rm -r "/c/Program Files (x86)/Microsoft Visual Studio 14.0"
2424
- export PATH=$PATH:"/c/Program Files (x86)/Microsoft Visual Studio/2017/BuildTools/VC/Tools/MSVC/14.16.27023/bin/HostX86/x64"
25-
- choco install -y adoptopenjdk8
26-
- export PATH=$PATH:"/c/Program Files/AdoptOpenJDK/jdk-8.0.222.10-hotspot/bin"
25+
- choco install -y adoptopenjdk8 --version 8.232.9
26+
- export PATH=$PATH:"/c/Program Files/AdoptOpenJDK/jdk-8.0.232.09-hotspot/bin"
2727
- wget https://github.com/google/protobuf/releases/download/v3.6.1/protobuf-cpp-3.6.1.tar.gz
2828
- tar -xzf protobuf-cpp-3.6.1.tar.gz
2929
- pushd protobuf-3.6.1/cmake

rsocket-rpc-protobuf/src/java_plugin/cpp/java_generator.cpp

Lines changed: 22 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -676,15 +676,31 @@ static void PrintClient(const ServiceDescriptor* service,
676676
p->Indent();
677677
p->Print(
678678
*vars,
679-
"return rSocket.requestChannel($Flux$.$from$(messages).map(\n");
679+
"return rSocket.requestChannel(\n");
680+
p->Indent();
681+
p->Print(
682+
*vars,
683+
"$Flux$.defer(new $Supplier$<$Publisher$<$Payload$>>() {\n\n");
684+
p->Indent();
685+
686+
p->Print(
687+
*vars,
688+
"@$Override$\n"
689+
"public $Publisher$<$Payload$> get() {\n");
690+
p->Print(
691+
*vars,
692+
"final $AtomicBoolean$ once = new $AtomicBoolean$(false);\n\n");
693+
p->Indent();
694+
p->Print(
695+
*vars,
696+
"return $Flux$.$from$(messages).map(\n");
680697
p->Indent();
681698
p->Print(
682699
*vars,
683700
"new $Function$<$MessageLite$, $Payload$>() {\n");
684701
p->Indent();
685702
p->Print(
686703
*vars,
687-
"private final $AtomicBoolean$ once = new $AtomicBoolean$(false);\n\n"
688704
"@$Override$\n"
689705
"public $Payload$ apply($MessageLite$ message) {\n");
690706
p->Indent();
@@ -708,6 +724,10 @@ static void PrintClient(const ServiceDescriptor* service,
708724
p->Outdent();
709725
p->Print("}\n");
710726
p->Outdent();
727+
p->Print(" });\n");
728+
p->Outdent();
729+
p->Print(" }\n");
730+
711731
if (server_streaming) {
712732
p->Print(
713733
*vars,

0 commit comments

Comments
 (0)