From 403efc252844d5b9cb222ea68ccb14aab0107440 Mon Sep 17 00:00:00 2001 From: Terry Hardie Date: Fri, 13 Apr 2018 16:01:51 -0700 Subject: [PATCH 1/2] Added support for LastPass Authenticator and trusting endpoint --- lastpass/exceptions.py | 10 +++++ lastpass/fetcher.py | 95 ++++++++++++++++++++++++++++++++++++------ lastpass/session.py | 6 ++- lastpass/vault.py | 19 +++++---- tests/test_fetcher.py | 62 +++++++++++++++++++++++---- 5 files changed, 160 insertions(+), 32 deletions(-) diff --git a/lastpass/exceptions.py b/lastpass/exceptions.py index 0e6f11a..8ab8e9d 100644 --- a/lastpass/exceptions.py +++ b/lastpass/exceptions.py @@ -49,6 +49,16 @@ class LastPassIncorrectYubikeyPasswordError(Error): pass +class LastPassIncorrectOutOfBandRequiredError(Error): + """LastPass error: need to provide out of band authentication (e.g, LastPass Authenticator)""" + pass + + +class LastPassIncorrectMultiFactorResponseError(Error): + """LastPass error: Multifactor response failed (wrong code or denied)""" + pass + + class LastPassUnknownError(Error): """LastPass error we don't know about""" pass diff --git a/lastpass/fetcher.py b/lastpass/fetcher.py index cfd6d04..7c588b7 100644 --- a/lastpass/fetcher.py +++ b/lastpass/fetcher.py @@ -1,5 +1,7 @@ # coding: utf-8 import hashlib +import random +import string from base64 import b64decode from binascii import hexlify import requests @@ -13,6 +15,8 @@ LastPassInvalidPasswordError, LastPassIncorrectGoogleAuthenticatorCodeError, LastPassIncorrectYubikeyPasswordError, + LastPassIncorrectOutOfBandRequiredError, + LastPassIncorrectMultiFactorResponseError, LastPassUnknownError ) from .session import Session @@ -21,9 +25,9 @@ http = requests -def login(username, password, multifactor_password=None, client_id=None): +def login(username, password, multifactor_password=None, client_id=None, trust_id=None, trust_me=False): key_iteration_count = request_iteration_count(username) - return request_login(username, password, key_iteration_count, multifactor_password, client_id) + return request_login(username, password, key_iteration_count, multifactor_password, client_id, trust_id=trust_id, trust_me=trust_me) def logout(session, web_client=http): @@ -63,21 +67,28 @@ def request_iteration_count(username, web_client=http): raise InvalidResponseError('Key iteration count is not positive') -def request_login(username, password, key_iteration_count, multifactor_password=None, client_id=None, web_client=http): +def request_login(username, password, key_iteration_count, multifactor_password=None, client_id=None, web_client=http, trust_id=None, trust_me=False): body = { - 'method': 'mobile', - 'web': 1, - 'xml': 1, + 'method': 'cli', + 'xml': 2, 'username': username, 'hash': make_hash(username, password, key_iteration_count), 'iterations': key_iteration_count, + 'includeprivatekeyenc': 1, + 'outofbandsupported': 1 } if multifactor_password: body['otp'] = multifactor_password + if trust_me and not trust_id: + trust_id = generate_trust_id() + + if trust_id: + body['uuid'] = trust_id + if client_id: - body['imei'] = client_id + body['trustlabel'] = client_id response = web_client.post('https://lastpass.com/login.php', data=body) @@ -93,17 +104,75 @@ def request_login(username, password, key_iteration_count, multifactor_password= if parsed_response is None: raise InvalidResponseError() - session = create_session(parsed_response, key_iteration_count) + session = create_session(parsed_response, key_iteration_count, trust_id) if not session: - raise login_error(parsed_response) + try: + raise login_error(parsed_response) + except LastPassIncorrectOutOfBandRequiredError: + (session, parsed_response) = oob_login(web_client, parsed_response, body, key_iteration_count, trust_id) + if not session: + raise login_error(parsed_response) + if trust_me: + response = web_client.post('https://lastpass.com/trust.php', cookies={'PHPSESSID': session.id}, data={"token": session.token, "uuid": trust_id, "trustlabel": client_id}) + return session -def create_session(parsed_response, key_iteration_count): +def oob_login(web_client, parsed_response, body, key_iteration_count, trust_id): + error = None if parsed_response.tag != 'response' else parsed_response.find( + 'error') + if 'outofbandname' not in error.attrib or 'capabilities' not in error.attrib: + return (None, parsed_response) + oob_name = error.attrib['outofbandname'] + oob_capabilities = error.attrib['capabilities'].split(',') + can_do_passcode = 'passcode' in oob_capabilities + if can_do_passcode and 'outofband' not in oob_capabilities: + return (None, parsed_response) + body['outofbandrequest'] = '1' + retries = 0 + # loop waiting for out of band approval, or failure + while retries < 5: + retries += 1 + response = web_client.post("https://lastpass.com/login.php", data=body) + if response.status_code != requests.codes.ok: + raise NetworkError() + + try: + parsed_response = etree.fromstring(response.content) + except etree.ParseError: + parsed_response = None + + if parsed_response is None: + raise InvalidResponseError() + + session = create_session(parsed_response, key_iteration_count, trust_id) + if session: + return (session, parsed_response) + error = None if parsed_response.tag != 'response' else parsed_response.find( + 'error') + if 'cause' in error.attrib and error.attrib['cause'] == 'outofbandrequired': + if 'retryid' in error.attrib: + body['outofbandretryid'] = error.attrib['retryid'] + body['outofbandretry'] = "1" + continue + return (None, parsed_response) + return (None, parsed_response) + + +def generate_trust_id(): + return ''.join(random.choice(string.ascii_uppercase + string.digits + string.ascii_lowercase + "!@#$") for _ in range(32)) + + +def create_session(parsed_response, key_iteration_count, trust_id): if parsed_response.tag == 'ok': - session_id = parsed_response.attrib.get('sessionid') + ok_response = parsed_response + else: + ok_response = parsed_response.find("ok") + if ok_response is not None: + session_id = ok_response.attrib.get('sessionid') + token = ok_response.attrib.get('token') if isinstance(session_id, str): - return Session(session_id, key_iteration_count) + return Session(session_id, key_iteration_count, token, trust_id) def login_error(parsed_response): @@ -117,6 +186,8 @@ def login_error(parsed_response): "googleauthrequired": LastPassIncorrectGoogleAuthenticatorCodeError, "googleauthfailed": LastPassIncorrectGoogleAuthenticatorCodeError, "yubikeyrestricted": LastPassIncorrectYubikeyPasswordError, + "outofbandrequired": LastPassIncorrectOutOfBandRequiredError, + "multifactorresponsefailed": LastPassIncorrectMultiFactorResponseError, } cause = error.attrib.get('cause') diff --git a/lastpass/session.py b/lastpass/session.py index f68a0f4..f1dd14b 100644 --- a/lastpass/session.py +++ b/lastpass/session.py @@ -1,8 +1,10 @@ # coding: utf-8 class Session(object): - def __init__(self, id, key_iteration_count): + def __init__(self, id, key_iteration_count, token=None, trust_id=None): self.id = id self.key_iteration_count = key_iteration_count + self.token = token + self.trust_id = trust_id def __eq__(self, other): - return self.id == other.id and self.key_iteration_count == other.key_iteration_count + return self.id == other.id and self.key_iteration_count == other.key_iteration_count and self.token == other.token diff --git a/lastpass/vault.py b/lastpass/vault.py index 1653eff..85866e6 100644 --- a/lastpass/vault.py +++ b/lastpass/vault.py @@ -6,10 +6,10 @@ class Vault(object): @classmethod - def open_remote(cls, username, password, multifactor_password=None, client_id=None): + def open_remote(cls, username, password, multifactor_password=None, client_id=None, trust_id=None, trust_me=False): """Fetches a blob from the server and creates a vault""" - blob = cls.fetch_blob(username, password, multifactor_password, client_id) - return cls.open(blob, username, password) + (blob, trust_id) = cls.fetch_blob(username, password, multifactor_password, client_id, trust_id, trust_me) + return cls.open(blob, username, password, trust_id) @classmethod def open_local(cls, blob_filename, username, password): @@ -18,20 +18,20 @@ def open_local(cls, blob_filename, username, password): raise NotImplementedError() @classmethod - def open(cls, blob, username, password): + def open(cls, blob, username, password, trust_id=None): """Creates a vault from a blob object""" - return cls(blob, blob.encryption_key(username, password)) + return cls(blob, blob.encryption_key(username, password), trust_id) @classmethod - def fetch_blob(cls, username, password, multifactor_password=None, client_id=None): + def fetch_blob(cls, username, password, multifactor_password=None, client_id=None, trust_id=None, trust_me=False): """Just fetches the blob, could be used to store it locally""" - session = fetcher.login(username, password, multifactor_password, client_id) + session = fetcher.login(username, password, multifactor_password, client_id, trust_id=trust_id, trust_me=trust_me) blob = fetcher.fetch(session) fetcher.logout(session) - return blob + return (blob, session.trust_id) - def __init__(self, blob, encryption_key): + def __init__(self, blob, encryption_key, trust_id=None): """This more of an internal method, use one of the static constructors instead""" chunks = parser.extract_chunks(blob) @@ -39,6 +39,7 @@ def __init__(self, blob, encryption_key): raise InvalidResponseError('Blob is truncated') self.accounts = self.parse_accounts(chunks, encryption_key) + self.trust_id = trust_id def is_complete(self, chunks): return len(chunks) > 0 and chunks[-1].id == b'ENDM' and chunks[-1].payload == b'OK' diff --git a/tests/test_fetcher.py b/tests/test_fetcher.py index 63ee05e..b81ae9b 100644 --- a/tests/test_fetcher.py +++ b/tests/test_fetcher.py @@ -16,22 +16,37 @@ def setUp(self): self.hash = b'7880a04588cfab954aa1a2da98fd9c0d2c6eba4c53e36a94510e6dbf30759256' self.session_id = '53ru,Hb713QnEVM5zWZ16jMvxS0' - self.session = Session(self.session_id, self.key_iteration_count) + self.token = '54aa1a2da98fd9c0d2c6eba4c5' + self.session = Session(self.session_id, self.key_iteration_count, token=self.token) self.blob_response = 'TFBBVgAAAAMxMjJQUkVNAAAACjE0MTQ5' self.blob_bytes = b64decode(self.blob_response) self.blob = Blob(self.blob_bytes, self.key_iteration_count) - self.login_post_data = {'method': 'mobile', - 'web': 1, - 'xml': 1, + self.login_post_data = {'method': 'cli', + 'xml': 2, + 'outofbandsupported': 1, + 'includeprivatekeyenc': 1, 'username': self.username, 'hash': self.hash, 'iterations': self.key_iteration_count} self.device_id = '492378378052455' self.login_post_data_with_device_id = self.login_post_data.copy() - self.login_post_data_with_device_id.update({'imei': self.device_id}) + self.login_post_data_with_device_id.update({'trustlabel': self.device_id}) + self.login_post_data_with_device_id.update({'uuid': self.device_id}) + + self.trust_id = '@2ykJ0Tp#dVi06qh6g6kvzOqjQGAWfKv' + + self.request_trust_data = { + 'token': self.token, + 'trustlabel': self.device_id, + 'uuid': self.trust_id + } + + self.request_trust_cookies = { + 'PHPSESSID': self.session_id + } self.google_authenticator_code = '12345' self.yubikey_password = 'emdbwzemyisymdnevznyqhqnklaqheaxszzvtnxjrmkb' @@ -93,6 +108,9 @@ def test_request_login_makes_a_post_request(self): def test_request_login_makes_a_post_request_with_device_id(self): self._verify_request_login_post_request(None, self.device_id, self.login_post_data_with_device_id) + def test_request_login_makes_a_post_request_with_trust_requested(self): + self._verify_request_trust(None, self.device_id, self.request_trust_data, cookies=self.request_trust_cookies, trust_id=self.trust_id, trust_me=True) + def test_request_login_makes_a_post_request_with_google_authenticator_code(self): self._verify_request_login_post_request(self.google_authenticator_code, None, @@ -104,7 +122,8 @@ def test_request_login_makes_a_post_request_with_yubikey_password(self): self.login_post_data_with_yubikey_password) def test_request_login_returns_a_session(self): - self.assertEqual(self._request_login_with_xml(''.format(self.session_id)), self.session) + tested_session = self._request_login_with_xml(''.format(self.session_id, self.token)) + self.assertEqual(tested_session, self.session) def test_request_login_raises_an_exception_on_http_error(self): self.assertRaises(lastpass.NetworkError, self._request_login_with_error) @@ -147,6 +166,14 @@ def test_request_login_raises_an_exception_on_missing_or_incorrect_yubikey_passw self.assertRaises(lastpass.LastPassIncorrectYubikeyPasswordError, self._request_login_with_lastpass_error, 'yubikeyrestricted', message) + def test_request_login_raises_an_exception_on_lastpass_authenticator(self): + message = 'Multifactor authentication required! ' \ + 'Upgrade your browser extension so you can enter it.' + self.assertRaises(lastpass.LastPassIncorrectOutOfBandRequiredError, + self._request_login_with_lastpass_multifactor_required, 'outofbandrequired', message) + self.assertRaises(lastpass.LastPassIncorrectMultiFactorResponseError, + self._request_login_with_lastpass_multifactor_required, 'multifactorresponsefailed', message) + def test_request_login_raises_an_exception_on_unknown_lastpass_error_without_a_message(self): cause = 'Unknown cause' self.assertRaises(lastpass.LastPassUnknownError, @@ -162,7 +189,8 @@ def test_fetch_makes_a_get_request(self): def test_fetch_returns_a_blob(self): m = mock.Mock() m.get.return_value = self._http_ok(self.blob_response) - self.assertEqual(fetcher.fetch(self.session, m), self.blob) + returned_blob = fetcher.fetch(self.session, m) + self.assertEqual(returned_blob, self.blob) def test_fetch_raises_exception_on_http_error(self): m = mock.Mock() @@ -197,12 +225,18 @@ def test_make_hash(self): for iterations, hash in hashes: self.assertEqual(hash, fetcher.make_hash('postlass@gmail.com', 'pl1234567890', iterations)) - def _verify_request_login_post_request(self, multifactor_password, device_id, post_data): + def _verify_request_login_post_request(self, multifactor_password, device_id, post_data, trust_me=False): m = mock.Mock() m.post.return_value = self._http_ok(''.format(self.session_id)) - fetcher.request_login(self.username, self.password, self.key_iteration_count, multifactor_password, device_id, m) + fetcher.request_login(self.username, self.password, self.key_iteration_count, multifactor_password, device_id, m, trust_id=device_id, trust_me=trust_me) m.post.assert_called_with('https://lastpass.com/login.php', data=post_data) + def _verify_request_trust(self, multifactor_password, device_id, post_data, cookies, trust_id, trust_me=False): + m = mock.Mock() + m.post.return_value = self._http_ok(''.format(self.session_id, self.token)) + fetcher.request_login(self.username, self.password, self.key_iteration_count, multifactor_password, device_id, m, trust_id=trust_id, trust_me=trust_me) + m.post.assert_called_with('https://lastpass.com/trust.php', data=post_data, cookies=cookies) + @staticmethod def _mock_response(code, body): m = mock.Mock() @@ -222,9 +256,19 @@ def _lastpass_error(cause, message): return ''.format(cause, message) return ''.format(cause) + @staticmethod + def _lastpass_multifactor_required(cause, message): + if message: + return ''.format(message, cause) + return ''.format(cause) + def _request_login_with_lastpass_error(self, cause, message=None): return self._request_login_with_xml(self._lastpass_error(cause, message)) + def _request_login_with_lastpass_multifactor_required(self, cause, message=None): + return self._request_login_with_xml( + self._lastpass_multifactor_required(cause, message)) + def _request_login_with_xml(self, text): return self._request_login_with_ok(text) From cb4cf24aa2b73b1ab72e2c14c1a61abeeff10597 Mon Sep 17 00:00:00 2001 From: Karthik Kumar Viswanathan Date: Tue, 28 Jul 2020 22:48:45 -0700 Subject: [PATCH 2/2] Import Local Blob fetching code, with changes from erikson1970/lastpass-python. Import SecureNote parsing code, with changes from nadavshalev/lastpass-python. Pylint pass and uncrustify code. Fix pycodestyle issues. --- .gitignore | 2 + example/example.py | 2 +- lastpass/account.py | 41 +++++++++- lastpass/blob.py | 4 +- lastpass/chunk.py | 4 +- lastpass/fetcher.py | 31 ++++---- lastpass/parser.py | 189 +++++++++++++++++++++++++++++++++----------- lastpass/session.py | 4 +- lastpass/vault.py | 48 +++++++++-- 9 files changed, 250 insertions(+), 75 deletions(-) diff --git a/.gitignore b/.gitignore index 70161f7..c9b71e2 100644 --- a/.gitignore +++ b/.gitignore @@ -4,7 +4,9 @@ example/credentials.json *.egg-info/ dist/ +build .tox +.eggs .python-version .idea/ diff --git a/example/example.py b/example/example.py index 34652e8..3481725 100755 --- a/example/example.py +++ b/example/example.py @@ -36,4 +36,4 @@ for index, i in enumerate(vault.accounts): - print("{} {} {} {} {} {} {} {}".format(index + 1, i.id, i.name, i.username, i.password, i.url, i.group, i.notes)) + print("{} {}".format(index + 1, str(i))) diff --git a/lastpass/account.py b/lastpass/account.py index 3e5701b..d16746f 100644 --- a/lastpass/account.py +++ b/lastpass/account.py @@ -1,10 +1,47 @@ # coding: utf-8 +import types + + class Account(object): - def __init__(self, id, name, username, password, url, group, notes=None): - self.id = id + """ + Lastpass Password Account + """ + def __init__(self, _id, name, username, password, url, group, notes=None): + self.id = _id self.name = name self.username = username self.password = password self.url = url self.group = group self.notes = notes + + def notes_string(self): + if type(self.notes) == bytes: + note_str = '{}'.format(self.notes.decode()) + else: + note_str = '{}'.format(str(self.notes)) + return note_str + + def fields(self): + result_fields = [] + for field in dir(self): + if not field.startswith('_') and not callable(getattr(self, field)): + result_fields.append(field) + return result_fields + + def __str__(self): + return "name: {}\n\tusername: {}\n\tpassword: {}\n\turl: {}\n\tgroup: {}\n\tnotes: {}".format(self.name, self.username, self.password, self.url, self.group, self.notes_string()) + + +class SecureNote(Account): + """ + Lastpass Secure Note + """ + def __init__(self): + pass + + def __str__(self): + try: + return getattr(self, 'unparsed_notes_0').decode() + except AttributeError: + return '\n'.join(['\t\t{}: {}'.format(field, getattr(self, field).decode()) for field in self.fields()]) diff --git a/lastpass/blob.py b/lastpass/blob.py index 5be27d5..6b1f7b9 100644 --- a/lastpass/blob.py +++ b/lastpass/blob.py @@ -1,7 +1,7 @@ # coding: utf-8 class Blob(object): - def __init__(self, bytes, key_iteration_count): - self.bytes = bytes + def __init__(self, bytes_, key_iteration_count): + self.bytes = bytes_ self.key_iteration_count = key_iteration_count def encryption_key(self, username, password): diff --git a/lastpass/chunk.py b/lastpass/chunk.py index a003319..16fb7ca 100644 --- a/lastpass/chunk.py +++ b/lastpass/chunk.py @@ -1,7 +1,7 @@ # coding: utf-8 class Chunk(object): - def __init__(self, id, payload): - self.id = id + def __init__(self, id_, payload): + self.id = id_ self.payload = payload def __eq__(self, other): diff --git a/lastpass/fetcher.py b/lastpass/fetcher.py index 7c588b7..859b7f1 100644 --- a/lastpass/fetcher.py +++ b/lastpass/fetcher.py @@ -4,8 +4,9 @@ import string from base64 import b64decode from binascii import hexlify -import requests from xml.etree import ElementTree as etree +import requests + from . import blob from .exceptions import ( NetworkError, @@ -59,7 +60,7 @@ def request_iteration_count(username, web_client=http): try: count = int(response.content) - except: + except Exception: raise InvalidResponseError('Key iteration count is invalid') if count > 0: @@ -123,7 +124,6 @@ def oob_login(web_client, parsed_response, body, key_iteration_count, trust_id): 'error') if 'outofbandname' not in error.attrib or 'capabilities' not in error.attrib: return (None, parsed_response) - oob_name = error.attrib['outofbandname'] oob_capabilities = error.attrib['capabilities'].split(',') can_do_passcode = 'passcode' in oob_capabilities if can_do_passcode and 'outofband' not in oob_capabilities: @@ -173,11 +173,12 @@ def create_session(parsed_response, key_iteration_count, trust_id): token = ok_response.attrib.get('token') if isinstance(session_id, str): return Session(session_id, key_iteration_count, token, trust_id) + return None def login_error(parsed_response): error = None if parsed_response.tag != 'response' else parsed_response.find('error') - if error is None or len(error.attrib) == 0: + if error is None or not error.attrib: raise UnknownResponseSchemaError() exceptions = { @@ -198,27 +199,25 @@ def login_error(parsed_response): return InvalidResponseError(message) -def decode_blob(blob): - return b64decode(blob) +def decode_blob(blob_): + return b64decode(blob_) def make_key(username, password, key_iteration_count): # type: (str, str, int) -> bytes if key_iteration_count == 1: return hashlib.sha256(username.encode('utf-8') + password.encode('utf-8')).digest() - else: - return hashlib.pbkdf2_hmac('sha256', password.encode('utf-8'), username.encode('utf-8'), key_iteration_count, 32) + return hashlib.pbkdf2_hmac('sha256', password.encode('utf-8'), username.encode('utf-8'), key_iteration_count, 32) def make_hash(username, password, key_iteration_count): # type: (str, str, int) -> bytes if key_iteration_count == 1: return bytearray(hashlib.sha256(hexlify(make_key(username, password, 1)) + password.encode('utf-8')).hexdigest(), 'ascii') - else: - return hexlify(hashlib.pbkdf2_hmac( - 'sha256', - make_key(username, password, key_iteration_count), - password.encode('utf-8'), - 1, - 32 - )) + return hexlify(hashlib.pbkdf2_hmac( + 'sha256', + make_key(username, password, key_iteration_count), + password.encode('utf-8'), + 1, + 32 + )) diff --git a/lastpass/parser.py b/lastpass/parser.py index b4261a8..c4e9f1d 100644 --- a/lastpass/parser.py +++ b/lastpass/parser.py @@ -1,27 +1,48 @@ # coding: utf-8 -from base64 import b64decode +from base64 import b64decode, b64encode import binascii import codecs from io import BytesIO +import os import struct import re +import zlib from Crypto.Cipher import AES, PKCS1_OAEP from Crypto.Util import number from Crypto.PublicKey import RSA -from .account import Account +from .account import Account, SecureNote from .chunk import Chunk # Secure note types that contain account-like information ALLOWED_SECURE_NOTE_TYPES = [ + None, b"Server", - b"Email Account", + b"SSH Key", b"Database", + b"Stripe Key", + b"Passport", + b"Membership", + b"Wi-Fi Password", + b"Software License", + b"Social Security", + b"Address", + b"Bank Account", + b"Credit Card", + b"Email Account", + b"Health Insurance", + b"Insurance", b"Instant Messenger", + b"Generic", + b"Custom", ] +random = os.urandom +compress = zlib.compress +decompress = zlib.decompress + def extract_chunks(blob): """Splits the blob into chucks grouped by kind.""" @@ -60,12 +81,13 @@ def parse_ACCT(chunk, encryption_key): # Parse secure note if secure_note == b'1': + secure_notes = SecureNote() parsed = parse_secure_note_server(notes) - - if parsed.get('type') in ALLOWED_SECURE_NOTE_TYPES: - url = parsed.get('url', url) - username = parsed.get('username', username) - password = parsed.get('password', password) + parsed_type = parsed.get('type') + if parsed_type in ALLOWED_SECURE_NOTE_TYPES: + for key in parsed: + setattr(secure_notes, key, parsed[key]) + notes = secure_notes return Account(id, name, username, password, url, group, notes) @@ -113,17 +135,28 @@ def parse_SHAR(chunk, encryption_key, rsa_key): def parse_secure_note_server(notes): info = {} - - for i in notes.split(b'\n'): - if not i: # blank line - continue - - if b':' not in i: # there is no `:` if generic note + last_field = None + unparsed_counter = 0 + + for line in notes.split(b'\n'): + + if not line: + if not last_field: + continue + + if b':' not in line: # there is no `:` if generic note + if not last_field: + last_field = 'unparsed_notes_{}'.format(unparsed_counter) + unparsed_counter = unparsed_counter + 1 + info[last_field] = b'' + if last_field: + old_bytes = info[last_field] + info[last_field] = (old_bytes.decode() + '\n' + line.decode()).encode() continue # Split only once so that strings like "Hostname:host.example.com:80" # get interpreted correctly - key, value = i.split(b':', 1) + key, value = line.split(b':', 1) if key == b'NoteType': info['type'] = value elif key == b'Hostname': @@ -132,6 +165,9 @@ def parse_secure_note_server(notes): info['username'] = value elif key == b'Password': info['password'] = value + else: + last_field = key.decode().strip() + info[last_field] = value return info @@ -163,7 +199,7 @@ def read_item(stream): def skip_item(stream, times=1): """Skips an item in a stream.""" - for i in range(times): + for _ in range(times): read_item(stream) @@ -195,9 +231,17 @@ def decode_hex(data): raise TypeError() -def decode_base64(data): +def decode_base64(b64data): """Decodes a base64 encoded string into raw bytes.""" - return b64decode(data) + # see http://passingcuriosity.com/2009/aes-encryption-in-python-with-m2crypto/ + data = b64decode(b64data) + return data + + +def encode_base64(data): + """Encodes raw bytes into a base64 encoded string.""" + b64data = b64encode(data) + return b64data def decode_aes256_plain_auto(data, encryption_key): @@ -207,10 +251,9 @@ def decode_aes256_plain_auto(data, encryption_key): if length == 0: return b'' - elif data[0] == b'!'[0] and length % 16 == 1 and length > 32: + if data[0] == b'!'[0] and length % 16 == 1 and length > 32: return decode_aes256_cbc_plain(data, encryption_key) - else: - return decode_aes256_ecb_plain(data, encryption_key) + return decode_aes256_ecb_plain(data, encryption_key) def decode_aes256_base64_auto(data, encryption_key): @@ -220,18 +263,16 @@ def decode_aes256_base64_auto(data, encryption_key): if length == 0: return b'' - elif data[0] == b'!'[0]: + if data[0] == b'!'[0]: return decode_aes256_cbc_base64(data, encryption_key) - else: - return decode_aes256_ecb_base64(data, encryption_key) + return decode_aes256_ecb_base64(data, encryption_key) def decode_aes256_ecb_plain(data, encryption_key): """Decrypts AES-256 ECB bytes.""" if not data: return b'' - else: - return decode_aes256('ecb', '', data, encryption_key) + return decode_aes256('ecb', '', data, encryption_key) def decode_aes256_ecb_base64(data, encryption_key): @@ -243,27 +284,73 @@ def decode_aes256_cbc_plain(data, encryption_key): """Decrypts AES-256 CBC bytes.""" if not data: return b'' - else: - # LastPass AES-256/CBC encryted string starts with an "!". - # Next 16 bytes are the IV for the cipher. - # And the rest is the encrypted payload. - return decode_aes256('cbc', data[1:17], data[17:], encryption_key) + # LastPass AES-256/CBC encryted string starts with an "!". + # Next 16 bytes are the IV for the cipher. + # And the rest is the encrypted payload. + return decode_aes256('cbc', data[1:17], data[17:], encryption_key) def decode_aes256_cbc_base64(data, encryption_key): """Decrypts base64 encoded AES-256 CBC bytes.""" if not data: return b'' - else: - # LastPass AES-256/CBC/base64 encryted string starts with an "!". - # Next 24 bytes are the base64 encoded IV for the cipher. - # Then comes the "|". - # And the rest is the base64 encoded encrypted payload. - return decode_aes256( - 'cbc', - decode_base64(data[1:25]), - decode_base64(data[26:]), - encryption_key) + # LastPass AES-256/CBC/base64 encryted string starts with an "!". + # Next 24 bytes are the base64 encoded IV for the cipher. + # Then comes the "|". + # And the rest is the base64 encoded encrypted payload. + return decode_aes256( + 'cbc', + decode_base64(data[1:25]), + decode_base64(data[26:]), + encryption_key) + + +def encode_aes256_cbc_base64(cleartext, encryption_key, iv): + """Encrypts base64 encoded AES-256 CBC bytes.""" + if not cleartext: + return b'' + # LastPass AES-256/CBC/base64 encryted string starts with an "!". + # Next 24 bytes are the base64 encoded IV for the cipher. + # Then comes the "|". + # And the rest is the base64 encoded encrypted payload. + return b'!' + b"%24s" % encode_base64(iv) + b'|' + encode_base64(encode_aes256('cbc', iv, cleartext, encryption_key)) + + +def pad(data): + """ + Pad Data to PKCS 5 Encoding. + """ + BS = 16 + # see http://passingcuriosity.com/2009/aes-encryption-in-python-with-m2crypto/ + padded = (BS - len(data) % BS) * chr(BS - len(data) % BS) + if isinstance(data, str): + try: + data = str.encode(data, 'latin1') + except Exception: + data = bytes(data) + if isinstance(padded, str): + try: + padded = str.encode(padded, 'latin1') + except Exception: + padded = bytes(data) + try: + result = bytes(data + padded) + except Exception: + result = data + padded + return result + + +def unpad(data): + """ + Unpad Data from PKCS 5 Encoding. + """ + # see http://passingcuriosity.com/2009/aes-encryption-in-python-with-m2crypto/ + if isinstance(data, str): + try: + data = str.encode(data, 'latin1') + except Exception: + data = bytes(data) + return data[0:-ord(data[-1:])] def decode_aes256(cipher, iv, data, encryption_key): @@ -278,7 +365,19 @@ def decode_aes256(cipher, iv, data, encryption_key): aes = AES.new(encryption_key, AES.MODE_ECB) else: raise ValueError('Unknown AES mode') - d = aes.decrypt(data) - # http://passingcuriosity.com/2009/aes-encryption-in-python-with-m2crypto/ - unpad = lambda s: s[0:-ord(d[-1:])] - return unpad(d) + return unpad(aes.decrypt(data)) + + +def encode_aes256(cipher, iv, data, encryption_key): + """ + Encrypt AES-256 bytes. + Allowed ciphers are: :ecb, :cbc. + If for :ecb iv is not used and should be set to "". + """ + if cipher == 'cbc': + aes = AES.new(encryption_key, AES.MODE_CBC, iv) + elif cipher == 'ecb': + aes = AES.new(encryption_key, AES.MODE_ECB) + else: + raise ValueError('Unknown AES mode') + return aes.encrypt(pad(data)) diff --git a/lastpass/session.py b/lastpass/session.py index f1dd14b..261c348 100644 --- a/lastpass/session.py +++ b/lastpass/session.py @@ -1,7 +1,7 @@ # coding: utf-8 class Session(object): - def __init__(self, id, key_iteration_count, token=None, trust_id=None): - self.id = id + def __init__(self, id_, key_iteration_count, token=None, trust_id=None): + self.id = id_ self.key_iteration_count = key_iteration_count self.token = token self.trust_id = trust_id diff --git a/lastpass/vault.py b/lastpass/vault.py index 85866e6..df88a9d 100644 --- a/lastpass/vault.py +++ b/lastpass/vault.py @@ -5,17 +5,20 @@ class Vault(object): + """ + Lastpass Vault + """ @classmethod - def open_remote(cls, username, password, multifactor_password=None, client_id=None, trust_id=None, trust_me=False): + def open_remote(cls, username, password, multifactor_password=None, client_id=None, trust_id=None, trust_me=False, blob_filename=None): """Fetches a blob from the server and creates a vault""" - (blob, trust_id) = cls.fetch_blob(username, password, multifactor_password, client_id, trust_id, trust_me) + (blob, trust_id) = cls.fetch_blob(username, password, multifactor_password, client_id, trust_id, trust_me, blob_filename) return cls.open(blob, username, password, trust_id) @classmethod def open_local(cls, blob_filename, username, password): """Creates a vault from a locally stored blob""" - # TODO: read the blob here - raise NotImplementedError() + blob = cls.read_blob_local(username, password, blob_filename) + return cls.open(blob, username, password) @classmethod def open(cls, blob, username, password, trust_id=None): @@ -23,14 +26,45 @@ def open(cls, blob, username, password, trust_id=None): return cls(blob, blob.encryption_key(username, password), trust_id) @classmethod - def fetch_blob(cls, username, password, multifactor_password=None, client_id=None, trust_id=None, trust_me=False): + def fetch_blob(cls, username, password, multifactor_password=None, client_id=None, trust_id=None, trust_me=False, blob_filename=None): """Just fetches the blob, could be used to store it locally""" session = fetcher.login(username, password, multifactor_password, client_id, trust_id=trust_id, trust_me=trust_me) blob = fetcher.fetch(session) fetcher.logout(session) + if blob_filename: + cls.write_blob_local(blob, username, password, blob_filename) return (blob, session.trust_id) + @classmethod + def read_blob_local(cls, username, password, blob_filename): + """Read and decode a blob from a local file """ + with open(blob_filename, 'r') as fp: + file_data = fp.read().replace('\n', '').strip() + username_read = file_data[4:104].strip() + assert username == username_read + key_iteration_count = 10 + decoding_key = fetcher.make_key(username, password, key_iteration_count) + inner_decoded = parser.decode_aes256_cbc_base64(file_data[105:], decoding_key) + key_iteration_count = int(inner_decoded[1:17]) + decoding_key = fetcher.make_key(username, password, key_iteration_count) + decoded = parser.decompress(parser.decode_aes256_cbc_base64(inner_decoded[17:], decoding_key)) + blob = fetcher.blob.Blob(decoded, key_iteration_count) + return blob + + @classmethod + def write_blob_local(cls, blob, username, password, blob_filename): + """write a blob to a local file""" + key = fetcher.make_key(username, password, blob.key_iteration_count) + iv = b"\x00" + parser.random(14) + b"\x00" + inner_encoded = "#" + "%16d" % blob.key_iteration_count + parser.encode_aes256_cbc_base64(parser.compress(blob.bytes), key, iv).decode() + iv = b"\x00" + parser.random(14) + b"\x00" + key_iteration_count = 10 + key = fetcher.make_key(username, password, key_iteration_count) + file_data = "BLOB" + "%100s" % username + "#" + parser.encode_aes256_cbc_base64(inner_encoded, key, iv).decode() + with open(blob_filename, 'w') as fp: + fp.write(file_data) + def __init__(self, blob, encryption_key, trust_id=None): """This more of an internal method, use one of the static constructors instead""" chunks = parser.extract_chunks(blob) @@ -41,10 +75,14 @@ def __init__(self, blob, encryption_key, trust_id=None): self.accounts = self.parse_accounts(chunks, encryption_key) self.trust_id = trust_id + @classmethod def is_complete(self, chunks): + "Is Chunk Complete" return len(chunks) > 0 and chunks[-1].id == b'ENDM' and chunks[-1].payload == b'OK' + @classmethod def parse_accounts(self, chunks, encryption_key): + "Parse Account" accounts = [] key = encryption_key