Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ LastPass Python API

**This is unofficial LastPass API**

Lastpass Authenticator reverse engineering by `Terry Hardie <https://github.com/thardie>`_


Install
-------
Expand All @@ -32,6 +34,8 @@ Example
for i in vault.accounts:
print(i.id, i.username, i.password, i.url)

for i in vault.authenticator:
print(i.accountID, i.issuerName, i.userName, i.secret)


Testing
Expand Down
13 changes: 13 additions & 0 deletions lastpass/authenticator.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
# coding: utf-8
class Authenticator(object):
def __init__(self, accountID, digits, issuerName, lmiUserId, originalIssuerName, originalUserName, pushNotification, secret, timeStep, userName):
self.accountID = accountID
self.digits = digits
self.issuerName = issuerName
self.lmiUserId = lmiUserId
self.originalIssuerName = originalIssuerName
self.originalUserName = originalUserName
self.pushNotification = pushNotification
self.secret = secret
self.timeStep = timeStep
self.userName = userName
46 changes: 39 additions & 7 deletions lastpass/fetcher.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
# coding: utf-8
import hashlib
import json
from base64 import b64decode
from binascii import hexlify
import requests
Expand All @@ -17,7 +18,6 @@
)
from .session import Session


http = requests


Expand All @@ -37,14 +37,43 @@ def logout(session, web_client=http):
raise NetworkError()


def fetch_authenticator(session, web_client=http):
return fetch_override(session,
web_client,
'https://lastpass.com/lmiapi/authenticator/backup',
headers={
'X-CSRF-TOKEN': session.csrf_token,
'X-SESSION-ID': session.id,
},
decode_json=True)


def fetch(session, web_client=http):
response = web_client.get('https://lastpass.com/getaccts.php?mobile=1&b64=1&hash=0.0&hasplugin=3.0.23&requestsrc=android',
cookies={'PHPSESSID': session.id})
return fetch_override(session,
web_client,
'https://lastpass.com/getaccts.php?mobile=1&b64=1&hash=0.0&hasplugin=3.0.23&requestsrc=android',
cookies={'PHPSESSID': session.id})


def fetch_override(session,
web_client=http,
url='https://lastpass.com/getaccts.php?mobile=1&b64=1&hash=0.0&hasplugin=3.0.23&requestsrc=android',
cookies=None,
headers=None,
decode_json=False):
response = web_client.get(url, cookies=cookies, headers=headers)

if response.status_code != requests.codes.ok:
raise NetworkError()

return blob.Blob(decode_blob(response.content), session.key_iteration_count)
if decode_json:
if 'content-type' in response.headers and response.headers['content-type'].startswith('application/json'):
data = json.loads(response.content)
return blob.Blob(data['userData'].encode('utf-8'), session.key_iteration_count)
else:
raise InvalidResponseError('Expected JSON response')
else:
return blob.Blob(decode_blob(response.content), session.key_iteration_count)


def request_iteration_count(username, web_client=http):
Expand Down Expand Up @@ -102,8 +131,9 @@ def request_login(username, password, key_iteration_count, multifactor_password=
def create_session(parsed_response, key_iteration_count):
if parsed_response.tag == 'ok':
session_id = parsed_response.attrib.get('sessionid')
csrf_token = parsed_response.attrib.get('token')
if isinstance(session_id, str):
return Session(session_id, key_iteration_count)
return Session(session_id, key_iteration_count, csrf_token)


def login_error(parsed_response):
Expand Down Expand Up @@ -136,13 +166,15 @@ def make_key(username, password, key_iteration_count):
if key_iteration_count == 1:
return hashlib.sha256(username.encode('utf-8') + password.encode('utf-8')).digest()
else:
return hashlib.pbkdf2_hmac('sha256', password.encode('utf-8'), username.encode('utf-8'), key_iteration_count, 32)
return hashlib.pbkdf2_hmac('sha256', password.encode('utf-8'), username.encode('utf-8'), key_iteration_count,
32)


def make_hash(username, password, key_iteration_count):
# type: (str, str, int) -> bytes
if key_iteration_count == 1:
return bytearray(hashlib.sha256(hexlify(make_key(username, password, 1)) + password.encode('utf-8')).hexdigest(), 'ascii')
return bytearray(
hashlib.sha256(hexlify(make_key(username, password, 1)) + password.encode('utf-8')).hexdigest(), 'ascii')
else:
return hexlify(hashlib.pbkdf2_hmac(
'sha256',
Expand Down
23 changes: 23 additions & 0 deletions lastpass/parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,15 @@
from io import BytesIO
import struct
import re
import json

from Crypto.Cipher import AES, PKCS1_OAEP
from Crypto.Util import number
from Crypto.PublicKey import RSA

from .account import Account
from .chunk import Chunk
from .authenticator import Authenticator


# Secure note types that contain account-like information
Expand All @@ -37,6 +39,27 @@ def extract_chunks(blob):
return chunks


def parse_Authenticator(chunk, encryption_key):
json_result = decode_aes256_base64_auto(chunk.bytes, encryption_key)
print(json_result)
decoded_json_result = json.loads(json_result)
accounts = []
for account in decoded_json_result['accounts']:
accounts.append(Authenticator(
accountID=account['accountID'],
digits=account['digits'],
issuerName=account['issuerName'],
lmiUserId=account['lmiUserId'],
originalIssuerName=account['originalIssuerName'],
originalUserName=account['originalUserName'],
pushNotification=account['pushNotification'],
secret=account['secret'],
timeStep=account['timeStep'],
userName=account['userName'],
))
return accounts


def parse_ACCT(chunk, encryption_key):
"""
Parses an account chunk, decrypts and creates an Account object.
Expand Down
5 changes: 3 additions & 2 deletions lastpass/session.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
# coding: utf-8
class Session(object):
def __init__(self, id, key_iteration_count):
def __init__(self, id, key_iteration_count, csrf_token=None):
self.id = id
self.key_iteration_count = key_iteration_count
self.csrf_token = csrf_token

def __eq__(self, other):
return self.id == other.id and self.key_iteration_count == other.key_iteration_count
return self.id == other.id and self.key_iteration_count == other.key_iteration_count and self.csrf_token == other.csrf_token
17 changes: 11 additions & 6 deletions lastpass/vault.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@ class Vault(object):
@classmethod
def open_remote(cls, username, password, multifactor_password=None, client_id=None):
"""Fetches a blob from the server and creates a vault"""
blob = cls.fetch_blob(username, password, multifactor_password, client_id)
return cls.open(blob, username, password)
blob, authenticator_blob = cls.fetch_blob(username, password, multifactor_password, client_id)
return cls.open(blob, authenticator_blob, username, password)

@classmethod
def open_local(cls, blob_filename, username, password):
Expand All @@ -18,27 +18,29 @@ def open_local(cls, blob_filename, username, password):
raise NotImplementedError()

@classmethod
def open(cls, blob, username, password):
def open(cls, blob, authenticator_blob, username, password):
"""Creates a vault from a blob object"""
return cls(blob, blob.encryption_key(username, password))
return cls(blob, authenticator_blob, blob.encryption_key(username, password))

@classmethod
def fetch_blob(cls, username, password, multifactor_password=None, client_id=None):
"""Just fetches the blob, could be used to store it locally"""
session = fetcher.login(username, password, multifactor_password, client_id)
blob = fetcher.fetch(session)
authenticator_blob = fetcher.fetch_authenticator(session)
fetcher.logout(session)

return blob
return blob, authenticator_blob

def __init__(self, blob, encryption_key):
def __init__(self, blob, authenticator_blob, encryption_key):
"""This more of an internal method, use one of the static constructors instead"""
chunks = parser.extract_chunks(blob)

if not self.is_complete(chunks):
raise InvalidResponseError('Blob is truncated')

self.accounts = self.parse_accounts(chunks, encryption_key)
self.authenticator = self.parse_authenticator(authenticator_blob, encryption_key)

def is_complete(self, chunks):
return len(chunks) > 0 and chunks[-1].id == b'ENDM' and chunks[-1].payload == b'OK'
Expand All @@ -62,3 +64,6 @@ def parse_accounts(self, chunks, encryption_key):
key = parser.parse_SHAR(i, encryption_key, rsa_private_key)['encryption_key']

return accounts

def parse_authenticator(self, chunks, encryption_key):
return parser.parse_Authenticator(chunks, encryption_key)
2 changes: 2 additions & 0 deletions tests/test_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@
b'SYTE', b'WOTE', b'TATE', b'WPTE', b'SPMT', b'NMAC', b'ACCT', b'EQDN', b'URUL', b'ENDM',
]

TEST_AUTHENTICATOR = b'RC8qYiDYw5a2FQTitjN8aW39jR/xKRqUYpQHYx1RtoYaYmMb6YT/dCwOufPxbLKqgA47yVA7AFDj/ECf6JQPzeJEA25z1SdNlc1bzC/+i4oWhD+Yb0tQgehJ61/cKdnpHScYotuxNzZ9p4+gq1GgCRh3xHG8+nA+wtohzxNjm3wJNXTAXoYc9wNf3H80Q6HpklphIw0VbjijdxvQsUMz+8Ic/sHuijk6Iv72Uy8ukaoGBAPhewSWq+PMr/cTiRP4TBD77GvWbLh1HHhvumKN3f01K8266Jwp4Orw0XoFuMwuyYOsPdDwi5/O2htt3P/VYyLjmzy+2HhgQSxyTjvyuWbw5snC5o5oFyK1SdyCZ2dJR+fpmbq8/KV8BuOoWJdNp/s0ab2LMRUhPGJCA6yYbLmg370N74sywbIJC4gqEz5cI6Evhnl/dx+wnxJ3fnoFBKcR1LNxIr9iJvXPatzi5e9DUY7iGEqgXpiauRarQKHHnu5y6Vr61Q7Dq7gt1xvDx57ucula+tUOw6u4Ldcbww=='

TEST_BLOB = b64decode(
"TFBBVgAAAAMxMThBVFZSAAAAAzEwMkVOQ1UAAABGITExVkVjWFVnelFramxranB2" +
"VGZyR3c9PXxYUklhVVdQYlQzRXdRQlZCRUYxZUJTTDI3bWZ1cVViZmFCV3JCYnd5" +
Expand Down
2 changes: 1 addition & 1 deletion tests/test_fetcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ def test_fetch_makes_a_get_request(self):
m.get.return_value = self._http_ok(self.blob_response)
fetcher.fetch(self.session, m)
m.get.assert_called_with('https://lastpass.com/getaccts.php?mobile=1&b64=1&hash=0.0&hasplugin=3.0.23&requestsrc=android',
cookies={'PHPSESSID': self.session_id})
cookies={'PHPSESSID': self.session_id}, headers=None)

def test_fetch_returns_a_blob(self):
m = mock.Mock()
Expand Down
7 changes: 4 additions & 3 deletions tests/test_vault.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,18 +6,19 @@
from lastpass import InvalidResponseError
from lastpass.blob import Blob
from lastpass.vault import Vault
from tests.test_data import TEST_BLOB, TEST_KEY_ITERATION_COUNT, TEST_ENCRYPTION_KEY, TEST_ACCOUNTS
from tests.test_data import TEST_BLOB, TEST_KEY_ITERATION_COUNT, TEST_ENCRYPTION_KEY, TEST_ACCOUNTS, TEST_AUTHENTICATOR


class VaultTestCase(unittest.TestCase):
def setUp(self):
self.vault = Vault(Blob(TEST_BLOB, TEST_KEY_ITERATION_COUNT), TEST_ENCRYPTION_KEY)
self.vault = Vault(Blob(TEST_BLOB, TEST_KEY_ITERATION_COUNT), Blob(TEST_AUTHENTICATOR, TEST_KEY_ITERATION_COUNT), TEST_ENCRYPTION_KEY)

def test_init_raises_an_exception_on_truncated_blob(self):
for i in [1, 2, 3, 4, 5, 10, 100, 1000]:
blob = Blob(TEST_BLOB[:-i], TEST_KEY_ITERATION_COUNT)
authenticator_blob = Blob(TEST_AUTHENTICATOR, TEST_KEY_ITERATION_COUNT)
with self.assertRaises(Exception) as context:
Vault(blob, TEST_ENCRYPTION_KEY)
Vault(blob, authenticator_blob, TEST_ENCRYPTION_KEY)

self.assertIn(type(context.exception), [InvalidResponseError, struct.error])
# self.assertEqual(context.exception.message, 'Blob is truncated')
Expand Down