diff --git a/lastpass/exceptions.py b/lastpass/exceptions.py
index 0e6f11a..8ab8e9d 100644
--- a/lastpass/exceptions.py
+++ b/lastpass/exceptions.py
@@ -49,6 +49,16 @@ class LastPassIncorrectYubikeyPasswordError(Error):
pass
+class LastPassIncorrectOutOfBandRequiredError(Error):
+ """LastPass error: need to provide out of band authentication (e.g, LastPass Authenticator)"""
+ pass
+
+
+class LastPassIncorrectMultiFactorResponseError(Error):
+ """LastPass error: Multifactor response failed (wrong code or denied)"""
+ pass
+
+
class LastPassUnknownError(Error):
"""LastPass error we don't know about"""
pass
diff --git a/lastpass/fetcher.py b/lastpass/fetcher.py
index cfd6d04..7c588b7 100644
--- a/lastpass/fetcher.py
+++ b/lastpass/fetcher.py
@@ -1,5 +1,7 @@
# coding: utf-8
import hashlib
+import random
+import string
from base64 import b64decode
from binascii import hexlify
import requests
@@ -13,6 +15,8 @@
LastPassInvalidPasswordError,
LastPassIncorrectGoogleAuthenticatorCodeError,
LastPassIncorrectYubikeyPasswordError,
+ LastPassIncorrectOutOfBandRequiredError,
+ LastPassIncorrectMultiFactorResponseError,
LastPassUnknownError
)
from .session import Session
@@ -21,9 +25,9 @@
http = requests
-def login(username, password, multifactor_password=None, client_id=None):
+def login(username, password, multifactor_password=None, client_id=None, trust_id=None, trust_me=False):
key_iteration_count = request_iteration_count(username)
- return request_login(username, password, key_iteration_count, multifactor_password, client_id)
+ return request_login(username, password, key_iteration_count, multifactor_password, client_id, trust_id=trust_id, trust_me=trust_me)
def logout(session, web_client=http):
@@ -63,21 +67,28 @@ def request_iteration_count(username, web_client=http):
raise InvalidResponseError('Key iteration count is not positive')
-def request_login(username, password, key_iteration_count, multifactor_password=None, client_id=None, web_client=http):
+def request_login(username, password, key_iteration_count, multifactor_password=None, client_id=None, web_client=http, trust_id=None, trust_me=False):
body = {
- 'method': 'mobile',
- 'web': 1,
- 'xml': 1,
+ 'method': 'cli',
+ 'xml': 2,
'username': username,
'hash': make_hash(username, password, key_iteration_count),
'iterations': key_iteration_count,
+ 'includeprivatekeyenc': 1,
+ 'outofbandsupported': 1
}
if multifactor_password:
body['otp'] = multifactor_password
+ if trust_me and not trust_id:
+ trust_id = generate_trust_id()
+
+ if trust_id:
+ body['uuid'] = trust_id
+
if client_id:
- body['imei'] = client_id
+ body['trustlabel'] = client_id
response = web_client.post('https://lastpass.com/login.php',
data=body)
@@ -93,17 +104,75 @@ def request_login(username, password, key_iteration_count, multifactor_password=
if parsed_response is None:
raise InvalidResponseError()
- session = create_session(parsed_response, key_iteration_count)
+ session = create_session(parsed_response, key_iteration_count, trust_id)
if not session:
- raise login_error(parsed_response)
+ try:
+ raise login_error(parsed_response)
+ except LastPassIncorrectOutOfBandRequiredError:
+ (session, parsed_response) = oob_login(web_client, parsed_response, body, key_iteration_count, trust_id)
+ if not session:
+ raise login_error(parsed_response)
+ if trust_me:
+ response = web_client.post('https://lastpass.com/trust.php', cookies={'PHPSESSID': session.id}, data={"token": session.token, "uuid": trust_id, "trustlabel": client_id})
+
return session
-def create_session(parsed_response, key_iteration_count):
+def oob_login(web_client, parsed_response, body, key_iteration_count, trust_id):
+ error = None if parsed_response.tag != 'response' else parsed_response.find(
+ 'error')
+ if 'outofbandname' not in error.attrib or 'capabilities' not in error.attrib:
+ return (None, parsed_response)
+ oob_name = error.attrib['outofbandname']
+ oob_capabilities = error.attrib['capabilities'].split(',')
+ can_do_passcode = 'passcode' in oob_capabilities
+ if can_do_passcode and 'outofband' not in oob_capabilities:
+ return (None, parsed_response)
+ body['outofbandrequest'] = '1'
+ retries = 0
+ # loop waiting for out of band approval, or failure
+ while retries < 5:
+ retries += 1
+ response = web_client.post("https://lastpass.com/login.php", data=body)
+ if response.status_code != requests.codes.ok:
+ raise NetworkError()
+
+ try:
+ parsed_response = etree.fromstring(response.content)
+ except etree.ParseError:
+ parsed_response = None
+
+ if parsed_response is None:
+ raise InvalidResponseError()
+
+ session = create_session(parsed_response, key_iteration_count, trust_id)
+ if session:
+ return (session, parsed_response)
+ error = None if parsed_response.tag != 'response' else parsed_response.find(
+ 'error')
+ if 'cause' in error.attrib and error.attrib['cause'] == 'outofbandrequired':
+ if 'retryid' in error.attrib:
+ body['outofbandretryid'] = error.attrib['retryid']
+ body['outofbandretry'] = "1"
+ continue
+ return (None, parsed_response)
+ return (None, parsed_response)
+
+
+def generate_trust_id():
+ return ''.join(random.choice(string.ascii_uppercase + string.digits + string.ascii_lowercase + "!@#$") for _ in range(32))
+
+
+def create_session(parsed_response, key_iteration_count, trust_id):
if parsed_response.tag == 'ok':
- session_id = parsed_response.attrib.get('sessionid')
+ ok_response = parsed_response
+ else:
+ ok_response = parsed_response.find("ok")
+ if ok_response is not None:
+ session_id = ok_response.attrib.get('sessionid')
+ token = ok_response.attrib.get('token')
if isinstance(session_id, str):
- return Session(session_id, key_iteration_count)
+ return Session(session_id, key_iteration_count, token, trust_id)
def login_error(parsed_response):
@@ -117,6 +186,8 @@ def login_error(parsed_response):
"googleauthrequired": LastPassIncorrectGoogleAuthenticatorCodeError,
"googleauthfailed": LastPassIncorrectGoogleAuthenticatorCodeError,
"yubikeyrestricted": LastPassIncorrectYubikeyPasswordError,
+ "outofbandrequired": LastPassIncorrectOutOfBandRequiredError,
+ "multifactorresponsefailed": LastPassIncorrectMultiFactorResponseError,
}
cause = error.attrib.get('cause')
diff --git a/lastpass/session.py b/lastpass/session.py
index f68a0f4..f1dd14b 100644
--- a/lastpass/session.py
+++ b/lastpass/session.py
@@ -1,8 +1,10 @@
# coding: utf-8
class Session(object):
- def __init__(self, id, key_iteration_count):
+ def __init__(self, id, key_iteration_count, token=None, trust_id=None):
self.id = id
self.key_iteration_count = key_iteration_count
+ self.token = token
+ self.trust_id = trust_id
def __eq__(self, other):
- return self.id == other.id and self.key_iteration_count == other.key_iteration_count
+ return self.id == other.id and self.key_iteration_count == other.key_iteration_count and self.token == other.token
diff --git a/lastpass/vault.py b/lastpass/vault.py
index 1653eff..85866e6 100644
--- a/lastpass/vault.py
+++ b/lastpass/vault.py
@@ -6,10 +6,10 @@
class Vault(object):
@classmethod
- def open_remote(cls, username, password, multifactor_password=None, client_id=None):
+ def open_remote(cls, username, password, multifactor_password=None, client_id=None, trust_id=None, trust_me=False):
"""Fetches a blob from the server and creates a vault"""
- blob = cls.fetch_blob(username, password, multifactor_password, client_id)
- return cls.open(blob, username, password)
+ (blob, trust_id) = cls.fetch_blob(username, password, multifactor_password, client_id, trust_id, trust_me)
+ return cls.open(blob, username, password, trust_id)
@classmethod
def open_local(cls, blob_filename, username, password):
@@ -18,20 +18,20 @@ def open_local(cls, blob_filename, username, password):
raise NotImplementedError()
@classmethod
- def open(cls, blob, username, password):
+ def open(cls, blob, username, password, trust_id=None):
"""Creates a vault from a blob object"""
- return cls(blob, blob.encryption_key(username, password))
+ return cls(blob, blob.encryption_key(username, password), trust_id)
@classmethod
- def fetch_blob(cls, username, password, multifactor_password=None, client_id=None):
+ def fetch_blob(cls, username, password, multifactor_password=None, client_id=None, trust_id=None, trust_me=False):
"""Just fetches the blob, could be used to store it locally"""
- session = fetcher.login(username, password, multifactor_password, client_id)
+ session = fetcher.login(username, password, multifactor_password, client_id, trust_id=trust_id, trust_me=trust_me)
blob = fetcher.fetch(session)
fetcher.logout(session)
- return blob
+ return (blob, session.trust_id)
- def __init__(self, blob, encryption_key):
+ def __init__(self, blob, encryption_key, trust_id=None):
"""This more of an internal method, use one of the static constructors instead"""
chunks = parser.extract_chunks(blob)
@@ -39,6 +39,7 @@ def __init__(self, blob, encryption_key):
raise InvalidResponseError('Blob is truncated')
self.accounts = self.parse_accounts(chunks, encryption_key)
+ self.trust_id = trust_id
def is_complete(self, chunks):
return len(chunks) > 0 and chunks[-1].id == b'ENDM' and chunks[-1].payload == b'OK'
diff --git a/tests/test_fetcher.py b/tests/test_fetcher.py
index 63ee05e..b81ae9b 100644
--- a/tests/test_fetcher.py
+++ b/tests/test_fetcher.py
@@ -16,22 +16,37 @@ def setUp(self):
self.hash = b'7880a04588cfab954aa1a2da98fd9c0d2c6eba4c53e36a94510e6dbf30759256'
self.session_id = '53ru,Hb713QnEVM5zWZ16jMvxS0'
- self.session = Session(self.session_id, self.key_iteration_count)
+ self.token = '54aa1a2da98fd9c0d2c6eba4c5'
+ self.session = Session(self.session_id, self.key_iteration_count, token=self.token)
self.blob_response = 'TFBBVgAAAAMxMjJQUkVNAAAACjE0MTQ5'
self.blob_bytes = b64decode(self.blob_response)
self.blob = Blob(self.blob_bytes, self.key_iteration_count)
- self.login_post_data = {'method': 'mobile',
- 'web': 1,
- 'xml': 1,
+ self.login_post_data = {'method': 'cli',
+ 'xml': 2,
+ 'outofbandsupported': 1,
+ 'includeprivatekeyenc': 1,
'username': self.username,
'hash': self.hash,
'iterations': self.key_iteration_count}
self.device_id = '492378378052455'
self.login_post_data_with_device_id = self.login_post_data.copy()
- self.login_post_data_with_device_id.update({'imei': self.device_id})
+ self.login_post_data_with_device_id.update({'trustlabel': self.device_id})
+ self.login_post_data_with_device_id.update({'uuid': self.device_id})
+
+ self.trust_id = '@2ykJ0Tp#dVi06qh6g6kvzOqjQGAWfKv'
+
+ self.request_trust_data = {
+ 'token': self.token,
+ 'trustlabel': self.device_id,
+ 'uuid': self.trust_id
+ }
+
+ self.request_trust_cookies = {
+ 'PHPSESSID': self.session_id
+ }
self.google_authenticator_code = '12345'
self.yubikey_password = 'emdbwzemyisymdnevznyqhqnklaqheaxszzvtnxjrmkb'
@@ -93,6 +108,9 @@ def test_request_login_makes_a_post_request(self):
def test_request_login_makes_a_post_request_with_device_id(self):
self._verify_request_login_post_request(None, self.device_id, self.login_post_data_with_device_id)
+ def test_request_login_makes_a_post_request_with_trust_requested(self):
+ self._verify_request_trust(None, self.device_id, self.request_trust_data, cookies=self.request_trust_cookies, trust_id=self.trust_id, trust_me=True)
+
def test_request_login_makes_a_post_request_with_google_authenticator_code(self):
self._verify_request_login_post_request(self.google_authenticator_code,
None,
@@ -104,7 +122,8 @@ def test_request_login_makes_a_post_request_with_yubikey_password(self):
self.login_post_data_with_yubikey_password)
def test_request_login_returns_a_session(self):
- self.assertEqual(self._request_login_with_xml(''.format(self.session_id)), self.session)
+ tested_session = self._request_login_with_xml(''.format(self.session_id, self.token))
+ self.assertEqual(tested_session, self.session)
def test_request_login_raises_an_exception_on_http_error(self):
self.assertRaises(lastpass.NetworkError, self._request_login_with_error)
@@ -147,6 +166,14 @@ def test_request_login_raises_an_exception_on_missing_or_incorrect_yubikey_passw
self.assertRaises(lastpass.LastPassIncorrectYubikeyPasswordError,
self._request_login_with_lastpass_error, 'yubikeyrestricted', message)
+ def test_request_login_raises_an_exception_on_lastpass_authenticator(self):
+ message = 'Multifactor authentication required! ' \
+ 'Upgrade your browser extension so you can enter it.'
+ self.assertRaises(lastpass.LastPassIncorrectOutOfBandRequiredError,
+ self._request_login_with_lastpass_multifactor_required, 'outofbandrequired', message)
+ self.assertRaises(lastpass.LastPassIncorrectMultiFactorResponseError,
+ self._request_login_with_lastpass_multifactor_required, 'multifactorresponsefailed', message)
+
def test_request_login_raises_an_exception_on_unknown_lastpass_error_without_a_message(self):
cause = 'Unknown cause'
self.assertRaises(lastpass.LastPassUnknownError,
@@ -162,7 +189,8 @@ def test_fetch_makes_a_get_request(self):
def test_fetch_returns_a_blob(self):
m = mock.Mock()
m.get.return_value = self._http_ok(self.blob_response)
- self.assertEqual(fetcher.fetch(self.session, m), self.blob)
+ returned_blob = fetcher.fetch(self.session, m)
+ self.assertEqual(returned_blob, self.blob)
def test_fetch_raises_exception_on_http_error(self):
m = mock.Mock()
@@ -197,12 +225,18 @@ def test_make_hash(self):
for iterations, hash in hashes:
self.assertEqual(hash, fetcher.make_hash('postlass@gmail.com', 'pl1234567890', iterations))
- def _verify_request_login_post_request(self, multifactor_password, device_id, post_data):
+ def _verify_request_login_post_request(self, multifactor_password, device_id, post_data, trust_me=False):
m = mock.Mock()
m.post.return_value = self._http_ok(''.format(self.session_id))
- fetcher.request_login(self.username, self.password, self.key_iteration_count, multifactor_password, device_id, m)
+ fetcher.request_login(self.username, self.password, self.key_iteration_count, multifactor_password, device_id, m, trust_id=device_id, trust_me=trust_me)
m.post.assert_called_with('https://lastpass.com/login.php', data=post_data)
+ def _verify_request_trust(self, multifactor_password, device_id, post_data, cookies, trust_id, trust_me=False):
+ m = mock.Mock()
+ m.post.return_value = self._http_ok(''.format(self.session_id, self.token))
+ fetcher.request_login(self.username, self.password, self.key_iteration_count, multifactor_password, device_id, m, trust_id=trust_id, trust_me=trust_me)
+ m.post.assert_called_with('https://lastpass.com/trust.php', data=post_data, cookies=cookies)
+
@staticmethod
def _mock_response(code, body):
m = mock.Mock()
@@ -222,9 +256,19 @@ def _lastpass_error(cause, message):
return ''.format(cause, message)
return ''.format(cause)
+ @staticmethod
+ def _lastpass_multifactor_required(cause, message):
+ if message:
+ return ''.format(message, cause)
+ return ''.format(cause)
+
def _request_login_with_lastpass_error(self, cause, message=None):
return self._request_login_with_xml(self._lastpass_error(cause, message))
+ def _request_login_with_lastpass_multifactor_required(self, cause, message=None):
+ return self._request_login_with_xml(
+ self._lastpass_multifactor_required(cause, message))
+
def _request_login_with_xml(self, text):
return self._request_login_with_ok(text)