1+ # -------------------------------------------------------------------------
2+ # Copyright (c) Microsoft Corporation. All rights reserved.
3+ # Licensed under the MIT License. See License.txt in the project root for
4+ # license information.
5+ # --------------------------------------------------------------------------
6+ import os
7+ import json
8+ import time
9+ from datetime import datetime
10+ from typing import Any , Dict , List
11+
12+ import requests
13+ from azure .servicebus import ServiceBusClient
14+ from devtools_testutils import is_live
15+
16+ from _shared .asynctestcase import AsyncCommunicationTestCase
17+
18+
19+ class CallAutomationAutomatedLiveTestBase (AsyncCommunicationTestCase ):
20+ def __init__ (self , method_name , * args , ** kwargs ):
21+ self .servicebus_connection_str = os .environ .get ('SERVICEBUS_STRING' , 'Endpoint=REDACTED=' )
22+ self .dispatcher_endpoint = os .environ .get ('DISPATCHER_ENDPOINT' , 'https://REDACTED.azurewebsites.net' )
23+ self .dispatcher_callback = self .dispatcher_endpoint + '/api/servicebuscallback/events'
24+ self .processor_store : Dict [str , Any ] = {}
25+ self .incoming_call_context_store : Dict [str , Any ] = {}
26+ self .event_store : Dict [str , Dict [str , Any ]] = {}
27+ self .events_to_persist : List [str ] = []
28+ super (CallAutomationAutomatedLiveTestBase , self ).__init__ (method_name , * args , ** kwargs )
29+
30+ def _format_string (self , s ) -> str :
31+ s1 = f"{ s [:12 ]} -{ s [12 :16 ]} -{ s [16 :20 ]} -{ s [20 :24 ]} -{ s [24 :36 ]} "
32+ s2 = f"{ s [36 :44 ]} -{ s [44 :48 ]} -{ s [48 :52 ]} -{ s [52 :56 ]} -{ s [56 :]} "
33+ return f"{ s1 } _{ s2 } "
34+
35+ def _parse_ids_from_identifier (self , identifier : str ) -> str :
36+ if identifier is None :
37+ raise ValueError ("Identifier cannot be None" )
38+ return self ._format_string ('' .join (filter (str .isalnum , identifier )))
39+
40+ def _message_handler (self , message : Any ) -> bool :
41+ if not message :
42+ raise ValueError ("Body cannot be empty" )
43+
44+ self .events_to_persist .append (message )
45+ mapper = json .loads (message )
46+
47+ if "incomingCallContext" in mapper :
48+ incoming_call_context = mapper ["incomingCallContext" ]
49+ from_id = mapper ["from" ]["rawId" ]
50+ to_id = mapper ["to" ]["rawId" ]
51+ unique_id = self ._parse_ids_from_identifier (from_id ) + self ._parse_ids_from_identifier (to_id )
52+ self .incoming_call_context_store [unique_id ] = incoming_call_context
53+ return True
54+ else :
55+ if isinstance (mapper , list ):
56+ mapper = mapper [0 ]
57+ if mapper ["type" ]:
58+ print ('MAPPER: ' + mapper ["type" ])
59+ self .event_store [mapper ["data" ]["callConnectionId" ]] = mapper ["type" ].split ("." )[- 1 ]
60+ return False
61+
62+ def service_bus_with_new_call (self , caller , receiver ) -> str :
63+ """Create new ServiceBus client.
64+ Creates a new queue in the ServiceBus and a client in order to interact with it.
65+
66+ :param caller: User initiating the call.
67+ :type caller: CommunicationUserIdentifier
68+ :param receiver: User receiving the call.
69+ :type receiver: CommunicationUserIdentifier
70+
71+ :return: a unique_id that can be used to identify the ServiceBus queue.
72+ :rtype: str
73+ """
74+ unique_id = self ._parse_ids_from_identifier (caller .raw_id ) + self ._parse_ids_from_identifier (receiver .raw_id )
75+ if is_live ():
76+ dispatcher_url = f"{ self .dispatcher_endpoint } /api/servicebuscallback/subscribe?q={ unique_id } "
77+ response = requests .post (dispatcher_url )
78+
79+ if response is None :
80+ raise ValueError ("Response cannot be None" )
81+
82+ print (f"Subscription to dispatcher of { unique_id } : { response .status_code } " )
83+ service_bus_client = ServiceBusClient .from_connection_string (self .servicebus_connection_str )
84+ self .processor_store [unique_id ] = service_bus_client
85+ return unique_id
86+
87+ def wait_for_messages (self , unique_id , time_out ) -> None :
88+ """Create new ServiceBus client.
89+ Checks the Service Bus queue specified by the unique_id for messages and stores them in the event_store.
90+
91+ :param unique_id: Identifier used to keep track of ServiceBus message queue.
92+ :type unique_id: str
93+ :param time_out: How long to wait for a response.
94+ :type time_out: timedelta
95+
96+ :return: None
97+ :rtype: None
98+ """
99+ if is_live ():
100+ service_bus_receiver = self .processor_store [unique_id ].get_queue_receiver (queue_name = unique_id )
101+ time_out_time = datetime .now () + time_out
102+
103+ while datetime .now () < time_out_time :
104+ received_messages = service_bus_receiver .receive_messages (max_wait_time = 20 )
105+ for msg in received_messages :
106+ print (msg )
107+ body = msg .body
108+ body_bytes = b'' .join (body )
109+ body_str = body_bytes .decode ('utf-8' )
110+ is_incoming_call_event = self ._message_handler (body_str )
111+ service_bus_receiver .complete_message (msg )
112+ if is_incoming_call_event :
113+ return
114+ if not received_messages :
115+ time .sleep (1 )
116+
117+ def check_for_event (self , event_type : str , call_connection_id : str ) -> bool :
118+ """Check for events.
119+ Checks the event_store for any events that have been received from the Service Bus queue with the specified event_type and call_connection_id.
120+
121+ :param event_type: Type of event to check for in the event store.
122+ :type event_type: Type
123+ :param call_connection_id: The call_connection_id for which to find events for.
124+ :type call_connection_id: str
125+
126+ :return: None if no events are found. The event object if an event is found.
127+ :rtype: Optional[Any]
128+ """
129+ if self .event_store [call_connection_id ] == event_type :
130+ return True
131+ return False
132+
133+ def load_persisted_events (self , test_name ) -> None :
134+ """
135+ Load persisted events.
136+
137+ If the test is running in playback mode, this function opens the <test_name>.txt file located in the recordings directory and loads the events into the event_store.
138+
139+ :param test_name: The name of the test currently running.
140+ :type test_name: str
141+
142+ :return: None
143+ :rtype: None
144+ """
145+ if not is_live ():
146+ data = ''
147+ script_dir = os .path .dirname (os .path .realpath (__file__ ))
148+ file_path = os .path .join (script_dir , 'recordings' , f'{ test_name } .txt' )
149+ try :
150+ with open (file_path , 'r' ) as file :
151+ data = file .read ()
152+ except FileNotFoundError :
153+ print (f"File '{ file_path } ' does not exist." )
154+ data = ''
155+ event_strings = data .split ("\n " )
156+ for event in event_strings :
157+ self ._message_handler (event )
158+
159+ def persist_events (self , test_name ) -> None :
160+ """
161+ Persist events.
162+
163+ If the test is running in record mode, this function creates a .txt file and writes the events to it.
164+
165+ :param test_name: The name of the test currently running to be used as the file name.
166+ :type test_name: str
167+
168+ :return: None
169+ :rtype: None
170+ """
171+ if is_live ():
172+ script_dir = os .path .dirname (os .path .realpath (__file__ ))
173+ file_path = os .path .join (script_dir , 'recordings' , f'{ test_name } .txt' )
174+ try :
175+ with open (file_path , 'w' ) as file :
176+ file .write ('\n ' .join (self .events_to_persist ))
177+ except IOError as e :
178+ raise SystemExit (f"File write operation failed: { e } " )
179+
180+ # clear list for next test to use
181+ self .events_to_persist .clear ()
182+
183+ def cleanup (self ) -> None :
184+ for _ , receiver in self .processor_store .items ():
185+ receiver .close ()
0 commit comments