diff --git a/lastpass/exceptions.py b/lastpass/exceptions.py index 0e6f11a..8ab8e9d 100644 --- a/lastpass/exceptions.py +++ b/lastpass/exceptions.py @@ -49,6 +49,16 @@ class LastPassIncorrectYubikeyPasswordError(Error): pass +class LastPassIncorrectOutOfBandRequiredError(Error): + """LastPass error: need to provide out of band authentication (e.g, LastPass Authenticator)""" + pass + + +class LastPassIncorrectMultiFactorResponseError(Error): + """LastPass error: Multifactor response failed (wrong code or denied)""" + pass + + class LastPassUnknownError(Error): """LastPass error we don't know about""" pass diff --git a/lastpass/fetcher.py b/lastpass/fetcher.py index cfd6d04..7c588b7 100644 --- a/lastpass/fetcher.py +++ b/lastpass/fetcher.py @@ -1,5 +1,7 @@ # coding: utf-8 import hashlib +import random +import string from base64 import b64decode from binascii import hexlify import requests @@ -13,6 +15,8 @@ LastPassInvalidPasswordError, LastPassIncorrectGoogleAuthenticatorCodeError, LastPassIncorrectYubikeyPasswordError, + LastPassIncorrectOutOfBandRequiredError, + LastPassIncorrectMultiFactorResponseError, LastPassUnknownError ) from .session import Session @@ -21,9 +25,9 @@ http = requests -def login(username, password, multifactor_password=None, client_id=None): +def login(username, password, multifactor_password=None, client_id=None, trust_id=None, trust_me=False): key_iteration_count = request_iteration_count(username) - return request_login(username, password, key_iteration_count, multifactor_password, client_id) + return request_login(username, password, key_iteration_count, multifactor_password, client_id, trust_id=trust_id, trust_me=trust_me) def logout(session, web_client=http): @@ -63,21 +67,28 @@ def request_iteration_count(username, web_client=http): raise InvalidResponseError('Key iteration count is not positive') -def request_login(username, password, key_iteration_count, multifactor_password=None, client_id=None, web_client=http): +def request_login(username, password, key_iteration_count, multifactor_password=None, client_id=None, web_client=http, trust_id=None, trust_me=False): body = { - 'method': 'mobile', - 'web': 1, - 'xml': 1, + 'method': 'cli', + 'xml': 2, 'username': username, 'hash': make_hash(username, password, key_iteration_count), 'iterations': key_iteration_count, + 'includeprivatekeyenc': 1, + 'outofbandsupported': 1 } if multifactor_password: body['otp'] = multifactor_password + if trust_me and not trust_id: + trust_id = generate_trust_id() + + if trust_id: + body['uuid'] = trust_id + if client_id: - body['imei'] = client_id + body['trustlabel'] = client_id response = web_client.post('https://lastpass.com/login.php', data=body) @@ -93,17 +104,75 @@ def request_login(username, password, key_iteration_count, multifactor_password= if parsed_response is None: raise InvalidResponseError() - session = create_session(parsed_response, key_iteration_count) + session = create_session(parsed_response, key_iteration_count, trust_id) if not session: - raise login_error(parsed_response) + try: + raise login_error(parsed_response) + except LastPassIncorrectOutOfBandRequiredError: + (session, parsed_response) = oob_login(web_client, parsed_response, body, key_iteration_count, trust_id) + if not session: + raise login_error(parsed_response) + if trust_me: + response = web_client.post('https://lastpass.com/trust.php', cookies={'PHPSESSID': session.id}, data={"token": session.token, "uuid": trust_id, "trustlabel": client_id}) + return session -def create_session(parsed_response, key_iteration_count): +def oob_login(web_client, parsed_response, body, key_iteration_count, trust_id): + error = None if parsed_response.tag != 'response' else parsed_response.find( + 'error') + if 'outofbandname' not in error.attrib or 'capabilities' not in error.attrib: + return (None, parsed_response) + oob_name = error.attrib['outofbandname'] + oob_capabilities = error.attrib['capabilities'].split(',') + can_do_passcode = 'passcode' in oob_capabilities + if can_do_passcode and 'outofband' not in oob_capabilities: + return (None, parsed_response) + body['outofbandrequest'] = '1' + retries = 0 + # loop waiting for out of band approval, or failure + while retries < 5: + retries += 1 + response = web_client.post("https://lastpass.com/login.php", data=body) + if response.status_code != requests.codes.ok: + raise NetworkError() + + try: + parsed_response = etree.fromstring(response.content) + except etree.ParseError: + parsed_response = None + + if parsed_response is None: + raise InvalidResponseError() + + session = create_session(parsed_response, key_iteration_count, trust_id) + if session: + return (session, parsed_response) + error = None if parsed_response.tag != 'response' else parsed_response.find( + 'error') + if 'cause' in error.attrib and error.attrib['cause'] == 'outofbandrequired': + if 'retryid' in error.attrib: + body['outofbandretryid'] = error.attrib['retryid'] + body['outofbandretry'] = "1" + continue + return (None, parsed_response) + return (None, parsed_response) + + +def generate_trust_id(): + return ''.join(random.choice(string.ascii_uppercase + string.digits + string.ascii_lowercase + "!@#$") for _ in range(32)) + + +def create_session(parsed_response, key_iteration_count, trust_id): if parsed_response.tag == 'ok': - session_id = parsed_response.attrib.get('sessionid') + ok_response = parsed_response + else: + ok_response = parsed_response.find("ok") + if ok_response is not None: + session_id = ok_response.attrib.get('sessionid') + token = ok_response.attrib.get('token') if isinstance(session_id, str): - return Session(session_id, key_iteration_count) + return Session(session_id, key_iteration_count, token, trust_id) def login_error(parsed_response): @@ -117,6 +186,8 @@ def login_error(parsed_response): "googleauthrequired": LastPassIncorrectGoogleAuthenticatorCodeError, "googleauthfailed": LastPassIncorrectGoogleAuthenticatorCodeError, "yubikeyrestricted": LastPassIncorrectYubikeyPasswordError, + "outofbandrequired": LastPassIncorrectOutOfBandRequiredError, + "multifactorresponsefailed": LastPassIncorrectMultiFactorResponseError, } cause = error.attrib.get('cause') diff --git a/lastpass/session.py b/lastpass/session.py index f68a0f4..f1dd14b 100644 --- a/lastpass/session.py +++ b/lastpass/session.py @@ -1,8 +1,10 @@ # coding: utf-8 class Session(object): - def __init__(self, id, key_iteration_count): + def __init__(self, id, key_iteration_count, token=None, trust_id=None): self.id = id self.key_iteration_count = key_iteration_count + self.token = token + self.trust_id = trust_id def __eq__(self, other): - return self.id == other.id and self.key_iteration_count == other.key_iteration_count + return self.id == other.id and self.key_iteration_count == other.key_iteration_count and self.token == other.token diff --git a/lastpass/vault.py b/lastpass/vault.py index 1653eff..85866e6 100644 --- a/lastpass/vault.py +++ b/lastpass/vault.py @@ -6,10 +6,10 @@ class Vault(object): @classmethod - def open_remote(cls, username, password, multifactor_password=None, client_id=None): + def open_remote(cls, username, password, multifactor_password=None, client_id=None, trust_id=None, trust_me=False): """Fetches a blob from the server and creates a vault""" - blob = cls.fetch_blob(username, password, multifactor_password, client_id) - return cls.open(blob, username, password) + (blob, trust_id) = cls.fetch_blob(username, password, multifactor_password, client_id, trust_id, trust_me) + return cls.open(blob, username, password, trust_id) @classmethod def open_local(cls, blob_filename, username, password): @@ -18,20 +18,20 @@ def open_local(cls, blob_filename, username, password): raise NotImplementedError() @classmethod - def open(cls, blob, username, password): + def open(cls, blob, username, password, trust_id=None): """Creates a vault from a blob object""" - return cls(blob, blob.encryption_key(username, password)) + return cls(blob, blob.encryption_key(username, password), trust_id) @classmethod - def fetch_blob(cls, username, password, multifactor_password=None, client_id=None): + def fetch_blob(cls, username, password, multifactor_password=None, client_id=None, trust_id=None, trust_me=False): """Just fetches the blob, could be used to store it locally""" - session = fetcher.login(username, password, multifactor_password, client_id) + session = fetcher.login(username, password, multifactor_password, client_id, trust_id=trust_id, trust_me=trust_me) blob = fetcher.fetch(session) fetcher.logout(session) - return blob + return (blob, session.trust_id) - def __init__(self, blob, encryption_key): + def __init__(self, blob, encryption_key, trust_id=None): """This more of an internal method, use one of the static constructors instead""" chunks = parser.extract_chunks(blob) @@ -39,6 +39,7 @@ def __init__(self, blob, encryption_key): raise InvalidResponseError('Blob is truncated') self.accounts = self.parse_accounts(chunks, encryption_key) + self.trust_id = trust_id def is_complete(self, chunks): return len(chunks) > 0 and chunks[-1].id == b'ENDM' and chunks[-1].payload == b'OK' diff --git a/tests/test_fetcher.py b/tests/test_fetcher.py index 63ee05e..b81ae9b 100644 --- a/tests/test_fetcher.py +++ b/tests/test_fetcher.py @@ -16,22 +16,37 @@ def setUp(self): self.hash = b'7880a04588cfab954aa1a2da98fd9c0d2c6eba4c53e36a94510e6dbf30759256' self.session_id = '53ru,Hb713QnEVM5zWZ16jMvxS0' - self.session = Session(self.session_id, self.key_iteration_count) + self.token = '54aa1a2da98fd9c0d2c6eba4c5' + self.session = Session(self.session_id, self.key_iteration_count, token=self.token) self.blob_response = 'TFBBVgAAAAMxMjJQUkVNAAAACjE0MTQ5' self.blob_bytes = b64decode(self.blob_response) self.blob = Blob(self.blob_bytes, self.key_iteration_count) - self.login_post_data = {'method': 'mobile', - 'web': 1, - 'xml': 1, + self.login_post_data = {'method': 'cli', + 'xml': 2, + 'outofbandsupported': 1, + 'includeprivatekeyenc': 1, 'username': self.username, 'hash': self.hash, 'iterations': self.key_iteration_count} self.device_id = '492378378052455' self.login_post_data_with_device_id = self.login_post_data.copy() - self.login_post_data_with_device_id.update({'imei': self.device_id}) + self.login_post_data_with_device_id.update({'trustlabel': self.device_id}) + self.login_post_data_with_device_id.update({'uuid': self.device_id}) + + self.trust_id = '@2ykJ0Tp#dVi06qh6g6kvzOqjQGAWfKv' + + self.request_trust_data = { + 'token': self.token, + 'trustlabel': self.device_id, + 'uuid': self.trust_id + } + + self.request_trust_cookies = { + 'PHPSESSID': self.session_id + } self.google_authenticator_code = '12345' self.yubikey_password = 'emdbwzemyisymdnevznyqhqnklaqheaxszzvtnxjrmkb' @@ -93,6 +108,9 @@ def test_request_login_makes_a_post_request(self): def test_request_login_makes_a_post_request_with_device_id(self): self._verify_request_login_post_request(None, self.device_id, self.login_post_data_with_device_id) + def test_request_login_makes_a_post_request_with_trust_requested(self): + self._verify_request_trust(None, self.device_id, self.request_trust_data, cookies=self.request_trust_cookies, trust_id=self.trust_id, trust_me=True) + def test_request_login_makes_a_post_request_with_google_authenticator_code(self): self._verify_request_login_post_request(self.google_authenticator_code, None, @@ -104,7 +122,8 @@ def test_request_login_makes_a_post_request_with_yubikey_password(self): self.login_post_data_with_yubikey_password) def test_request_login_returns_a_session(self): - self.assertEqual(self._request_login_with_xml(''.format(self.session_id)), self.session) + tested_session = self._request_login_with_xml(''.format(self.session_id, self.token)) + self.assertEqual(tested_session, self.session) def test_request_login_raises_an_exception_on_http_error(self): self.assertRaises(lastpass.NetworkError, self._request_login_with_error) @@ -147,6 +166,14 @@ def test_request_login_raises_an_exception_on_missing_or_incorrect_yubikey_passw self.assertRaises(lastpass.LastPassIncorrectYubikeyPasswordError, self._request_login_with_lastpass_error, 'yubikeyrestricted', message) + def test_request_login_raises_an_exception_on_lastpass_authenticator(self): + message = 'Multifactor authentication required! ' \ + 'Upgrade your browser extension so you can enter it.' + self.assertRaises(lastpass.LastPassIncorrectOutOfBandRequiredError, + self._request_login_with_lastpass_multifactor_required, 'outofbandrequired', message) + self.assertRaises(lastpass.LastPassIncorrectMultiFactorResponseError, + self._request_login_with_lastpass_multifactor_required, 'multifactorresponsefailed', message) + def test_request_login_raises_an_exception_on_unknown_lastpass_error_without_a_message(self): cause = 'Unknown cause' self.assertRaises(lastpass.LastPassUnknownError, @@ -162,7 +189,8 @@ def test_fetch_makes_a_get_request(self): def test_fetch_returns_a_blob(self): m = mock.Mock() m.get.return_value = self._http_ok(self.blob_response) - self.assertEqual(fetcher.fetch(self.session, m), self.blob) + returned_blob = fetcher.fetch(self.session, m) + self.assertEqual(returned_blob, self.blob) def test_fetch_raises_exception_on_http_error(self): m = mock.Mock() @@ -197,12 +225,18 @@ def test_make_hash(self): for iterations, hash in hashes: self.assertEqual(hash, fetcher.make_hash('postlass@gmail.com', 'pl1234567890', iterations)) - def _verify_request_login_post_request(self, multifactor_password, device_id, post_data): + def _verify_request_login_post_request(self, multifactor_password, device_id, post_data, trust_me=False): m = mock.Mock() m.post.return_value = self._http_ok(''.format(self.session_id)) - fetcher.request_login(self.username, self.password, self.key_iteration_count, multifactor_password, device_id, m) + fetcher.request_login(self.username, self.password, self.key_iteration_count, multifactor_password, device_id, m, trust_id=device_id, trust_me=trust_me) m.post.assert_called_with('https://lastpass.com/login.php', data=post_data) + def _verify_request_trust(self, multifactor_password, device_id, post_data, cookies, trust_id, trust_me=False): + m = mock.Mock() + m.post.return_value = self._http_ok(''.format(self.session_id, self.token)) + fetcher.request_login(self.username, self.password, self.key_iteration_count, multifactor_password, device_id, m, trust_id=trust_id, trust_me=trust_me) + m.post.assert_called_with('https://lastpass.com/trust.php', data=post_data, cookies=cookies) + @staticmethod def _mock_response(code, body): m = mock.Mock() @@ -222,9 +256,19 @@ def _lastpass_error(cause, message): return ''.format(cause, message) return ''.format(cause) + @staticmethod + def _lastpass_multifactor_required(cause, message): + if message: + return ''.format(message, cause) + return ''.format(cause) + def _request_login_with_lastpass_error(self, cause, message=None): return self._request_login_with_xml(self._lastpass_error(cause, message)) + def _request_login_with_lastpass_multifactor_required(self, cause, message=None): + return self._request_login_with_xml( + self._lastpass_multifactor_required(cause, message)) + def _request_login_with_xml(self, text): return self._request_login_with_ok(text)