@@ -615,124 +615,6 @@ def on_connect_op_complete(op, error):
615615 self .send_op_down (pipeline_ops_base .ConnectOperation (callback = on_connect_op_complete ))
616616
617617
618- class ConnectionLockStage (PipelineStage ):
619- """
620- This stage is responsible for serializing connect, disconnect, and reauthorize ops on
621- the pipeline, such that only a single one of these ops can go past this stage at a
622- time. This way, we don't have to worry about cases like "what happens if we try to
623- disconnect if we're in the middle of reauthorizing." This stage will wait for the
624- reauthorize to complete before letting the disconnect past.
625- """
626-
627- def __init__ (self ):
628- super ().__init__ ()
629- self .queue = queue .Queue ()
630- self .blocked = False
631-
632- @pipeline_thread .runs_on_pipeline_thread
633- def _run_op (self , op ):
634-
635- # If this stage is currently blocked (because we're waiting for a connection, etc,
636- # to complete), we queue up all operations until after the connect completes.
637- if self .blocked :
638- logger .debug (
639- "{}({}): pipeline is blocked waiting for a prior connect/disconnect/reauthorize to complete. queueing." .format (
640- self .name , op .name
641- )
642- )
643- self .queue .put_nowait (op )
644-
645- elif isinstance (op , pipeline_ops_base .ConnectOperation ) and self .nucleus .connected :
646- logger .info (
647- "{}({}): Transport is already connected. Completing." .format (self .name , op .name )
648- )
649- op .complete ()
650-
651- # NOTE: We ought not to be checking specific ConnectionStates if it can be helped.
652- # Unfortunately, here, it can't be - it's okay, this stage will be deleted very soon
653- # TODO: Move auto-completion logic to ConnectionStateStage and delete this stage.
654- elif (
655- isinstance (op , pipeline_ops_base .DisconnectOperation )
656- and self .nucleus .connection_state is ConnectionState .DISCONNECTED
657- ):
658- logger .info (
659- "{}({}): Transport is already disconnected. Completing." .format (self .name , op .name )
660- )
661- op .complete ()
662-
663- elif (
664- isinstance (op , pipeline_ops_base .DisconnectOperation )
665- or isinstance (op , pipeline_ops_base .ConnectOperation )
666- or isinstance (op , pipeline_ops_base .ReauthorizeConnectionOperation )
667- ):
668- self ._block (op )
669-
670- @pipeline_thread .runs_on_pipeline_thread
671- def on_operation_complete (op , error ):
672- if error :
673- logger .debug (
674- "{}({}): op failed. Unblocking queue with error: {}" .format (
675- self .name , op .name , error
676- )
677- )
678- else :
679- logger .debug (
680- "{}({}): op succeeded. Unblocking queue" .format (self .name , op .name )
681- )
682-
683- self ._unblock (op , error )
684-
685- op .add_callback (on_operation_complete )
686- self .send_op_down (op )
687-
688- else :
689- self .send_op_down (op )
690-
691- @pipeline_thread .runs_on_pipeline_thread
692- def _block (self , op ):
693- """
694- block this stage while we're waiting for the connect/disconnect/reauthorize operation to complete.
695- """
696- logger .debug ("{}({}): blocking" .format (self .name , op .name ))
697- self .blocked = True
698-
699- @pipeline_thread .runs_on_pipeline_thread
700- def _unblock (self , op , error ):
701- """
702- Unblock this stage after the connect/disconnect/reauthorize operation is complete. This also means
703- releasing all the operations that were queued up.
704- """
705- logger .debug ("{}({}): unblocking and releasing queued ops." .format (self .name , op .name ))
706- self .blocked = False
707- logger .debug (
708- "{}({}): processing {} items in queue for error={}" .format (
709- self .name , op .name , self .queue .qsize (), error
710- )
711- )
712- # Loop through our queue and release all the blocked operations
713- # Put a new Queue in self.queue because releasing ops might put them back in the
714- # queue, especially if there's a ConnectOperation in the list of ops to release
715- old_queue = self .queue
716- self .queue = queue .Queue ()
717- while not old_queue .empty ():
718- op_to_release = old_queue .get_nowait ()
719- if error :
720- # if we're unblocking the queue because something (like a connect operation) failed,
721- # then we fail all of the blocked operations with the same error.
722- logger .debug (
723- "{}({}): failing {} op because of error" .format (
724- self .name , op .name , op_to_release .name
725- )
726- )
727- op_to_release .complete (error = error )
728- else :
729- logger .debug (
730- "{}({}): releasing {} op." .format (self .name , op .name , op_to_release .name )
731- )
732- # call run_op directly here so operations go through this stage again (especially connect/disconnect ops)
733- self .run_op (op_to_release )
734-
735-
736618class CoordinateRequestAndResponseStage (PipelineStage ):
737619 """
738620 Pipeline stage which is responsible for coordinating RequestAndResponseOperation operations. For each
@@ -1065,11 +947,6 @@ class ConnectionStateStage(PipelineStage):
1065947 ConnectionState .DISCONNECTING ,
1066948 ConnectionState .REAUTHORIZING ,
1067949 ]
1068- connection_ops = [
1069- pipeline_ops_base .ConnectOperation ,
1070- pipeline_ops_base .DisconnectOperation ,
1071- pipeline_ops_base .ReauthorizeConnectionOperation ,
1072- ]
1073950 transient_connect_errors = [
1074951 pipeline_exceptions .OperationCancelled ,
1075952 pipeline_exceptions .OperationTimeout ,
@@ -1092,17 +969,9 @@ def __init__(self):
1092969 @pipeline_thread .runs_on_pipeline_thread
1093970 def _run_op (self , op ):
1094971
1095- # If receiving a connection op while one is already in progress, wait for the current
1096- # one to finish. This is kind of like a ConnectionLockStage, but inside this one.
1097- # It has to happen here because just relying on a ConnectionLockStage before or after
1098- # in the pipeline is insufficient, given that operations can spawn in this stage.
1099- # We need a way to wait ops without letting them pass through and affect the connection
1100- # state in order to address edge cases e.g. a user-initiated connect and a automatic
1101- # reconnect connect happen at approximately the same time.
1102- if (
1103- self .nucleus .connection_state in self .intermediate_states
1104- and type (op ) in self .connection_ops
1105- ):
972+ # If receiving an operation while the connection state is changing, wait for the
973+ # connection state to reach a stable state before continuing.
974+ if self .nucleus .connection_state in self .intermediate_states :
1106975 logger .debug (
1107976 "{}({}): State is {} - waiting for in-progress operation to finish" .format (
1108977 self .name , op .name , self .nucleus .connection_state
@@ -1114,14 +983,11 @@ def _run_op(self, op):
1114983 if isinstance (op , pipeline_ops_base .ConnectOperation ):
1115984 if self .nucleus .connection_state is ConnectionState .CONNECTED :
1116985 logger .debug (
1117- "{}({}): State is already CONNECTED. Sending op down " .format (
986+ "{}({}): State is already CONNECTED. Completing operation " .format (
1118987 self .name , op .name
1119988 )
1120989 )
1121- self ._add_connection_op_callback (op )
1122- # NOTE: This is the safest thing to do while the ConnectionLockStage is
1123- # doing auto-completes based on connection status. When it is revisited,
1124- # this logic may need to be updated.
990+ op .complete ()
1125991 elif self .nucleus .connection_state is ConnectionState .DISCONNECTED :
1126992 logger .debug (
1127993 "{}({}): State changes DISCONNECTED -> CONNECTING. Sending op down" .format (
@@ -1130,6 +996,7 @@ def _run_op(self, op):
1130996 )
1131997 self .nucleus .connection_state = ConnectionState .CONNECTING
1132998 self ._add_connection_op_callback (op )
999+ self .send_op_down (op )
11331000 else :
11341001 # This should be impossible to reach. If the state were intermediate, it
11351002 # would have been added to the waiting ops queue above.
@@ -1138,6 +1005,7 @@ def _run_op(self, op):
11381005 self .name , op .name , self .nucleus .connection_state
11391006 )
11401007 )
1008+ self .send_op_down (op )
11411009
11421010 elif isinstance (op , pipeline_ops_base .DisconnectOperation ):
11431011 # First, always clear any reconnect timer. Because a manual disconnection is
@@ -1152,16 +1020,14 @@ def _run_op(self, op):
11521020 )
11531021 self .nucleus .connection_state = ConnectionState .DISCONNECTING
11541022 self ._add_connection_op_callback (op )
1023+ self .send_op_down (op )
11551024 elif self .nucleus .connection_state is ConnectionState .DISCONNECTED :
11561025 logger .debug (
1157- "{}({}): State is already DISCONNECTED. Sending op down " .format (
1026+ "{}({}): State is already DISCONNECTED. Completing operation " .format (
11581027 self .name , op .name
11591028 )
11601029 )
1161- self ._add_connection_op_callback (op )
1162- # NOTE: This is the safest thing to do while the ConnectionLockStage is
1163- # doing auto-completes based on connection status. When it is revisited,
1164- # this logic may need to be updated.
1030+ op .complete ()
11651031 else :
11661032 # This should be impossible to reach. If the state were intermediate, it
11671033 # would have been added to the waiting ops queue above.
@@ -1170,6 +1036,7 @@ def _run_op(self, op):
11701036 self .name , op .name , self .nucleus .connection_state
11711037 )
11721038 )
1039+ self .send_op_down (op )
11731040
11741041 elif isinstance (op , pipeline_ops_base .ReauthorizeConnectionOperation ):
11751042 if self .nucleus .connection_state is ConnectionState .CONNECTED :
@@ -1180,6 +1047,7 @@ def _run_op(self, op):
11801047 )
11811048 self .nucleus .connection_state = ConnectionState .REAUTHORIZING
11821049 self ._add_connection_op_callback (op )
1050+ self .send_op_down (op )
11831051 elif self .nucleus .connection_state is ConnectionState .DISCONNECTED :
11841052 logger .debug (
11851053 "{}({}): State changes DISCONNECTED -> REAUTHORIZING. Sending op down" .format (
@@ -1188,6 +1056,7 @@ def _run_op(self, op):
11881056 )
11891057 self .nucleus .connection_state = ConnectionState .REAUTHORIZING
11901058 self ._add_connection_op_callback (op )
1059+ self .send_op_down (op )
11911060 else :
11921061 # This should be impossible to reach. If the state were intermediate, it
11931062 # would have been added to the waiting ops queue above.
@@ -1196,6 +1065,7 @@ def _run_op(self, op):
11961065 self .name , op .name , self .nucleus .connection_state
11971066 )
11981067 )
1068+ self .send_op_down (op )
11991069
12001070 elif isinstance (op , pipeline_ops_base .ShutdownPipelineOperation ):
12011071 self ._clear_reconnect_timer ()
@@ -1206,9 +1076,10 @@ def _run_op(self, op):
12061076 "Operation waiting in ConnectionStateStage cancelled by shutdown"
12071077 )
12081078 waiting_op .complete (error = cancel_error )
1079+ self .send_op_down (op )
12091080
1210- # In all cases the op gets sent down
1211- self .send_op_down (op )
1081+ else :
1082+ self .send_op_down (op )
12121083
12131084 @pipeline_thread .runs_on_pipeline_thread
12141085 def _handle_pipeline_event (self , event ):
@@ -1327,10 +1198,6 @@ def _add_connection_op_callback(self, op):
13271198 """Adds callback to a connection op passing through to do necessary stage upkeep"""
13281199 self_weakref = weakref .ref (self )
13291200
1330- # NOTE: we are currently protected from connect failing due to being already connected
1331- # by the ConnectionLockStage. If the ConnectionLockStage changes functionality,
1332- # we may need some logic changes to address an op that can fail while leaving us CONNECTED
1333-
13341201 @pipeline_thread .runs_on_pipeline_thread
13351202 def on_complete (op , error ):
13361203 this = self_weakref ()
0 commit comments