From 12ceecf077e5289e0e4a55fe2150b9b85087bb70 Mon Sep 17 00:00:00 2001 From: sayakpaul Date: Wed, 20 Aug 2025 11:04:28 +0530 Subject: [PATCH 1/4] feat: implement requirements validation for custom blocks. --- src/diffusers/commands/custom_blocks.py | 2 - .../modular_pipelines/modular_pipeline.py | 66 +++++++++++++++---- 2 files changed, 55 insertions(+), 13 deletions(-) diff --git a/src/diffusers/commands/custom_blocks.py b/src/diffusers/commands/custom_blocks.py index 43d9ea88577a..953240c5a2c3 100644 --- a/src/diffusers/commands/custom_blocks.py +++ b/src/diffusers/commands/custom_blocks.py @@ -89,8 +89,6 @@ def run(self): # automap = self._create_automap(parent_class=parent_class, child_class=child_class) # with open(CONFIG, "w") as f: # json.dump(automap, f) - with open("requirements.txt", "w") as f: - f.write("") def _choose_block(self, candidates, chosen=None): for cls, base in candidates: diff --git a/src/diffusers/modular_pipelines/modular_pipeline.py b/src/diffusers/modular_pipelines/modular_pipeline.py index 8a05cce209c5..e45dc5068a40 100644 --- a/src/diffusers/modular_pipelines/modular_pipeline.py +++ b/src/diffusers/modular_pipelines/modular_pipeline.py @@ -32,6 +32,7 @@ from ..utils import PushToHubMixin, is_accelerate_available, logging from ..utils.dynamic_modules_utils import get_class_from_dynamic_module, resolve_trust_remote_code from ..utils.hub_utils import load_or_create_model_card, populate_model_card +from ..utils.import_utils import _is_package_available from .components_manager import ComponentsManager from .modular_pipeline_utils import ( ComponentSpec, @@ -231,6 +232,7 @@ class ModularPipelineBlocks(ConfigMixin, PushToHubMixin): config_name = "modular_config.json" model_name = None + _requirements: Union[List[Tuple[str, str], Tuple[str, str]]] = None @classmethod def _get_signature_keys(cls, obj): @@ -270,6 +272,28 @@ def _get_required_inputs(self): return input_names + def _get_requirements(self): + if getattr(self, "_requirements", None) is not None: + defined_reqs = self._requirements + if not isinstance(defined_reqs): + defined_reqs = [defined_reqs] + + final_reqs = [] + for pkg, specified_ver in defined_reqs: + pkg_available, pkg_actual_ver = _is_package_available(pkg) + if not pkg_available: + raise ValueError( + f"{pkg} was specified in the requirements but wasn't found. Please check your environment." + ) + if specified_ver != pkg_actual_ver: + logger.warning( + f"Version for {pkg} was specified to be {specified_ver} whereas the actual version found is {pkg_actual_ver}. Ignore if this is not concerning." + ) + final_reqs.append((pkg, specified_ver)) + + else: + return None + @property def required_inputs(self) -> List[InputParam]: return self._get_required_inputs() @@ -293,6 +317,31 @@ def from_pretrained( trust_remote_code: Optional[bool] = None, **kwargs, ): + config = cls.load_config(pretrained_model_name_or_path) + has_remote_code = "auto_map" in config and cls.__name__ in config["auto_map"] + trust_remote_code = resolve_trust_remote_code( + trust_remote_code, pretrained_model_name_or_path, has_remote_code + ) + if not (has_remote_code and trust_remote_code): + raise ValueError( + "Selected model repository does not happear to have any custom code or does not have a valid `config.json` file." + ) + + if "requirements" in config and config["requirements"] is not None: + requirements: Union[List[Tuple[str, str]], Tuple[str, str]] = config["requirements"] + if not isinstance(requirements, list): + requirements = [requirements] + for pkg, fetched_ver in requirements: + pkg_available, pkg_actual_ver = _is_package_available(pkg) + if not pkg_available: + raise ValueError( + f"{pkg} was specified in the requirements but wasn't found in the current environment." + ) + if fetched_ver != pkg_actual_ver: + logger.warning( + f"Version of {pkg} was specified to be {fetched_ver} in the configuration. However, the actual installed version if {pkg_actual_ver}. Things might work unexpected." + ) + hub_kwargs_names = [ "cache_dir", "force_download", @@ -305,16 +354,6 @@ def from_pretrained( ] hub_kwargs = {name: kwargs.pop(name) for name in hub_kwargs_names if name in kwargs} - config = cls.load_config(pretrained_model_name_or_path) - has_remote_code = "auto_map" in config and cls.__name__ in config["auto_map"] - trust_remote_code = resolve_trust_remote_code( - trust_remote_code, pretrained_model_name_or_path, has_remote_code - ) - if not (has_remote_code and trust_remote_code): - raise ValueError( - "Selected model repository does not happear to have any custom code or does not have a valid `config.json` file." - ) - class_ref = config["auto_map"][cls.__name__] module_file, class_name = class_ref.split(".") module_file = module_file + ".py" @@ -340,8 +379,13 @@ def save_pretrained(self, save_directory, push_to_hub=False, **kwargs): module = full_mod.rsplit(".", 1)[-1].replace("__dynamic__", "") parent_module = self.save_pretrained.__func__.__qualname__.split(".", 1)[0] auto_map = {f"{parent_module}": f"{module}.{cls_name}"} - self.register_to_config(auto_map=auto_map) + + # resolve requirements + requirements = self._get_requirements() + if requirements is not None: + self.register_to_config(requirements=requirements) + self.save_config(save_directory=save_directory, push_to_hub=push_to_hub, **kwargs) config = dict(self.config) self._internal_dict = FrozenDict(config) From 127e9a39d885dea964e054e7335932550e05f51a Mon Sep 17 00:00:00 2001 From: sayakpaul Date: Wed, 20 Aug 2025 11:51:15 +0530 Subject: [PATCH 2/4] up --- src/diffusers/modular_pipelines/modular_pipeline.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/src/diffusers/modular_pipelines/modular_pipeline.py b/src/diffusers/modular_pipelines/modular_pipeline.py index e45dc5068a40..5171223a6ff8 100644 --- a/src/diffusers/modular_pipelines/modular_pipeline.py +++ b/src/diffusers/modular_pipelines/modular_pipeline.py @@ -232,7 +232,7 @@ class ModularPipelineBlocks(ConfigMixin, PushToHubMixin): config_name = "modular_config.json" model_name = None - _requirements: Union[List[Tuple[str, str], Tuple[str, str]]] = None + _requirements: Union[List[Tuple[str, str]], Tuple[str, str]] = None @classmethod def _get_signature_keys(cls, obj): @@ -275,7 +275,7 @@ def _get_required_inputs(self): def _get_requirements(self): if getattr(self, "_requirements", None) is not None: defined_reqs = self._requirements - if not isinstance(defined_reqs): + if not isinstance(defined_reqs, list): defined_reqs = [defined_reqs] final_reqs = [] @@ -290,6 +290,7 @@ def _get_requirements(self): f"Version for {pkg} was specified to be {specified_ver} whereas the actual version found is {pkg_actual_ver}. Ignore if this is not concerning." ) final_reqs.append((pkg, specified_ver)) + return final_reqs else: return None From 37d3887194c4b54c145101a38ad703331aa6a0b8 Mon Sep 17 00:00:00 2001 From: sayakpaul Date: Wed, 20 Aug 2025 12:09:33 +0530 Subject: [PATCH 3/4] unify. --- .../modular_pipelines/modular_pipeline.py | 71 +++++++++---------- 1 file changed, 33 insertions(+), 38 deletions(-) diff --git a/src/diffusers/modular_pipelines/modular_pipeline.py b/src/diffusers/modular_pipelines/modular_pipeline.py index 5171223a6ff8..0fdfd8735550 100644 --- a/src/diffusers/modular_pipelines/modular_pipeline.py +++ b/src/diffusers/modular_pipelines/modular_pipeline.py @@ -272,29 +272,6 @@ def _get_required_inputs(self): return input_names - def _get_requirements(self): - if getattr(self, "_requirements", None) is not None: - defined_reqs = self._requirements - if not isinstance(defined_reqs, list): - defined_reqs = [defined_reqs] - - final_reqs = [] - for pkg, specified_ver in defined_reqs: - pkg_available, pkg_actual_ver = _is_package_available(pkg) - if not pkg_available: - raise ValueError( - f"{pkg} was specified in the requirements but wasn't found. Please check your environment." - ) - if specified_ver != pkg_actual_ver: - logger.warning( - f"Version for {pkg} was specified to be {specified_ver} whereas the actual version found is {pkg_actual_ver}. Ignore if this is not concerning." - ) - final_reqs.append((pkg, specified_ver)) - return final_reqs - - else: - return None - @property def required_inputs(self) -> List[InputParam]: return self._get_required_inputs() @@ -329,19 +306,7 @@ def from_pretrained( ) if "requirements" in config and config["requirements"] is not None: - requirements: Union[List[Tuple[str, str]], Tuple[str, str]] = config["requirements"] - if not isinstance(requirements, list): - requirements = [requirements] - for pkg, fetched_ver in requirements: - pkg_available, pkg_actual_ver = _is_package_available(pkg) - if not pkg_available: - raise ValueError( - f"{pkg} was specified in the requirements but wasn't found in the current environment." - ) - if fetched_ver != pkg_actual_ver: - logger.warning( - f"Version of {pkg} was specified to be {fetched_ver} in the configuration. However, the actual installed version if {pkg_actual_ver}. Things might work unexpected." - ) + _ = _validate_requirements(config["requirements"]) hub_kwargs_names = [ "cache_dir", @@ -383,8 +348,8 @@ def save_pretrained(self, save_directory, push_to_hub=False, **kwargs): self.register_to_config(auto_map=auto_map) # resolve requirements - requirements = self._get_requirements() - if requirements is not None: + requirements = _validate_requirements(getattr(self, "_requirements", None)) + if requirements: self.register_to_config(requirements=requirements) self.save_config(save_directory=save_directory, push_to_hub=push_to_hub, **kwargs) @@ -2489,3 +2454,33 @@ def __call__(self, state: PipelineState = None, output: Union[str, List[str]] = return state.get(output) else: raise ValueError(f"Output '{output}' is not a valid output type") + + +def _validate_requirements(reqs): + normalized_reqs = _normalize_requirements(reqs) + if not normalized_reqs: + return [] + + final: List[Tuple[str, str]] = [] + for req, specified_ver in normalized_reqs: + req_available, req_actual_ver = _is_package_available(req) + if not req_available: + raise ValueError(f"{req} was specified in the requirements but wasn't found in the current environment.") + if specified_ver != req_actual_ver: + logger.warning( + f"Version of {req} was specified to be {specified_ver} in the configuration. However, the actual installed version if {req_actual_ver}. Things might work unexpected." + ) + + final.append((req, specified_ver)) + + return final + + +def _normalize_requirements(reqs): + if not reqs: + return [] + if isinstance(reqs, tuple) and len(reqs) == 2 and isinstance(reqs[0], str): + req_seq: List[Tuple[str, str]] = [reqs] # single pair + else: + req_seq = reqs + return req_seq From 1de4402c2678febb9db4e8f609981c5341d1018c Mon Sep 17 00:00:00 2001 From: sayakpaul Date: Mon, 27 Oct 2025 13:55:17 +0530 Subject: [PATCH 4/4] up --- .../modular_pipelines/modular_pipeline.py | 42 +++------ .../modular_pipeline_utils.py | 85 +++++++++++++++++++ 2 files changed, 95 insertions(+), 32 deletions(-) diff --git a/src/diffusers/modular_pipelines/modular_pipeline.py b/src/diffusers/modular_pipelines/modular_pipeline.py index 8d5a943f458d..02232c7f609e 100644 --- a/src/diffusers/modular_pipelines/modular_pipeline.py +++ b/src/diffusers/modular_pipelines/modular_pipeline.py @@ -32,7 +32,6 @@ from ..utils import PushToHubMixin, is_accelerate_available, logging from ..utils.dynamic_modules_utils import get_class_from_dynamic_module, resolve_trust_remote_code from ..utils.hub_utils import load_or_create_model_card, populate_model_card -from ..utils.import_utils import _is_package_available from .components_manager import ComponentsManager from .modular_pipeline_utils import ( ComponentSpec, @@ -40,6 +39,7 @@ InputParam, InsertableDict, OutputParam, + _validate_requirements, format_components, format_configs, make_doc_string, @@ -240,7 +240,7 @@ class ModularPipelineBlocks(ConfigMixin, PushToHubMixin): config_name = "modular_config.json" model_name = None - _requirements: Union[List[Tuple[str, str]], Tuple[str, str]] = None + _requirements: Optional[Dict[str, str]] = None @classmethod def _get_signature_keys(cls, obj): @@ -1143,6 +1143,14 @@ def doc(self): expected_configs=self.expected_configs, ) + @property + def _requirements(self) -> Dict[str, str]: + requirements = {} + for block_name, block in self.sub_blocks.items(): + if getattr(block, "_requirements", None): + requirements[block_name] = block._requirements + return requirements + class LoopSequentialPipelineBlocks(ModularPipelineBlocks): """ @@ -2547,33 +2555,3 @@ def __call__(self, state: PipelineState = None, output: Union[str, List[str]] = return state.get(output) else: raise ValueError(f"Output '{output}' is not a valid output type") - - -def _validate_requirements(reqs): - normalized_reqs = _normalize_requirements(reqs) - if not normalized_reqs: - return [] - - final: List[Tuple[str, str]] = [] - for req, specified_ver in normalized_reqs: - req_available, req_actual_ver = _is_package_available(req) - if not req_available: - raise ValueError(f"{req} was specified in the requirements but wasn't found in the current environment.") - if specified_ver != req_actual_ver: - logger.warning( - f"Version of {req} was specified to be {specified_ver} in the configuration. However, the actual installed version if {req_actual_ver}. Things might work unexpected." - ) - - final.append((req, specified_ver)) - - return final - - -def _normalize_requirements(reqs): - if not reqs: - return [] - if isinstance(reqs, tuple) and len(reqs) == 2 and isinstance(reqs[0], str): - req_seq: List[Tuple[str, str]] = [reqs] # single pair - else: - req_seq = reqs - return req_seq diff --git a/src/diffusers/modular_pipelines/modular_pipeline_utils.py b/src/diffusers/modular_pipelines/modular_pipeline_utils.py index b15126868634..50190305be5e 100644 --- a/src/diffusers/modular_pipelines/modular_pipeline_utils.py +++ b/src/diffusers/modular_pipelines/modular_pipeline_utils.py @@ -19,9 +19,11 @@ from typing import Any, Dict, List, Literal, Optional, Type, Union import torch +from packaging.specifiers import InvalidSpecifier, SpecifierSet from ..configuration_utils import ConfigMixin, FrozenDict from ..utils import is_torch_available, logging +from ..utils.import_utils import _is_package_available if is_torch_available(): @@ -670,3 +672,86 @@ def make_doc_string( output += format_output_params(outputs, indent_level=2) return output + + +def _validate_requirements(reqs): + if reqs is None: + normalized_reqs = {} + else: + if not isinstance(reqs, dict): + raise ValueError( + "Requirements must be provided as a dictionary mapping package names to version specifiers." + ) + normalized_reqs = _normalize_requirements(reqs) + + if not normalized_reqs: + return {} + + final: Dict[str, str] = {} + for req, specified_ver in normalized_reqs.items(): + req_available, req_actual_ver = _is_package_available(req) + if not req_available: + logger.warning(f"{req} was specified in the requirements but wasn't found in the current environment.") + + if specified_ver: + try: + specifier = SpecifierSet(specified_ver) + except InvalidSpecifier as err: + raise ValueError(f"Requirement specifier '{specified_ver}' for {req} is invalid.") from err + + if req_actual_ver == "N/A": + logger.warning( + f"Version of {req} could not be determined to validate requirement '{specified_ver}'. Things might work unexpected." + ) + elif not specifier.contains(req_actual_ver, prereleases=True): + logger.warning( + f"{req} requirement '{specified_ver}' is not satisfied by the installed version {req_actual_ver}. Things might work unexpected." + ) + + final[req] = specified_ver + + return final + + +def _normalize_requirements(reqs): + if not reqs: + return {} + + normalized: "OrderedDict[str, str]" = OrderedDict() + + def _accumulate(mapping: Dict[str, Any]): + for pkg, spec in mapping.items(): + if isinstance(spec, dict): + # This is recursive because blocks are composable. This way, we can merge requirements + # from multiple blocks. + _accumulate(spec) + continue + + pkg_name = str(pkg).strip() + if not pkg_name: + raise ValueError("Requirement package name cannot be empty.") + + spec_str = "" if spec is None else str(spec).strip() + if spec_str and not spec_str.startswith(("<", ">", "=", "!", "~")): + spec_str = f"=={spec_str}" + + existing_spec = normalized.get(pkg_name) + if existing_spec is not None: + if not existing_spec and spec_str: + normalized[pkg_name] = spec_str + elif existing_spec and spec_str and existing_spec != spec_str: + try: + combined_spec = SpecifierSet(",".join(filter(None, [existing_spec, spec_str]))) + except InvalidSpecifier: + logger.warning( + f"Conflicting requirements for '{pkg_name}' detected: '{existing_spec}' vs '{spec_str}'. Keeping '{existing_spec}'." + ) + else: + normalized[pkg_name] = str(combined_spec) + continue + + normalized[pkg_name] = spec_str + + _accumulate(reqs) + + return normalized