@@ -164,6 +164,40 @@ def extract_wheel_file(
164164 logger .debug (
165165 f"[Rank { rank } ] Starting extraction of { wheel_path } to { extract_dir } "
166166 )
167+
168+ # Check for stale/corrupt extraction and clean up if needed
169+ if lock_file .exists ():
170+ try :
171+ lock_age = time .time () - lock_file .stat ().st_mtime
172+ if lock_age > 300 : # 5 minutes
173+ logger .warning (
174+ f"[Rank { rank } ] Detected stale lock file (age: { lock_age :.1f} s). "
175+ f"Previous extraction likely crashed. Cleaning up and retrying..."
176+ )
177+ # Clean up stale extraction
178+ import shutil
179+
180+ if extract_dir .exists ():
181+ shutil .rmtree (extract_dir , ignore_errors = True )
182+ logger .info (
183+ f"[Rank { rank } ] Cleaned up stale extraction directory: { extract_dir } "
184+ )
185+ elif not plugin_lib_path .exists ():
186+ logger .warning (
187+ f"[Rank { rank } ] Lock file exists but plugin missing. "
188+ f"Previous extraction incomplete. Cleaning up and retrying..."
189+ )
190+ # Clean up incomplete extraction
191+ import shutil
192+
193+ if extract_dir .exists ():
194+ shutil .rmtree (extract_dir , ignore_errors = True )
195+ logger .info (
196+ f"[Rank { rank } ] Cleaned up incomplete extraction directory: { extract_dir } "
197+ )
198+ except Exception as e :
199+ logger .warning (f"[Rank { rank } ] Error checking stale lock: { e } " )
200+
167201 # If another job already finished earlier, skip immediately
168202 if plugin_lib_path .exists ():
169203 logger .debug (
@@ -189,14 +223,14 @@ def extract_wheel_file(
189223 # Only one process should be able to create the lock file with O_EXCL
190224 logger .debug (f"[Rank { rank } ] Attempting to acquire lock: { lock_file } " )
191225 acquire_start_time = time .time ()
226+ # Re-check in case extractor finished while we waited
227+ if plugin_lib_path .exists ():
228+ logger .debug (
229+ f"[Rank { rank } ] Plugin appeared at { plugin_lib_path } during acquire, skipping extraction"
230+ )
231+ return
192232 while True :
193233 try :
194- # Re-check in case extractor finished while we waited
195- if plugin_lib_path .exists ():
196- logger .debug (
197- f"[Rank { rank } ] Plugin appeared at { plugin_lib_path } during acquire, skipping extraction"
198- )
199- return
200234 lock_fd = os .open (str (lock_file ), os .O_CREAT | os .O_EXCL | os .O_RDWR )
201235 logger .debug (f"[Rank { rank } ] Successfully acquired lock" )
202236 # write lock owner metadata for race condition time logging
@@ -223,6 +257,16 @@ def extract_wheel_file(
223257 logger .debug (
224258 f"[Rank { rank } ] Plugin already present at { plugin_lib_path } after acquire, skipping extraction"
225259 )
260+ # Clean up lock and return, since lock already acquired
261+ try :
262+ if lock_fd is not None :
263+ os .close (lock_fd )
264+ except Exception as e :
265+ logger .debug (f"[Rank { rank } ] Failed to close lock fd: { e } " )
266+ try :
267+ lock_file .unlink (missing_ok = True )
268+ except Exception as e :
269+ logger .debug (f"[Rank { rank } ] Failed to unlink lock file: { e } " )
226270 return
227271 # With lock held, perform extraction
228272 logger .debug (
@@ -426,9 +470,33 @@ def load_tensorrt_llm_for_nccl() -> bool:
426470 Attempts to load the TensorRT-LLM plugin and initialize it.
427471 Either the env variable TRTLLM_PLUGINS_PATH can specify the path
428472 Or the user can specify USE_TRTLLM_PLUGINS as either of (1, true, yes, on) to download the TRT-LLM distribution and load it
473+
474+ Environment Variables:
475+ TRTLLM_PLUGINS_PATH: Path to pre-installed TensorRT-LLM plugin library
476+ USE_TRTLLM_PLUGINS: Set to 1/true/yes/on to auto-download plugin
477+ TRTLLM_FORCE_CLEANUP: Set to 1 to force cleanup of cached files on startup (useful after mpirunbus errors)
478+
429479 Returns:
430480 bool: True if the plugin was successfully loaded and initialized, False otherwise.
431481 """
482+ if os .environ .get ("TRTLLM_FORCE_CLEANUP" , "0" ).lower () in (
483+ "1" ,
484+ "true" ,
485+ "yes" ,
486+ "on" ,
487+ ):
488+ import shutil
489+
490+ cache_root = _cache_root ()
491+ if cache_root .exists ():
492+ logger .warning (
493+ f"TRTLLM_FORCE_CLEANUP=1 detected. Cleaning up cache: { cache_root } "
494+ )
495+ shutil .rmtree (cache_root , ignore_errors = True )
496+ logger .info (
497+ "Cache cleaned up. Proceeding with fresh download/extraction..."
498+ )
499+
432500 if not is_platform_supported_for_trtllm ():
433501 return False
434502 plugin_lib_path = os .environ .get ("TRTLLM_PLUGINS_PATH" )
0 commit comments