diff --git a/scripts/batch_run.yml b/scripts/batch_run.yml index f2c5e05..5c22460 100644 --- a/scripts/batch_run.yml +++ b/scripts/batch_run.yml @@ -1,10 +1,10 @@ -run_time: 4 +run_time: 3 params: request_freq: - low: - - middle: - - high: + # - middle: + # - high: dag_type: # - single: # - mix: @@ -16,9 +16,9 @@ params: mech_scale_sche: scale_sche_joint: scale_num: - # - hpa: + - hpa: # - temp_scaler: - - ensure_scaler: + # - ensure_scaler: scale_down_exec: - default: scale_up_exec: @@ -26,30 +26,33 @@ mech_scale_sche: sche: # - bp_balance: # - pos: greedy - - ensure_scheduler: + # - ensure_scheduler: + - priority: a + - heft: + - priority: b filter: # - [] - [{'careful_down':''}] - scale_sche_separated: - scale_num: - - temp_scaler: - - hpa: - - lass: - scale_down_exec: - - default: - scale_up_exec: - - least_task: - sche: - # - greedy: - # - hash: - # - random: - - load_least: - # - rotate: - - pass: - filter: - # - [] - - [{'careful_down':''}] + # scale_sche_separated: + # scale_num: + # - temp_scaler: + # - hpa: + # - lass: + # scale_down_exec: + # - default: + # scale_up_exec: + # - least_task: + # sche: + # # - greedy: + # # - hash: + # # - random: + # - load_least: + # # - rotate: + # - pass: + # filter: + # # - [] + # - [{'careful_down':''}] # no_scale: # scale_num: diff --git a/scripts/fast_draw.yml b/scripts/fast_draw.yml index 95c5c75..379889e 100644 --- a/scripts/fast_draw.yml +++ b/scripts/fast_draw.yml @@ -11,22 +11,9 @@ filter: ## each group bars targets_alias: -- [{'rand_seed': '', 'request_freq': 'low', 'dag_type': 'single', 'cold_start': 'high', 'fn_type': 'cpu', 'scale_num': 'ensure_scaler.', 'scale_down_exec': 'default.', 'scale_up_exec': 'least_task.', 'sche': 'ensure_scheduler.', 'instance_cache_policy': 'no_evict.', 'filter': '(careful_down.)', 'no_mech_latency': '1'}, 'sd.rflow.dtsingle.cshigh.ftcpu.nml1.mtscale_sche_joint.scl(ensure_scaler.)(default.)(least_task.)[(careful_down.)].scd(ensure_scheduler.).ic(no_evict.)'] -- [{'rand_seed': '', 'request_freq': 'low', 'dag_type': 'single', 'cold_start': 'high', 'fn_type': 'cpu', 'scale_num': 'hpa.', 'scale_down_exec': 'default.', 'scale_up_exec': 'least_task.', 'sche': 'bp_balance.', 'instance_cache_policy': 'no_evict.', 'filter': '(careful_down.)', 'no_mech_latency': '1'}, 'sd.rflow.dtsingle.cshigh.ftcpu.nml1.mtscale_sche_joint.scl(hpa.)(default.)(least_task.)[(careful_down.)].scd(bp_balance.).ic(no_evict.)'] -- [{'rand_seed': '', 'request_freq': 'low', 'dag_type': 'single', 'cold_start': 'high', 'fn_type': 'cpu', 'scale_num': 'hpa.', 'scale_down_exec': 'default.', 'scale_up_exec': 'least_task.', 'sche': 'pos.greedy', 'instance_cache_policy': 'no_evict.', 'filter': '(careful_down.)', 'no_mech_latency': '1'}, 'sd.rflow.dtsingle.cshigh.ftcpu.nml1.mtscale_sche_joint.scl(hpa.)(default.)(least_task.)[(careful_down.)].scd(pos.greedy).ic(no_evict.)'] -- [{'rand_seed': '', 'request_freq': 'low', 'dag_type': 'single', 'cold_start': 'high', 'fn_type': 'cpu', 'scale_num': 'temp_scaler.', 'scale_down_exec': 'default.', 'scale_up_exec': 'least_task.', 'sche': 'bp_balance.', 'instance_cache_policy': 'no_evict.', 'filter': '(careful_down.)', 'no_mech_latency': '1'}, 'sd.rflow.dtsingle.cshigh.ftcpu.nml1.mtscale_sche_joint.scl(temp_scaler.)(default.)(least_task.)[(careful_down.)].scd(bp_balance.).ic(no_evict.)'] -- [{'rand_seed': '', 'request_freq': 'low', 'dag_type': 'single', 'cold_start': 'high', 'fn_type': 'cpu', 'scale_num': 'temp_scaler.', 'scale_down_exec': 'default.', 'scale_up_exec': 'least_task.', 'sche': 'pos.greedy', 'instance_cache_policy': 'no_evict.', 'filter': '(careful_down.)', 'no_mech_latency': '1'}, 'sd.rflow.dtsingle.cshigh.ftcpu.nml1.mtscale_sche_joint.scl(temp_scaler.)(default.)(least_task.)[(careful_down.)].scd(pos.greedy).ic(no_evict.)'] -- [{'rand_seed': '', 'request_freq': 'low', 'dag_type': 'single', 'cold_start': 'high', 'fn_type': 'cpu', 'scale_num': 'hpa.', 'scale_down_exec': 'default.', 'scale_up_exec': 'least_task.', 'sche': 'greedy.', 'instance_cache_policy': 'no_evict.', 'filter': '(careful_down.)', 'no_mech_latency': '1'}, 'sd.rflow.dtsingle.cshigh.ftcpu.nml1.mtscale_sche_separated.scl(hpa.)(default.)(least_task.)[(careful_down.)].scd(greedy.).ic(no_evict.)'] -- [{'rand_seed': '', 'request_freq': 'low', 'dag_type': 'single', 'cold_start': 'high', 'fn_type': 'cpu', 'scale_num': 'hpa.', 'scale_down_exec': 'default.', 'scale_up_exec': 'least_task.', 'sche': 'hash.', 'instance_cache_policy': 'no_evict.', 'filter': '(careful_down.)', 'no_mech_latency': '1'}, 'sd.rflow.dtsingle.cshigh.ftcpu.nml1.mtscale_sche_separated.scl(hpa.)(default.)(least_task.)[(careful_down.)].scd(hash.).ic(no_evict.)'] -- [{'rand_seed': '', 'request_freq': 'low', 'dag_type': 'single', 'cold_start': 'high', 'fn_type': 'cpu', 'scale_num': 'hpa.', 'scale_down_exec': 'default.', 'scale_up_exec': 'least_task.', 'sche': 'load_least.', 'instance_cache_policy': 'no_evict.', 'filter': '(careful_down.)', 'no_mech_latency': '1'}, 'sd.rflow.dtsingle.cshigh.ftcpu.nml1.mtscale_sche_separated.scl(hpa.)(default.)(least_task.)[(careful_down.)].scd(load_least.).ic(no_evict.)'] -- [{'rand_seed': '', 'request_freq': 'low', 'dag_type': 'single', 'cold_start': 'high', 'fn_type': 'cpu', 'scale_num': 'hpa.', 'scale_down_exec': 'default.', 'scale_up_exec': 'least_task.', 'sche': 'pass.', 'instance_cache_policy': 'no_evict.', 'filter': '(careful_down.)', 'no_mech_latency': '1'}, 'sd.rflow.dtsingle.cshigh.ftcpu.nml1.mtscale_sche_separated.scl(hpa.)(default.)(least_task.)[(careful_down.)].scd(pass.).ic(no_evict.)'] -- [{'rand_seed': '', 'request_freq': 'low', 'dag_type': 'single', 'cold_start': 'high', 'fn_type': 'cpu', 'scale_num': 'hpa.', 'scale_down_exec': 'default.', 'scale_up_exec': 'least_task.', 'sche': 'rotate.', 'instance_cache_policy': 'no_evict.', 'filter': '(careful_down.)', 'no_mech_latency': '1'}, 'sd.rflow.dtsingle.cshigh.ftcpu.nml1.mtscale_sche_separated.scl(hpa.)(default.)(least_task.)[(careful_down.)].scd(rotate.).ic(no_evict.)'] -- [{'rand_seed': '', 'request_freq': 'low', 'dag_type': 'single', 'cold_start': 'high', 'fn_type': 'cpu', 'scale_num': 'lass.', 'scale_down_exec': 'default.', 'scale_up_exec': 'least_task.', 'sche': 'load_least.', 'instance_cache_policy': 'no_evict.', 'filter': '(careful_down.)', 'no_mech_latency': '1'}, 'sd.rflow.dtsingle.cshigh.ftcpu.nml1.mtscale_sche_separated.scl(lass.)(default.)(least_task.)[(careful_down.)].scd(load_least.).ic(no_evict.)'] -- [{'rand_seed': '', 'request_freq': 'low', 'dag_type': 'single', 'cold_start': 'high', 'fn_type': 'cpu', 'scale_num': 'lass.', 'scale_down_exec': 'default.', 'scale_up_exec': 'least_task.', 'sche': 'pass.', 'instance_cache_policy': 'no_evict.', 'filter': '(careful_down.)', 'no_mech_latency': '1'}, 'sd.rflow.dtsingle.cshigh.ftcpu.nml1.mtscale_sche_separated.scl(lass.)(default.)(least_task.)[(careful_down.)].scd(pass.).ic(no_evict.)'] -- [{'rand_seed': '', 'request_freq': 'low', 'dag_type': 'single', 'cold_start': 'high', 'fn_type': 'cpu', 'scale_num': 'temp_scaler.', 'scale_down_exec': 'default.', 'scale_up_exec': 'least_task.', 'sche': 'greedy.', 'instance_cache_policy': 'no_evict.', 'filter': '(careful_down.)', 'no_mech_latency': '1'}, 'sd.rflow.dtsingle.cshigh.ftcpu.nml1.mtscale_sche_separated.scl(temp_scaler.)(default.)(least_task.)[(careful_down.)].scd(greedy.).ic(no_evict.)'] -- [{'rand_seed': '', 'request_freq': 'low', 'dag_type': 'single', 'cold_start': 'high', 'fn_type': 'cpu', 'scale_num': 'temp_scaler.', 'scale_down_exec': 'default.', 'scale_up_exec': 'least_task.', 'sche': 'load_least.', 'instance_cache_policy': 'no_evict.', 'filter': '(careful_down.)', 'no_mech_latency': '1'}, 'sd.rflow.dtsingle.cshigh.ftcpu.nml1.mtscale_sche_separated.scl(temp_scaler.)(default.)(least_task.)[(careful_down.)].scd(load_least.).ic(no_evict.)'] -- [{'rand_seed': '', 'request_freq': 'low', 'dag_type': 'single', 'cold_start': 'high', 'fn_type': 'cpu', 'scale_num': 'temp_scaler.', 'scale_down_exec': 'default.', 'scale_up_exec': 'least_task.', 'sche': 'pass.', 'instance_cache_policy': 'no_evict.', 'filter': '(careful_down.)', 'no_mech_latency': '1'}, 'sd.rflow.dtsingle.cshigh.ftcpu.nml1.mtscale_sche_separated.scl(temp_scaler.)(default.)(least_task.)[(careful_down.)].scd(pass.).ic(no_evict.)'] -- [{'rand_seed': '', 'request_freq': 'low', 'dag_type': 'single', 'cold_start': 'high', 'fn_type': 'cpu', 'scale_num': 'temp_scaler.', 'scale_down_exec': 'default.', 'scale_up_exec': 'least_task.', 'sche': 'rotate.', 'instance_cache_policy': 'no_evict.', 'filter': '(careful_down.)', 'no_mech_latency': '1'}, 'sd.rflow.dtsingle.cshigh.ftcpu.nml1.mtscale_sche_separated.scl(temp_scaler.)(default.)(least_task.)[(careful_down.)].scd(rotate.).ic(no_evict.)'] +- [{'rand_seed': '', 'request_freq': 'low', 'dag_type': 'dag', 'cold_start': 'high', 'fn_type': 'cpu', 'scale_num': 'hpa.', 'scale_down_exec': 'default.', 'scale_up_exec': 'least_task.', 'sche': 'heft.', 'instance_cache_policy': 'no_evict.', 'filter': '(careful_down.)', 'no_mech_latency': '1'}, 'sd.rflow.dtdag.cshigh.ftcpu.nml1.mtscale_sche_joint.scl(hpa.)(default.)(least_task.)[(careful_down.)].scd(heft.).ic(no_evict.)'] +- [{'rand_seed': '', 'request_freq': 'low', 'dag_type': 'dag', 'cold_start': 'high', 'fn_type': 'cpu', 'scale_num': 'hpa.', 'scale_down_exec': 'default.', 'scale_up_exec': 'least_task.', 'sche': 'priority.a', 'instance_cache_policy': 'no_evict.', 'filter': '(careful_down.)', 'no_mech_latency': '1'}, 'sd.rflow.dtdag.cshigh.ftcpu.nml1.mtscale_sche_joint.scl(hpa.)(default.)(least_task.)[(careful_down.)].scd(priority.a).ic(no_evict.)'] +- [{'rand_seed': '', 'request_freq': 'low', 'dag_type': 'dag', 'cold_start': 'high', 'fn_type': 'cpu', 'scale_num': 'hpa.', 'scale_down_exec': 'default.', 'scale_up_exec': 'least_task.', 'sche': 'priority.b', 'instance_cache_policy': 'no_evict.', 'filter': '(careful_down.)', 'no_mech_latency': '1'}, 'sd.rflow.dtdag.cshigh.ftcpu.nml1.mtscale_sche_joint.scl(hpa.)(default.)(least_task.)[(careful_down.)].scd(priority.b).ic(no_evict.)'] ## group on x axis: diff --git a/serverless_sim/module_conf_es.json b/serverless_sim/module_conf_es.json index 625ae38..86f3731 100644 --- a/serverless_sim/module_conf_es.json +++ b/serverless_sim/module_conf_es.json @@ -1,17 +1,17 @@ { "mech_type": { - "no_scale": null, "scale_sche_separated": null, - "scale_sche_joint": null + "scale_sche_joint": null, + "no_scale": null }, "scale_num": { - "rela": null, + "lass": null, + "temp_scaler": null, + "no": null, "hpa": null, "full_placement": null, - "ensure_scaler": null, - "no": null, - "lass": null, - "temp_scaler": null + "rela": null, + "ensure_scaler": null }, "scale_down_exec": { "default": null @@ -21,25 +21,27 @@ "no": null }, "sche": { - "consistenthash": null, - "ensure_scheduler": null, + "heft": null, + "random": null, + "hash": null, + "pass": null, "faasflow": null, - "load_least": null, "rotate": null, - "hash": null, - "pos": null, - "random": null, - "bp_balance": null, + "ensure_scheduler": null, "fnsche": null, + "pos": null, "greedy": null, - "pass": null + "consistenthash": null, + "load_least": null, + "bp_balance": null, + "priority": null }, "filter": { "careful_down": null }, "instance_cache_policy": { - "lru": null, "fifo": null, + "lru": null, "no_evict": null } } \ No newline at end of file diff --git a/serverless_sim/src/mechanism.rs b/serverless_sim/src/mechanism.rs index 9cc2c2d..f79138a 100644 --- a/serverless_sim/src/mechanism.rs +++ b/serverless_sim/src/mechanism.rs @@ -78,7 +78,7 @@ impl CheckDup for Vec { } } -pub const SCHE_NAMES: [&'static str; 12] = [ +pub const SCHE_NAMES: [&'static str; 14] = [ "rotate", "hash", "bp_balance", @@ -91,6 +91,8 @@ pub const SCHE_NAMES: [&'static str; 12] = [ "consistenthash", // "gofs", "ensure_scheduler", "load_least", + "priority", + "heft" // "load_least", // "random", ]; @@ -222,7 +224,7 @@ impl ConfigNewMec for Config { } } "scale_sche_joint" => { - let allow_sche = vec!["pos", "bp_balance", "ensure_scheduler"]; + let allow_sche = vec!["pos", "bp_balance", "ensure_scheduler", "priority", "heft"]; let allow_scale_num = vec!["hpa", "lass", "temp_scaler", "full_placement", "rela", "ensure_scaler"]; let allow_scale_down_exec = vec!["default"]; let allow_scale_up_exec = vec!["least_task"]; diff --git a/serverless_sim/src/node.rs b/serverless_sim/src/node.rs index f48da46..429079c 100644 --- a/serverless_sim/src/node.rs +++ b/serverless_sim/src/node.rs @@ -100,8 +100,8 @@ impl Node { Self { node_id, rsc_limit: NodeRscLimit { - // cpu: 1000.0, - cpu: 200.0, + cpu: 1000.0, + // cpu: 200.0, mem: 8000.0, }, fn_containers: HashMap::new().into(), @@ -264,9 +264,9 @@ impl Node { Box::new(move |to_replace| { let node = node.as_ref(); // log::info!("节点{}要移除的容器{}", node.node_id, to_replace); - for (_k, v) in node.fn_containers.borrow().iter() { - // log::info!("{}", v.fn_id); - } + // for (_k, v) in node.fn_containers.borrow().iter() { + // log::info!("{}", v.fn_id); + // } node.container(*to_replace).unwrap().is_idle() }), ); diff --git a/serverless_sim/src/request.rs b/serverless_sim/src/request.rs index 700b089..138ea41 100644 --- a/serverless_sim/src/request.rs +++ b/serverless_sim/src/request.rs @@ -359,13 +359,13 @@ impl SimEnv { for (dag_i, &(mut avg_frequency, cv)) in env.help.fn_call_frequency().iter() { if env.help.config().request_freq_low() { - avg_frequency *= 0.1; + avg_frequency *= 0.3; } else if env.help.config().request_freq_middle() { - avg_frequency *= 0.2; + avg_frequency *= 0.5; } else { - avg_frequency *= 0.3; + avg_frequency *= 0.7; } // avg_frequency *= 100.0; // avg_frequency *= 10.0; diff --git a/serverless_sim/src/sche/heft.rs b/serverless_sim/src/sche/heft.rs new file mode 100644 index 0000000..de59226 --- /dev/null +++ b/serverless_sim/src/sche/heft.rs @@ -0,0 +1,242 @@ +use std::{cmp::Ordering, collections::HashMap}; + +use daggy::Walker; +use rand::Rng; + +use crate::{ + fn_dag::{DagId, EnvFnExt, FnId}, + mechanism::{MechanismImpl, ScheCmd, SimEnvObserve}, + mechanism_thread::{MechCmdDistributor, MechScheduleOnceRes}, + node::{EnvNodeExt, NodeId}, + request::Request, + sim_run::{schedule_helper, Scheduler}, + with_env_sub::{WithEnvCore, WithEnvHelp}, +}; + +pub struct HEFTScheduler { + dag_fns_priority: HashMap>, + + node_task_count: HashMap, + + node2node_all_bw: Vec, +} + +impl HEFTScheduler { + pub fn new() -> Self { + HEFTScheduler { + dag_fns_priority: HashMap::new(), + + node_task_count: HashMap::new(), + + node2node_all_bw: Vec::new(), + } + } + + fn prepare(&mut self, env: &SimEnvObserve) { + for node in env.nodes().iter() { + let node_id = node.node_id(); + + self.node_task_count + .insert(node_id, (node.all_task_cnt(), node.running_task_cnt())); + } + } + + fn prepare_node2node_all_bw(&mut self, env: &SimEnvObserve) { + let node_count = env.nodes().len(); + + let node2node_graph = env.core().node2node_graph(); + + for i in 0..node_count { + for j in 0..i { + self.node2node_all_bw.push(node2node_graph[i][j]); + } + } + } + + fn calculate_priority_for_dag_fns(&mut self, req: &Request, env: &SimEnvObserve) { + let dag = env.dag(req.dag_i); + + if !self.dag_fns_priority.contains_key(&dag.dag_i) { + let mut walker = dag.new_dag_walker(); + + let mut map = HashMap::new(); + + let mut stack = vec![]; + + let n = env.nodes().len(); + + while let Some(func_g_i) = walker.next(&dag.dag_inner) { + let fnid = dag.dag_inner[func_g_i]; + let func = env.func(fnid); + + let mut t_sum_exec = 0.0; + for node in env.nodes().iter() { + let node_running_task_count = + self.node_task_count.get(&node.node_id()).unwrap().1; + let each_running_task_cpu = node.rsc_limit.cpu / node_running_task_count as f32; + t_sum_exec += func.cpu / each_running_task_cpu + } + let t_avg_exec = t_sum_exec / n as f32; + + let mut t_sum_trans = 0.0; + for bw in &self.node2node_all_bw { + t_sum_trans += func.out_put_size / bw * 5.0; + } + let t_avg_trans = t_sum_trans / self.node2node_all_bw.len() as f32; + + let initial_priority = t_avg_exec + t_avg_trans; + map.insert(fnid, initial_priority); + stack.push(func_g_i); + } + + while let Some(func_g_i) = stack.pop() { + let nexts: daggy::Children = dag.dag_inner.children(func_g_i); + if let Some(max_node) = nexts.iter(&dag.dag_inner).max_by(|a, b| { + let fnid_a = dag.dag_inner[a.1]; + let fnid_b = dag.dag_inner[b.1]; + + map.get(&fnid_a) + .unwrap() + .total_cmp(map.get(&fnid_b).unwrap()) + }) { + let fnid_max = dag.dag_inner[max_node.1]; + let max = *map.get(&fnid_max).unwrap(); + + let fnid = dag.dag_inner[func_g_i]; + (*map.get_mut(&fnid).unwrap()) += max; + } + } + + let mut priority_order = map.into_iter().collect::>(); + priority_order.sort_by(|a, b| b.1.partial_cmp(&a.1).unwrap_or_else(|| Ordering::Equal)); + self.dag_fns_priority.insert(dag.dag_i, priority_order); + } + } + + fn schedule_one_req( + &mut self, + env: &SimEnvObserve, + mech: &MechanismImpl, + req: &Request, + cmd_distributor: &MechCmdDistributor, + ) { + self.calculate_priority_for_dag_fns(req, env); + + let mech_metric = || env.help().mech_metric_mut(); + + let dag_fns_priority = self.dag_fns_priority.get(&req.dag_i).unwrap(); + + let scheduleable_fns = schedule_helper::collect_task_to_sche( + req, + env, + schedule_helper::CollectTaskConfig::All, + ); + + // 可调度的而函数按优先级排序 + let sorted_scheduleable_fns = { + let mut sorted = Vec::new(); + for (fn_id, _) in dag_fns_priority { + if scheduleable_fns.contains(fn_id) { + sorted.push(*fn_id); + } + } + sorted + }; + + let mut scheduleable_fns_nodes = schedule_helper::collect_node_to_sche_task_to(&sorted_scheduleable_fns, env); + + for fnid in sorted_scheduleable_fns { + let func = env.func(fnid); + + let mut target_cnt = mech.scale_num(fnid); + if target_cnt == 0 { + target_cnt = 1; + } + + let fn_scale_up_cmds = + mech.scale_up_exec() + .exec_scale_up(target_cnt, fnid, env, cmd_distributor); + + for cmd in fn_scale_up_cmds.iter() { + scheduleable_fns_nodes + .get_mut(&cmd.fnid) + .unwrap() + .insert(cmd.nid); + } + + let scheduleable_nodes = scheduleable_fns_nodes + .get(&fnid) + .unwrap() + .iter() + .cloned() + .collect::>(); + + let mut best_node_id = 999; + let mut min_exec_time = 100.0; + + for node_id in scheduleable_nodes { + let node = env.node(node_id); + let node_running_task_count = self.node_task_count.get(&node_id).unwrap().1; + let each_running_task_cpu = node.rsc_limit.cpu / node_running_task_count as f32; + + let exec_time = func.cpu / each_running_task_cpu; + + if min_exec_time > exec_time { + min_exec_time = exec_time; + best_node_id = node_id; + } + } + + if best_node_id == 999 { + best_node_id = rand::thread_rng().gen_range(0..env.nodes().len()); + } + + mech_metric().add_node_task_new_cnt(best_node_id); + + if let Some((all_task_count, _)) = self.node_task_count.get_mut(&best_node_id) { + *all_task_count += 1; + } + + cmd_distributor + .send(MechScheduleOnceRes::ScheCmd(ScheCmd { + reqid: req.req_id, + fnid, + nid: best_node_id, + memlimit: None, + })) + .unwrap(); + } + } +} + +impl Scheduler for HEFTScheduler { + fn schedule_some( + &mut self, + env: &SimEnvObserve, + mech: &MechanismImpl, + cmd_distributor: &MechCmdDistributor, + ) { + self.node2node_all_bw.clear(); + self.node_task_count.clear(); + + self.prepare(env); + self.prepare_node2node_all_bw(env); + + for (_, req) in env.core().requests().iter() { + self.schedule_one_req(env, mech, req, cmd_distributor); + } + + for func in env.core().fns().iter() { + let target = mech.scale_num(func.fn_id); + let cur = env.fn_container_cnt(func.fn_id); + if target < cur { + mech.scale_down_exec().exec_scale_down( + env, + func.fn_id, + cur - target, + cmd_distributor, + ); + } + } + } +} diff --git a/serverless_sim/src/sche/mod.rs b/serverless_sim/src/sche/mod.rs index 3973168..bf56e21 100644 --- a/serverless_sim/src/sche/mod.rs +++ b/serverless_sim/src/sche/mod.rs @@ -15,6 +15,8 @@ use self::{ rotate::RotateScheduler, ensure_scheduler::EnsureScheduler, load_least::LoadLeastScheduler, + priority::PriorityScheduler, + heft::HEFTScheduler }; pub mod consistenthash; @@ -29,6 +31,9 @@ pub mod hash; pub mod rotate; pub mod ensure_scheduler; pub mod load_least; +pub mod priority; +pub mod heft; + // pub mod rule_based; // pub mod time_aware; @@ -76,6 +81,12 @@ pub fn prepare_spec_scheduler(config: &Config) -> Option { return Some(Box::new(LoadLeastScheduler::new())); } + "priority" => { + return Some(Box::new(PriorityScheduler::new(&sche_attr))); + } + "heft" => { + return Some(Box::new(HEFTScheduler::new())); + } _ => { return None; } diff --git a/serverless_sim/src/sche/pass.rs b/serverless_sim/src/sche/pass.rs index 4fef07f..24973ef 100644 --- a/serverless_sim/src/sche/pass.rs +++ b/serverless_sim/src/sche/pass.rs @@ -197,17 +197,6 @@ impl PassScheduler { } } -// 图形调度器中分组和调度算法的关键步骤如下所示。 -// 在初始化阶段,每个函数节点都作为单独的组进行初始化,并且工作节点是随机分配的(第1-2行)。 -// 首先,算法从拓扑排序和迭代开始。在每次迭代的开始,它将使用贪婪方法来定位DAG图中关键路径上具有最长边的两个函数, -// 并确定这两个函数是否可以合并到同一组(第3-8行)。 -// 如果这两个函数被分配到不同的组中,它们将被合并(第9行)。 -// 在合并组时,需要考虑额外的因素。 -// 首先,算法需要确保合并的函数组不超过工作节点的最大容量(第10-12行)。 -// 否则,合并的组将无法部署在任何节点上。其次,组内局部化的数据总量不能违反内存约束(第13-18行)。 -// 同时,在合并的组中不能存在任何资源竞争的函数对𝑐𝑜𝑛𝑡 (𝐺) = {(𝑓𝑖, 𝑓𝑗 )}(第19-20行)。 -// 最后,调度算法将采用装箱策略,根据节点容量为每个函数组选择适当的工作节点(第21-23行)。 -// 根据上述逻辑,算法迭代直到收敛,表示函数组不再更新。 impl Scheduler for PassScheduler { fn schedule_some( &mut self, @@ -220,19 +209,5 @@ impl Scheduler for PassScheduler { self.schedule_for_one_req(req, env, cmd_distributor); } } - // let mut to_scale_down = vec![]; - // // 回收空闲container - // for n in env.nodes.borrow().iter() { - // for (_, c) in n.fn_containers.iter() { - // if c.recent_frame_is_idle(3) && c.req_fn_state.len() == 0 { - // to_scale_down.push((n.node_id(), c.fn_id)); - // } - // } - // } - // for (n, f) in to_scale_down { - // env.scale_executor - // .borrow_mut() - // .scale_down(env, ScaleOption::ForSpecNodeFn(n, f)); - // } } } diff --git a/serverless_sim/src/sche/priority.rs b/serverless_sim/src/sche/priority.rs new file mode 100644 index 0000000..c6e13ad --- /dev/null +++ b/serverless_sim/src/sche/priority.rs @@ -0,0 +1,394 @@ +use std::{ + cmp::Ordering, + collections::HashMap, +}; + +use daggy::Walker; +use rand::Rng; + +use crate::{ + fn_dag::{DagId, EnvFnExt, FnId}, + mechanism::{MechanismImpl, ScheCmd, SimEnvObserve}, + mechanism_thread::{MechCmdDistributor, MechScheduleOnceRes}, + node::{EnvNodeExt, NodeId}, request::Request, + sim_run::{schedule_helper, Scheduler}, + util, + with_env_sub::{WithEnvCore, WithEnvHelp} +}; + +pub struct PriorityScheduler { + // 每种DAG中每个函数的优先级(Vec降序,优先调度优先级高的函数) + dag_fns_priority: HashMap>, + + // 每个节点剩余的资源(cpu,mem) + node_resource_left: HashMap, + + // 每个节点上的任务数(running + pending,running) + node_task_count: HashMap, + + // 每一对节点之间的带宽bandwidth + node2node_all_bw: Vec, + + // 单位内存的开销 + mem_cost_per_unit: f32, + + mode: String, +} + +impl PriorityScheduler { + pub fn new(arg: &str) -> Self { + Self { + dag_fns_priority: HashMap::new(), + + node_resource_left: HashMap::new(), + + node_task_count: HashMap::new(), + + node2node_all_bw: Vec::new(), + + // mem_cost_per_unit: arg.parse::().unwrap(), + mem_cost_per_unit: 0.005, + + mode: arg.to_string(), + } + } + + // 初始化node_resource_left、node_task_count + fn prepare(&mut self, env: &SimEnvObserve) { + + for node in env.nodes().iter() { + let node_id = node.node_id(); + + self.node_resource_left.insert( + node_id, + ( + node.rsc_limit.cpu - node.last_frame_cpu, + node.rsc_limit.mem - node.last_frame_mem, + ), + ); + + self.node_task_count + .insert(node_id, (node.all_task_cnt(), node.running_task_cnt())); + } + } + + // 初始化node2node_all_bw + fn prepare_node2node_all_bw(&mut self, env: &SimEnvObserve) { + + let node_count = env.nodes().len(); + + // 节点间带宽 + let node2node_graph = env.core().node2node_graph(); + + for i in 0..node_count { + for j in 0..i { + self.node2node_all_bw + .push(node2node_graph[i][j]); + } + } + } + + // 初始化dag_fns_priority + fn calculate_priority_for_dag_fns(&mut self, req: &Request, env: &SimEnvObserve) { + + // 请求对应的DAG + let dag = env.dag(req.dag_i); + + // 不同请求可能对应相同的DAG,已经计算过的DAG不再重复计算 + if !self.dag_fns_priority.contains_key(&dag.dag_i) { + // DAG中每个函数对应的优先级 + let mut map = HashMap::new(); + + let mut walker = dag.new_dag_walker(); + + // 记录逆拓扑排序,按此顺序给函数赋予优先级 + let mut stack = vec![]; + + // DAG中关键路径上的节点 + let critical_path_nodes = util::graph::aoe_critical_path(&dag.dag_inner); + + let m = self.node2node_all_bw.len(); + let n = env.nodes().len(); + + // 拓扑排序 + while let Some(func_g_i) = walker.next(&dag.dag_inner) { + let fnid = dag.dag_inner[func_g_i]; + let func = env.func(fnid); + + // 是否为DAG中关键路径上的节点 + let flag = critical_path_nodes.contains(&func_g_i); + + let mut t_sum_exec = 0.0; + for node in env.nodes().iter() { + let node_running_task_count = + self.node_task_count.get(&node.node_id()).unwrap().1; + + let each_running_task_cpu = node.rsc_limit.cpu / node_running_task_count as f32; + t_sum_exec += func.cpu / each_running_task_cpu + } + // 函数平均执行时间 + let t_avg_exec = t_sum_exec / n as f32; + + let mut t_sum_trans = 0.0; + for bw in &self.node2node_all_bw { + t_sum_trans += func.out_put_size / bw * 5.0; + } + // 平均数据传输时间 + let t_avg_trans = t_sum_trans / m as f32; + + // 函数内存占用 + let mem_cost = func.mem as f32 * self.mem_cost_per_unit; + + // log::info!( + // "t_avg_exec{} t_avg_trans{} mem_cost{}", + // t_avg_exec, + // t_avg_trans, + // mem_cost + // ); + + // 函数初始优先级 + let mut initial_priority = t_avg_exec + t_avg_trans - mem_cost; + + // 考虑关键路径对优先级的影响 + if self.mode == "b" && flag { + initial_priority += 1.0; + } + + map.insert(fnid, initial_priority); + + stack.push(func_g_i); + } + + // 按逆拓扑排序为每一个函数计算priority,因为函数的优先级与其后继有关 + while let Some(func_g_i) = stack.pop() { + let nexts: daggy::Children = dag.dag_inner.children(func_g_i); + // 取后继中优先级最大的 + if let Some(max_node) = nexts.iter(&dag.dag_inner).max_by(|a, b| { + let fnid_a = dag.dag_inner[a.1]; + let fnid_b = dag.dag_inner[b.1]; + + map.get(&fnid_a) + .unwrap() + .total_cmp(map.get(&fnid_b).unwrap()) + }) { + let fnid_max = dag.dag_inner[max_node.1]; + let max = *map.get(&fnid_max).unwrap(); + + let fnid = dag.dag_inner[func_g_i]; + (*map.get_mut(&fnid).unwrap()) += max; + } + } + + let mut priority_order = map.into_iter().collect::>(); + + // 降序排序,优先调度优先级高的函数 + priority_order.sort_by(|a, b| b.1.partial_cmp(&a.1).unwrap_or_else(|| Ordering::Equal)); + + // 记录当前dag中函数的优先级序列,避免重复计算 + self.dag_fns_priority.insert(dag.dag_i, priority_order); + } + } + + fn schedule_one_req( + &mut self, + env: &SimEnvObserve, + mech: &MechanismImpl, + req: &Request, + cmd_distributor: &MechCmdDistributor, + ) { + // 计算请求对应的DAG中函数的优先级 + self.calculate_priority_for_dag_fns(req, env); + + let mech_metric = || env.help().mech_metric_mut(); + + // 该请求对应的DAG中函数的优先级 + let dag_fns_priority = self.dag_fns_priority.get(&req.dag_i).unwrap(); + + // 可调度的函数(选择节点时需要考虑函数间的数据传输时间,所以每次可调度的函数是前驱函数已经完成调度的) + let scheduleable_fns = schedule_helper::collect_task_to_sche( + req, + env, + schedule_helper::CollectTaskConfig::PreAllSched + ); + + // 可调度的函数及已经有其容器的节点 + let mut scheduleable_fns_nodes = schedule_helper::collect_node_to_sche_task_to(&scheduleable_fns, env); + + // 本次可调度的函数 + let sorted_scheduleable_fns = + // // 不排序直接调度 + // if self.mode == "a" { + // fns + // } + // // 根据优先级排序后调度 + // else + { + let mut sorted = Vec::new(); + for (fn_id, _) in dag_fns_priority { + if scheduleable_fns.contains(fn_id) { + sorted.push(*fn_id); + } + } + sorted + }; + + for fnid in sorted_scheduleable_fns { + let func = env.func(fnid); + + // scale_sche_joint在调度前已经更新了函数所需容器的数量,获取 + let mut target_cnt = mech.scale_num(fnid); + if target_cnt == 0 { + target_cnt = 1; + } + + // 扩容 + let fn_scale_up_cmds = + mech.scale_up_exec() + .exec_scale_up(target_cnt, fnid, env, cmd_distributor); + + // 含有该函数的容器的节点 = 已经有容器的节点 + 扩容所选的节点 + for cmd in fn_scale_up_cmds.iter() { + scheduleable_fns_nodes + .get_mut(&cmd.fnid) + .unwrap() + .insert(cmd.nid); + } + + // 可调度的节点 + let scheduleable_nodes = + // // 所有节点 + // if self.mode == "a" { + // env.nodes().iter().map(|node| node.node_id()).collect::>() + // } + // // 含有该函数的容器的节点 + // else + { + scheduleable_fns_nodes.get(&fnid).unwrap().iter().cloned().collect::>() + }; + + // 最适合调度的节点 + let mut best_node_id = 999; + // 最适合调度的节点的分数 + let mut best_score = -10.0; + + for node_id in scheduleable_nodes { + let node = env.node(node_id); + + // 函数的前驱列表 + let pred_fnids = env.func(fnid).parent_fns(env); + + // 不在当前节点的前驱函数的个数 + let mut not_in_the_same_node = 0; + + // 不在当前节点的前驱函数的数据传输时间之和 + let mut transimission_time = 0.0; + for pred in pred_fnids { + // 前驱所在节点 + let &pred_node_id = req.fn_node.get(&pred).unwrap(); + + // 与前驱不在同一节点,计算数据传输时间 + if pred_node_id != node_id { + not_in_the_same_node += 1; + + transimission_time += env.func(pred).out_put_size + / env.node_get_speed_btwn(pred_node_id, node_id); + } + } + + let node_all_task_count = self.node_task_count.get(&node_id).unwrap().0; + + let node_cpu_left = self.node_resource_left.get(&node_id).unwrap().0; + + let node_mem_left = self.node_resource_left.get(&node_id).unwrap().1; + + // 节点分数 + let score_this_node = + // 不考虑节点剩余资源量 + // if self.mode == "a" { + // 1.0 / (node_all_task_count as f32 + 1.0) + // // + node_cpu_left / node.rsc_limit.cpu + // // + node_mem_left / node.rsc_limit.mem + // - transimission_time + // } + // // 考虑节点剩余资源量 + // else + { + 1.0 / (node_all_task_count as f32 + 1.0) + + node_cpu_left / node.rsc_limit.cpu + + node_mem_left / node.rsc_limit.mem + - transimission_time + }; + + // log::info!("score_this_node {}", score_this_node); + // log::info!("best_score {}", best_score); + + if score_this_node > best_score { + best_score = score_this_node; + best_node_id = node_id; + } + } + + if best_node_id == 999 { + best_node_id = rand::thread_rng().gen_range(0..env.nodes().len()); + } + + mech_metric().add_node_task_new_cnt(best_node_id); + + // log::info!("best_node_id {}", best_node_id); + + // best_node任务总数 + 1 + if let Some((all_task_count, _)) = self.node_task_count.get_mut(&best_node_id) { + *all_task_count += 1; + } + + // 调度指令 + cmd_distributor + .send(MechScheduleOnceRes::ScheCmd(ScheCmd { + reqid: req.req_id, + fnid, + nid: best_node_id, + memlimit: None, + })) + .unwrap(); + } + } +} + +impl Scheduler for PriorityScheduler { + fn schedule_some( + &mut self, + env: &SimEnvObserve, + mech: &MechanismImpl, + cmd_distributor: &MechCmdDistributor, + ) { + // 清理上一次调度的数据 + self.node_resource_left.clear(); + self.node2node_all_bw.clear(); + self.node_task_count.clear(); + + // 获取每个节点的资源剩余、任务数 + self.prepare(env); + + // 获取每一对节点的bandwidth + self.prepare_node2node_all_bw(env); + + // 调度每一个请求 + for (_, req) in env.core().requests().iter() { + self.schedule_one_req(env, mech, req, cmd_distributor); + } + + // 缩容 + for func in env.core().fns().iter() { + let target = mech.scale_num(func.fn_id); + let cur = env.fn_container_cnt(func.fn_id); + if target < cur { + mech.scale_down_exec().exec_scale_down( + env, + func.fn_id, + cur - target, + cmd_distributor, + ); + } + } + } +} \ No newline at end of file diff --git a/serverless_sim/src/sim_run.rs b/serverless_sim/src/sim_run.rs index 40449e0..c86f807 100644 --- a/serverless_sim/src/sim_run.rs +++ b/serverless_sim/src/sim_run.rs @@ -22,10 +22,14 @@ pub trait Scheduler: Send { } pub mod schedule_helper { + use std::collections::{HashMap, HashSet}; + use crate::{ fn_dag::{EnvFnExt, FnId}, mechanism::SimEnvObserve, + node::NodeId, request::Request, + with_env_sub::WithEnvCore, }; pub enum CollectTaskConfig { All, @@ -80,6 +84,24 @@ pub mod schedule_helper { } collect } + + pub fn collect_node_to_sche_task_to( + scheduleable_fns: &Vec, + env: &SimEnvObserve, + ) -> HashMap> { + let mut scheduleable_fns_nodes = HashMap::new(); + for fnid in scheduleable_fns { + scheduleable_fns_nodes.insert( + *fnid, + env.core() + .fn_2_nodes() + .get(&fnid) + .map(|v| v.clone()) + .unwrap_or(HashSet::new()), + ); + } + scheduleable_fns_nodes + } } #[derive(Clone, Debug)] @@ -262,11 +284,9 @@ impl SimEnv { let nodes_cnt = self.nodes().len(); for x in 0..nodes_cnt { - for y in 0..nodes_cnt { - if x > y { - let connection_count = node2node_trans.len(); - self.node_set_connection_count_between(x, y, connection_count); - } + for y in 0..x { + let connection_count = node2node_trans.len(); + self.node_set_connection_count_between(x, y, connection_count); } } @@ -291,11 +311,9 @@ impl SimEnv { // p.1.recv_paths=new_recv_paths; // } for x in 0..nodes_cnt { - for y in 0..nodes_cnt { - if x > y { - // simu transfer between node x and y - self.sim_transfer_btwn_nodes(x, y, &mut node2node_trans); - } + for y in 0..x { + // simu transfer between node x and y + self.sim_transfer_btwn_nodes(x, y, &mut node2node_trans); } } }