You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Configured python-jose to use cryptography backend.
Since python-jose-cryptography is an RSA public key object that cannot be pickled. Cached the raw JWKS response, upon retrieval from cache, construct the key objects with jwk.construct(...) is done
Refactor KeycloakOIDC to separate signing key retrieval from public key fetching
The cache now stores raw JWKS while callers expect a dict of constructed signing keys; ensure all call paths consistently transform JWKS to signing keys to avoid type mismatches and repeated conversions.
asyncdef__get_public_keys(self) ->Dict[str, Any]:
"""Retrieve public keys from cache or fetch if not present."""public_keys=self.cache.get("public_keys")
ifpublic_keysisNone:
public_keys=awaitself.__fetch_keys()
self.cache.set("public_keys", public_keys)
signing_keys=awaitself.__get__signing_keys(public_keys)
returnsigning_keys
The private helper method is named __get__signing_keys with a double underscore in the middle; this is unconventional and could cause confusion. Consider renaming to __get_signing_keys for clarity.
asyncdef__get__signing_keys(self, public_keys) ->Dict[str, Any]:
"""Retrieving signing public keys from the public keys."""keys=public_keys.get("keys", [])
signing_public_keys= {
key["kid"]: jwk.construct(key)
forkeyinkeysifkey.get("use") =="sig"andkey.get("alg") =="RS256"andkey.get("kid")
}
returnsigning_public_keys
Uninstalling ecdsa at build time assumes it's installed in the image; confirm it’s present or remove the uninstall step to avoid unnecessary layer work and potential build warnings.
Refresh order is inconsistent: you set cache with raw JWKS before deriving signing keys. Derive signing keys first and only then update the cache used by verification to avoid mismatched cache content.
public_keys = await self.__get_public_keys()
try:
logger.info("Token Verification started")
headers = jwt.get_unverified_header(token)
kid = headers.get("kid")
if not kid or kid not in public_keys:
# Force refresh keys if kid not found (possible key rotation)
- public_keys = await self.__fetch_keys()- self.cache.set("public_keys", public_keys)- kid = headers.get("kid")- public_keys = await self.__get__signing_keys(public_keys)+ raw_jwks = await self.__fetch_keys()+ signing_keys = await self.__get__signing_keys(raw_jwks)+ self.cache.set("public_keys", raw_jwks)+ public_keys = signing_keys
if not kid or kid not in public_keys:
raise JWTError("Public key not found for 'kid'")
public_key = public_keys[kid]
payload = jwt.decode(
Suggestion importance[1-10]: 8
__
Why: The suggestion correctly identifies inconsistent caching of raw JWKS vs signing keys during refresh and proposes a precise fix that prevents mismatched cache content affecting verification; it's a meaningful correctness improvement.
Medium
Validate JWKS response format
Validate the JWKS structure before returning to avoid propagating malformed data to cache. Ensure jwks is a dict containing a keys list with dict items to prevent runtime errors in downstream processing.
async def __fetch_keys(self) -> Dict[str, Any]:
"""Fetch JWKS keys from Keycloak."""
try:
logger.info("Fetching Public key of keycloak")
async with httpx.AsyncClient() as client:
response = await client.get(self.jwks_url, timeout=5)
response.raise_for_status()
jwks = response.json()
logger.info("Got response form keycloak [public key]")
+ if not isinstance(jwks, dict) or not isinstance(jwks.get("keys"), list):+ raise RuntimeError("Invalid JWKS format from Keycloak")
return jwks
except Exception as e:
raise RuntimeError(f"Failed to fetch Keycloak public keys: {str(e)}") from e
Suggestion importance[1-10]: 7
__
Why: Validating JWKS structure before caching reduces downstream errors and aligns with new flow that processes keys later. It's accurate and moderately improves robustness without being critical.
Medium
General
Harden JWK construction
Guard against missing or malformed key parameters when constructing JWKs to avoid exceptions crashing verification. Skip keys lacking required fields and catch construction errors per key.
async def __get__signing_keys(self, public_keys) -> Dict[str, Any]:
"""Retrieving signing public keys from the public keys."""
keys = public_keys.get("keys", [])
- signing_public_keys = {- key["kid"]: jwk.construct(key)- for key in keys- if key.get("use") == "sig" and key.get("alg") == "RS256" and key.get("kid")- }+ signing_public_keys: Dict[str, Any] = {}+ for key in keys:+ try:+ if key.get("use") == "sig" and key.get("alg") == "RS256" and key.get("kid"):+ signing_public_keys[key["kid"]] = jwk.construct(key)+ except Exception:+ # Skip malformed key entries+ continue
return signing_public_keys
Suggestion importance[1-10]: 6
__
Why: Adding per-key error handling prevents a bad entry from breaking verification; it's contextually correct and low-risk, but impact is moderate since upstream validation may mitigate many cases.
Cache the derived signing keys, not the raw JWKS, to ensure lookups by kid work without re-deriving each call. This avoids inconsistencies between cached content and expected structure in verify_token.
async def __get_public_keys(self) -> Dict[str, Any]:
"""Retrieve public keys from cache or fetch if not present."""
- public_keys = self.cache.get("public_keys")- if public_keys is None:- public_keys = await self.__fetch_keys()- self.cache.set("public_keys", public_keys)- signing_keys= await self.__get__signing_keys(public_keys)+ signing_keys = self.cache.get("public_keys")+ if signing_keys is None:+ jwks = await self.__fetch_keys()+ signing_keys = await self.__get__signing_keys(jwks)+ self.cache.set("public_keys", signing_keys)
return signing_keys
Suggestion importance[1-10]: 8
__
Why: Caching derived signing keys aligns the cache content with later lookups by kid, avoiding repeated derivation and inconsistencies. It directly addresses a correctness/consistency gap introduced by the PR and has meaningful impact.
Medium
Normalize refreshed cache content
Derive signing keys before caching on refresh to keep cache consistent, and only then check membership. This prevents caching raw JWKS that breaks subsequent lookups.
if not kid or kid not in public_keys:
# Force refresh keys if kid not found (possible key rotation)
- public_keys = await self.__fetch_keys()- self.cache.set("public_keys", public_keys)+ jwks = await self.__fetch_keys()+ refreshed_signing_keys = await self.__get__signing_keys(jwks)+ self.cache.set("public_keys", refreshed_signing_keys)+ public_keys = refreshed_signing_keys
kid = headers.get("kid")
- public_keys= await self.__get__signing_keys(public_keys)
if not kid or kid not in public_keys:
raise JWTError("Public key not found for 'kid'")
Suggestion importance[1-10]: 8
__
Why: On refresh, caching the derived signing keys instead of raw JWKS keeps the cache consistent with expected structure and prevents subsequent lookup failures. The change is precise to the shown lines and improves correctness during key rotation.
Medium
Validate JWKS structure
Validate that the JWKS response contains a list under keys before returning it to prevent downstream errors. If the structure is invalid, raise a clear error instead of caching unusable data.
async def __fetch_keys(self) -> Dict[str, Any]:
"""Fetch JWKS keys from Keycloak."""
try:
logger.info("Fetching Public key of keycloak")
async with httpx.AsyncClient() as client:
response = await client.get(self.jwks_url, timeout=5)
response.raise_for_status()
jwks = response.json()
logger.info("Got response form keycloak [public key]")
+ if not isinstance(jwks, dict) or not isinstance(jwks.get("keys"), list):+ raise RuntimeError("Invalid JWKS format from Keycloak")
return jwks
except Exception as e:
raise RuntimeError(f"Failed to fetch Keycloak public keys: {str(e)}") from e
Suggestion importance[1-10]: 7
__
Why: Validating that jwks["keys"] is a list prevents caching malformed data and avoids downstream errors when deriving signing keys. This is accurate to the new code and improves robustness, though not a critical bug fix.
Avoid uninstalling ecdsa at runtime as a non-root user; it may target the user site-packages and not the system environment, leaving vulnerable code in place. If removal is required, perform it in the same layer and context as installation, or ensure it targets the correct environment.
Why: It correctly flags that uninstalling ecdsa after switching users could be ineffective; moving the uninstall earlier in the layer improves security and determinism. The proposal is accurate and relevant, though not a critical bug fix.
Medium
Pin base image by digest
Pin the base image by digest to prevent supply-chain drift and ensure reproducible builds with the audited image. This mitigates unexpected upgrades that could reintroduce vulnerabilities.
Why: Pinning by digest improves supply-chain security and reproducibility, and the change applies directly to the updated base image line. It's a good practice but not strictly required for functionality.
Low
General
Ensure crypto dependencies present
Verify that the cryptography extra is installed successfully during build; some environments require system libraries. If build fails, pre-install required OS deps before pip to avoid runtime cryptography issues.
-python-jose[cryptography]==3.4.0+# In Dockerfile before pip install:+RUN apt-get update && apt-get install -y --no-install-recommends \+ libssl3 \+ && rm -rf /var/lib/apt/lists/*
Suggestion importance[1-10]: 5
__
Why: The advice to ensure system libs for python-jose[cryptography] can avoid build/runtime issues and aligns with the added extra; however, it is conditional and not directly derived from a failure in the diff, so its impact is moderate.
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
User description
📝 Pull Request Summary
Description:
Data layer
python:3.13.9-slim-bookwormecdsapython-joseto usecryptographybackend.Related Jira Ticket:
FWF-5604=3
DEPENDENCY PR:
Type of Change:
💻 Frontend Changes
Modules/Components Affected:
Summary of Frontend Changes:
UI/UX Review:
Screenshots / Screen Recordings (if applicable):
⚙️ Backend Changes (Java / Python)
Modules/Endpoints Affected:
Summary of Backend Changes:
API Testing:
Screenshots / Screen Recordings (if applicable):
✅ Checklist
Details:
👥 Reviewer Notes
PR Type
Bug fix, Enhancement, Dependencies
Description
Refactor JWKS handling and key selection
Fix token verification with signing keys
Update base image to python:3.13.9-slim-bookworm
Replace ecdsa via python-jose cryptography backend
Diagram Walkthrough
File Walkthrough
keycloak_oidc.py
Robust JWKS fetch and signing key derivationforms-flow-data-layer/src/utils/keycloak_oidc.py
__fetch_keys.__get__signing_keysto filter RS256 signing keys.forms-flow-data-layer-ci.yml
CI Python version bump.github/workflows/forms-flow-data-layer-ci.yml
Dockerfile
Harden base image and remove ecdsaforms-flow-data-layer/Dockerfile
ecdsaduring build.requirements.txt
Dependabot-style crypto backend switchforms-flow-data-layer/requirements.txt
ecdsadependency.python-jose[cryptography].