diff --git a/lastpass/fetcher.py b/lastpass/fetcher.py index e8e33b5..d053502 100644 --- a/lastpass/fetcher.py +++ b/lastpass/fetcher.py @@ -18,20 +18,20 @@ ) from .session import Session - http = requests +default_url = 'https://lastpass.com' headers = {'user-agent': 'lastpass-python/{}'.format(__version__)} def login(username, password, multifactor_password=None, client_id=None): key_iteration_count = request_iteration_count(username) return request_login(username, password, key_iteration_count, multifactor_password, client_id) - + def logout(session, web_client=http): - # type: (Session, requests) -> None + # type: (Session, requests) -> None response = web_client.get( - 'https://lastpass.com/logout.php?mobile=1', + f'{session.url}/logout.php?mobile=1', cookies={'PHPSESSID': session.id} ) @@ -40,7 +40,7 @@ def logout(session, web_client=http): def fetch(session, web_client=http): - response = web_client.get('https://lastpass.com/getaccts.php?mobile=1&b64=1&hash=0.0&hasplugin=3.0.23&requestsrc=android', + response = web_client.get(f'{session.url}/getaccts.php?mobile=1&b64=1&hash=0.0&hasplugin=3.0.23&requestsrc=android', cookies={'PHPSESSID': session.id}) if response.status_code != requests.codes.ok: @@ -50,9 +50,9 @@ def fetch(session, web_client=http): def request_iteration_count(username, web_client=http): - response = web_client.post('https://lastpass.com/iterations.php', - data={'email': username}, - headers=headers) + response = web_client.get(f'{default_url}/iterations.php', + params={'email': username}, + headers=headers) if response.status_code != requests.codes.ok: raise NetworkError() @@ -66,7 +66,7 @@ def request_iteration_count(username, web_client=http): raise InvalidResponseError('Key iteration count is not positive') -def request_login(username, password, key_iteration_count, multifactor_password=None, client_id=None, web_client=http): +def request_login(username, password, key_iteration_count, multifactor_password=None, client_id=None, web_client=http, url=default_url): body = { 'method': 'mobile', 'web': 1, @@ -82,7 +82,7 @@ def request_login(username, password, key_iteration_count, multifactor_password= if client_id: body['imei'] = client_id - response = web_client.post('https://lastpass.com/login.php', + response = web_client.post(f'{url}/login.php', data=body, headers=headers) @@ -97,27 +97,42 @@ def request_login(username, password, key_iteration_count, multifactor_password= if parsed_response is None: raise InvalidResponseError() - session = create_session(parsed_response, key_iteration_count) + # Handle lastpass.eu and future other accounts + new_url = check_lastpass_url(parsed_response) + + if new_url is not None: + return request_login(username, password, key_iteration_count, multifactor_password, client_id, web_client, url=new_url) + + session = create_session(parsed_response, key_iteration_count, url) if not session: raise login_error(parsed_response) return session - -def create_session(parsed_response, key_iteration_count): +def create_session(parsed_response, key_iteration_count, url): if parsed_response.tag == 'ok': session_id = parsed_response.attrib.get('sessionid') if isinstance(session_id, str): - return Session(session_id, key_iteration_count) + return Session(session_id, key_iteration_count, url) +def check_lastpass_url(parsed_response): + error = None if parsed_response.tag != 'response' else parsed_response.find('error') + + if error is None or len(error.attrib) == 0: + return None + + if error.attrib.get('server') is not None and error.attrib.get('server') != default_url: + return f"https://{error.attrib.get('server')}" + + return None def login_error(parsed_response): error = None if parsed_response.tag != 'response' else parsed_response.find('error') if error is None or len(error.attrib) == 0: raise UnknownResponseSchemaError() - + exceptions = { - "unknownemail": LastPassUnknownUsernameError, - "unknownpassword": LastPassInvalidPasswordError, + "user_not_exists": LastPassUnknownUsernameError, + "password_invalid": LastPassInvalidPasswordError, "googleauthrequired": LastPassIncorrectGoogleAuthenticatorCodeError, "googleauthfailed": LastPassIncorrectGoogleAuthenticatorCodeError, "yubikeyrestricted": LastPassIncorrectYubikeyPasswordError, @@ -125,7 +140,7 @@ def login_error(parsed_response): cause = error.attrib.get('cause') message = error.attrib.get('message') - + if cause: return exceptions.get(cause, LastPassUnknownError)(message or cause) return InvalidResponseError(message) diff --git a/lastpass/session.py b/lastpass/session.py index f68a0f4..0ff4aff 100644 --- a/lastpass/session.py +++ b/lastpass/session.py @@ -1,8 +1,9 @@ # coding: utf-8 class Session(object): - def __init__(self, id, key_iteration_count): + def __init__(self, id, key_iteration_count, url): self.id = id self.key_iteration_count = key_iteration_count + self.url = url def __eq__(self, other): return self.id == other.id and self.key_iteration_count == other.key_iteration_count diff --git a/tests/test_fetcher.py b/tests/test_fetcher.py index d2f716f..98c3518 100644 --- a/tests/test_fetcher.py +++ b/tests/test_fetcher.py @@ -3,6 +3,7 @@ import unittest import mock import lastpass +from xml.etree import ElementTree as etree from lastpass.blob import Blob from lastpass import fetcher from lastpass.session import Session @@ -10,13 +11,16 @@ class FetcherTestCase(unittest.TestCase): def setUp(self): + self.default_url = 'https://lastpass.com' + self.eu_url = 'https://lastpass.eu' + self.username = 'username' self.password = 'password' self.key_iteration_count = 5000 self.hash = b'7880a04588cfab954aa1a2da98fd9c0d2c6eba4c53e36a94510e6dbf30759256' self.session_id = '53ru,Hb713QnEVM5zWZ16jMvxS0' - self.session = Session(self.session_id, self.key_iteration_count) + self.session = Session(self.session_id, self.key_iteration_count, self.default_url) self.blob_response = 'TFBBVgAAAAMxMjJQUkVNAAAACjE0MTQ5' self.blob_bytes = b64decode(self.blob_response) @@ -58,35 +62,35 @@ def test_logout_raises_an_exception_on_HTTP_error(self): def test_request_iteration_count_makes_a_post_request(self): m = mock.Mock() - m.post.return_value = self._http_ok(str(self.key_iteration_count)) + m.get.return_value = self._http_ok(str(self.key_iteration_count)) fetcher.request_iteration_count(self.username, m) - m.post.assert_called_with('https://lastpass.com/iterations.php', - data={'email': self.username}, - headers=fetcher.headers) + m.get.assert_called_with('https://lastpass.com/iterations.php', + params={'email': self.username}, + headers=fetcher.headers) def test_request_iteration_count_returns_key_iteration_count(self): m = mock.Mock() - m.post.return_value = self._http_ok(str(self.key_iteration_count)) + m.get.return_value = self._http_ok(str(self.key_iteration_count)) self.assertEqual(fetcher.request_iteration_count(self.username, m), self.key_iteration_count) def test_request_iteration_count_raises_an_exception_on_http_error(self): m = mock.Mock() - m.post.return_value = self._http_error() + m.get.return_value = self._http_error() self.assertRaises(lastpass.NetworkError, fetcher.request_iteration_count, self.username, m) def test_request_iteration_count_raises_an_exception_on_invalid_key_iteration_count(self): m = mock.Mock() - m.post.return_value = self._http_ok('not a number') + m.get.return_value = self._http_ok('not a number') self.assertRaises(lastpass.InvalidResponseError, fetcher.request_iteration_count, self.username, m) def test_request_iteration_count_raises_an_exception_on_zero_key_iteration_cont(self): m = mock.Mock() - m.post.return_value = self._http_ok('0') + m.get.return_value = self._http_ok('0') self.assertRaises(lastpass.InvalidResponseError, fetcher.request_iteration_count, self.username, m) def test_request_iteration_count_raises_an_exception_on_negative_key_iteration_cont(self): m = mock.Mock() - m.post.return_value = self._http_ok('-1') + m.get.return_value = self._http_ok('-1') self.assertRaises(lastpass.InvalidResponseError, fetcher.request_iteration_count, self.username, m) def test_request_login_makes_a_post_request(self): @@ -107,7 +111,7 @@ def test_request_login_makes_a_post_request_with_yubikey_password(self): def test_request_login_returns_a_session(self): self.assertEqual(self._request_login_with_xml(''.format(self.session_id)), self.session) - + def test_request_login_raises_an_exception_on_http_error(self): self.assertRaises(lastpass.NetworkError, self._request_login_with_error) @@ -126,11 +130,11 @@ def test_request_login_raises_an_exception_on_unknown_response_schema_3(self): def test_request_login_raises_an_exception_on_unknown_username(self): self.assertRaises(lastpass.LastPassUnknownUsernameError, - self._request_login_with_lastpass_error, 'unknownemail') + self._request_login_with_lastpass_error, 'user_not_exists') def test_request_login_raises_an_exception_on_invalid_password(self): self.assertRaises(lastpass.LastPassInvalidPasswordError, - self._request_login_with_lastpass_error, 'unknownpassword') + self._request_login_with_lastpass_error, 'password_invalid') def test_request_login_raises_an_exception_on_missing_google_authenticator_code(self): message = 'Google Authenticator authentication required! ' \ @@ -154,6 +158,21 @@ def test_request_login_raises_an_exception_on_unknown_lastpass_error_without_a_m self.assertRaises(lastpass.LastPassUnknownError, self._request_login_with_lastpass_error, cause) + def test_check_lastpass_url_returns_an_url(self): + server = 'lastpass.eu' + message = 'Please update to the latest version of LastPass.' + + parsed_response = etree.fromstring(self._lastpass_server_error(server, message)) + + self.assertEqual(fetcher.check_lastpass_url(parsed_response), self.eu_url) + + def test_check_lastpass_url_returns_None_if_server_attrb_not_present(self): + cause = 'Unknown cause' + + parsed_response = etree.fromstring(self._lastpass_error(cause, None)) + + self.assertEqual(fetcher.check_lastpass_url(parsed_response), None) + def test_fetch_makes_a_get_request(self): m = mock.Mock() m.get.return_value = self._http_ok(self.blob_response) @@ -225,7 +244,11 @@ def _lastpass_error(cause, message): if message: return ''.format(cause, message) return ''.format(cause) - + + @staticmethod + def _lastpass_server_error(server, message): + return ''.format(server, message) + def _request_login_with_lastpass_error(self, cause, message=None): return self._request_login_with_xml(self._lastpass_error(cause, message)) diff --git a/tests/test_session.py b/tests/test_session.py index db5e7b1..b424c08 100644 --- a/tests/test_session.py +++ b/tests/test_session.py @@ -7,11 +7,15 @@ class SessionTestCase(unittest.TestCase): def setUp(self): self.id = '53ru,Hb713QnEVM5zWZ16jMvxS0' self.key_iteration_count = 5000 + self.url = 'https://lastpass.com' - self.session = Session(self.id, self.key_iteration_count) + self.session = Session(self.id, self.key_iteration_count, self.url) def test_id_returns_the_correct_value(self): self.assertEqual(self.session.id, self.id) def test_key_iteration_count_returns_the_correct_value(self): self.assertEqual(self.session.key_iteration_count, self.key_iteration_count) + + def test_url_returns_the_correct_value(self): + self.assertEqual(self.session.url, self.url)