diff --git a/docs/advanced/authentication.md b/docs/advanced/authentication.md index 63d26e5f46..fdd6348fd7 100644 --- a/docs/advanced/authentication.md +++ b/docs/advanced/authentication.md @@ -40,6 +40,18 @@ HTTP digest authentication is a challenge-response authentication scheme. Unlike [] ``` +HTTPX also supports digest authentication using `auth-int` quality-of-protection, which provides message integrity protection. When `qop="auth-int"` is used, the authentication response includes a hash of the request body, protecting against tampering with the message content during transmission. This provides additional security over `qop="auth"` by ensuring that both the authentication credentials and the message body integrity are verified. + +```pycon +>>> auth = httpx.DigestAuth(username="olivia", password="secret") +>>> client = httpx.Client(auth=auth) +>>> response = client.get("https://httpbin.org/digest-auth/auth-int/olivia/secret") +>>> response + +>>> response.history +[] +``` + ## NetRC authentication HTTPX can be configured to use [a `.netrc` config file](https://everything.curl.dev/usingcurl/netrc) for authentication. @@ -229,4 +241,4 @@ class MyCustomAuth(httpx.Auth): async def async_auth_flow(self, request): raise RuntimeError("Cannot use a sync authentication class with httpx.AsyncClient") -``` \ No newline at end of file +``` diff --git a/httpx/_auth.py b/httpx/_auth.py index 9d24faed99..8450548a5b 100644 --- a/httpx/_auth.py +++ b/httpx/_auth.py @@ -264,7 +264,8 @@ def digest(data: bytes) -> bytes: path = request.url.raw_path A2 = b":".join((request.method.encode(), path)) - # TODO: implement auth-int + if challenge.qop == b"auth-int": + A2 += b":" + digest(request.content) HA2 = digest(A2) nc_value = b"%08x" % self._nonce_count @@ -294,7 +295,7 @@ def digest(data: bytes) -> bytes: if challenge.opaque: format_args["opaque"] = challenge.opaque if qop: - format_args["qop"] = b"auth" + format_args["qop"] = qop format_args["nc"] = nc_value format_args["cnonce"] = cnonce @@ -330,12 +331,13 @@ def _resolve_qop(self, qop: bytes | None, request: Request) -> bytes | None: if qop is None: return None qops = re.split(b", ?", qop) + + # Defer to the strongest supplied qop (auth-int > auth) + if b"auth-int" in qops: + return b"auth-int" if b"auth" in qops: return b"auth" - if qops == [b"auth-int"]: - raise NotImplementedError("Digest auth-int support is not yet implemented") - message = f'Unexpected qop value "{qop!r}" in digest auth' raise ProtocolError(message, request=request) diff --git a/tests/client/test_auth.py b/tests/client/test_auth.py index 72674e6f4b..304428e07d 100644 --- a/tests/client/test_auth.py +++ b/tests/client/test_auth.py @@ -501,15 +501,50 @@ async def test_digest_auth_qop_including_spaces_and_auth_returns_auth(qop: str) assert len(response.history) == 1 +@pytest.mark.parametrize( + "algorithm,expected_hash_length,expected_response_length", + [ + ("MD5", 64, 32), + ("MD5-SESS", 64, 32), + ("SHA", 64, 40), + ("SHA-SESS", 64, 40), + ("SHA-256", 64, 64), + ("SHA-256-SESS", 64, 64), + ("SHA-512", 64, 128), + ("SHA-512-SESS", 64, 128), + ], +) @pytest.mark.anyio -async def test_digest_auth_qop_auth_int_not_implemented() -> None: +async def test_digest_auth_qop_auth_int( + algorithm: str, expected_hash_length: int, expected_response_length: int +) -> None: url = "https://example.org/" auth = httpx.DigestAuth(username="user", password="password123") - app = DigestApp(qop="auth-int") + app = DigestApp(qop="auth-int", algorithm=algorithm) async with httpx.AsyncClient(transport=httpx.MockTransport(app)) as client: - with pytest.raises(NotImplementedError): - await client.get(url, auth=auth) + response = await client.get(url, auth=auth) + + assert response.status_code == 200 + assert len(response.history) == 1 + + authorization = typing.cast(typing.Dict[str, typing.Any], response.json())["auth"] + scheme, _, fields = authorization.partition(" ") + assert scheme == "Digest" + + response_fields = [field.strip() for field in fields.split(",")] + digest_data = dict(field.split("=") for field in response_fields) + + assert digest_data["username"] == '"user"' + assert digest_data["realm"] == '"httpx@example.org"' + assert "nonce" in digest_data + assert digest_data["uri"] == '"/"' + assert len(digest_data["response"]) == expected_response_length + 2 # extra quotes + assert len(digest_data["opaque"]) == expected_hash_length + 2 + assert digest_data["algorithm"] == algorithm + assert digest_data["qop"] == "auth-int" + assert digest_data["nc"] == "00000001" + assert len(digest_data["cnonce"]) == 16 + 2 @pytest.mark.anyio diff --git a/tests/test_auth.py b/tests/test_auth.py index 6b6df922ea..be2708e930 100644 --- a/tests/test_auth.py +++ b/tests/test_auth.py @@ -208,7 +208,7 @@ def mock_get_client_nonce(nonce_count: int, nonce: bytes) -> bytes: headers = { "WWW-Authenticate": ( 'Digest realm="http-auth@example.org", ' - 'qop="auth, auth-int", ' + "qop=auth, " "algorithm=MD5, " 'nonce="7ypf/xlj9XXwfDPEoM4URrv/xwf94BcCAzFZH4GiTo0v", ' 'opaque="FQhe/qaU925kfnzjCev0ciny7QMkPqMAFRtzCUYo5tdS"' @@ -232,7 +232,7 @@ def mock_get_client_nonce(nonce_count: int, nonce: bytes) -> bytes: 'cnonce="f2/wE4q74E6zIJEtWaHKaf5wv/H5QzzpXusqGemxURZJ"' in request.headers["Authorization"] ) - assert "qop=auth" in request.headers["Authorization"] + assert "qop=auth," in request.headers["Authorization"] assert ( 'opaque="FQhe/qaU925kfnzjCev0ciny7QMkPqMAFRtzCUYo5tdS"' in request.headers["Authorization"] @@ -268,7 +268,7 @@ def mock_get_client_nonce(nonce_count: int, nonce: bytes) -> bytes: headers = { "WWW-Authenticate": ( 'Digest realm="http-auth@example.org", ' - 'qop="auth, auth-int", ' + "qop=auth, " "algorithm=SHA-256, " 'nonce="7ypf/xlj9XXwfDPEoM4URrv/xwf94BcCAzFZH4GiTo0v", ' 'opaque="FQhe/qaU925kfnzjCev0ciny7QMkPqMAFRtzCUYo5tdS"' @@ -292,7 +292,7 @@ def mock_get_client_nonce(nonce_count: int, nonce: bytes) -> bytes: 'cnonce="f2/wE4q74E6zIJEtWaHKaf5wv/H5QzzpXusqGemxURZJ"' in request.headers["Authorization"] ) - assert "qop=auth" in request.headers["Authorization"] + assert "qop=auth," in request.headers["Authorization"] assert ( 'opaque="FQhe/qaU925kfnzjCev0ciny7QMkPqMAFRtzCUYo5tdS"' in request.headers["Authorization"] @@ -306,3 +306,119 @@ def mock_get_client_nonce(nonce_count: int, nonce: bytes) -> bytes: response = httpx.Response(content=b"Hello, world!", status_code=200) with pytest.raises(StopIteration): flow.send(response) + + +def test_digest_auth_int_rfc_7616_md5(monkeypatch): + def mock_get_client_nonce(nonce_count: int, nonce: bytes) -> bytes: + return "f2/wE4q74E6zIJEtWaHKaf5wv/H5QzzpXusqGemxURZJ".encode() + + auth = httpx.DigestAuth(username="Mufasa", password="Circle of Life") + monkeypatch.setattr(auth, "_get_client_nonce", mock_get_client_nonce) + + request = httpx.Request("GET", "https://www.example.com/dir/index.html") + + # The initial request should not include an auth header. + flow = auth.sync_auth_flow(request) + request = next(flow) + assert "Authorization" not in request.headers + + # If a 401 response is returned, then a digest auth request is made. + headers = { + "WWW-Authenticate": ( + 'Digest realm="http-auth@example.org", ' + "qop=auth-int, " + "algorithm=MD5, " + 'nonce="7ypf/xlj9XXwfDPEoM4URrv/xwf94BcCAzFZH4GiTo0v", ' + 'opaque="FQhe/qaU925kfnzjCev0ciny7QMkPqMAFRtzCUYo5tdS"' + ) + } + response = httpx.Response( + content=b"Auth required", status_code=401, headers=headers, request=request + ) + request = flow.send(response) + assert request.headers["Authorization"].startswith("Digest") + assert 'username="Mufasa"' in request.headers["Authorization"] + assert 'realm="http-auth@example.org"' in request.headers["Authorization"] + assert 'uri="/dir/index.html"' in request.headers["Authorization"] + assert "algorithm=MD5" in request.headers["Authorization"] + assert ( + 'nonce="7ypf/xlj9XXwfDPEoM4URrv/xwf94BcCAzFZH4GiTo0v"' + in request.headers["Authorization"] + ) + assert "nc=00000001" in request.headers["Authorization"] + assert ( + 'cnonce="f2/wE4q74E6zIJEtWaHKaf5wv/H5QzzpXusqGemxURZJ"' + in request.headers["Authorization"] + ) + assert "qop=auth-int," in request.headers["Authorization"] + assert ( + 'opaque="FQhe/qaU925kfnzjCev0ciny7QMkPqMAFRtzCUYo5tdS"' + in request.headers["Authorization"] + ) + assert ( + 'response="8804a53d3640a40a4f73cea12c5ba451"' + in request.headers["Authorization"] + ) + + # No other requests are made. + response = httpx.Response(content=b"Hello, world!", status_code=200) + with pytest.raises(StopIteration): + flow.send(response) + + +def test_digest_auth_int_rfc7616_sha256(monkeypatch): + def mock_get_client_nonce(nonce_count: int, nonce: bytes) -> bytes: + return "f2/wE4q74E6zIJEtWaHKaf5wv/H5QzzpXusqGemxURZJ".encode() + + auth = httpx.DigestAuth(username="Mufasa", password="Circle of Life") + monkeypatch.setattr(auth, "_get_client_nonce", mock_get_client_nonce) + + request = httpx.Request("GET", "https://www.example.com/dir/index.html") + + # The initial request should not include an auth header. + flow = auth.sync_auth_flow(request) + request = next(flow) + assert "Authorization" not in request.headers + + # If a 401 response is returned, then a digest auth request is made. + headers = { + "WWW-Authenticate": ( + 'Digest realm="http-auth@example.org", ' + "qop=auth-int, " + "algorithm=SHA-256, " + 'nonce="7ypf/xlj9XXwfDPEoM4URrv/xwf94BcCAzFZH4GiTo0v", ' + 'opaque="FQhe/qaU925kfnzjCev0ciny7QMkPqMAFRtzCUYo5tdS"' + ) + } + response = httpx.Response( + content=b"Auth required", status_code=401, headers=headers, request=request + ) + request = flow.send(response) + assert request.headers["Authorization"].startswith("Digest") + assert 'username="Mufasa"' in request.headers["Authorization"] + assert 'realm="http-auth@example.org"' in request.headers["Authorization"] + assert 'uri="/dir/index.html"' in request.headers["Authorization"] + assert "algorithm=SHA-256" in request.headers["Authorization"] + assert ( + 'nonce="7ypf/xlj9XXwfDPEoM4URrv/xwf94BcCAzFZH4GiTo0v"' + in request.headers["Authorization"] + ) + assert "nc=00000001" in request.headers["Authorization"] + assert ( + 'cnonce="f2/wE4q74E6zIJEtWaHKaf5wv/H5QzzpXusqGemxURZJ"' + in request.headers["Authorization"] + ) + assert "qop=auth-int," in request.headers["Authorization"] + assert ( + 'opaque="FQhe/qaU925kfnzjCev0ciny7QMkPqMAFRtzCUYo5tdS"' + in request.headers["Authorization"] + ) + assert ( + 'response="8bdf6f15638e260831e905028de5450562816d093c9bfc5c13d3a46adcdde940"' + in request.headers["Authorization"] + ) + + # No other requests are made. + response = httpx.Response(content=b"Hello, world!", status_code=200) + with pytest.raises(StopIteration): + flow.send(response)