1616package jp .xet .sparwings .aws .sqs ;
1717
1818import java .nio .charset .StandardCharsets ;
19+ import java .util .Collections ;
1920import java .util .List ;
2021import java .util .concurrent .ExecutionException ;
2122import java .util .concurrent .ExecutorService ;
2223import java .util .concurrent .Executors ;
2324import java .util .concurrent .Future ;
2425import java .util .concurrent .TimeUnit ;
2526import java .util .concurrent .TimeoutException ;
27+ import java .util .function .Consumer ;
2628
2729import lombok .Getter ;
2830import lombok .RequiredArgsConstructor ;
2931import lombok .Setter ;
32+ import lombok .extern .slf4j .Slf4j ;
3033
3134import org .springframework .retry .support .RetryTemplate ;
3235import org .springframework .scheduling .annotation .Scheduled ;
3336import org .springframework .util .DigestUtils ;
3437
35- import org .slf4j .Logger ;
36- import org .slf4j .LoggerFactory ;
37-
3838import com .amazonaws .services .sqs .AmazonSQS ;
3939import com .amazonaws .services .sqs .model .ChangeMessageVisibilityRequest ;
4040import com .amazonaws .services .sqs .model .DeleteMessageRequest ;
5050 * @version $Id$
5151 * @author daisuke
5252 */
53+ @ Slf4j
5354@ RequiredArgsConstructor
5455public class SqsMessagePoller { // NOPMD - cc
5556
56- private static Logger logger = LoggerFactory .getLogger (SqsMessagePoller .class );
57-
5857 @ Getter
5958 private final AmazonSQS sqs ;
6059
@@ -65,15 +64,15 @@ public class SqsMessagePoller { // NOPMD - cc
6564 private final String workerQueueUrl ;
6665
6766 @ Getter
68- private final SqsMessageHandler messageHandler ;
67+ private final Consumer < Message > messageHandler ;
6968
7069 @ Getter
7170 @ Setter
7271 private ExecutorService executor = Executors .newCachedThreadPool (r -> {
7372 Thread thread = new Thread (r );
7473 thread .setUncaughtExceptionHandler ((t , e ) -> {
7574 synchronized (SqsMessagePoller .class ) {
76- logger .error ("Uncaught exception in thread '{}': {}" , t .getName (), e .getMessage ());
75+ log .error ("Uncaught exception in thread '{}': {}" , t .getName (), e .getMessage ());
7776 }
7877 });
7978 return thread ;
@@ -103,82 +102,97 @@ public class SqsMessagePoller { // NOPMD - cc
103102 */
104103 @ Scheduled (fixedDelay = 1 ) // SUPPRESS CHECKSTYLE bug?
105104 public void loop () { // NOPMD - cc
105+ List <Message > messages = reveiveMessages ();
106+ if (messages .isEmpty ()) {
107+ log .trace ("No SQS message received" );
108+ return ;
109+ }
110+ log .debug ("{} SQS messages are received" , messages .size ());
111+ messages .stream ().parallel ().forEach (this ::handleMessage );
112+ }
113+
114+ private List <Message > reveiveMessages () {
106115 ReceiveMessageResult receiveMessageResult ;
107116 try {
108- logger .trace ("Start SQS long polling" );
117+ log .trace ("Start SQS long polling" );
109118 receiveMessageResult = sqs .receiveMessage (new ReceiveMessageRequest (workerQueueUrl )
110119 .withWaitTimeSeconds (waitTimeSeconds )
111120 .withMaxNumberOfMessages (maxNumberOfMessages )
112121 .withVisibilityTimeout (visibilityTimeout )
113122 .withAttributeNames ("ApproximateReceiveCount" ));
123+ return receiveMessageResult .getMessages ();
114124 } catch (OverLimitException e ) {
115- logger .error ("SQS over limit" , e );
125+ log .error ("SQS over limit" , e );
116126 try {
117127 Thread .sleep (60000 );
118128 } catch (InterruptedException e1 ) {
119- logger .error ("interrupted" , e1 );
129+ log .error ("interrupted" , e1 );
120130 throw new AssertionError (e1 ); // NOPMD - lost OverLimitException's stacktrace
121131 }
122- return ;
123132 }
124-
125- List <Message > messages = receiveMessageResult .getMessages ();
126- if (messages .isEmpty ()) {
127- logger .trace ("No SQS message received" );
128- return ;
129- }
130- logger .debug ("{} SQS messages are received" , messages .size ());
131- messages .stream ().parallel ().forEach (message -> {
132- logger .info ("SQS message was recieved: {}" , message .getMessageId ());
133- logger .debug ("Receive SQS:{} C:{} RHD:{}" , new Object [] {
133+ return Collections .emptyList ();
134+ }
135+
136+ private void handleMessage (Message message ) {
137+ log .info ("SQS message was recieved: {}" , message .getMessageId ());
138+ log .debug ("Receive SQS:{} C:{} RHD:{}" ,
134139 message .getMessageId (),
135140 message .getAttributes ().get ("ApproximateReceiveCount" ),
136- DigestUtils .md5DigestAsHex (message .getReceiptHandle ().getBytes (StandardCharsets .UTF_8 ))
141+ computeReceptHandleDigest (message ));
142+
143+ Future <Message > future = executor .submit (() -> messageHandler .accept (message ), message );
144+ log .debug ("Main task for {} is submitted" , message .getMessageId ());
145+
146+ doFollowup (message , future );
147+ }
148+
149+ private void doFollowup (Message message , Future <Message > future ) {
150+ log .debug ("Start visibility timeout follow-up task for {}" , message .getMessageId ());
151+ try {
152+ retry .execute (context -> {
153+ try {
154+ future .get (changeVisibilityThreshold , TimeUnit .SECONDS );
155+ log .debug ("Job for SQS:{} was done" , message .getMessageId ());
156+ sqs .deleteMessage (new DeleteMessageRequest (workerQueueUrl , message .getReceiptHandle ()));
157+ log .info ("SQS:{} was deleted" , message .getMessageId ());
158+ } catch (InterruptedException e ) {
159+ Thread .currentThread ().interrupt ();
160+ log .warn ("Job for SQS:{} was interrupted" , message .getMessageId ());
161+ } catch (ExecutionException e ) { // handle e.getCause()
162+ log .error ("Job for SQS:{} was failed" , message .getMessageId (), e .getCause ());
163+ } catch (TimeoutException e ) { // we need more time
164+ extendTimeout (message );
165+ throw e ;
166+ }
167+ return null ;
137168 });
169+ } catch (Exception e ) { // NOPMD - cc
170+ log .error ("Retry attempt exceeded?" , e );
171+ }
172+ log .debug ("Visibility timeout follow-up task for {} was finished" , message .getMessageId ());
173+ }
174+
175+ private void extendTimeout (Message message ) {
176+ log .debug ("Job for SQS:{} was timeout RHD:{}" , message .getMessageId (), computeReceptHandleDigest (message ));
177+ sqs .changeMessageVisibility (new ChangeMessageVisibilityRequest (
178+ workerQueueUrl , message .getReceiptHandle (), visibilityTimeout ));
179+ if (log .isDebugEnabled ()) {
180+ log .debug ("Visibility for SQS:{} was updated VT:{}" , message .getMessageId (), visibilityTimeout );
181+ } else if (log .isTraceEnabled ()) {
182+ log .trace ("Visibility for SQS:{} was updated VT:{} RHD:{}" ,
183+ message .getMessageId (),
184+ visibilityTimeout ,
185+ computeReceptHandleDigest (message ));
186+ }
187+ }
188+
189+ private Object computeReceptHandleDigest (Message message ) {
190+ return new Object () {
138191
139- Future <Void > future = executor .submit (() -> messageHandler .handle (message ));
140- logger .debug ("Main task for {} is submitted" , message .getMessageId ());
141-
142- logger .debug ("Start visibility timeout follow-up task for {}" , message .getMessageId ());
143- try {
144- retry .execute (context -> {
145- try {
146- future .get (changeVisibilityThreshold , TimeUnit .SECONDS );
147- logger .debug ("Job for SQS:{} was done" , message .getMessageId ());
148- sqs .deleteMessage (new DeleteMessageRequest (workerQueueUrl , message .getReceiptHandle ()));
149- logger .info ("SQS:{} was deleted" , message .getMessageId ());
150- } catch (InterruptedException e ) {
151- Thread .currentThread ().interrupt ();
152- logger .warn ("Job for SQS:{} was interrupted" , message .getMessageId ());
153- } catch (ExecutionException e ) { // handle e.getCause()
154- logger .error ("Job for SQS:{} was failed" , message .getMessageId (), e .getCause ());
155- } catch (TimeoutException e ) { // we need more time
156- logger .debug ("Job for SQS:{} was timeout RHD:{}" , new Object [] {
157- message .getMessageId (),
158- DigestUtils .md5DigestAsHex (message .getReceiptHandle ().getBytes (StandardCharsets .UTF_8 ))
159- });
160- sqs .changeMessageVisibility (new ChangeMessageVisibilityRequest (
161- workerQueueUrl , message .getReceiptHandle (), visibilityTimeout ));
162- if (logger .isDebugEnabled ()) {
163- logger .debug ("Visibility for SQS:{} was updated" , new Object [] {
164- message .getMessageId (),
165- visibilityTimeout
166- });
167- } else if (logger .isTraceEnabled ()) {
168- logger .trace ("Visibility for SQS:{} was updated VT:{} RHD:{}" , new Object [] {
169- message .getMessageId (),
170- visibilityTimeout ,
171- DigestUtils .md5DigestAsHex (message .getReceiptHandle ().getBytes (StandardCharsets .UTF_8 ))
172- });
173- }
174- throw e ;
175- }
176- return null ;
177- });
178- } catch (Exception e ) { // NOPMD - cc
179- logger .error ("Retry attempt exceeded?" , e );
192+ @ Override
193+ public String toString () {
194+ return DigestUtils .md5DigestAsHex (message .getReceiptHandle ().getBytes (StandardCharsets .UTF_8 ));
180195 }
181- logger .debug ("Visibility timeout follow-up task for {} was finished" , message .getMessageId ());
182- });
196+ };
183197 }
184198}
0 commit comments