diff --git a/.gitignore b/.gitignore
index e0f5d11..35bbfd5 100644
--- a/.gitignore
+++ b/.gitignore
@@ -4,7 +4,9 @@ example/credentials.json
*.egg-info/
dist/
+build
.tox
+.eggs
.python-version
.idea/
diff --git a/example/example.py b/example/example.py
index 34652e8..3481725 100755
--- a/example/example.py
+++ b/example/example.py
@@ -36,4 +36,4 @@
for index, i in enumerate(vault.accounts):
- print("{} {} {} {} {} {} {} {}".format(index + 1, i.id, i.name, i.username, i.password, i.url, i.group, i.notes))
+ print("{} {}".format(index + 1, str(i)))
diff --git a/lastpass/account.py b/lastpass/account.py
index 3e5701b..d16746f 100644
--- a/lastpass/account.py
+++ b/lastpass/account.py
@@ -1,10 +1,47 @@
# coding: utf-8
+import types
+
+
class Account(object):
- def __init__(self, id, name, username, password, url, group, notes=None):
- self.id = id
+ """
+ Lastpass Password Account
+ """
+ def __init__(self, _id, name, username, password, url, group, notes=None):
+ self.id = _id
self.name = name
self.username = username
self.password = password
self.url = url
self.group = group
self.notes = notes
+
+ def notes_string(self):
+ if type(self.notes) == bytes:
+ note_str = '{}'.format(self.notes.decode())
+ else:
+ note_str = '{}'.format(str(self.notes))
+ return note_str
+
+ def fields(self):
+ result_fields = []
+ for field in dir(self):
+ if not field.startswith('_') and not callable(getattr(self, field)):
+ result_fields.append(field)
+ return result_fields
+
+ def __str__(self):
+ return "name: {}\n\tusername: {}\n\tpassword: {}\n\turl: {}\n\tgroup: {}\n\tnotes: {}".format(self.name, self.username, self.password, self.url, self.group, self.notes_string())
+
+
+class SecureNote(Account):
+ """
+ Lastpass Secure Note
+ """
+ def __init__(self):
+ pass
+
+ def __str__(self):
+ try:
+ return getattr(self, 'unparsed_notes_0').decode()
+ except AttributeError:
+ return '\n'.join(['\t\t{}: {}'.format(field, getattr(self, field).decode()) for field in self.fields()])
diff --git a/lastpass/blob.py b/lastpass/blob.py
index 5be27d5..6b1f7b9 100644
--- a/lastpass/blob.py
+++ b/lastpass/blob.py
@@ -1,7 +1,7 @@
# coding: utf-8
class Blob(object):
- def __init__(self, bytes, key_iteration_count):
- self.bytes = bytes
+ def __init__(self, bytes_, key_iteration_count):
+ self.bytes = bytes_
self.key_iteration_count = key_iteration_count
def encryption_key(self, username, password):
diff --git a/lastpass/chunk.py b/lastpass/chunk.py
index a003319..16fb7ca 100644
--- a/lastpass/chunk.py
+++ b/lastpass/chunk.py
@@ -1,7 +1,7 @@
# coding: utf-8
class Chunk(object):
- def __init__(self, id, payload):
- self.id = id
+ def __init__(self, id_, payload):
+ self.id = id_
self.payload = payload
def __eq__(self, other):
diff --git a/lastpass/exceptions.py b/lastpass/exceptions.py
index 0e6f11a..8ab8e9d 100644
--- a/lastpass/exceptions.py
+++ b/lastpass/exceptions.py
@@ -49,6 +49,16 @@ class LastPassIncorrectYubikeyPasswordError(Error):
pass
+class LastPassIncorrectOutOfBandRequiredError(Error):
+ """LastPass error: need to provide out of band authentication (e.g, LastPass Authenticator)"""
+ pass
+
+
+class LastPassIncorrectMultiFactorResponseError(Error):
+ """LastPass error: Multifactor response failed (wrong code or denied)"""
+ pass
+
+
class LastPassUnknownError(Error):
"""LastPass error we don't know about"""
pass
diff --git a/lastpass/fetcher.py b/lastpass/fetcher.py
index e8e33b5..5a8a8f8 100644
--- a/lastpass/fetcher.py
+++ b/lastpass/fetcher.py
@@ -1,9 +1,12 @@
# coding: utf-8
import hashlib
+import random
+import string
from base64 import b64decode
from binascii import hexlify
-import requests
from xml.etree import ElementTree as etree
+import requests
+
from . import blob
from .version import __version__
from .exceptions import (
@@ -14,6 +17,8 @@
LastPassInvalidPasswordError,
LastPassIncorrectGoogleAuthenticatorCodeError,
LastPassIncorrectYubikeyPasswordError,
+ LastPassIncorrectOutOfBandRequiredError,
+ LastPassIncorrectMultiFactorResponseError,
LastPassUnknownError
)
from .session import Session
@@ -23,9 +28,9 @@
headers = {'user-agent': 'lastpass-python/{}'.format(__version__)}
-def login(username, password, multifactor_password=None, client_id=None):
+def login(username, password, multifactor_password=None, client_id=None, trust_id=None, trust_me=False):
key_iteration_count = request_iteration_count(username)
- return request_login(username, password, key_iteration_count, multifactor_password, client_id)
+ return request_login(username, password, key_iteration_count, multifactor_password, client_id, trust_id=trust_id, trust_me=trust_me)
def logout(session, web_client=http):
@@ -58,7 +63,7 @@ def request_iteration_count(username, web_client=http):
try:
count = int(response.content)
- except:
+ except Exception:
raise InvalidResponseError('Key iteration count is invalid')
if count > 0:
@@ -66,21 +71,28 @@ def request_iteration_count(username, web_client=http):
raise InvalidResponseError('Key iteration count is not positive')
-def request_login(username, password, key_iteration_count, multifactor_password=None, client_id=None, web_client=http):
+def request_login(username, password, key_iteration_count, multifactor_password=None, client_id=None, web_client=http, trust_id=None, trust_me=False):
body = {
- 'method': 'mobile',
- 'web': 1,
- 'xml': 1,
+ 'method': 'cli',
+ 'xml': 2,
'username': username,
'hash': make_hash(username, password, key_iteration_count),
'iterations': key_iteration_count,
+ 'includeprivatekeyenc': 1,
+ 'outofbandsupported': 1
}
if multifactor_password:
body['otp'] = multifactor_password
+ if trust_me and not trust_id:
+ trust_id = generate_trust_id()
+
+ if trust_id:
+ body['uuid'] = trust_id
+
if client_id:
- body['imei'] = client_id
+ body['trustlabel'] = client_id
response = web_client.post('https://lastpass.com/login.php',
data=body,
@@ -97,22 +109,80 @@ def request_login(username, password, key_iteration_count, multifactor_password=
if parsed_response is None:
raise InvalidResponseError()
- session = create_session(parsed_response, key_iteration_count)
+ session = create_session(parsed_response, key_iteration_count, trust_id)
if not session:
- raise login_error(parsed_response)
+ try:
+ raise login_error(parsed_response)
+ except LastPassIncorrectOutOfBandRequiredError:
+ (session, parsed_response) = oob_login(web_client, parsed_response, body, key_iteration_count, trust_id)
+ if not session:
+ raise login_error(parsed_response)
+ if trust_me:
+ response = web_client.post('https://lastpass.com/trust.php', cookies={'PHPSESSID': session.id}, data={"token": session.token, "uuid": trust_id, "trustlabel": client_id})
+
return session
-def create_session(parsed_response, key_iteration_count):
+def oob_login(web_client, parsed_response, body, key_iteration_count, trust_id):
+ error = None if parsed_response.tag != 'response' else parsed_response.find(
+ 'error')
+ if 'outofbandname' not in error.attrib or 'capabilities' not in error.attrib:
+ return (None, parsed_response)
+ oob_capabilities = error.attrib['capabilities'].split(',')
+ can_do_passcode = 'passcode' in oob_capabilities
+ if can_do_passcode and 'outofband' not in oob_capabilities:
+ return (None, parsed_response)
+ body['outofbandrequest'] = '1'
+ retries = 0
+ # loop waiting for out of band approval, or failure
+ while retries < 5:
+ retries += 1
+ response = web_client.post("https://lastpass.com/login.php", data=body)
+ if response.status_code != requests.codes.ok:
+ raise NetworkError()
+
+ try:
+ parsed_response = etree.fromstring(response.content)
+ except etree.ParseError:
+ parsed_response = None
+
+ if parsed_response is None:
+ raise InvalidResponseError()
+
+ session = create_session(parsed_response, key_iteration_count, trust_id)
+ if session:
+ return (session, parsed_response)
+ error = None if parsed_response.tag != 'response' else parsed_response.find(
+ 'error')
+ if 'cause' in error.attrib and error.attrib['cause'] == 'outofbandrequired':
+ if 'retryid' in error.attrib:
+ body['outofbandretryid'] = error.attrib['retryid']
+ body['outofbandretry'] = "1"
+ continue
+ return (None, parsed_response)
+ return (None, parsed_response)
+
+
+def generate_trust_id():
+ return ''.join(random.choice(string.ascii_uppercase + string.digits + string.ascii_lowercase + "!@#$") for _ in range(32))
+
+
+def create_session(parsed_response, key_iteration_count, trust_id):
if parsed_response.tag == 'ok':
- session_id = parsed_response.attrib.get('sessionid')
+ ok_response = parsed_response
+ else:
+ ok_response = parsed_response.find("ok")
+ if ok_response is not None:
+ session_id = ok_response.attrib.get('sessionid')
+ token = ok_response.attrib.get('token')
if isinstance(session_id, str):
- return Session(session_id, key_iteration_count)
+ return Session(session_id, key_iteration_count, token, trust_id)
+ return None
def login_error(parsed_response):
error = None if parsed_response.tag != 'response' else parsed_response.find('error')
- if error is None or len(error.attrib) == 0:
+ if error is None or not error.attrib:
raise UnknownResponseSchemaError()
exceptions = {
@@ -121,6 +191,8 @@ def login_error(parsed_response):
"googleauthrequired": LastPassIncorrectGoogleAuthenticatorCodeError,
"googleauthfailed": LastPassIncorrectGoogleAuthenticatorCodeError,
"yubikeyrestricted": LastPassIncorrectYubikeyPasswordError,
+ "outofbandrequired": LastPassIncorrectOutOfBandRequiredError,
+ "multifactorresponsefailed": LastPassIncorrectMultiFactorResponseError,
}
cause = error.attrib.get('cause')
@@ -131,27 +203,25 @@ def login_error(parsed_response):
return InvalidResponseError(message)
-def decode_blob(blob):
- return b64decode(blob)
+def decode_blob(blob_):
+ return b64decode(blob_)
def make_key(username, password, key_iteration_count):
# type: (str, str, int) -> bytes
if key_iteration_count == 1:
return hashlib.sha256(username.encode('utf-8') + password.encode('utf-8')).digest()
- else:
- return hashlib.pbkdf2_hmac('sha256', password.encode('utf-8'), username.encode('utf-8'), key_iteration_count, 32)
+ return hashlib.pbkdf2_hmac('sha256', password.encode('utf-8'), username.encode('utf-8'), key_iteration_count, 32)
def make_hash(username, password, key_iteration_count):
# type: (str, str, int) -> bytes
if key_iteration_count == 1:
return bytearray(hashlib.sha256(hexlify(make_key(username, password, 1)) + password.encode('utf-8')).hexdigest(), 'ascii')
- else:
- return hexlify(hashlib.pbkdf2_hmac(
- 'sha256',
- make_key(username, password, key_iteration_count),
- password.encode('utf-8'),
- 1,
- 32
- ))
+ return hexlify(hashlib.pbkdf2_hmac(
+ 'sha256',
+ make_key(username, password, key_iteration_count),
+ password.encode('utf-8'),
+ 1,
+ 32
+ ))
diff --git a/lastpass/parser.py b/lastpass/parser.py
index b4261a8..c4e9f1d 100644
--- a/lastpass/parser.py
+++ b/lastpass/parser.py
@@ -1,27 +1,48 @@
# coding: utf-8
-from base64 import b64decode
+from base64 import b64decode, b64encode
import binascii
import codecs
from io import BytesIO
+import os
import struct
import re
+import zlib
from Crypto.Cipher import AES, PKCS1_OAEP
from Crypto.Util import number
from Crypto.PublicKey import RSA
-from .account import Account
+from .account import Account, SecureNote
from .chunk import Chunk
# Secure note types that contain account-like information
ALLOWED_SECURE_NOTE_TYPES = [
+ None,
b"Server",
- b"Email Account",
+ b"SSH Key",
b"Database",
+ b"Stripe Key",
+ b"Passport",
+ b"Membership",
+ b"Wi-Fi Password",
+ b"Software License",
+ b"Social Security",
+ b"Address",
+ b"Bank Account",
+ b"Credit Card",
+ b"Email Account",
+ b"Health Insurance",
+ b"Insurance",
b"Instant Messenger",
+ b"Generic",
+ b"Custom",
]
+random = os.urandom
+compress = zlib.compress
+decompress = zlib.decompress
+
def extract_chunks(blob):
"""Splits the blob into chucks grouped by kind."""
@@ -60,12 +81,13 @@ def parse_ACCT(chunk, encryption_key):
# Parse secure note
if secure_note == b'1':
+ secure_notes = SecureNote()
parsed = parse_secure_note_server(notes)
-
- if parsed.get('type') in ALLOWED_SECURE_NOTE_TYPES:
- url = parsed.get('url', url)
- username = parsed.get('username', username)
- password = parsed.get('password', password)
+ parsed_type = parsed.get('type')
+ if parsed_type in ALLOWED_SECURE_NOTE_TYPES:
+ for key in parsed:
+ setattr(secure_notes, key, parsed[key])
+ notes = secure_notes
return Account(id, name, username, password, url, group, notes)
@@ -113,17 +135,28 @@ def parse_SHAR(chunk, encryption_key, rsa_key):
def parse_secure_note_server(notes):
info = {}
-
- for i in notes.split(b'\n'):
- if not i: # blank line
- continue
-
- if b':' not in i: # there is no `:` if generic note
+ last_field = None
+ unparsed_counter = 0
+
+ for line in notes.split(b'\n'):
+
+ if not line:
+ if not last_field:
+ continue
+
+ if b':' not in line: # there is no `:` if generic note
+ if not last_field:
+ last_field = 'unparsed_notes_{}'.format(unparsed_counter)
+ unparsed_counter = unparsed_counter + 1
+ info[last_field] = b''
+ if last_field:
+ old_bytes = info[last_field]
+ info[last_field] = (old_bytes.decode() + '\n' + line.decode()).encode()
continue
# Split only once so that strings like "Hostname:host.example.com:80"
# get interpreted correctly
- key, value = i.split(b':', 1)
+ key, value = line.split(b':', 1)
if key == b'NoteType':
info['type'] = value
elif key == b'Hostname':
@@ -132,6 +165,9 @@ def parse_secure_note_server(notes):
info['username'] = value
elif key == b'Password':
info['password'] = value
+ else:
+ last_field = key.decode().strip()
+ info[last_field] = value
return info
@@ -163,7 +199,7 @@ def read_item(stream):
def skip_item(stream, times=1):
"""Skips an item in a stream."""
- for i in range(times):
+ for _ in range(times):
read_item(stream)
@@ -195,9 +231,17 @@ def decode_hex(data):
raise TypeError()
-def decode_base64(data):
+def decode_base64(b64data):
"""Decodes a base64 encoded string into raw bytes."""
- return b64decode(data)
+ # see http://passingcuriosity.com/2009/aes-encryption-in-python-with-m2crypto/
+ data = b64decode(b64data)
+ return data
+
+
+def encode_base64(data):
+ """Encodes raw bytes into a base64 encoded string."""
+ b64data = b64encode(data)
+ return b64data
def decode_aes256_plain_auto(data, encryption_key):
@@ -207,10 +251,9 @@ def decode_aes256_plain_auto(data, encryption_key):
if length == 0:
return b''
- elif data[0] == b'!'[0] and length % 16 == 1 and length > 32:
+ if data[0] == b'!'[0] and length % 16 == 1 and length > 32:
return decode_aes256_cbc_plain(data, encryption_key)
- else:
- return decode_aes256_ecb_plain(data, encryption_key)
+ return decode_aes256_ecb_plain(data, encryption_key)
def decode_aes256_base64_auto(data, encryption_key):
@@ -220,18 +263,16 @@ def decode_aes256_base64_auto(data, encryption_key):
if length == 0:
return b''
- elif data[0] == b'!'[0]:
+ if data[0] == b'!'[0]:
return decode_aes256_cbc_base64(data, encryption_key)
- else:
- return decode_aes256_ecb_base64(data, encryption_key)
+ return decode_aes256_ecb_base64(data, encryption_key)
def decode_aes256_ecb_plain(data, encryption_key):
"""Decrypts AES-256 ECB bytes."""
if not data:
return b''
- else:
- return decode_aes256('ecb', '', data, encryption_key)
+ return decode_aes256('ecb', '', data, encryption_key)
def decode_aes256_ecb_base64(data, encryption_key):
@@ -243,27 +284,73 @@ def decode_aes256_cbc_plain(data, encryption_key):
"""Decrypts AES-256 CBC bytes."""
if not data:
return b''
- else:
- # LastPass AES-256/CBC encryted string starts with an "!".
- # Next 16 bytes are the IV for the cipher.
- # And the rest is the encrypted payload.
- return decode_aes256('cbc', data[1:17], data[17:], encryption_key)
+ # LastPass AES-256/CBC encryted string starts with an "!".
+ # Next 16 bytes are the IV for the cipher.
+ # And the rest is the encrypted payload.
+ return decode_aes256('cbc', data[1:17], data[17:], encryption_key)
def decode_aes256_cbc_base64(data, encryption_key):
"""Decrypts base64 encoded AES-256 CBC bytes."""
if not data:
return b''
- else:
- # LastPass AES-256/CBC/base64 encryted string starts with an "!".
- # Next 24 bytes are the base64 encoded IV for the cipher.
- # Then comes the "|".
- # And the rest is the base64 encoded encrypted payload.
- return decode_aes256(
- 'cbc',
- decode_base64(data[1:25]),
- decode_base64(data[26:]),
- encryption_key)
+ # LastPass AES-256/CBC/base64 encryted string starts with an "!".
+ # Next 24 bytes are the base64 encoded IV for the cipher.
+ # Then comes the "|".
+ # And the rest is the base64 encoded encrypted payload.
+ return decode_aes256(
+ 'cbc',
+ decode_base64(data[1:25]),
+ decode_base64(data[26:]),
+ encryption_key)
+
+
+def encode_aes256_cbc_base64(cleartext, encryption_key, iv):
+ """Encrypts base64 encoded AES-256 CBC bytes."""
+ if not cleartext:
+ return b''
+ # LastPass AES-256/CBC/base64 encryted string starts with an "!".
+ # Next 24 bytes are the base64 encoded IV for the cipher.
+ # Then comes the "|".
+ # And the rest is the base64 encoded encrypted payload.
+ return b'!' + b"%24s" % encode_base64(iv) + b'|' + encode_base64(encode_aes256('cbc', iv, cleartext, encryption_key))
+
+
+def pad(data):
+ """
+ Pad Data to PKCS 5 Encoding.
+ """
+ BS = 16
+ # see http://passingcuriosity.com/2009/aes-encryption-in-python-with-m2crypto/
+ padded = (BS - len(data) % BS) * chr(BS - len(data) % BS)
+ if isinstance(data, str):
+ try:
+ data = str.encode(data, 'latin1')
+ except Exception:
+ data = bytes(data)
+ if isinstance(padded, str):
+ try:
+ padded = str.encode(padded, 'latin1')
+ except Exception:
+ padded = bytes(data)
+ try:
+ result = bytes(data + padded)
+ except Exception:
+ result = data + padded
+ return result
+
+
+def unpad(data):
+ """
+ Unpad Data from PKCS 5 Encoding.
+ """
+ # see http://passingcuriosity.com/2009/aes-encryption-in-python-with-m2crypto/
+ if isinstance(data, str):
+ try:
+ data = str.encode(data, 'latin1')
+ except Exception:
+ data = bytes(data)
+ return data[0:-ord(data[-1:])]
def decode_aes256(cipher, iv, data, encryption_key):
@@ -278,7 +365,19 @@ def decode_aes256(cipher, iv, data, encryption_key):
aes = AES.new(encryption_key, AES.MODE_ECB)
else:
raise ValueError('Unknown AES mode')
- d = aes.decrypt(data)
- # http://passingcuriosity.com/2009/aes-encryption-in-python-with-m2crypto/
- unpad = lambda s: s[0:-ord(d[-1:])]
- return unpad(d)
+ return unpad(aes.decrypt(data))
+
+
+def encode_aes256(cipher, iv, data, encryption_key):
+ """
+ Encrypt AES-256 bytes.
+ Allowed ciphers are: :ecb, :cbc.
+ If for :ecb iv is not used and should be set to "".
+ """
+ if cipher == 'cbc':
+ aes = AES.new(encryption_key, AES.MODE_CBC, iv)
+ elif cipher == 'ecb':
+ aes = AES.new(encryption_key, AES.MODE_ECB)
+ else:
+ raise ValueError('Unknown AES mode')
+ return aes.encrypt(pad(data))
diff --git a/lastpass/session.py b/lastpass/session.py
index f68a0f4..261c348 100644
--- a/lastpass/session.py
+++ b/lastpass/session.py
@@ -1,8 +1,10 @@
# coding: utf-8
class Session(object):
- def __init__(self, id, key_iteration_count):
- self.id = id
+ def __init__(self, id_, key_iteration_count, token=None, trust_id=None):
+ self.id = id_
self.key_iteration_count = key_iteration_count
+ self.token = token
+ self.trust_id = trust_id
def __eq__(self, other):
- return self.id == other.id and self.key_iteration_count == other.key_iteration_count
+ return self.id == other.id and self.key_iteration_count == other.key_iteration_count and self.token == other.token
diff --git a/lastpass/vault.py b/lastpass/vault.py
index 1653eff..df88a9d 100644
--- a/lastpass/vault.py
+++ b/lastpass/vault.py
@@ -5,33 +5,67 @@
class Vault(object):
+ """
+ Lastpass Vault
+ """
@classmethod
- def open_remote(cls, username, password, multifactor_password=None, client_id=None):
+ def open_remote(cls, username, password, multifactor_password=None, client_id=None, trust_id=None, trust_me=False, blob_filename=None):
"""Fetches a blob from the server and creates a vault"""
- blob = cls.fetch_blob(username, password, multifactor_password, client_id)
- return cls.open(blob, username, password)
+ (blob, trust_id) = cls.fetch_blob(username, password, multifactor_password, client_id, trust_id, trust_me, blob_filename)
+ return cls.open(blob, username, password, trust_id)
@classmethod
def open_local(cls, blob_filename, username, password):
"""Creates a vault from a locally stored blob"""
- # TODO: read the blob here
- raise NotImplementedError()
+ blob = cls.read_blob_local(username, password, blob_filename)
+ return cls.open(blob, username, password)
@classmethod
- def open(cls, blob, username, password):
+ def open(cls, blob, username, password, trust_id=None):
"""Creates a vault from a blob object"""
- return cls(blob, blob.encryption_key(username, password))
+ return cls(blob, blob.encryption_key(username, password), trust_id)
@classmethod
- def fetch_blob(cls, username, password, multifactor_password=None, client_id=None):
+ def fetch_blob(cls, username, password, multifactor_password=None, client_id=None, trust_id=None, trust_me=False, blob_filename=None):
"""Just fetches the blob, could be used to store it locally"""
- session = fetcher.login(username, password, multifactor_password, client_id)
+ session = fetcher.login(username, password, multifactor_password, client_id, trust_id=trust_id, trust_me=trust_me)
blob = fetcher.fetch(session)
fetcher.logout(session)
+ if blob_filename:
+ cls.write_blob_local(blob, username, password, blob_filename)
+ return (blob, session.trust_id)
+
+ @classmethod
+ def read_blob_local(cls, username, password, blob_filename):
+ """Read and decode a blob from a local file """
+ with open(blob_filename, 'r') as fp:
+ file_data = fp.read().replace('\n', '').strip()
+ username_read = file_data[4:104].strip()
+ assert username == username_read
+ key_iteration_count = 10
+ decoding_key = fetcher.make_key(username, password, key_iteration_count)
+ inner_decoded = parser.decode_aes256_cbc_base64(file_data[105:], decoding_key)
+ key_iteration_count = int(inner_decoded[1:17])
+ decoding_key = fetcher.make_key(username, password, key_iteration_count)
+ decoded = parser.decompress(parser.decode_aes256_cbc_base64(inner_decoded[17:], decoding_key))
+ blob = fetcher.blob.Blob(decoded, key_iteration_count)
return blob
- def __init__(self, blob, encryption_key):
+ @classmethod
+ def write_blob_local(cls, blob, username, password, blob_filename):
+ """write a blob to a local file"""
+ key = fetcher.make_key(username, password, blob.key_iteration_count)
+ iv = b"\x00" + parser.random(14) + b"\x00"
+ inner_encoded = "#" + "%16d" % blob.key_iteration_count + parser.encode_aes256_cbc_base64(parser.compress(blob.bytes), key, iv).decode()
+ iv = b"\x00" + parser.random(14) + b"\x00"
+ key_iteration_count = 10
+ key = fetcher.make_key(username, password, key_iteration_count)
+ file_data = "BLOB" + "%100s" % username + "#" + parser.encode_aes256_cbc_base64(inner_encoded, key, iv).decode()
+ with open(blob_filename, 'w') as fp:
+ fp.write(file_data)
+
+ def __init__(self, blob, encryption_key, trust_id=None):
"""This more of an internal method, use one of the static constructors instead"""
chunks = parser.extract_chunks(blob)
@@ -39,11 +73,16 @@ def __init__(self, blob, encryption_key):
raise InvalidResponseError('Blob is truncated')
self.accounts = self.parse_accounts(chunks, encryption_key)
+ self.trust_id = trust_id
+ @classmethod
def is_complete(self, chunks):
+ "Is Chunk Complete"
return len(chunks) > 0 and chunks[-1].id == b'ENDM' and chunks[-1].payload == b'OK'
+ @classmethod
def parse_accounts(self, chunks, encryption_key):
+ "Parse Account"
accounts = []
key = encryption_key
diff --git a/tests/test_fetcher.py b/tests/test_fetcher.py
index d2f716f..bdcd105 100644
--- a/tests/test_fetcher.py
+++ b/tests/test_fetcher.py
@@ -16,22 +16,37 @@ def setUp(self):
self.hash = b'7880a04588cfab954aa1a2da98fd9c0d2c6eba4c53e36a94510e6dbf30759256'
self.session_id = '53ru,Hb713QnEVM5zWZ16jMvxS0'
- self.session = Session(self.session_id, self.key_iteration_count)
+ self.token = '54aa1a2da98fd9c0d2c6eba4c5'
+ self.session = Session(self.session_id, self.key_iteration_count, token=self.token)
self.blob_response = 'TFBBVgAAAAMxMjJQUkVNAAAACjE0MTQ5'
self.blob_bytes = b64decode(self.blob_response)
self.blob = Blob(self.blob_bytes, self.key_iteration_count)
- self.login_post_data = {'method': 'mobile',
- 'web': 1,
- 'xml': 1,
+ self.login_post_data = {'method': 'cli',
+ 'xml': 2,
+ 'outofbandsupported': 1,
+ 'includeprivatekeyenc': 1,
'username': self.username,
'hash': self.hash,
'iterations': self.key_iteration_count}
self.device_id = '492378378052455'
self.login_post_data_with_device_id = self.login_post_data.copy()
- self.login_post_data_with_device_id.update({'imei': self.device_id})
+ self.login_post_data_with_device_id.update({'trustlabel': self.device_id})
+ self.login_post_data_with_device_id.update({'uuid': self.device_id})
+
+ self.trust_id = '@2ykJ0Tp#dVi06qh6g6kvzOqjQGAWfKv'
+
+ self.request_trust_data = {
+ 'token': self.token,
+ 'trustlabel': self.device_id,
+ 'uuid': self.trust_id
+ }
+
+ self.request_trust_cookies = {
+ 'PHPSESSID': self.session_id
+ }
self.google_authenticator_code = '12345'
self.yubikey_password = 'emdbwzemyisymdnevznyqhqnklaqheaxszzvtnxjrmkb'
@@ -95,6 +110,9 @@ def test_request_login_makes_a_post_request(self):
def test_request_login_makes_a_post_request_with_device_id(self):
self._verify_request_login_post_request(None, self.device_id, self.login_post_data_with_device_id)
+ def test_request_login_makes_a_post_request_with_trust_requested(self):
+ self._verify_request_trust(None, self.device_id, self.request_trust_data, cookies=self.request_trust_cookies, trust_id=self.trust_id, trust_me=True)
+
def test_request_login_makes_a_post_request_with_google_authenticator_code(self):
self._verify_request_login_post_request(self.google_authenticator_code,
None,
@@ -106,7 +124,8 @@ def test_request_login_makes_a_post_request_with_yubikey_password(self):
self.login_post_data_with_yubikey_password)
def test_request_login_returns_a_session(self):
- self.assertEqual(self._request_login_with_xml(''.format(self.session_id)), self.session)
+ tested_session = self._request_login_with_xml(''.format(self.session_id, self.token))
+ self.assertEqual(tested_session, self.session)
def test_request_login_raises_an_exception_on_http_error(self):
self.assertRaises(lastpass.NetworkError, self._request_login_with_error)
@@ -149,6 +168,14 @@ def test_request_login_raises_an_exception_on_missing_or_incorrect_yubikey_passw
self.assertRaises(lastpass.LastPassIncorrectYubikeyPasswordError,
self._request_login_with_lastpass_error, 'yubikeyrestricted', message)
+ def test_request_login_raises_an_exception_on_lastpass_authenticator(self):
+ message = 'Multifactor authentication required! ' \
+ 'Upgrade your browser extension so you can enter it.'
+ self.assertRaises(lastpass.LastPassIncorrectOutOfBandRequiredError,
+ self._request_login_with_lastpass_multifactor_required, 'outofbandrequired', message)
+ self.assertRaises(lastpass.LastPassIncorrectMultiFactorResponseError,
+ self._request_login_with_lastpass_multifactor_required, 'multifactorresponsefailed', message)
+
def test_request_login_raises_an_exception_on_unknown_lastpass_error_without_a_message(self):
cause = 'Unknown cause'
self.assertRaises(lastpass.LastPassUnknownError,
@@ -164,7 +191,8 @@ def test_fetch_makes_a_get_request(self):
def test_fetch_returns_a_blob(self):
m = mock.Mock()
m.get.return_value = self._http_ok(self.blob_response)
- self.assertEqual(fetcher.fetch(self.session, m), self.blob)
+ returned_blob = fetcher.fetch(self.session, m)
+ self.assertEqual(returned_blob, self.blob)
def test_fetch_raises_exception_on_http_error(self):
m = mock.Mock()
@@ -199,14 +227,20 @@ def test_make_hash(self):
for iterations, hash in hashes:
self.assertEqual(hash, fetcher.make_hash('postlass@gmail.com', 'pl1234567890', iterations))
- def _verify_request_login_post_request(self, multifactor_password, device_id, post_data):
+ def _verify_request_login_post_request(self, multifactor_password, device_id, post_data, trust_me=False):
m = mock.Mock()
m.post.return_value = self._http_ok(''.format(self.session_id))
- fetcher.request_login(self.username, self.password, self.key_iteration_count, multifactor_password, device_id, m)
+ fetcher.request_login(self.username, self.password, self.key_iteration_count, multifactor_password, device_id, m, trust_id=device_id, trust_me=trust_me)
m.post.assert_called_with('https://lastpass.com/login.php',
data=post_data,
headers=fetcher.headers)
+ def _verify_request_trust(self, multifactor_password, device_id, post_data, cookies, trust_id, trust_me=False):
+ m = mock.Mock()
+ m.post.return_value = self._http_ok(''.format(self.session_id, self.token))
+ fetcher.request_login(self.username, self.password, self.key_iteration_count, multifactor_password, device_id, m, trust_id=trust_id, trust_me=trust_me)
+ m.post.assert_called_with('https://lastpass.com/trust.php', data=post_data, cookies=cookies)
+
@staticmethod
def _mock_response(code, body):
m = mock.Mock()
@@ -226,9 +260,19 @@ def _lastpass_error(cause, message):
return ''.format(cause, message)
return ''.format(cause)
+ @staticmethod
+ def _lastpass_multifactor_required(cause, message):
+ if message:
+ return ''.format(message, cause)
+ return ''.format(cause)
+
def _request_login_with_lastpass_error(self, cause, message=None):
return self._request_login_with_xml(self._lastpass_error(cause, message))
+ def _request_login_with_lastpass_multifactor_required(self, cause, message=None):
+ return self._request_login_with_xml(
+ self._lastpass_multifactor_required(cause, message))
+
def _request_login_with_xml(self, text):
return self._request_login_with_ok(text)