diff --git a/tests/email_test.py b/tests/email_test.py
index 4244ceace6..6df054eba4 100644
--- a/tests/email_test.py
+++ b/tests/email_test.py
@@ -19,3 +19,9 @@ def test_verify_email_body(self, mailinator_helper):
"testautomation", "purchase is confirmed"
)
assert "Thank you for your purchase" in message.parts[0].body
+
+ @allure.title("Get OTP code from email")
+ def test_verify_otp_code(self, mailinator_helper):
+ otp_code = mailinator_helper.get_otp_code("testautomation")
+ assert otp_code.isdigit(), f"OTP code '{otp_code}' is not a digit string"
+ assert len(otp_code) == 6, f"OTP code '{otp_code}' is not 6 digits long"
diff --git a/utilities/mailinator_helper.py b/utilities/mailinator_helper.py
index 591260eb29..6e6d8c347c 100644
--- a/utilities/mailinator_helper.py
+++ b/utilities/mailinator_helper.py
@@ -1,7 +1,15 @@
+import re
from collections import Counter
from mailinator import GetInboxRequest, GetMessageRequest, Mailinator, Message
-from tenacity import retry, retry_if_result, stop_after_attempt, wait_fixed
+from tenacity import (
+ retry,
+ retry_if_exception_type,
+ retry_if_result,
+ stop_after_attempt,
+ stop_after_delay,
+ wait_fixed,
+)
class MailinatorHelper:
@@ -28,26 +36,10 @@ def __init__(self, mailinator: Mailinator, mailinator_domain: str):
self.mailinator = mailinator
self.mailinator_domain = mailinator_domain
- @staticmethod
- def is_none(value):
- """Return True if value is None.
-
- You can use the `tenacity` library to handle retrying operations.
- Check the `tenacity` documentation for more details:
- `tenacity Documentation `_
-
- Additionally, here is a helpful article on using `tenacity` in Python:
- `Retry Flaky Task in Python using Tenacity `_
-
- :param value: The value to check for None.
- :type value: Any
- :return: True if the value is None, False otherwise.
- :rtype: bool
- """
- return value is None
-
@retry(
- retry=retry_if_result(is_none), stop=(stop_after_attempt(3)), wait=wait_fixed(4)
+ retry=retry_if_result(lambda x: x is None),
+ stop=(stop_after_attempt(3)),
+ wait=wait_fixed(4),
)
def __get_message_id(self, user_email: str, email_subject: str) -> str:
"""Wait for an email to arrive with a specific subject in a user's
@@ -79,7 +71,7 @@ def __get_message_id(self, user_email: str, email_subject: str) -> str:
message
for message in messages
if message.to == user_email.split("@")[0]
- and message.subject == email_subject
+ and message.subject.casefold() == email_subject.casefold()
]
return filtered_messages[0].id if filtered_messages else None
@@ -108,6 +100,36 @@ def get_message(self, user_email: str, email_subject: str) -> Message:
)
)
+ @retry(
+ stop=stop_after_delay(30),
+ wait=wait_fixed(1),
+ retry=retry_if_result(lambda x: x is None) | retry_if_exception_type(Exception),
+ )
+ def get_otp_code(self, user_email: str) -> str | None:
+ """Retrieves a 6-digit OTP code from an email in the Mailinator inbox.
+
+ This method:
+ 1. Waits for up to 30 seconds, polling every second, for an email to arrive.
+ 2. Retrieves the email message from Mailinator.
+ 3. Extracts the first 6-digit OTP code found in the email body.
+
+ Args:
+ user_email (str): The email address to check for an OTP.
+
+ Returns:
+ str: The extracted 6-digit OTP code.
+
+ Raises:
+ RuntimeError: If no OTP is found in the email message.
+ """
+ message: Message = self.get_message(user_email, "Verify your email address")
+ if not message.parts:
+ return None
+ email_body = message.parts[0].body
+ if match := re.search(r"\b(\d{6})\b", email_body):
+ return match[1]
+ raise RuntimeError("OTP not found in email message")
+
def count_messages_by_subject(self, user_email: str) -> dict[str, int]:
"""Count the occurrences of email subjects in a user's inbox.