Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions tests/email_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,3 +19,9 @@ def test_verify_email_body(self, mailinator_helper):
"testautomation", "purchase is confirmed"
)
assert "Thank you for your purchase" in message.parts[0].body

@allure.title("Get OTP code from email")
def test_verify_otp_code(self, mailinator_helper):
otp_code = mailinator_helper.get_otp_code("testautomation")
assert otp_code.isdigit(), f"OTP code '{otp_code}' is not a digit string"
assert len(otp_code) == 6, f"OTP code '{otp_code}' is not 6 digits long"
64 changes: 43 additions & 21 deletions utilities/mailinator_helper.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,15 @@
import re
from collections import Counter

from mailinator import GetInboxRequest, GetMessageRequest, Mailinator, Message
from tenacity import retry, retry_if_result, stop_after_attempt, wait_fixed
from tenacity import (
retry,
retry_if_exception_type,
retry_if_result,
stop_after_attempt,
stop_after_delay,
wait_fixed,
)


class MailinatorHelper:
Expand All @@ -28,26 +36,10 @@ def __init__(self, mailinator: Mailinator, mailinator_domain: str):
self.mailinator = mailinator
self.mailinator_domain = mailinator_domain

@staticmethod
def is_none(value):
"""Return True if value is None.

You can use the `tenacity` library to handle retrying operations.
Check the `tenacity` documentation for more details:
`tenacity Documentation <https://tenacity.readthedocs.io/en/latest/>`_

Additionally, here is a helpful article on using `tenacity` in Python:
`Retry Flaky Task in Python using Tenacity <https://paragkamble.medium.com/retry-flaky-task-in-python-using-tenacity-c6fabcf9a3be>`_

:param value: The value to check for None.
:type value: Any
:return: True if the value is None, False otherwise.
:rtype: bool
"""
return value is None

@retry(
retry=retry_if_result(is_none), stop=(stop_after_attempt(3)), wait=wait_fixed(4)
retry=retry_if_result(lambda x: x is None),
stop=(stop_after_attempt(3)),
wait=wait_fixed(4),
)
def __get_message_id(self, user_email: str, email_subject: str) -> str:
"""Wait for an email to arrive with a specific subject in a user's
Expand Down Expand Up @@ -79,7 +71,7 @@ def __get_message_id(self, user_email: str, email_subject: str) -> str:
message
for message in messages
if message.to == user_email.split("@")[0]
and message.subject == email_subject
and message.subject.casefold() == email_subject.casefold()
]
return filtered_messages[0].id if filtered_messages else None

Expand Down Expand Up @@ -108,6 +100,36 @@ def get_message(self, user_email: str, email_subject: str) -> Message:
)
)

@retry(
stop=stop_after_delay(30),
wait=wait_fixed(1),
retry=retry_if_result(lambda x: x is None) | retry_if_exception_type(Exception),
)
def get_otp_code(self, user_email: str) -> str | None:
"""Retrieves a 6-digit OTP code from an email in the Mailinator inbox.

This method:
1. Waits for up to 30 seconds, polling every second, for an email to arrive.
2. Retrieves the email message from Mailinator.
3. Extracts the first 6-digit OTP code found in the email body.

Args:
user_email (str): The email address to check for an OTP.

Returns:
str: The extracted 6-digit OTP code.

Raises:
RuntimeError: If no OTP is found in the email message.
"""
message: Message = self.get_message(user_email, "Verify your email address")
if not message.parts:
return None
email_body = message.parts[0].body
if match := re.search(r"\b(\d{6})\b", email_body):
return match[1]
raise RuntimeError("OTP not found in email message")

def count_messages_by_subject(self, user_email: str) -> dict[str, int]:
"""Count the occurrences of email subjects in a user's inbox.

Expand Down