44import java .io .InputStreamReader ;
55import java .util .LinkedList ;
66import java .util .List ;
7+ import java .util .concurrent .TimeUnit ;
78
89import io .reactivex .Observable ;
910import io .reactivex .ObservableEmitter ;
1011import io .reactivex .ObservableOnSubscribe ;
12+ import io .reactivex .disposables .Disposable ;
13+ import io .reactivex .functions .Consumer ;
1114import io .reactivex .schedulers .Schedulers ;
1215
1316/**
@@ -96,6 +99,8 @@ public class CommandTask {
9699 private Process mProcess = null ;
97100 private int mStatus = STATUS_WAITING ;
98101 private IListener mIListener = null ;
102+ private Disposable mTimer = null ;
103+ private String mCmd = null ;
99104
100105 private CommandTask (List <String > command , final IListener listener ) {
101106 mCommand = command ;
@@ -116,10 +121,12 @@ public void onProgress(String message) {
116121
117122 @ Override
118123 public void onError (Throwable t ) {
119- mStatus = STATUS_FINISHED ;
120- if (listener != null ) {
121- listener .onError (t );
124+ if (mStatus != STATUS_INTERRUPT ) {
125+ if (listener != null ) {
126+ listener .onError (t );
127+ }
122128 }
129+ mStatus = STATUS_FINISHED ;
123130 remove (CommandTask .this );
124131 }
125132
@@ -152,19 +159,24 @@ public void subscribe(ObservableEmitter<String> emitter) throws Exception {
152159 for (String item : mCommand ) {
153160 cmd .append (item ).append (" " );
154161 }
155- mIListener .onPre (cmd .toString ());
162+ mCmd = cmd .toString ();
163+ mIListener .onPre (mCmd );
164+ restartTimer ();
156165 mProcess = new ProcessBuilder (mCommand ).redirectErrorStream (true ).start ();
157166
158167 BufferedReader stdin = new BufferedReader (new InputStreamReader (mProcess .getInputStream ()));
159168 StringBuilder result = new StringBuilder ();
160169 String line = null ;
161- while ((line = stdin .readLine ()) != null && mStatus != STATUS_INTERRUPT ) {
162- mIListener .onProgress (line );
163- result .append (line );
170+ while (mStatus == STATUS_RUNNING && (line = stdin .readLine ()) != null ) {
171+ if (mStatus == STATUS_RUNNING ) {
172+ restartTimer ();
173+ mIListener .onProgress (line );
174+ result .append (line );
175+ }
164176 }
165177 stdin .close ();
166-
167- if (mStatus != STATUS_INTERRUPT ) {
178+ resetTimer ();
179+ if (mStatus == STATUS_RUNNING ) {
168180 mIListener .onSuccess (result .toString ());
169181 }
170182 }
@@ -174,23 +186,34 @@ public void subscribe(ObservableEmitter<String> emitter) throws Exception {
174186 }
175187 }
176188
189+ private void resetTimer () {
190+ if (mTimer != null && !mTimer .isDisposed ()) {
191+ mTimer .dispose ();
192+ }
193+ }
194+
195+ private void restartTimer () {
196+ resetTimer ();
197+ mTimer = Observable .timer (mTimeOut , TimeUnit .MILLISECONDS ).subscribe (new Consumer <Long >() {
198+ @ Override
199+ public void accept (Long aLong ) throws Exception {
200+ mIListener .onError (new Throwable ("Time out : " + mCmd ));
201+ discard ();
202+ }
203+ });
204+ }
205+
177206 private boolean isRunning () {
178207 return mStatus == STATUS_RUNNING ;
179208 }
180209
181210 private void cancel () {
182211 mStatus = STATUS_INTERRUPT ;
183- if (mProcess != null ) {
184- mProcess .destroy ();
185- }
186212 mTaskQueue .remove (this );
187213 }
188214
189215 public void discard () {
190216 mStatus = STATUS_INTERRUPT ;
191- if (mProcess != null ) {
192- mProcess .destroy ();
193- }
194217 remove (this );
195218 }
196219
0 commit comments