Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 5 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -67,9 +67,11 @@ High-throughput analyses rely on predictable reference data access. Daylily prov
## Remote Data Staging & Pipeline Execution
Daylily’s workflow helpers bridge the gap between local manifests and remote execution:

1. Use `bin/daylily-stage-analysis-samples` from your workstation to pick a cluster, upload an `analysis_samples.tsv`, and invoke the head-node staging utility. The script downloads data from S3/HTTP/local paths, merges multi-lane FASTQs, and writes canonical `config/samples.tsv` and `config/units.tsv` files to your chosen staging directory.
2. The staging utility automatically validates AWS credentials, materializes concordance/control payloads, normalizes metadata, and reports where to copy the generated configs.
3. When you are ready to launch a workflow, `bin/daylily-run-ephemeral-cluster-remote-tests` can log into the head node, clone the selected pipeline (as configured in the YAML registry), and start the run in a tmux session for detached execution.
1. Use `bin/daylily-stage-analysis-samples` from your workstation to pick a cluster, upload an `analysis_samples.tsv`, and invoke the head-node staging utility. The script downloads data from S3/HTTP/local paths, merges multi-lane FASTQs, and writes canonical `config/samples.tsv` and `config/units.tsv` files to your chosen staging directory.
2. The staging utility automatically validates AWS credentials, materializes concordance/control payloads, normalizes metadata, and reports where to copy the generated configs.
3. When you are ready to launch a workflow, `bin/daylily-run-ephemeral-cluster-remote-tests` can log into the head node, clone the selected pipeline (as configured in the YAML registry), and start the run in a tmux session for detached execution.

For fully automated operations, the repository now includes `bin/daylily-monitor-worksets`, a long-running daemon that watches an S3 prefix for ready worksets, acquires a cooperative lock via sentinel files, stages inputs, provisions clusters, launches the requested pipeline, and mirrors the finished run back to S3. See [docs/workset-monitor.md](docs/workset-monitor.md) for configuration details.

These tools make it straightforward to stage data once, reuse it across pipelines, and keep critical control material co-located with sample inputs.

Expand Down
66 changes: 66 additions & 0 deletions bin/daylily-monitor-worksets
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
#!/usr/bin/env python3
"""Monitor S3 workset directories and launch Daylily pipelines automatically."""

from __future__ import annotations

import argparse
import logging
from pathlib import Path
from typing import Optional

from daylib.workset_monitor import S3Location, S3WorksetMonitor


def _configure_logging(level: str, log_file: Optional[Path]) -> None:
log_level = getattr(logging, level.upper(), logging.INFO)
handlers = [logging.StreamHandler()]
if log_file:
log_file.parent.mkdir(parents=True, exist_ok=True)
handlers.append(logging.FileHandler(log_file))

logging.basicConfig(
level=log_level,
format="%(asctime)s %(levelname)s %(name)s: %(message)s",
handlers=handlers,
)


def parse_args() -> argparse.Namespace:
parser = argparse.ArgumentParser(description=__doc__)
parser.add_argument("s3_root", help="Root S3 URI (e.g. s3://bucket/worksets)")
parser.add_argument("--aws-profile", dest="aws_profile", help="AWS profile name", default=None)
parser.add_argument("--aws-region", dest="aws_region", help="AWS region", default=None)
parser.add_argument("--poll-seconds", type=int, default=60, help="Polling interval in seconds")
parser.add_argument("--local-root", type=Path, help="Local directory for staging worksets")
parser.add_argument("--log-level", default="INFO", help="Logging level (default: INFO)")
parser.add_argument("--log-file", type=Path, help="Optional log file path")
parser.add_argument(
"--run-once",
action="store_true",
help="Process currently ready worksets once and exit instead of looping",
)
return parser.parse_args()


def main() -> None:
args = parse_args()
_configure_logging(args.log_level, args.log_file)

location = S3Location.parse(args.s3_root)
monitor = S3WorksetMonitor(
root=location,
aws_profile=args.aws_profile,
aws_region=args.aws_region,
poll_seconds=args.poll_seconds,
local_root=args.local_root,
)

if args.run_once:
logging.getLogger(__name__).info("Processing ready worksets once")
monitor.process_ready_once()
else:
monitor.run_forever()


if __name__ == "__main__":
main()
Loading