Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions lastpass/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,16 @@ class LastPassIncorrectYubikeyPasswordError(Error):
pass


class LastPassIncorrectOutOfBandRequiredError(Error):
"""LastPass error: need to provide out of band authentication (e.g, LastPass Authenticator)"""
pass


class LastPassIncorrectMultiFactorResponseError(Error):
"""LastPass error: Multifactor response failed (wrong code or denied)"""
pass


class LastPassUnknownError(Error):
"""LastPass error we don't know about"""
pass
95 changes: 83 additions & 12 deletions lastpass/fetcher.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
# coding: utf-8
import hashlib
import random
import string
from base64 import b64decode
from binascii import hexlify
import requests
Expand All @@ -13,6 +15,8 @@
LastPassInvalidPasswordError,
LastPassIncorrectGoogleAuthenticatorCodeError,
LastPassIncorrectYubikeyPasswordError,
LastPassIncorrectOutOfBandRequiredError,
LastPassIncorrectMultiFactorResponseError,
LastPassUnknownError
)
from .session import Session
Expand All @@ -21,9 +25,9 @@
http = requests


def login(username, password, multifactor_password=None, client_id=None):
def login(username, password, multifactor_password=None, client_id=None, trust_id=None, trust_me=False):
key_iteration_count = request_iteration_count(username)
return request_login(username, password, key_iteration_count, multifactor_password, client_id)
return request_login(username, password, key_iteration_count, multifactor_password, client_id, trust_id=trust_id, trust_me=trust_me)


def logout(session, web_client=http):
Expand Down Expand Up @@ -63,21 +67,28 @@ def request_iteration_count(username, web_client=http):
raise InvalidResponseError('Key iteration count is not positive')


def request_login(username, password, key_iteration_count, multifactor_password=None, client_id=None, web_client=http):
def request_login(username, password, key_iteration_count, multifactor_password=None, client_id=None, web_client=http, trust_id=None, trust_me=False):
body = {
'method': 'mobile',
'web': 1,
'xml': 1,
'method': 'cli',
'xml': 2,
'username': username,
'hash': make_hash(username, password, key_iteration_count),
'iterations': key_iteration_count,
'includeprivatekeyenc': 1,
'outofbandsupported': 1
}

if multifactor_password:
body['otp'] = multifactor_password

if trust_me and not trust_id:
trust_id = generate_trust_id()

if trust_id:
body['uuid'] = trust_id

if client_id:
body['imei'] = client_id
body['trustlabel'] = client_id

response = web_client.post('https://lastpass.com/login.php',
data=body)
Expand All @@ -93,17 +104,75 @@ def request_login(username, password, key_iteration_count, multifactor_password=
if parsed_response is None:
raise InvalidResponseError()

session = create_session(parsed_response, key_iteration_count)
session = create_session(parsed_response, key_iteration_count, trust_id)
if not session:
raise login_error(parsed_response)
try:
raise login_error(parsed_response)
except LastPassIncorrectOutOfBandRequiredError:
(session, parsed_response) = oob_login(web_client, parsed_response, body, key_iteration_count, trust_id)
if not session:
raise login_error(parsed_response)
if trust_me:
response = web_client.post('https://lastpass.com/trust.php', cookies={'PHPSESSID': session.id}, data={"token": session.token, "uuid": trust_id, "trustlabel": client_id})

return session


def create_session(parsed_response, key_iteration_count):
def oob_login(web_client, parsed_response, body, key_iteration_count, trust_id):
error = None if parsed_response.tag != 'response' else parsed_response.find(
'error')
if 'outofbandname' not in error.attrib or 'capabilities' not in error.attrib:
return (None, parsed_response)
oob_name = error.attrib['outofbandname']
oob_capabilities = error.attrib['capabilities'].split(',')
can_do_passcode = 'passcode' in oob_capabilities
if can_do_passcode and 'outofband' not in oob_capabilities:
return (None, parsed_response)
body['outofbandrequest'] = '1'
retries = 0
# loop waiting for out of band approval, or failure
while retries < 5:
retries += 1
response = web_client.post("https://lastpass.com/login.php", data=body)
if response.status_code != requests.codes.ok:
raise NetworkError()

try:
parsed_response = etree.fromstring(response.content)
except etree.ParseError:
parsed_response = None

if parsed_response is None:
raise InvalidResponseError()

session = create_session(parsed_response, key_iteration_count, trust_id)
if session:
return (session, parsed_response)
error = None if parsed_response.tag != 'response' else parsed_response.find(
'error')
if 'cause' in error.attrib and error.attrib['cause'] == 'outofbandrequired':
if 'retryid' in error.attrib:
body['outofbandretryid'] = error.attrib['retryid']
body['outofbandretry'] = "1"
continue
return (None, parsed_response)
return (None, parsed_response)


def generate_trust_id():
return ''.join(random.choice(string.ascii_uppercase + string.digits + string.ascii_lowercase + "!@#$") for _ in range(32))


def create_session(parsed_response, key_iteration_count, trust_id):
if parsed_response.tag == 'ok':
session_id = parsed_response.attrib.get('sessionid')
ok_response = parsed_response
else:
ok_response = parsed_response.find("ok")
if ok_response is not None:
session_id = ok_response.attrib.get('sessionid')
token = ok_response.attrib.get('token')
if isinstance(session_id, str):
return Session(session_id, key_iteration_count)
return Session(session_id, key_iteration_count, token, trust_id)


def login_error(parsed_response):
Expand All @@ -117,6 +186,8 @@ def login_error(parsed_response):
"googleauthrequired": LastPassIncorrectGoogleAuthenticatorCodeError,
"googleauthfailed": LastPassIncorrectGoogleAuthenticatorCodeError,
"yubikeyrestricted": LastPassIncorrectYubikeyPasswordError,
"outofbandrequired": LastPassIncorrectOutOfBandRequiredError,
"multifactorresponsefailed": LastPassIncorrectMultiFactorResponseError,
}

cause = error.attrib.get('cause')
Expand Down
6 changes: 4 additions & 2 deletions lastpass/session.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
# coding: utf-8
class Session(object):
def __init__(self, id, key_iteration_count):
def __init__(self, id, key_iteration_count, token=None, trust_id=None):
self.id = id
self.key_iteration_count = key_iteration_count
self.token = token
self.trust_id = trust_id

def __eq__(self, other):
return self.id == other.id and self.key_iteration_count == other.key_iteration_count
return self.id == other.id and self.key_iteration_count == other.key_iteration_count and self.token == other.token
19 changes: 10 additions & 9 deletions lastpass/vault.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,10 @@

class Vault(object):
@classmethod
def open_remote(cls, username, password, multifactor_password=None, client_id=None):
def open_remote(cls, username, password, multifactor_password=None, client_id=None, trust_id=None, trust_me=False):
"""Fetches a blob from the server and creates a vault"""
blob = cls.fetch_blob(username, password, multifactor_password, client_id)
return cls.open(blob, username, password)
(blob, trust_id) = cls.fetch_blob(username, password, multifactor_password, client_id, trust_id, trust_me)
return cls.open(blob, username, password, trust_id)

@classmethod
def open_local(cls, blob_filename, username, password):
Expand All @@ -18,27 +18,28 @@ def open_local(cls, blob_filename, username, password):
raise NotImplementedError()

@classmethod
def open(cls, blob, username, password):
def open(cls, blob, username, password, trust_id=None):
"""Creates a vault from a blob object"""
return cls(blob, blob.encryption_key(username, password))
return cls(blob, blob.encryption_key(username, password), trust_id)

@classmethod
def fetch_blob(cls, username, password, multifactor_password=None, client_id=None):
def fetch_blob(cls, username, password, multifactor_password=None, client_id=None, trust_id=None, trust_me=False):
"""Just fetches the blob, could be used to store it locally"""
session = fetcher.login(username, password, multifactor_password, client_id)
session = fetcher.login(username, password, multifactor_password, client_id, trust_id=trust_id, trust_me=trust_me)
blob = fetcher.fetch(session)
fetcher.logout(session)

return blob
return (blob, session.trust_id)

def __init__(self, blob, encryption_key):
def __init__(self, blob, encryption_key, trust_id=None):
"""This more of an internal method, use one of the static constructors instead"""
chunks = parser.extract_chunks(blob)

if not self.is_complete(chunks):
raise InvalidResponseError('Blob is truncated')

self.accounts = self.parse_accounts(chunks, encryption_key)
self.trust_id = trust_id

def is_complete(self, chunks):
return len(chunks) > 0 and chunks[-1].id == b'ENDM' and chunks[-1].payload == b'OK'
Expand Down
62 changes: 53 additions & 9 deletions tests/test_fetcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,22 +16,37 @@ def setUp(self):

self.hash = b'7880a04588cfab954aa1a2da98fd9c0d2c6eba4c53e36a94510e6dbf30759256'
self.session_id = '53ru,Hb713QnEVM5zWZ16jMvxS0'
self.session = Session(self.session_id, self.key_iteration_count)
self.token = '54aa1a2da98fd9c0d2c6eba4c5'
self.session = Session(self.session_id, self.key_iteration_count, token=self.token)

self.blob_response = 'TFBBVgAAAAMxMjJQUkVNAAAACjE0MTQ5'
self.blob_bytes = b64decode(self.blob_response)
self.blob = Blob(self.blob_bytes, self.key_iteration_count)

self.login_post_data = {'method': 'mobile',
'web': 1,
'xml': 1,
self.login_post_data = {'method': 'cli',
'xml': 2,
'outofbandsupported': 1,
'includeprivatekeyenc': 1,
'username': self.username,
'hash': self.hash,
'iterations': self.key_iteration_count}

self.device_id = '492378378052455'
self.login_post_data_with_device_id = self.login_post_data.copy()
self.login_post_data_with_device_id.update({'imei': self.device_id})
self.login_post_data_with_device_id.update({'trustlabel': self.device_id})
self.login_post_data_with_device_id.update({'uuid': self.device_id})

self.trust_id = '@2ykJ0Tp#dVi06qh6g6kvzOqjQGAWfKv'

self.request_trust_data = {
'token': self.token,
'trustlabel': self.device_id,
'uuid': self.trust_id
}

self.request_trust_cookies = {
'PHPSESSID': self.session_id
}

self.google_authenticator_code = '12345'
self.yubikey_password = 'emdbwzemyisymdnevznyqhqnklaqheaxszzvtnxjrmkb'
Expand Down Expand Up @@ -93,6 +108,9 @@ def test_request_login_makes_a_post_request(self):
def test_request_login_makes_a_post_request_with_device_id(self):
self._verify_request_login_post_request(None, self.device_id, self.login_post_data_with_device_id)

def test_request_login_makes_a_post_request_with_trust_requested(self):
self._verify_request_trust(None, self.device_id, self.request_trust_data, cookies=self.request_trust_cookies, trust_id=self.trust_id, trust_me=True)

def test_request_login_makes_a_post_request_with_google_authenticator_code(self):
self._verify_request_login_post_request(self.google_authenticator_code,
None,
Expand All @@ -104,7 +122,8 @@ def test_request_login_makes_a_post_request_with_yubikey_password(self):
self.login_post_data_with_yubikey_password)

def test_request_login_returns_a_session(self):
self.assertEqual(self._request_login_with_xml('<ok sessionid="{}" />'.format(self.session_id)), self.session)
tested_session = self._request_login_with_xml('<ok sessionid="{}" token="{}"/>'.format(self.session_id, self.token))
self.assertEqual(tested_session, self.session)

def test_request_login_raises_an_exception_on_http_error(self):
self.assertRaises(lastpass.NetworkError, self._request_login_with_error)
Expand Down Expand Up @@ -147,6 +166,14 @@ def test_request_login_raises_an_exception_on_missing_or_incorrect_yubikey_passw
self.assertRaises(lastpass.LastPassIncorrectYubikeyPasswordError,
self._request_login_with_lastpass_error, 'yubikeyrestricted', message)

def test_request_login_raises_an_exception_on_lastpass_authenticator(self):
message = 'Multifactor authentication required! ' \
'Upgrade your browser extension so you can enter it.'
self.assertRaises(lastpass.LastPassIncorrectOutOfBandRequiredError,
self._request_login_with_lastpass_multifactor_required, 'outofbandrequired', message)
self.assertRaises(lastpass.LastPassIncorrectMultiFactorResponseError,
self._request_login_with_lastpass_multifactor_required, 'multifactorresponsefailed', message)

def test_request_login_raises_an_exception_on_unknown_lastpass_error_without_a_message(self):
cause = 'Unknown cause'
self.assertRaises(lastpass.LastPassUnknownError,
Expand All @@ -162,7 +189,8 @@ def test_fetch_makes_a_get_request(self):
def test_fetch_returns_a_blob(self):
m = mock.Mock()
m.get.return_value = self._http_ok(self.blob_response)
self.assertEqual(fetcher.fetch(self.session, m), self.blob)
returned_blob = fetcher.fetch(self.session, m)
self.assertEqual(returned_blob, self.blob)

def test_fetch_raises_exception_on_http_error(self):
m = mock.Mock()
Expand Down Expand Up @@ -197,12 +225,18 @@ def test_make_hash(self):
for iterations, hash in hashes:
self.assertEqual(hash, fetcher.make_hash('postlass@gmail.com', 'pl1234567890', iterations))

def _verify_request_login_post_request(self, multifactor_password, device_id, post_data):
def _verify_request_login_post_request(self, multifactor_password, device_id, post_data, trust_me=False):
m = mock.Mock()
m.post.return_value = self._http_ok('<ok sessionid="{}" />'.format(self.session_id))
fetcher.request_login(self.username, self.password, self.key_iteration_count, multifactor_password, device_id, m)
fetcher.request_login(self.username, self.password, self.key_iteration_count, multifactor_password, device_id, m, trust_id=device_id, trust_me=trust_me)
m.post.assert_called_with('https://lastpass.com/login.php', data=post_data)

def _verify_request_trust(self, multifactor_password, device_id, post_data, cookies, trust_id, trust_me=False):
m = mock.Mock()
m.post.return_value = self._http_ok('<ok sessionid="{}" token="{}"/>'.format(self.session_id, self.token))
fetcher.request_login(self.username, self.password, self.key_iteration_count, multifactor_password, device_id, m, trust_id=trust_id, trust_me=trust_me)
m.post.assert_called_with('https://lastpass.com/trust.php', data=post_data, cookies=cookies)

@staticmethod
def _mock_response(code, body):
m = mock.Mock()
Expand All @@ -222,9 +256,19 @@ def _lastpass_error(cause, message):
return '<response><error cause="{}" message="{}" /></response>'.format(cause, message)
return '<response><error cause="{}" /></response>'.format(cause)

@staticmethod
def _lastpass_multifactor_required(cause, message):
if message:
return '<response><error message="{}" cause="{}" allowtrust="1" capabilities="push,totp,sms,outofband,outofbandauto,passcode" outofbandtype="lastpassauth" outofbandname="LastPass Authenticator" allowmultifactortrust="true" trustexpired="0" trustlabel="" hidedisable="false" /></response>'.format(message, cause)
return '<response><error cause="{}" allowtrust="1" capabilities="push,totp,sms,outofband,outofbandauto,passcode" outofbandtype="lastpassauth" outofbandname="LastPass Authenticator" allowmultifactortrust="true" trustexpired="0" trustlabel="" hidedisable="false" /></response>'.format(cause)

def _request_login_with_lastpass_error(self, cause, message=None):
return self._request_login_with_xml(self._lastpass_error(cause, message))

def _request_login_with_lastpass_multifactor_required(self, cause, message=None):
return self._request_login_with_xml(
self._lastpass_multifactor_required(cause, message))

def _request_login_with_xml(self, text):
return self._request_login_with_ok(text)

Expand Down