1- <?php
1+ <?php declare (strict_types= 1 );
22
33namespace mikemadisonweb \rabbitmq \components ;
44
55use mikemadisonweb \rabbitmq \exceptions \RuntimeException ;
6+ use PhpAmqpLib \Channel \AMQPChannel ;
67use PhpAmqpLib \Connection \AbstractConnection ;
78use PhpAmqpLib \Exception \AMQPProtocolChannelException ;
89use yii \helpers \ArrayHelper ;
@@ -16,11 +17,17 @@ class Routing
1617 private $ exchangesDeclared = [];
1718 private $ queuesDeclared = [];
1819 private $ isDeclared = false ;
20+
1921 /**
2022 * @var $conn AbstractConnection
2123 */
2224 private $ conn ;
2325
26+ /**
27+ * @var $conn AMQPChannel
28+ */
29+ private $ ch ;
30+
2431 /**
2532 * @param AbstractConnection $conn
2633 */
@@ -85,10 +92,12 @@ public function declareQueue(string $queueName)
8592 if (!isset ($ this ->queues [$ queueName ])) {
8693 throw new RuntimeException ("Queue ` {$ queueName }` is not configured. " );
8794 }
95+
96+ $ channel =
8897 $ queue = $ this ->queues [$ queueName ];
8998 if (!isset ($ this ->queuesDeclared [$ queueName ])) {
9099 if (ArrayHelper::isAssociative ($ queue )) {
91- $ this ->conn -> channel ()->queue_declare (
100+ $ this ->getChannel ()->queue_declare (
92101 $ queue ['name ' ],
93102 $ queue ['passive ' ],
94103 $ queue ['durable ' ],
@@ -100,7 +109,7 @@ public function declareQueue(string $queueName)
100109 );
101110 } else {
102111 foreach ($ queue as $ q ) {
103- $ this ->conn -> channel ()->queue_declare (
112+ $ this ->getChannel ()->queue_declare (
104113 $ q ['name ' ],
105114 $ q ['passive ' ],
106115 $ q ['durable ' ],
@@ -140,13 +149,13 @@ public function bindExchangeToQueue(array $binding)
140149 foreach ($ binding ['routing_keys ' ] as $ routingKey ) {
141150 // queue binding is not permitted on the default exchange
142151 if ('' !== $ binding ['exchange ' ]) {
143- $ this ->conn -> channel ()->queue_bind ($ binding ['queue ' ], $ binding ['exchange ' ], $ routingKey );
152+ $ this ->getChannel ()->queue_bind ($ binding ['queue ' ], $ binding ['exchange ' ], $ routingKey );
144153 }
145154 }
146155 } else {
147156 // queue binding is not permitted on the default exchange
148157 if ('' !== $ binding ['exchange ' ]) {
149- $ this ->conn -> channel ()->queue_bind ($ binding ['queue ' ], $ binding ['exchange ' ]);
158+ $ this ->getChannel ()->queue_bind ($ binding ['queue ' ], $ binding ['exchange ' ]);
150159 }
151160 }
152161 }
@@ -161,13 +170,13 @@ public function bindExchangeToExchange(array $binding)
161170 foreach ($ binding ['routing_keys ' ] as $ routingKey ) {
162171 // queue binding is not permitted on the default exchange
163172 if ('' !== $ binding ['exchange ' ]) {
164- $ this ->conn -> channel ()->exchange_bind ($ binding ['to_exchange ' ], $ binding ['exchange ' ], $ routingKey );
173+ $ this ->getChannel ()->exchange_bind ($ binding ['to_exchange ' ], $ binding ['exchange ' ], $ routingKey );
165174 }
166175 }
167176 } else {
168177 // queue binding is not permitted on the default exchange
169178 if ('' !== $ binding ['exchange ' ]) {
170- $ this ->conn -> channel ()->exchange_bind ($ binding ['to_exchange ' ], $ binding ['exchange ' ]);
179+ $ this ->getChannel ()->exchange_bind ($ binding ['to_exchange ' ], $ binding ['exchange ' ]);
171180 }
172181 }
173182 }
@@ -183,7 +192,7 @@ public function declareExchange(string $exchangeName)
183192 }
184193 $ exchange = $ this ->exchanges [$ exchangeName ];
185194 if (!isset ($ this ->exchangesDeclared [$ exchangeName ])) {
186- $ this ->conn -> channel ()->exchange_declare (
195+ $ this ->getChannel ()->exchange_declare (
187196 $ exchange ['name ' ],
188197 $ exchange ['type ' ],
189198 $ exchange ['passive ' ],
@@ -208,7 +217,7 @@ public function purgeQueue(string $queueName)
208217 if (!isset ($ this ->queues [$ queueName ])) {
209218 throw new RuntimeException ("Queue {$ queueName } is not configured. Purge is aborted. " );
210219 }
211- $ this ->conn -> channel ()->queue_purge ($ queueName , true );
220+ $ this ->getChannel ()->queue_purge ($ queueName , true );
212221 }
213222
214223 /**
@@ -235,20 +244,20 @@ public function deleteQueue(string $queueName)
235244 if (!isset ($ this ->queues [$ queueName ])) {
236245 throw new RuntimeException ("Queue {$ queueName } is not configured. Delete is aborted. " );
237246 }
238- $ this ->conn -> channel ()->queue_delete ($ queueName );
247+ $ this ->getChannel ()->queue_delete ($ queueName );
239248 }
240249
241250 /**
242- * Delete the exchange
251+ * Delete the queue
243252 * @param string $exchangeName
244253 * @throws RuntimeException
245254 */
246255 public function deleteExchange (string $ exchangeName )
247256 {
248- if (!isset ($ this ->exchanges [$ exchangeName ])) {
249- throw new RuntimeException ("Exchange {$ exchangeName } is not configured. Delete is aborted. " );
257+ if (!isset ($ this ->queues [$ exchangeName ])) {
258+ throw new RuntimeException ("Queue {$ exchangeName } is not configured. Delete is aborted. " );
250259 }
251- $ this ->conn -> channel ()->exchange_delete ($ exchangeName );
260+ $ this ->getChannel ()->exchange_delete ($ exchangeName );
252261 }
253262
254263 /**
@@ -259,7 +268,7 @@ public function deleteExchange(string $exchangeName)
259268 public function isExchangeExists (string $ exchangeName ) : bool
260269 {
261270 try {
262- $ this ->conn -> channel ()->exchange_declare ($ exchangeName , null , true );
271+ $ this ->getChannel ()->exchange_declare ($ exchangeName , null , true );
263272 } catch (AMQPProtocolChannelException $ e ) {
264273 return false ;
265274 }
@@ -275,7 +284,7 @@ public function isExchangeExists(string $exchangeName) : bool
275284 public function isQueueExists (string $ queueName ) : bool
276285 {
277286 try {
278- $ this ->conn -> channel ()->queue_declare ($ queueName , true );
287+ $ this ->getChannel ()->queue_declare ($ queueName , true );
279288 } catch (AMQPProtocolChannelException $ e ) {
280289 return false ;
281290 }
@@ -300,4 +309,16 @@ private function arrangeByName(array $unnamedArr) : array
300309
301310 return $ namedArr ;
302311 }
312+
313+ /**
314+ * @return AMQPChannel
315+ */
316+ private function getChannel ()
317+ {
318+ if (empty ($ this ->ch ) || null === $ this ->ch ->getChannelId ()) {
319+ $ this ->ch = $ this ->conn ->channel ();
320+ }
321+
322+ return $ this ->ch ;
323+ }
303324}
0 commit comments