Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 23 additions & 4 deletions crates/hyperqueue/src/bin/hq.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@ use hyperqueue::client::commands::data::command_task_data;
use hyperqueue::client::commands::doc::command_doc;
use hyperqueue::client::commands::job::{
JobCancelOpts, JobCatOpts, JobCloseOpts, JobForgetOpts, JobInfoOpts, JobListOpts,
JobTaskIdsOpts, cancel_job, close_job, forget_job, output_job_cat, output_job_detail,
output_job_list, output_job_summary,
JobTaskIdsOpts, JobWorkdirOpts, cancel_job, close_job, forget_job, output_job_cat,
output_job_detail, output_job_list, output_job_summary, output_job_workdir,
};
use hyperqueue::client::commands::journal::command_journal;
use hyperqueue::client::commands::outputlog::command_reader;
Expand All @@ -31,8 +31,9 @@ use hyperqueue::client::output::outputs::{Output, Outputs};
use hyperqueue::client::output::quiet::Quiet;
use hyperqueue::client::status::Status;
use hyperqueue::client::task::{
TaskCommand, TaskExplainOpts, TaskInfoOpts, TaskListOpts, output_job_task_explain,
output_job_task_ids, output_job_task_info, output_job_task_list,
TaskCommand, TaskExplainOpts, TaskInfoOpts, TaskListOpts, TaskWorkdirOpts,
output_job_task_explain, output_job_task_ids, output_job_task_info, output_job_task_list,
output_job_task_workdir,
};
use hyperqueue::common::cli::{
ColorPolicy, CommonOpts, DeploySshOpts, GenerateCompletionOpts, HwDetectOpts, JobCommand,
Expand Down Expand Up @@ -140,6 +141,14 @@ async fn command_job_close(gsettings: &GlobalSettings, opts: JobCloseOpts) -> an
close_job(gsettings, &mut connection, opts.selector).await
}

async fn command_job_workdir(
gsettings: &GlobalSettings,
opts: JobWorkdirOpts,
) -> anyhow::Result<()> {
let mut connection = get_client_session(gsettings.server_directory()).await?;
output_job_workdir(gsettings, &mut connection, opts.selector).await
}

async fn command_job_delete(gsettings: &GlobalSettings, opts: JobForgetOpts) -> anyhow::Result<()> {
let mut connection = get_client_session(gsettings.server_directory()).await?;
forget_job(gsettings, &mut connection, opts).await
Expand Down Expand Up @@ -212,6 +221,14 @@ async fn command_task_explain(
output_job_task_explain(gsettings, &mut session, opts).await
}

async fn command_task_workdir(
gsettings: &GlobalSettings,
opts: TaskWorkdirOpts,
) -> anyhow::Result<()> {
let mut session = get_client_session(gsettings.server_directory()).await?;
output_job_task_workdir(gsettings, &mut session, opts).await
}

async fn command_worker_start(
gsettings: &GlobalSettings,
opts: WorkerStartOpts,
Expand Down Expand Up @@ -496,6 +513,7 @@ async fn main() -> hyperqueue::Result<()> {
JobCommand::TaskIds(opts) => command_job_task_ids(&gsettings, opts).await,
JobCommand::Open(opts) => command_job_open(&gsettings, opts).await,
JobCommand::Close(opts) => command_job_close(&gsettings, opts).await,
JobCommand::Workdir(opts) => command_job_workdir(&gsettings, opts).await,
},
SubCommand::Submit(opts) => {
command_job_submit(&gsettings, OptsWithMatches::new(opts, matches)).await
Expand All @@ -504,6 +522,7 @@ async fn main() -> hyperqueue::Result<()> {
TaskCommand::List(opts) => command_task_list(&gsettings, opts).await,
TaskCommand::Info(opts) => command_task_info(&gsettings, opts).await,
TaskCommand::Explain(opts) => command_task_explain(&gsettings, opts).await,
TaskCommand::Workdir(opts) => command_task_workdir(&gsettings, opts).await,
},
SubCommand::Data(opts) => command_task_data(&gsettings, opts).await,
#[cfg(feature = "dashboard")]
Expand Down
41 changes: 41 additions & 0 deletions crates/hyperqueue/src/client/commands/job.rs
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,13 @@ pub struct JobCatOpts {
pub stream: OutputStream,
}

#[derive(Parser)]
pub struct JobWorkdirOpts {
/// Single ID, ID range or `last` to display the most recently submitted job
#[arg(value_parser = parse_last_all_range)]
pub selector: IdSelector,
}

pub async fn output_job_list(
gsettings: &GlobalSettings,
session: &mut ClientSession,
Expand Down Expand Up @@ -340,3 +347,37 @@ pub async fn forget_job(

Ok(())
}

pub async fn output_job_workdir(
gsettings: &GlobalSettings,
session: &mut ClientSession,
selector: IdSelector,
) -> anyhow::Result<()> {
let message = FromClientMessage::JobDetail(JobDetailRequest {
job_id_selector: selector,
task_selector: Some(TaskSelector {
id_selector: TaskIdSelector::All,
status_selector: TaskStatusSelector::All,
}),
});
let response =
rpc_call!(session.connection(), message, ToClientMessage::JobDetailResponse(r) => r)
.await?;

let jobs: Vec<JobDetail> = response
.details
.into_iter()
.filter_map(|(id, job)| match job {
Some(job) => Some(job),
None => {
log::error!("Job {id} not found");
None
}
})
.collect();

gsettings
.printer()
.print_job_workdir(jobs, &response.server_uid);
Ok(())
}
47 changes: 47 additions & 0 deletions crates/hyperqueue/src/client/output/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -472,6 +472,19 @@ impl Output for CliOutput {
}
}

fn print_task_workdir(&self, jobs: Vec<(JobId, JobDetail)>, server_uid: &str) {
for (job_id, job) in jobs {
let task_paths = resolve_task_paths(&job, server_uid);

println!("Job {}:", job_id);
for (task_id, resolved_paths) in task_paths.iter() {
if let Some(paths) = resolved_paths {
println!(" Task {}: {}", task_id.as_num(), paths.cwd.display());
}
}
}
}

fn print_job_list(&self, jobs: Vec<JobInfo>, total_jobs: usize) {
let job_count = jobs.len();
let mut has_opened = false;
Expand Down Expand Up @@ -632,6 +645,40 @@ impl Output for CliOutput {
}
}

fn print_job_workdir(&self, jobs: Vec<JobDetail>, server_uid: &str) {
for job in jobs {
let task_paths = resolve_task_paths(&job, server_uid);

// Collect unique working directories
let mut workdirs: std::collections::BTreeSet<String> =
std::collections::BTreeSet::new();

// Add submission directory(s)
for submit_desc in &job.submit_descs {
workdirs.insert(
submit_desc
.description()
.submit_dir
.to_string_lossy()
.to_string(),
);
}

// Add task working directories
for (_, resolved_paths) in task_paths.iter() {
if let Some(paths) = resolved_paths {
workdirs.insert(paths.cwd.to_string_lossy().to_string());
}
}

// Print job header and working directories
println!("Job {}:", job.info.id);
for workdir in workdirs {
println!(" {}", workdir);
}
}
}

fn print_job_wait(
&self,
duration: Duration,
Expand Down
60 changes: 60 additions & 0 deletions crates/hyperqueue/src/client/output/json.rs
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,43 @@ impl Output for JsonOutput {
self.print(Value::Array(job_details));
}

fn print_job_workdir(&self, jobs: Vec<JobDetail>, server_uid: &str) {
let job_workdirs: Vec<_> = jobs
.into_iter()
.map(|job| {
let task_paths = resolve_task_paths(&job, server_uid);

// Collect unique working directories
let mut workdirs: std::collections::BTreeSet<String> =
std::collections::BTreeSet::new();

// Add submission directory(s)
for submit_desc in &job.submit_descs {
workdirs.insert(
submit_desc
.description()
.submit_dir
.to_string_lossy()
.to_string(),
);
}

// Add task working directories
for (_, resolved_paths) in task_paths.iter() {
if let Some(paths) = resolved_paths {
workdirs.insert(paths.cwd.to_string_lossy().to_string());
}
}

json!({
"job_id": job.info.id,
"workdirs": workdirs.into_iter().collect::<Vec<_>>()
})
})
.collect();
self.print(Value::Array(job_workdirs));
}

fn print_job_wait(
&self,
duration: Duration,
Expand Down Expand Up @@ -221,6 +258,29 @@ impl Output for JsonOutput {
self.print(json!(map));
}

fn print_task_workdir(&self, jobs: Vec<(JobId, JobDetail)>, server_uid: &str) {
let task_workdirs: Vec<_> = jobs
.into_iter()
.map(|(job_id, job)| {
let task_paths = resolve_task_paths(&job, server_uid);
let tasks: HashMap<u32, String> = task_paths
.iter()
.filter_map(|(task_id, resolved_paths)| {
resolved_paths.as_ref().map(|paths| {
(task_id.as_num(), paths.cwd.to_string_lossy().to_string())
})
})
.collect();

json!({
"job_id": job_id,
"tasks": tasks
})
})
.collect();
self.print(Value::Array(task_workdirs));
}

fn print_summary(&self, filename: &Path, summary: Summary) {
let json = json!({
"filename": filename,
Expand Down
2 changes: 2 additions & 0 deletions crates/hyperqueue/src/client/output/outputs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ pub trait Output {
fn print_job_list(&self, jobs: Vec<JobInfo>, total_jobs: usize);
fn print_job_summary(&self, jobs: Vec<JobInfo>);
fn print_job_detail(&self, jobs: Vec<JobDetail>, worker_map: WorkerMap, server_uid: &str);
fn print_job_workdir(&self, jobs: Vec<JobDetail>, server_uid: &str);
fn print_job_wait(
&self,
duration: Duration,
Expand Down Expand Up @@ -80,6 +81,7 @@ pub trait Output {
verbosity: Verbosity,
);
fn print_task_ids(&self, jobs_task_id: Vec<(JobId, IntArray)>);
fn print_task_workdir(&self, jobs: Vec<(JobId, JobDetail)>, server_uid: &str);

// Stream
fn print_summary(&self, path: &Path, summary: Summary);
Expand Down
4 changes: 4 additions & 0 deletions crates/hyperqueue/src/client/output/quiet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,8 @@ impl Output for Quiet {
}
fn print_job_detail(&self, _jobs: Vec<JobDetail>, _worker_map: WorkerMap, _server_uid: &str) {}

fn print_job_workdir(&self, _jobs: Vec<JobDetail>, _server_uid: &str) {}

fn print_job_wait(
&self,
_duration: Duration,
Expand Down Expand Up @@ -139,6 +141,8 @@ impl Output for Quiet {

fn print_task_ids(&self, _job_task_ids: Vec<(JobId, IntArray)>) {}

fn print_task_workdir(&self, _jobs: Vec<(JobId, JobDetail)>, _server_uid: &str) {}

// Stream
fn print_summary(&self, _filename: &Path, _summary: Summary) {}

Expand Down
53 changes: 53 additions & 0 deletions crates/hyperqueue/src/client/task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ pub enum TaskCommand {
Info(TaskInfoOpts),
/// Explain if task can run on a selected worker
Explain(TaskExplainOpts),
/// Display working directory of selected task(s)
Workdir(TaskWorkdirOpts),
}

#[derive(clap::Parser)]
Expand Down Expand Up @@ -65,6 +67,16 @@ pub struct TaskExplainOpts {
pub task_id: JobTaskId,
}

#[derive(clap::Parser)]
pub struct TaskWorkdirOpts {
/// Select specific job
#[arg(value_parser = parse_last_single_id)]
pub job_selector: SingleIdSelector,

/// Select specific task(s)
pub task_selector: IntArray,
}

pub async fn output_job_task_list(
gsettings: &GlobalSettings,
session: &mut ClientSession,
Expand Down Expand Up @@ -200,3 +212,44 @@ pub async fn output_job_task_explain(
.print_explanation(response.task_id, &response.explanation);
Ok(())
}

pub async fn output_job_task_workdir(
gsettings: &GlobalSettings,
session: &mut ClientSession,
opts: TaskWorkdirOpts,
) -> anyhow::Result<()> {
let task_selector = TaskSelector {
id_selector: TaskIdSelector::Specific(opts.task_selector),
status_selector: TaskStatusSelector::All,
};

let job_id_selector = match opts.job_selector {
SingleIdSelector::Specific(id) => IdSelector::Specific(IntArray::from_id(id)),
SingleIdSelector::Last => IdSelector::LastN(1),
};

let message = FromClientMessage::JobDetail(JobDetailRequest {
job_id_selector,
task_selector: Some(task_selector),
});
let response =
rpc_call!(session.connection(), message, ToClientMessage::JobDetailResponse(r) => r)
.await?;

let jobs = response
.details
.into_iter()
.filter_map(|(job_id, opt_job)| match opt_job {
Some(job) => Some((job_id, job)),
None => {
log::warn!("Job {job_id} not found");
None
}
})
.collect();

gsettings
.printer()
.print_task_workdir(jobs, &response.server_uid);
Ok(())
}
4 changes: 3 additions & 1 deletion crates/hyperqueue/src/common/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use crate::client::commands::data::DataOpts;
use crate::client::commands::doc::DocOpts;
use crate::client::commands::job::{
JobCancelOpts, JobCatOpts, JobCloseOpts, JobForgetOpts, JobInfoOpts, JobListOpts,
JobTaskIdsOpts,
JobTaskIdsOpts, JobWorkdirOpts,
};
use crate::client::commands::journal::JournalOpts;
use crate::client::commands::outputlog::OutputLogOpts;
Expand Down Expand Up @@ -374,6 +374,8 @@ pub enum JobCommand {
Open(SubmitJobConfOpts),
/// Close an open job
Close(JobCloseOpts),
/// Display working directory of selected job(s)
Workdir(JobWorkdirOpts),
}

#[derive(Parser)]
Expand Down
Loading
Loading