From c4ef7eac123008cf3eace5f83d4055b0b438361d Mon Sep 17 00:00:00 2001 From: chao fang <939915473@qq.com> Date: Fri, 1 Nov 2024 15:09:26 +0800 Subject: [PATCH 1/4] feat: priority --- scripts/batch_run.yml | 50 ++-- scripts/fast_draw.yml | 18 +- serverless_sim/module_conf_es.json | 39 +-- serverless_sim/src/mechanism.rs | 5 +- serverless_sim/src/node.rs | 10 +- serverless_sim/src/request.rs | 6 +- serverless_sim/src/sche/mod.rs | 6 + serverless_sim/src/sche/pass.rs | 25 -- serverless_sim/src/sche/priority.rs | 420 ++++++++++++++++++++++++++++ serverless_sim/src/sim_run.rs | 16 +- 10 files changed, 491 insertions(+), 104 deletions(-) create mode 100644 serverless_sim/src/sche/priority.rs diff --git a/scripts/batch_run.yml b/scripts/batch_run.yml index f2c5e05..5501b9e 100644 --- a/scripts/batch_run.yml +++ b/scripts/batch_run.yml @@ -1,4 +1,4 @@ -run_time: 4 +run_time: 10 params: request_freq: @@ -16,40 +16,42 @@ params: mech_scale_sche: scale_sche_joint: scale_num: - # - hpa: + - hpa: # - temp_scaler: - - ensure_scaler: + # - ensure_scaler: scale_down_exec: - default: scale_up_exec: - least_task: sche: # - bp_balance: - # - pos: greedy - - ensure_scheduler: + - pos: greedy + # - ensure_scheduler: + - priority: a + - priority: b filter: # - [] - [{'careful_down':''}] - scale_sche_separated: - scale_num: - - temp_scaler: - - hpa: - - lass: - scale_down_exec: - - default: - scale_up_exec: - - least_task: - sche: - # - greedy: - # - hash: - # - random: - - load_least: - # - rotate: - - pass: - filter: - # - [] - - [{'careful_down':''}] + # scale_sche_separated: + # scale_num: + # - temp_scaler: + # - hpa: + # - lass: + # scale_down_exec: + # - default: + # scale_up_exec: + # - least_task: + # sche: + # # - greedy: + # # - hash: + # # - random: + # - load_least: + # # - rotate: + # - pass: + # filter: + # # - [] + # - [{'careful_down':''}] # no_scale: # scale_num: diff --git a/scripts/fast_draw.yml b/scripts/fast_draw.yml index 95c5c75..ba4aa12 100644 --- a/scripts/fast_draw.yml +++ b/scripts/fast_draw.yml @@ -11,22 +11,8 @@ filter: ## each group bars targets_alias: -- [{'rand_seed': '', 'request_freq': 'low', 'dag_type': 'single', 'cold_start': 'high', 'fn_type': 'cpu', 'scale_num': 'ensure_scaler.', 'scale_down_exec': 'default.', 'scale_up_exec': 'least_task.', 'sche': 'ensure_scheduler.', 'instance_cache_policy': 'no_evict.', 'filter': '(careful_down.)', 'no_mech_latency': '1'}, 'sd.rflow.dtsingle.cshigh.ftcpu.nml1.mtscale_sche_joint.scl(ensure_scaler.)(default.)(least_task.)[(careful_down.)].scd(ensure_scheduler.).ic(no_evict.)'] -- [{'rand_seed': '', 'request_freq': 'low', 'dag_type': 'single', 'cold_start': 'high', 'fn_type': 'cpu', 'scale_num': 'hpa.', 'scale_down_exec': 'default.', 'scale_up_exec': 'least_task.', 'sche': 'bp_balance.', 'instance_cache_policy': 'no_evict.', 'filter': '(careful_down.)', 'no_mech_latency': '1'}, 'sd.rflow.dtsingle.cshigh.ftcpu.nml1.mtscale_sche_joint.scl(hpa.)(default.)(least_task.)[(careful_down.)].scd(bp_balance.).ic(no_evict.)'] -- [{'rand_seed': '', 'request_freq': 'low', 'dag_type': 'single', 'cold_start': 'high', 'fn_type': 'cpu', 'scale_num': 'hpa.', 'scale_down_exec': 'default.', 'scale_up_exec': 'least_task.', 'sche': 'pos.greedy', 'instance_cache_policy': 'no_evict.', 'filter': '(careful_down.)', 'no_mech_latency': '1'}, 'sd.rflow.dtsingle.cshigh.ftcpu.nml1.mtscale_sche_joint.scl(hpa.)(default.)(least_task.)[(careful_down.)].scd(pos.greedy).ic(no_evict.)'] -- [{'rand_seed': '', 'request_freq': 'low', 'dag_type': 'single', 'cold_start': 'high', 'fn_type': 'cpu', 'scale_num': 'temp_scaler.', 'scale_down_exec': 'default.', 'scale_up_exec': 'least_task.', 'sche': 'bp_balance.', 'instance_cache_policy': 'no_evict.', 'filter': '(careful_down.)', 'no_mech_latency': '1'}, 'sd.rflow.dtsingle.cshigh.ftcpu.nml1.mtscale_sche_joint.scl(temp_scaler.)(default.)(least_task.)[(careful_down.)].scd(bp_balance.).ic(no_evict.)'] -- [{'rand_seed': '', 'request_freq': 'low', 'dag_type': 'single', 'cold_start': 'high', 'fn_type': 'cpu', 'scale_num': 'temp_scaler.', 'scale_down_exec': 'default.', 'scale_up_exec': 'least_task.', 'sche': 'pos.greedy', 'instance_cache_policy': 'no_evict.', 'filter': '(careful_down.)', 'no_mech_latency': '1'}, 'sd.rflow.dtsingle.cshigh.ftcpu.nml1.mtscale_sche_joint.scl(temp_scaler.)(default.)(least_task.)[(careful_down.)].scd(pos.greedy).ic(no_evict.)'] -- [{'rand_seed': '', 'request_freq': 'low', 'dag_type': 'single', 'cold_start': 'high', 'fn_type': 'cpu', 'scale_num': 'hpa.', 'scale_down_exec': 'default.', 'scale_up_exec': 'least_task.', 'sche': 'greedy.', 'instance_cache_policy': 'no_evict.', 'filter': '(careful_down.)', 'no_mech_latency': '1'}, 'sd.rflow.dtsingle.cshigh.ftcpu.nml1.mtscale_sche_separated.scl(hpa.)(default.)(least_task.)[(careful_down.)].scd(greedy.).ic(no_evict.)'] -- [{'rand_seed': '', 'request_freq': 'low', 'dag_type': 'single', 'cold_start': 'high', 'fn_type': 'cpu', 'scale_num': 'hpa.', 'scale_down_exec': 'default.', 'scale_up_exec': 'least_task.', 'sche': 'hash.', 'instance_cache_policy': 'no_evict.', 'filter': '(careful_down.)', 'no_mech_latency': '1'}, 'sd.rflow.dtsingle.cshigh.ftcpu.nml1.mtscale_sche_separated.scl(hpa.)(default.)(least_task.)[(careful_down.)].scd(hash.).ic(no_evict.)'] -- [{'rand_seed': '', 'request_freq': 'low', 'dag_type': 'single', 'cold_start': 'high', 'fn_type': 'cpu', 'scale_num': 'hpa.', 'scale_down_exec': 'default.', 'scale_up_exec': 'least_task.', 'sche': 'load_least.', 'instance_cache_policy': 'no_evict.', 'filter': '(careful_down.)', 'no_mech_latency': '1'}, 'sd.rflow.dtsingle.cshigh.ftcpu.nml1.mtscale_sche_separated.scl(hpa.)(default.)(least_task.)[(careful_down.)].scd(load_least.).ic(no_evict.)'] -- [{'rand_seed': '', 'request_freq': 'low', 'dag_type': 'single', 'cold_start': 'high', 'fn_type': 'cpu', 'scale_num': 'hpa.', 'scale_down_exec': 'default.', 'scale_up_exec': 'least_task.', 'sche': 'pass.', 'instance_cache_policy': 'no_evict.', 'filter': '(careful_down.)', 'no_mech_latency': '1'}, 'sd.rflow.dtsingle.cshigh.ftcpu.nml1.mtscale_sche_separated.scl(hpa.)(default.)(least_task.)[(careful_down.)].scd(pass.).ic(no_evict.)'] -- [{'rand_seed': '', 'request_freq': 'low', 'dag_type': 'single', 'cold_start': 'high', 'fn_type': 'cpu', 'scale_num': 'hpa.', 'scale_down_exec': 'default.', 'scale_up_exec': 'least_task.', 'sche': 'rotate.', 'instance_cache_policy': 'no_evict.', 'filter': '(careful_down.)', 'no_mech_latency': '1'}, 'sd.rflow.dtsingle.cshigh.ftcpu.nml1.mtscale_sche_separated.scl(hpa.)(default.)(least_task.)[(careful_down.)].scd(rotate.).ic(no_evict.)'] -- [{'rand_seed': '', 'request_freq': 'low', 'dag_type': 'single', 'cold_start': 'high', 'fn_type': 'cpu', 'scale_num': 'lass.', 'scale_down_exec': 'default.', 'scale_up_exec': 'least_task.', 'sche': 'load_least.', 'instance_cache_policy': 'no_evict.', 'filter': '(careful_down.)', 'no_mech_latency': '1'}, 'sd.rflow.dtsingle.cshigh.ftcpu.nml1.mtscale_sche_separated.scl(lass.)(default.)(least_task.)[(careful_down.)].scd(load_least.).ic(no_evict.)'] -- [{'rand_seed': '', 'request_freq': 'low', 'dag_type': 'single', 'cold_start': 'high', 'fn_type': 'cpu', 'scale_num': 'lass.', 'scale_down_exec': 'default.', 'scale_up_exec': 'least_task.', 'sche': 'pass.', 'instance_cache_policy': 'no_evict.', 'filter': '(careful_down.)', 'no_mech_latency': '1'}, 'sd.rflow.dtsingle.cshigh.ftcpu.nml1.mtscale_sche_separated.scl(lass.)(default.)(least_task.)[(careful_down.)].scd(pass.).ic(no_evict.)'] -- [{'rand_seed': '', 'request_freq': 'low', 'dag_type': 'single', 'cold_start': 'high', 'fn_type': 'cpu', 'scale_num': 'temp_scaler.', 'scale_down_exec': 'default.', 'scale_up_exec': 'least_task.', 'sche': 'greedy.', 'instance_cache_policy': 'no_evict.', 'filter': '(careful_down.)', 'no_mech_latency': '1'}, 'sd.rflow.dtsingle.cshigh.ftcpu.nml1.mtscale_sche_separated.scl(temp_scaler.)(default.)(least_task.)[(careful_down.)].scd(greedy.).ic(no_evict.)'] -- [{'rand_seed': '', 'request_freq': 'low', 'dag_type': 'single', 'cold_start': 'high', 'fn_type': 'cpu', 'scale_num': 'temp_scaler.', 'scale_down_exec': 'default.', 'scale_up_exec': 'least_task.', 'sche': 'load_least.', 'instance_cache_policy': 'no_evict.', 'filter': '(careful_down.)', 'no_mech_latency': '1'}, 'sd.rflow.dtsingle.cshigh.ftcpu.nml1.mtscale_sche_separated.scl(temp_scaler.)(default.)(least_task.)[(careful_down.)].scd(load_least.).ic(no_evict.)'] -- [{'rand_seed': '', 'request_freq': 'low', 'dag_type': 'single', 'cold_start': 'high', 'fn_type': 'cpu', 'scale_num': 'temp_scaler.', 'scale_down_exec': 'default.', 'scale_up_exec': 'least_task.', 'sche': 'pass.', 'instance_cache_policy': 'no_evict.', 'filter': '(careful_down.)', 'no_mech_latency': '1'}, 'sd.rflow.dtsingle.cshigh.ftcpu.nml1.mtscale_sche_separated.scl(temp_scaler.)(default.)(least_task.)[(careful_down.)].scd(pass.).ic(no_evict.)'] -- [{'rand_seed': '', 'request_freq': 'low', 'dag_type': 'single', 'cold_start': 'high', 'fn_type': 'cpu', 'scale_num': 'temp_scaler.', 'scale_down_exec': 'default.', 'scale_up_exec': 'least_task.', 'sche': 'rotate.', 'instance_cache_policy': 'no_evict.', 'filter': '(careful_down.)', 'no_mech_latency': '1'}, 'sd.rflow.dtsingle.cshigh.ftcpu.nml1.mtscale_sche_separated.scl(temp_scaler.)(default.)(least_task.)[(careful_down.)].scd(rotate.).ic(no_evict.)'] +- [{'rand_seed': '', 'request_freq': 'low', 'dag_type': 'dag', 'cold_start': 'high', 'fn_type': 'cpu', 'scale_num': 'hpa.', 'scale_down_exec': 'default.', 'scale_up_exec': 'least_task.', 'sche': 'priority.a', 'instance_cache_policy': 'no_evict.', 'filter': '(careful_down.)', 'no_mech_latency': '1'}, 'sd.rflow.dtdag.cshigh.ftcpu.nml1.mtscale_sche_joint.scl(hpa.)(default.)(least_task.)[(careful_down.)].scd(priority.a).ic(no_evict.)'] +- [{'rand_seed': '', 'request_freq': 'low', 'dag_type': 'dag', 'cold_start': 'high', 'fn_type': 'cpu', 'scale_num': 'temp_scaler.', 'scale_down_exec': 'default.', 'scale_up_exec': 'least_task.', 'sche': 'load_least.', 'instance_cache_policy': 'no_evict.', 'filter': '(careful_down.)', 'no_mech_latency': '1'}, 'sd.rflow.dtdag.cshigh.ftcpu.nml1.mtscale_sche_separated.scl(temp_scaler.)(default.)(least_task.)[(careful_down.)].scd(load_least.).ic(no_evict.)'] ## group on x axis: diff --git a/serverless_sim/module_conf_es.json b/serverless_sim/module_conf_es.json index 625ae38..3a1129c 100644 --- a/serverless_sim/module_conf_es.json +++ b/serverless_sim/module_conf_es.json @@ -1,45 +1,46 @@ { "mech_type": { - "no_scale": null, "scale_sche_separated": null, - "scale_sche_joint": null + "scale_sche_joint": null, + "no_scale": null }, "scale_num": { - "rela": null, + "no": null, "hpa": null, - "full_placement": null, "ensure_scaler": null, - "no": null, - "lass": null, - "temp_scaler": null + "full_placement": null, + "temp_scaler": null, + "rela": null, + "lass": null }, "scale_down_exec": { "default": null }, "scale_up_exec": { - "least_task": null, - "no": null + "no": null, + "least_task": null }, "sche": { - "consistenthash": null, - "ensure_scheduler": null, - "faasflow": null, - "load_least": null, "rotate": null, - "hash": null, "pos": null, + "ensure_scheduler": null, "random": null, - "bp_balance": null, + "load_least": null, + "faasflow": null, + "pass": null, + "priority": null, + "hash": null, + "consistenthash": null, "fnsche": null, - "greedy": null, - "pass": null + "bp_balance": null, + "greedy": null }, "filter": { "careful_down": null }, "instance_cache_policy": { - "lru": null, + "no_evict": null, "fifo": null, - "no_evict": null + "lru": null } } \ No newline at end of file diff --git a/serverless_sim/src/mechanism.rs b/serverless_sim/src/mechanism.rs index 9cc2c2d..bb43992 100644 --- a/serverless_sim/src/mechanism.rs +++ b/serverless_sim/src/mechanism.rs @@ -78,7 +78,7 @@ impl CheckDup for Vec { } } -pub const SCHE_NAMES: [&'static str; 12] = [ +pub const SCHE_NAMES: [&'static str; 13] = [ "rotate", "hash", "bp_balance", @@ -91,6 +91,7 @@ pub const SCHE_NAMES: [&'static str; 12] = [ "consistenthash", // "gofs", "ensure_scheduler", "load_least", + "priority" // "load_least", // "random", ]; @@ -222,7 +223,7 @@ impl ConfigNewMec for Config { } } "scale_sche_joint" => { - let allow_sche = vec!["pos", "bp_balance", "ensure_scheduler"]; + let allow_sche = vec!["pos", "bp_balance", "ensure_scheduler", "priority"]; let allow_scale_num = vec!["hpa", "lass", "temp_scaler", "full_placement", "rela", "ensure_scaler"]; let allow_scale_down_exec = vec!["default"]; let allow_scale_up_exec = vec!["least_task"]; diff --git a/serverless_sim/src/node.rs b/serverless_sim/src/node.rs index f48da46..429079c 100644 --- a/serverless_sim/src/node.rs +++ b/serverless_sim/src/node.rs @@ -100,8 +100,8 @@ impl Node { Self { node_id, rsc_limit: NodeRscLimit { - // cpu: 1000.0, - cpu: 200.0, + cpu: 1000.0, + // cpu: 200.0, mem: 8000.0, }, fn_containers: HashMap::new().into(), @@ -264,9 +264,9 @@ impl Node { Box::new(move |to_replace| { let node = node.as_ref(); // log::info!("节点{}要移除的容器{}", node.node_id, to_replace); - for (_k, v) in node.fn_containers.borrow().iter() { - // log::info!("{}", v.fn_id); - } + // for (_k, v) in node.fn_containers.borrow().iter() { + // log::info!("{}", v.fn_id); + // } node.container(*to_replace).unwrap().is_idle() }), ); diff --git a/serverless_sim/src/request.rs b/serverless_sim/src/request.rs index 700b089..138ea41 100644 --- a/serverless_sim/src/request.rs +++ b/serverless_sim/src/request.rs @@ -359,13 +359,13 @@ impl SimEnv { for (dag_i, &(mut avg_frequency, cv)) in env.help.fn_call_frequency().iter() { if env.help.config().request_freq_low() { - avg_frequency *= 0.1; + avg_frequency *= 0.3; } else if env.help.config().request_freq_middle() { - avg_frequency *= 0.2; + avg_frequency *= 0.5; } else { - avg_frequency *= 0.3; + avg_frequency *= 0.7; } // avg_frequency *= 100.0; // avg_frequency *= 10.0; diff --git a/serverless_sim/src/sche/mod.rs b/serverless_sim/src/sche/mod.rs index 3973168..fefccb6 100644 --- a/serverless_sim/src/sche/mod.rs +++ b/serverless_sim/src/sche/mod.rs @@ -15,6 +15,7 @@ use self::{ rotate::RotateScheduler, ensure_scheduler::EnsureScheduler, load_least::LoadLeastScheduler, + priority::PriorityScheduler }; pub mod consistenthash; @@ -29,6 +30,8 @@ pub mod hash; pub mod rotate; pub mod ensure_scheduler; pub mod load_least; +pub mod priority; + // pub mod rule_based; // pub mod time_aware; @@ -76,6 +79,9 @@ pub fn prepare_spec_scheduler(config: &Config) -> Option { return Some(Box::new(LoadLeastScheduler::new())); } + "priority" => { + return Some(Box::new(PriorityScheduler::new(&sche_attr))); + } _ => { return None; } diff --git a/serverless_sim/src/sche/pass.rs b/serverless_sim/src/sche/pass.rs index 4fef07f..24973ef 100644 --- a/serverless_sim/src/sche/pass.rs +++ b/serverless_sim/src/sche/pass.rs @@ -197,17 +197,6 @@ impl PassScheduler { } } -// 图形调度器中分组和调度算法的关键步骤如下所示。 -// 在初始化阶段,每个函数节点都作为单独的组进行初始化,并且工作节点是随机分配的(第1-2行)。 -// 首先,算法从拓扑排序和迭代开始。在每次迭代的开始,它将使用贪婪方法来定位DAG图中关键路径上具有最长边的两个函数, -// 并确定这两个函数是否可以合并到同一组(第3-8行)。 -// 如果这两个函数被分配到不同的组中,它们将被合并(第9行)。 -// 在合并组时,需要考虑额外的因素。 -// 首先,算法需要确保合并的函数组不超过工作节点的最大容量(第10-12行)。 -// 否则,合并的组将无法部署在任何节点上。其次,组内局部化的数据总量不能违反内存约束(第13-18行)。 -// 同时,在合并的组中不能存在任何资源竞争的函数对𝑐𝑜𝑛𝑡 (𝐺) = {(𝑓𝑖, 𝑓𝑗 )}(第19-20行)。 -// 最后,调度算法将采用装箱策略,根据节点容量为每个函数组选择适当的工作节点(第21-23行)。 -// 根据上述逻辑,算法迭代直到收敛,表示函数组不再更新。 impl Scheduler for PassScheduler { fn schedule_some( &mut self, @@ -220,19 +209,5 @@ impl Scheduler for PassScheduler { self.schedule_for_one_req(req, env, cmd_distributor); } } - // let mut to_scale_down = vec![]; - // // 回收空闲container - // for n in env.nodes.borrow().iter() { - // for (_, c) in n.fn_containers.iter() { - // if c.recent_frame_is_idle(3) && c.req_fn_state.len() == 0 { - // to_scale_down.push((n.node_id(), c.fn_id)); - // } - // } - // } - // for (n, f) in to_scale_down { - // env.scale_executor - // .borrow_mut() - // .scale_down(env, ScaleOption::ForSpecNodeFn(n, f)); - // } } } diff --git a/serverless_sim/src/sche/priority.rs b/serverless_sim/src/sche/priority.rs new file mode 100644 index 0000000..7f5a206 --- /dev/null +++ b/serverless_sim/src/sche/priority.rs @@ -0,0 +1,420 @@ +use std::{ + borrow::{Borrow, BorrowMut}, + cmp::Ordering, + collections::{HashMap, HashSet}, +}; + +use daggy::Walker; +use rand::Rng; + +use crate::{ + fn_dag::{DagId, EnvFnExt, FnId}, mechanism::{MechanismImpl, ScheCmd, SimEnvObserve}, mechanism_thread::{MechCmdDistributor, MechScheduleOnceRes}, node::{self, EnvNodeExt, NodeId}, request::Request, scale::num::no, sim_run::Scheduler, with_env_sub::{WithEnvCore, WithEnvHelp} +}; + +pub struct PriorityScheduler { + // 每种DAG中每个函数的优先级(Vec降序,优先调度优先级高的函数) + dag_fns_priority: HashMap>, + + // 每个节点剩余的资源(cpu,mem) + node_resource_left: HashMap, + + // 每个节点上的任务数(running + pending,running) + node_task_count: HashMap, + + // 每一对节点之间的带宽bandwidth + node2node_all_bw: Vec, + + // 单位内存的开销 + mem_cost_per_unit: f32, + + mode: String, +} + +impl PriorityScheduler { + pub fn new(arg: &str) -> Self { + Self { + dag_fns_priority: HashMap::new(), + + node_resource_left: HashMap::new(), + + node_task_count: HashMap::new(), + + node2node_all_bw: Vec::new(), + + // mem_cost_per_unit: arg.parse::().unwrap(), + mem_cost_per_unit: 0.005, + + mode: arg.to_string(), + } + } + + // 初始化node_resource_left、node_task_count + fn prepare(&mut self, env: &SimEnvObserve) { + + for node in env.nodes().iter() { + let node_id = node.node_id(); + + self.node_resource_left.insert( + node_id, + ( + node.rsc_limit.cpu - node.last_frame_cpu, + node.rsc_limit.mem - node.last_frame_mem, + ), + ); + + self.node_task_count + .insert(node_id, (node.all_task_cnt(), node.running_task_cnt())); + } + } + + // 初始化node2node_all_bw + fn prepare_node2node_all_bw(&mut self, env: &SimEnvObserve) { + + let node_count = env.nodes().len(); + + // 节点间带宽 + let node2node_graph = env.core().node2node_graph(); + + for i in 0..node_count { + for j in 0..i { + self.node2node_all_bw + .push(node2node_graph[i][j]); + } + } + } + + // 初始化dag_fns_priority + fn calculate_priority_for_dag_fns(&mut self, req: &Request, env: &SimEnvObserve) { + + // 请求对应的DAG + let dag = env.dag(req.dag_i); + + // 不同请求可能对应相同的DAG,已经计算过的DAG不再重复计算 + if !self.dag_fns_priority.contains_key(&dag.dag_i) { + // DAG中每个函数对应的优先级 + let mut map = HashMap::new(); + + let mut walker = dag.new_dag_walker(); + + // 记录逆拓扑排序,按此顺序给函数赋予优先级 + let mut stack = vec![]; + + // 拓扑排序 + while let Some(func_g_i) = walker.next(&dag.dag_inner) { + let fnid = dag.dag_inner[func_g_i]; + let func = env.func(fnid); + + let mut t_sum_exec = 0.0; + for node in env.nodes().iter() { + let node_cpu_left = self.node_resource_left.get(&node.node_id()).unwrap().0; + t_sum_exec += + // if self.mode == "a" { + // func.cpu / node_cpu_left + // } else + { + let node_running_task_count = + self.node_task_count.get(&node.node_id()).unwrap().1; + + let each_running_task_cpu = node_cpu_left / node_running_task_count as f32; + + func.cpu / each_running_task_cpu + }; + } + // 函数平均执行时间 + let t_avg_exec = t_sum_exec / self.node_resource_left.len() as f32; + + let mut t_sum_trans = 0.0; + for bw in &self.node2node_all_bw { + t_sum_trans += func.out_put_size / bw * 5.0; + } + // 平均数据传输时间 + let t_avg_trans = t_sum_trans / self.node2node_all_bw.len() as f32; + + // 函数内存占用 + let t_mem_cost = func.mem as f32 * self.mem_cost_per_unit; + + // log::info!( + // "t_avg_exec{} t_avg_trans{} t_mem_cost{}", + // t_avg_exec, + // t_avg_trans, + // t_mem_cost + // ); + + // 总开销,用于后续定义优先级 + let total_cost = t_avg_exec + t_avg_trans - t_mem_cost; + + map.insert(fnid, total_cost); + + stack.push(func_g_i); + } + + // 按逆拓扑排序为每一个函数计算priority,因为函数的优先级与其后继有关 + while let Some(func_g_i) = stack.pop() { + let nexts: daggy::Children = dag.dag_inner.children(func_g_i); + // 取后继中优先级最大的 + if let Some(max_node) = nexts.iter(&dag.dag_inner).max_by(|a, b| { + let fnid_a = dag.dag_inner[a.1]; + let fnid_b = dag.dag_inner[b.1]; + + map.get(&fnid_a) + .unwrap() + .total_cmp(map.get(&fnid_b).unwrap()) + }) { + let fnid_max = dag.dag_inner[max_node.1]; + let max = *map.get(&fnid_max).unwrap(); + + let fnid = dag.dag_inner[func_g_i]; + (*map.get_mut(&fnid).unwrap()) += max; + } + } + + let mut prio_order = map.into_iter().collect::>(); + + // 降序排序,优先调度优先级高的函数 + prio_order.sort_by(|a, b| b.1.partial_cmp(&a.1).unwrap_or_else(|| Ordering::Equal)); + + // 记录当前dag中函数的优先级序列,避免重复计算 + self.dag_fns_priority.insert(dag.dag_i, prio_order); + } + } + + fn schedule_one_req( + &mut self, + env: &SimEnvObserve, + mech: &MechanismImpl, + req: &Request, + cmd_distributor: &MechCmdDistributor, + ) { + // 计算请求对应的DAG中函数的优先级 + self.calculate_priority_for_dag_fns(req, env); + + // 获取该请求中可以被调度的函数(即前驱已被调度的函数)以及含有该函数的容器的节点 + let mut scheduleable_fns_nodes = HashMap::new(); + + let dag = env.dag(req.dag_i); + + let mut walker = dag.new_dag_walker(); + + 'next_fn: while let Some(func_g_i) = walker.next(&dag.dag_inner) { + let fnid = dag.dag_inner[func_g_i]; + + // 函数已被调度,跳过 + if req.fn_node.contains_key(&fnid) { + continue; + } + + // 函数的前驱尚未被调度,跳过 + let predecessors = env.func(fnid).parent_fns(env); + for p in &predecessors { + if !req.fn_node.contains_key(p) { + continue 'next_fn; + } + } + + if scheduleable_fns_nodes.contains_key(&fnid) { + continue; + } + + scheduleable_fns_nodes.insert( + fnid, + env.core() + .fn_2_nodes() + .get(&fnid) + .map(|v| v.clone()) + .unwrap_or(HashSet::new()), + ); + } + + let mech_metric = || env.help().mech_metric_mut(); + + // 该请求对应的DAG中函数的优先级 + let dag_fns_priority = self.dag_fns_priority.get(&req.dag_i).unwrap(); + + // 本次可调度的函数按优先级排序 + let scheduleable_fns = + // if self.mode == "a" { + // scheduleable_fns_nodes + // .clone() + // .into_keys() + // .collect::>() + // } else + { + let mut temp = Vec::new(); + for (fn_id, _) in dag_fns_priority { + if scheduleable_fns_nodes.contains_key(fn_id) { + temp.push(*fn_id); + } + } + temp + }; + + for fnid in scheduleable_fns { + let func = env.func(fnid); + + // scale_sche_joint在调度前已经更新了函数所需容器的数量,获取 + let mut target_cnt = mech.scale_num(fnid); + if target_cnt == 0 { + target_cnt = 1; + } + + // 扩容 + let fn_scale_up_cmds = + mech.scale_up_exec() + .exec_scale_up(target_cnt, fnid, env, cmd_distributor); + + // 含有该函数的容器的节点 = 已经有容器的节点 + 扩容所选的节点 + for cmd in fn_scale_up_cmds.iter() { + scheduleable_fns_nodes + .get_mut(&cmd.fnid) + .unwrap() + .insert(cmd.nid); + } + + + + // 函数的可调度节点 = 含有该函数的容器的节点 + let scheduleable_nodes = scheduleable_fns_nodes.get(&fnid).unwrap(); + + let mut best_score = -10.0; + let mut best_node_id = 999; + + let node_ids = + // if self.mode == "a" { + scheduleable_nodes.iter().cloned().collect::>(); + // } else { + // env.nodes().iter().map(|node| node.node_id()).collect::>() + // }; + + for node_id in node_ids { + let node = env.node(node_id); + + // 函数的前驱列表 + let pred_fnids = env.func(fnid).parent_fns(env); + + // 不在当前节点的前驱函数的个数 + let mut not_in_the_same_node = 0; + + // 不在当前节点的前驱函数的数据传输时间之和 + let mut transimission_time = 0.0; + for pred in pred_fnids { + // 前驱所在节点 + let &pred_node_id = req.fn_node.get(&pred).unwrap(); + + // 前驱没有调度到当前节点,计算数据传输时间 + if pred_node_id != node_id { + not_in_the_same_node += 1; + + transimission_time += env.func(pred).out_put_size + / env.node_get_speed_btwn(pred_node_id, node_id); + } + } + + let node_all_task_count = self.node_task_count.get(&node_id).unwrap().0; + + let node_running_task_count = self.node_task_count.get(&node_id).unwrap().1; + + let node_cpu_left = self.node_resource_left.get(&node_id).unwrap().0; + + let node_mem_left = self.node_resource_left.get(&node_id).unwrap().1; + + let each_running_task_cpu = node_cpu_left / node_running_task_count as f32; + + // 优先调度到任务总数少, 无需数据传输(即与前驱部署到同一节点) + let score_this_node = + if self.mode == "a" { + 1.0 / (node_all_task_count as f32 + 1.0) + // + 1.0 / (not_in_the_same_node as f32 + 1.0) + - transimission_time + // + func.cpu / each_running_task_cpu as f32 + // + node_cpu_left / node.rsc_limit.cpu + // + node_mem_left / node.rsc_limit.mem + } else { + 1.0 / (node_all_task_count as f32 + 1.0) + // + 1.0 / (not_in_the_same_node as f32 + 1.0) + - transimission_time + // + func.cpu / each_running_task_cpu as f32 + + node_cpu_left / node.rsc_limit.cpu + + node_mem_left / node.rsc_limit.mem + }; + // else { + // 1.0 / (node_all_task_count as f32 + 1.0) + // // + 1.0 / (not_in_the_same_node as f32 + 1.0) + // // - transimission_time + // // + func.cpu / each_running_task_cpu as f32 + // + node_cpu_left / node.rsc_limit.cpu + // + node_mem_left / node.rsc_limit.mem + // }; + + // log::info!("score_this_node {}", score_this_node); + // log::info!("best_score {}", best_score); + + if score_this_node > best_score { + best_score = score_this_node; + best_node_id = node_id; + } + } + + if best_node_id == 999 { + best_node_id = rand::thread_rng().gen_range(0..env.nodes().len()); + } + + mech_metric().add_node_task_new_cnt(best_node_id); + + // log::info!("best_node_id {}", best_node_id); + + // best_node任务总数 + 1 + if let Some((all_task_count, _)) = self.node_task_count.get_mut(&best_node_id) { + *all_task_count += 1; + } + + // 调度指令 + cmd_distributor + .send(MechScheduleOnceRes::ScheCmd(ScheCmd { + reqid: req.req_id, + fnid, + nid: best_node_id, + memlimit: None, + })) + .unwrap(); + } + } +} + +impl Scheduler for PriorityScheduler { + fn schedule_some( + &mut self, + env: &SimEnvObserve, + mech: &MechanismImpl, + cmd_distributor: &MechCmdDistributor, + ) { + // 清理上一次调度的数据 + self.node_resource_left.clear(); + self.node2node_all_bw.clear(); + self.node_task_count.clear(); + + // 获取每个节点的资源剩余、任务数 + self.prepare(env); + + // 获取每一对节点的bandwidth + self.prepare_node2node_all_bw(env); + + // 调度每一个请求 + for (_, req) in env.core().requests().iter() { + self.schedule_one_req(env, mech, req, cmd_distributor); + } + + // 缩容 + for func in env.core().fns().iter() { + let target = mech.scale_num(func.fn_id); + let cur = env.fn_container_cnt(func.fn_id); + if target < cur { + mech.scale_down_exec().exec_scale_down( + env, + func.fn_id, + cur - target, + cmd_distributor, + ); + } + } + } +} \ No newline at end of file diff --git a/serverless_sim/src/sim_run.rs b/serverless_sim/src/sim_run.rs index 40449e0..b0508da 100644 --- a/serverless_sim/src/sim_run.rs +++ b/serverless_sim/src/sim_run.rs @@ -262,11 +262,9 @@ impl SimEnv { let nodes_cnt = self.nodes().len(); for x in 0..nodes_cnt { - for y in 0..nodes_cnt { - if x > y { - let connection_count = node2node_trans.len(); - self.node_set_connection_count_between(x, y, connection_count); - } + for y in 0..x { + let connection_count = node2node_trans.len(); + self.node_set_connection_count_between(x, y, connection_count); } } @@ -291,11 +289,9 @@ impl SimEnv { // p.1.recv_paths=new_recv_paths; // } for x in 0..nodes_cnt { - for y in 0..nodes_cnt { - if x > y { - // simu transfer between node x and y - self.sim_transfer_btwn_nodes(x, y, &mut node2node_trans); - } + for y in 0..x { + // simu transfer between node x and y + self.sim_transfer_btwn_nodes(x, y, &mut node2node_trans); } } } From 2e3e769c7853b82d7858f89e008dc78a61547c4e Mon Sep 17 00:00:00 2001 From: chao fang <939915473@qq.com> Date: Mon, 18 Nov 2024 09:19:23 +0800 Subject: [PATCH 2/4] =?UTF-8?q?feat:=20optimize=20priority=E3=80=81collect?= =?UTF-8?q?=20node?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- scripts/batch_run.yml | 4 +- scripts/fast_draw.yml | 3 +- serverless_sim/module_conf_es.json | 34 +++--- serverless_sim/src/sche/priority.rs | 168 +++++++++++++--------------- serverless_sim/src/sim_run.rs | 22 ++++ 5 files changed, 119 insertions(+), 112 deletions(-) diff --git a/scripts/batch_run.yml b/scripts/batch_run.yml index 5501b9e..072963e 100644 --- a/scripts/batch_run.yml +++ b/scripts/batch_run.yml @@ -3,8 +3,8 @@ run_time: 10 params: request_freq: - low: - - middle: - - high: + # - middle: + # - high: dag_type: # - single: # - mix: diff --git a/scripts/fast_draw.yml b/scripts/fast_draw.yml index ba4aa12..897cf87 100644 --- a/scripts/fast_draw.yml +++ b/scripts/fast_draw.yml @@ -11,8 +11,9 @@ filter: ## each group bars targets_alias: +- [{'rand_seed': '', 'request_freq': 'low', 'dag_type': 'dag', 'cold_start': 'high', 'fn_type': 'cpu', 'scale_num': 'hpa.', 'scale_down_exec': 'default.', 'scale_up_exec': 'least_task.', 'sche': 'pos.greedy', 'instance_cache_policy': 'no_evict.', 'filter': '(careful_down.)', 'no_mech_latency': '1'}, 'sd.rflow.dtdag.cshigh.ftcpu.nml1.mtscale_sche_joint.scl(hpa.)(default.)(least_task.)[(careful_down.)].scd(pos.greedy).ic(no_evict.)'] - [{'rand_seed': '', 'request_freq': 'low', 'dag_type': 'dag', 'cold_start': 'high', 'fn_type': 'cpu', 'scale_num': 'hpa.', 'scale_down_exec': 'default.', 'scale_up_exec': 'least_task.', 'sche': 'priority.a', 'instance_cache_policy': 'no_evict.', 'filter': '(careful_down.)', 'no_mech_latency': '1'}, 'sd.rflow.dtdag.cshigh.ftcpu.nml1.mtscale_sche_joint.scl(hpa.)(default.)(least_task.)[(careful_down.)].scd(priority.a).ic(no_evict.)'] -- [{'rand_seed': '', 'request_freq': 'low', 'dag_type': 'dag', 'cold_start': 'high', 'fn_type': 'cpu', 'scale_num': 'temp_scaler.', 'scale_down_exec': 'default.', 'scale_up_exec': 'least_task.', 'sche': 'load_least.', 'instance_cache_policy': 'no_evict.', 'filter': '(careful_down.)', 'no_mech_latency': '1'}, 'sd.rflow.dtdag.cshigh.ftcpu.nml1.mtscale_sche_separated.scl(temp_scaler.)(default.)(least_task.)[(careful_down.)].scd(load_least.).ic(no_evict.)'] +- [{'rand_seed': '', 'request_freq': 'low', 'dag_type': 'dag', 'cold_start': 'high', 'fn_type': 'cpu', 'scale_num': 'hpa.', 'scale_down_exec': 'default.', 'scale_up_exec': 'least_task.', 'sche': 'priority.b', 'instance_cache_policy': 'no_evict.', 'filter': '(careful_down.)', 'no_mech_latency': '1'}, 'sd.rflow.dtdag.cshigh.ftcpu.nml1.mtscale_sche_joint.scl(hpa.)(default.)(least_task.)[(careful_down.)].scd(priority.b).ic(no_evict.)'] ## group on x axis: diff --git a/serverless_sim/module_conf_es.json b/serverless_sim/module_conf_es.json index 3a1129c..aea82c9 100644 --- a/serverless_sim/module_conf_es.json +++ b/serverless_sim/module_conf_es.json @@ -1,46 +1,46 @@ { "mech_type": { - "scale_sche_separated": null, "scale_sche_joint": null, + "scale_sche_separated": null, "no_scale": null }, "scale_num": { "no": null, - "hpa": null, - "ensure_scaler": null, - "full_placement": null, "temp_scaler": null, + "lass": null, + "full_placement": null, "rela": null, - "lass": null + "hpa": null, + "ensure_scaler": null }, "scale_down_exec": { "default": null }, "scale_up_exec": { - "no": null, - "least_task": null + "least_task": null, + "no": null }, "sche": { - "rotate": null, + "faasflow": null, + "pass": null, "pos": null, - "ensure_scheduler": null, + "bp_balance": null, "random": null, + "ensure_scheduler": null, "load_least": null, - "faasflow": null, - "pass": null, - "priority": null, + "rotate": null, "hash": null, + "greedy": null, + "priority": null, "consistenthash": null, - "fnsche": null, - "bp_balance": null, - "greedy": null + "fnsche": null }, "filter": { "careful_down": null }, "instance_cache_policy": { - "no_evict": null, "fifo": null, - "lru": null + "lru": null, + "no_evict": null } } \ No newline at end of file diff --git a/serverless_sim/src/sche/priority.rs b/serverless_sim/src/sche/priority.rs index 7f5a206..992a6d9 100644 --- a/serverless_sim/src/sche/priority.rs +++ b/serverless_sim/src/sche/priority.rs @@ -1,5 +1,4 @@ use std::{ - borrow::{Borrow, BorrowMut}, cmp::Ordering, collections::{HashMap, HashSet}, }; @@ -8,7 +7,14 @@ use daggy::Walker; use rand::Rng; use crate::{ - fn_dag::{DagId, EnvFnExt, FnId}, mechanism::{MechanismImpl, ScheCmd, SimEnvObserve}, mechanism_thread::{MechCmdDistributor, MechScheduleOnceRes}, node::{self, EnvNodeExt, NodeId}, request::Request, scale::num::no, sim_run::Scheduler, with_env_sub::{WithEnvCore, WithEnvHelp} + fn_dag::{DagId, EnvFnExt, FnId}, + mechanism::{MechanismImpl, ScheCmd, SimEnvObserve}, + mechanism_thread::{MechCmdDistributor, MechScheduleOnceRes}, + node::{EnvNodeExt, NodeId}, + request::Request, + sim_run::{schedule_helper, Scheduler}, + util, + with_env_sub::{WithEnvCore, WithEnvHelp} }; pub struct PriorityScheduler { @@ -99,11 +105,17 @@ impl PriorityScheduler { // 记录逆拓扑排序,按此顺序给函数赋予优先级 let mut stack = vec![]; + // DAG中关键路径上的节点 + let critical_path_nodes = util::graph::aoe_critical_path(&dag.dag_inner); + // 拓扑排序 while let Some(func_g_i) = walker.next(&dag.dag_inner) { let fnid = dag.dag_inner[func_g_i]; let func = env.func(fnid); + // 是否为DAG中关键路径上的节点 + let flag = critical_path_nodes.contains(&func_g_i); + let mut t_sum_exec = 0.0; for node in env.nodes().iter() { let node_cpu_left = self.node_resource_left.get(&node.node_id()).unwrap().0; @@ -131,19 +143,24 @@ impl PriorityScheduler { let t_avg_trans = t_sum_trans / self.node2node_all_bw.len() as f32; // 函数内存占用 - let t_mem_cost = func.mem as f32 * self.mem_cost_per_unit; + let mem_cost = func.mem as f32 * self.mem_cost_per_unit; // log::info!( - // "t_avg_exec{} t_avg_trans{} t_mem_cost{}", + // "t_avg_exec{} t_avg_trans{} mem_cost{}", // t_avg_exec, // t_avg_trans, - // t_mem_cost + // mem_cost // ); - // 总开销,用于后续定义优先级 - let total_cost = t_avg_exec + t_avg_trans - t_mem_cost; + // 函数初始优先级 + let mut initial_priority = t_avg_exec + t_avg_trans - mem_cost; - map.insert(fnid, total_cost); + // 考虑关键路径对优先级的影响 + if self.mode == "b" && flag { + initial_priority += 1.0; + } + + map.insert(fnid, initial_priority); stack.push(func_g_i); } @@ -168,13 +185,13 @@ impl PriorityScheduler { } } - let mut prio_order = map.into_iter().collect::>(); + let mut priority_order = map.into_iter().collect::>(); // 降序排序,优先调度优先级高的函数 - prio_order.sort_by(|a, b| b.1.partial_cmp(&a.1).unwrap_or_else(|| Ordering::Equal)); + priority_order.sort_by(|a, b| b.1.partial_cmp(&a.1).unwrap_or_else(|| Ordering::Equal)); // 记录当前dag中函数的优先级序列,避免重复计算 - self.dag_fns_priority.insert(dag.dag_i, prio_order); + self.dag_fns_priority.insert(dag.dag_i, priority_order); } } @@ -188,64 +205,37 @@ impl PriorityScheduler { // 计算请求对应的DAG中函数的优先级 self.calculate_priority_for_dag_fns(req, env); - // 获取该请求中可以被调度的函数(即前驱已被调度的函数)以及含有该函数的容器的节点 - let mut scheduleable_fns_nodes = HashMap::new(); - - let dag = env.dag(req.dag_i); - - let mut walker = dag.new_dag_walker(); - - 'next_fn: while let Some(func_g_i) = walker.next(&dag.dag_inner) { - let fnid = dag.dag_inner[func_g_i]; - - // 函数已被调度,跳过 - if req.fn_node.contains_key(&fnid) { - continue; - } - - // 函数的前驱尚未被调度,跳过 - let predecessors = env.func(fnid).parent_fns(env); - for p in &predecessors { - if !req.fn_node.contains_key(p) { - continue 'next_fn; - } - } - - if scheduleable_fns_nodes.contains_key(&fnid) { - continue; - } - - scheduleable_fns_nodes.insert( - fnid, - env.core() - .fn_2_nodes() - .get(&fnid) - .map(|v| v.clone()) - .unwrap_or(HashSet::new()), - ); - } - let mech_metric = || env.help().mech_metric_mut(); // 该请求对应的DAG中函数的优先级 let dag_fns_priority = self.dag_fns_priority.get(&req.dag_i).unwrap(); - // 本次可调度的函数按优先级排序 + // 可调度的函数 + let fns = schedule_helper::collect_task_to_sche( + req, + env, + schedule_helper::CollectTaskConfig::PreAllSched + ); + + // 可调度的函数及已经有其容器的节点 + let mut scheduleable_fns_nodes = schedule_helper::collect_node_to_sche_task_to(&fns, env); + + // 本次可调度的函数 let scheduleable_fns = + // // 不排序直接调度 // if self.mode == "a" { - // scheduleable_fns_nodes - // .clone() - // .into_keys() - // .collect::>() - // } else + // fns + // } + // // 根据优先级排序后调度 + // else { - let mut temp = Vec::new(); + let mut sorted = Vec::new(); for (fn_id, _) in dag_fns_priority { - if scheduleable_fns_nodes.contains_key(fn_id) { - temp.push(*fn_id); + if fns.contains(fn_id) { + sorted.push(*fn_id); } } - temp + sorted }; for fnid in scheduleable_fns { @@ -270,22 +260,24 @@ impl PriorityScheduler { .insert(cmd.nid); } - - - // 函数的可调度节点 = 含有该函数的容器的节点 - let scheduleable_nodes = scheduleable_fns_nodes.get(&fnid).unwrap(); - - let mut best_score = -10.0; - let mut best_node_id = 999; - - let node_ids = + // 可调度的节点 + let scheduleable_nodes = + // // 所有节点 // if self.mode == "a" { - scheduleable_nodes.iter().cloned().collect::>(); - // } else { // env.nodes().iter().map(|node| node.node_id()).collect::>() - // }; + // } + // // 含有该函数的容器的节点 + // else + { + scheduleable_fns_nodes.get(&fnid).unwrap().iter().cloned().collect::>() + }; + + // 最适合调度的节点 + let mut best_node_id = 999; + // 最适合调度的节点的分数 + let mut best_score = -10.0; - for node_id in node_ids { + for node_id in scheduleable_nodes { let node = env.node(node_id); // 函数的前驱列表 @@ -300,7 +292,7 @@ impl PriorityScheduler { // 前驱所在节点 let &pred_node_id = req.fn_node.get(&pred).unwrap(); - // 前驱没有调度到当前节点,计算数据传输时间 + // 与前驱不在同一节点,计算数据传输时间 if pred_node_id != node_id { not_in_the_same_node += 1; @@ -319,31 +311,23 @@ impl PriorityScheduler { let each_running_task_cpu = node_cpu_left / node_running_task_count as f32; - // 优先调度到任务总数少, 无需数据传输(即与前驱部署到同一节点) + // 节点分数 let score_this_node = - if self.mode == "a" { - 1.0 / (node_all_task_count as f32 + 1.0) - // + 1.0 / (not_in_the_same_node as f32 + 1.0) - - transimission_time - // + func.cpu / each_running_task_cpu as f32 - // + node_cpu_left / node.rsc_limit.cpu - // + node_mem_left / node.rsc_limit.mem - } else { + // 不考虑节点剩余资源量 + // if self.mode == "a" { + // 1.0 / (node_all_task_count as f32 + 1.0) + // // + node_cpu_left / node.rsc_limit.cpu + // // + node_mem_left / node.rsc_limit.mem + // - transimission_time + // } + // // 考虑节点剩余资源量 + // else + { 1.0 / (node_all_task_count as f32 + 1.0) - // + 1.0 / (not_in_the_same_node as f32 + 1.0) - - transimission_time - // + func.cpu / each_running_task_cpu as f32 + node_cpu_left / node.rsc_limit.cpu + node_mem_left / node.rsc_limit.mem + - transimission_time }; - // else { - // 1.0 / (node_all_task_count as f32 + 1.0) - // // + 1.0 / (not_in_the_same_node as f32 + 1.0) - // // - transimission_time - // // + func.cpu / each_running_task_cpu as f32 - // + node_cpu_left / node.rsc_limit.cpu - // + node_mem_left / node.rsc_limit.mem - // }; // log::info!("score_this_node {}", score_this_node); // log::info!("best_score {}", best_score); diff --git a/serverless_sim/src/sim_run.rs b/serverless_sim/src/sim_run.rs index b0508da..c86f807 100644 --- a/serverless_sim/src/sim_run.rs +++ b/serverless_sim/src/sim_run.rs @@ -22,10 +22,14 @@ pub trait Scheduler: Send { } pub mod schedule_helper { + use std::collections::{HashMap, HashSet}; + use crate::{ fn_dag::{EnvFnExt, FnId}, mechanism::SimEnvObserve, + node::NodeId, request::Request, + with_env_sub::WithEnvCore, }; pub enum CollectTaskConfig { All, @@ -80,6 +84,24 @@ pub mod schedule_helper { } collect } + + pub fn collect_node_to_sche_task_to( + scheduleable_fns: &Vec, + env: &SimEnvObserve, + ) -> HashMap> { + let mut scheduleable_fns_nodes = HashMap::new(); + for fnid in scheduleable_fns { + scheduleable_fns_nodes.insert( + *fnid, + env.core() + .fn_2_nodes() + .get(&fnid) + .map(|v| v.clone()) + .unwrap_or(HashSet::new()), + ); + } + scheduleable_fns_nodes + } } #[derive(Clone, Debug)] From 9ad2e05a090dcb884469ae2281fc97fac73926ab Mon Sep 17 00:00:00 2001 From: chao fang <939915473@qq.com> Date: Fri, 22 Nov 2024 16:12:56 +0800 Subject: [PATCH 3/4] =?UTF-8?q?feat:=20heft=E3=80=81optimize=20priority?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- scripts/batch_run.yml | 5 +- scripts/fast_draw.yml | 2 +- serverless_sim/module_conf_es.json | 27 ++-- serverless_sim/src/mechanism.rs | 7 +- serverless_sim/src/sche/heft.rs | 241 ++++++++++++++++++++++++++++ serverless_sim/src/sche/mod.rs | 7 +- serverless_sim/src/sche/priority.rs | 30 ++-- 7 files changed, 279 insertions(+), 40 deletions(-) create mode 100644 serverless_sim/src/sche/heft.rs diff --git a/scripts/batch_run.yml b/scripts/batch_run.yml index 072963e..5c22460 100644 --- a/scripts/batch_run.yml +++ b/scripts/batch_run.yml @@ -1,4 +1,4 @@ -run_time: 10 +run_time: 3 params: request_freq: @@ -25,9 +25,10 @@ mech_scale_sche: - least_task: sche: # - bp_balance: - - pos: greedy + # - pos: greedy # - ensure_scheduler: - priority: a + - heft: - priority: b filter: # - [] diff --git a/scripts/fast_draw.yml b/scripts/fast_draw.yml index 897cf87..379889e 100644 --- a/scripts/fast_draw.yml +++ b/scripts/fast_draw.yml @@ -11,7 +11,7 @@ filter: ## each group bars targets_alias: -- [{'rand_seed': '', 'request_freq': 'low', 'dag_type': 'dag', 'cold_start': 'high', 'fn_type': 'cpu', 'scale_num': 'hpa.', 'scale_down_exec': 'default.', 'scale_up_exec': 'least_task.', 'sche': 'pos.greedy', 'instance_cache_policy': 'no_evict.', 'filter': '(careful_down.)', 'no_mech_latency': '1'}, 'sd.rflow.dtdag.cshigh.ftcpu.nml1.mtscale_sche_joint.scl(hpa.)(default.)(least_task.)[(careful_down.)].scd(pos.greedy).ic(no_evict.)'] +- [{'rand_seed': '', 'request_freq': 'low', 'dag_type': 'dag', 'cold_start': 'high', 'fn_type': 'cpu', 'scale_num': 'hpa.', 'scale_down_exec': 'default.', 'scale_up_exec': 'least_task.', 'sche': 'heft.', 'instance_cache_policy': 'no_evict.', 'filter': '(careful_down.)', 'no_mech_latency': '1'}, 'sd.rflow.dtdag.cshigh.ftcpu.nml1.mtscale_sche_joint.scl(hpa.)(default.)(least_task.)[(careful_down.)].scd(heft.).ic(no_evict.)'] - [{'rand_seed': '', 'request_freq': 'low', 'dag_type': 'dag', 'cold_start': 'high', 'fn_type': 'cpu', 'scale_num': 'hpa.', 'scale_down_exec': 'default.', 'scale_up_exec': 'least_task.', 'sche': 'priority.a', 'instance_cache_policy': 'no_evict.', 'filter': '(careful_down.)', 'no_mech_latency': '1'}, 'sd.rflow.dtdag.cshigh.ftcpu.nml1.mtscale_sche_joint.scl(hpa.)(default.)(least_task.)[(careful_down.)].scd(priority.a).ic(no_evict.)'] - [{'rand_seed': '', 'request_freq': 'low', 'dag_type': 'dag', 'cold_start': 'high', 'fn_type': 'cpu', 'scale_num': 'hpa.', 'scale_down_exec': 'default.', 'scale_up_exec': 'least_task.', 'sche': 'priority.b', 'instance_cache_policy': 'no_evict.', 'filter': '(careful_down.)', 'no_mech_latency': '1'}, 'sd.rflow.dtdag.cshigh.ftcpu.nml1.mtscale_sche_joint.scl(hpa.)(default.)(least_task.)[(careful_down.)].scd(priority.b).ic(no_evict.)'] diff --git a/serverless_sim/module_conf_es.json b/serverless_sim/module_conf_es.json index aea82c9..86f3731 100644 --- a/serverless_sim/module_conf_es.json +++ b/serverless_sim/module_conf_es.json @@ -1,16 +1,16 @@ { "mech_type": { - "scale_sche_joint": null, "scale_sche_separated": null, + "scale_sche_joint": null, "no_scale": null }, "scale_num": { - "no": null, - "temp_scaler": null, "lass": null, + "temp_scaler": null, + "no": null, + "hpa": null, "full_placement": null, "rela": null, - "hpa": null, "ensure_scaler": null }, "scale_down_exec": { @@ -21,19 +21,20 @@ "no": null }, "sche": { - "faasflow": null, - "pass": null, - "pos": null, - "bp_balance": null, + "heft": null, "random": null, - "ensure_scheduler": null, - "load_least": null, - "rotate": null, "hash": null, + "pass": null, + "faasflow": null, + "rotate": null, + "ensure_scheduler": null, + "fnsche": null, + "pos": null, "greedy": null, - "priority": null, "consistenthash": null, - "fnsche": null + "load_least": null, + "bp_balance": null, + "priority": null }, "filter": { "careful_down": null diff --git a/serverless_sim/src/mechanism.rs b/serverless_sim/src/mechanism.rs index bb43992..f79138a 100644 --- a/serverless_sim/src/mechanism.rs +++ b/serverless_sim/src/mechanism.rs @@ -78,7 +78,7 @@ impl CheckDup for Vec { } } -pub const SCHE_NAMES: [&'static str; 13] = [ +pub const SCHE_NAMES: [&'static str; 14] = [ "rotate", "hash", "bp_balance", @@ -91,7 +91,8 @@ pub const SCHE_NAMES: [&'static str; 13] = [ "consistenthash", // "gofs", "ensure_scheduler", "load_least", - "priority" + "priority", + "heft" // "load_least", // "random", ]; @@ -223,7 +224,7 @@ impl ConfigNewMec for Config { } } "scale_sche_joint" => { - let allow_sche = vec!["pos", "bp_balance", "ensure_scheduler", "priority"]; + let allow_sche = vec!["pos", "bp_balance", "ensure_scheduler", "priority", "heft"]; let allow_scale_num = vec!["hpa", "lass", "temp_scaler", "full_placement", "rela", "ensure_scaler"]; let allow_scale_down_exec = vec!["default"]; let allow_scale_up_exec = vec!["least_task"]; diff --git a/serverless_sim/src/sche/heft.rs b/serverless_sim/src/sche/heft.rs new file mode 100644 index 0000000..5c5433b --- /dev/null +++ b/serverless_sim/src/sche/heft.rs @@ -0,0 +1,241 @@ +use std::{cmp::Ordering, collections::HashMap}; + +use daggy::Walker; +use rand::Rng; + +use crate::{ + fn_dag::{DagId, EnvFnExt, FnId}, + mechanism::{MechanismImpl, ScheCmd, SimEnvObserve}, + mechanism_thread::{MechCmdDistributor, MechScheduleOnceRes}, + node::{EnvNodeExt, NodeId}, + request::Request, + sim_run::{schedule_helper, Scheduler}, + with_env_sub::{WithEnvCore, WithEnvHelp}, +}; + +pub struct HEFTScheduler { + dag_fns_priority: HashMap>, + + node_task_count: HashMap, + + node2node_all_bw: Vec, +} + +impl HEFTScheduler { + pub fn new() -> Self { + HEFTScheduler { + dag_fns_priority: HashMap::new(), + + node_task_count: HashMap::new(), + + node2node_all_bw: Vec::new(), + } + } + + fn prepare(&mut self, env: &SimEnvObserve) { + for node in env.nodes().iter() { + let node_id = node.node_id(); + + self.node_task_count + .insert(node_id, (node.all_task_cnt(), node.running_task_cnt())); + } + } + + fn prepare_node2node_all_bw(&mut self, env: &SimEnvObserve) { + let node_count = env.nodes().len(); + + let node2node_graph = env.core().node2node_graph(); + + for i in 0..node_count { + for j in 0..i { + self.node2node_all_bw.push(node2node_graph[i][j]); + } + } + } + + fn calculate_priority_for_dag_fns(&mut self, req: &Request, env: &SimEnvObserve) { + let dag = env.dag(req.dag_i); + + if !self.dag_fns_priority.contains_key(&dag.dag_i) { + let mut walker = dag.new_dag_walker(); + + let mut map = HashMap::new(); + + let mut stack = vec![]; + + let n = env.nodes().len(); + + while let Some(func_g_i) = walker.next(&dag.dag_inner) { + let fnid = dag.dag_inner[func_g_i]; + let func = env.func(fnid); + + let mut t_sum_exec = 0.0; + for node in env.nodes().iter() { + let node_running_task_count = + self.node_task_count.get(&node.node_id()).unwrap().1; + let each_running_task_cpu = node.rsc_limit.cpu / node_running_task_count as f32; + t_sum_exec += func.cpu / each_running_task_cpu + } + let t_avg_exec = t_sum_exec / n as f32; + + let mut t_sum_trans = 0.0; + for bw in &self.node2node_all_bw { + t_sum_trans += func.out_put_size / bw * 5.0; + } + let t_avg_trans = t_sum_trans / self.node2node_all_bw.len() as f32; + + let initial_priority = t_avg_exec + t_avg_trans; + map.insert(fnid, initial_priority); + stack.push(func_g_i); + } + + while let Some(func_g_i) = stack.pop() { + let nexts: daggy::Children = dag.dag_inner.children(func_g_i); + if let Some(max_node) = nexts.iter(&dag.dag_inner).max_by(|a, b| { + let fnid_a = dag.dag_inner[a.1]; + let fnid_b = dag.dag_inner[b.1]; + + map.get(&fnid_a) + .unwrap() + .total_cmp(map.get(&fnid_b).unwrap()) + }) { + let fnid_max = dag.dag_inner[max_node.1]; + let max = *map.get(&fnid_max).unwrap(); + + let fnid = dag.dag_inner[func_g_i]; + (*map.get_mut(&fnid).unwrap()) += max; + } + } + + let mut priority_order = map.into_iter().collect::>(); + priority_order.sort_by(|a, b| b.1.partial_cmp(&a.1).unwrap_or_else(|| Ordering::Equal)); + self.dag_fns_priority.insert(dag.dag_i, priority_order); + } + } + + fn schedule_one_req( + &mut self, + env: &SimEnvObserve, + mech: &MechanismImpl, + req: &Request, + cmd_distributor: &MechCmdDistributor, + ) { + self.calculate_priority_for_dag_fns(req, env); + + let mech_metric = || env.help().mech_metric_mut(); + + let dag_fns_priority = self.dag_fns_priority.get(&req.dag_i).unwrap(); + + let fns = schedule_helper::collect_task_to_sche( + req, + env, + schedule_helper::CollectTaskConfig::All, + ); + + let scheduleable_fns = { + let mut sorted = Vec::new(); + for (fn_id, _) in dag_fns_priority { + if fns.contains(fn_id) { + sorted.push(*fn_id); + } + } + sorted + }; + + let mut scheduleable_fns_nodes = schedule_helper::collect_node_to_sche_task_to(&fns, env); + + for fnid in scheduleable_fns { + let func = env.func(fnid); + + let mut target_cnt = mech.scale_num(fnid); + if target_cnt == 0 { + target_cnt = 1; + } + + let fn_scale_up_cmds = + mech.scale_up_exec() + .exec_scale_up(target_cnt, fnid, env, cmd_distributor); + + for cmd in fn_scale_up_cmds.iter() { + scheduleable_fns_nodes + .get_mut(&cmd.fnid) + .unwrap() + .insert(cmd.nid); + } + + let scheduleable_nodes = scheduleable_fns_nodes + .get(&fnid) + .unwrap() + .iter() + .cloned() + .collect::>(); + + let mut best_node_id = 999; + let mut min_exec_time = 100.0; + + for node_id in scheduleable_nodes { + let node = env.node(node_id); + let node_running_task_count = self.node_task_count.get(&node_id).unwrap().1; + let each_running_task_cpu = node.rsc_limit.cpu / node_running_task_count as f32; + + let exec_time = func.cpu / each_running_task_cpu; + + if min_exec_time > exec_time { + min_exec_time = exec_time; + best_node_id = node_id; + } + } + + if best_node_id == 999 { + best_node_id = rand::thread_rng().gen_range(0..env.nodes().len()); + } + + mech_metric().add_node_task_new_cnt(best_node_id); + + if let Some((all_task_count, _)) = self.node_task_count.get_mut(&best_node_id) { + *all_task_count += 1; + } + + cmd_distributor + .send(MechScheduleOnceRes::ScheCmd(ScheCmd { + reqid: req.req_id, + fnid, + nid: best_node_id, + memlimit: None, + })) + .unwrap(); + } + } +} + +impl Scheduler for HEFTScheduler { + fn schedule_some( + &mut self, + env: &SimEnvObserve, + mech: &MechanismImpl, + cmd_distributor: &MechCmdDistributor, + ) { + self.node2node_all_bw.clear(); + self.node_task_count.clear(); + + self.prepare(env); + self.prepare_node2node_all_bw(env); + + for (_, req) in env.core().requests().iter() { + self.schedule_one_req(env, mech, req, cmd_distributor); + } + + for func in env.core().fns().iter() { + let target = mech.scale_num(func.fn_id); + let cur = env.fn_container_cnt(func.fn_id); + if target < cur { + mech.scale_down_exec().exec_scale_down( + env, + func.fn_id, + cur - target, + cmd_distributor, + ); + } + } + } +} diff --git a/serverless_sim/src/sche/mod.rs b/serverless_sim/src/sche/mod.rs index fefccb6..bf56e21 100644 --- a/serverless_sim/src/sche/mod.rs +++ b/serverless_sim/src/sche/mod.rs @@ -15,7 +15,8 @@ use self::{ rotate::RotateScheduler, ensure_scheduler::EnsureScheduler, load_least::LoadLeastScheduler, - priority::PriorityScheduler + priority::PriorityScheduler, + heft::HEFTScheduler }; pub mod consistenthash; @@ -31,6 +32,7 @@ pub mod rotate; pub mod ensure_scheduler; pub mod load_least; pub mod priority; +pub mod heft; // pub mod rule_based; @@ -82,6 +84,9 @@ pub fn prepare_spec_scheduler(config: &Config) -> Option { return Some(Box::new(PriorityScheduler::new(&sche_attr))); } + "heft" => { + return Some(Box::new(HEFTScheduler::new())); + } _ => { return None; } diff --git a/serverless_sim/src/sche/priority.rs b/serverless_sim/src/sche/priority.rs index 992a6d9..827e0df 100644 --- a/serverless_sim/src/sche/priority.rs +++ b/serverless_sim/src/sche/priority.rs @@ -1,6 +1,6 @@ use std::{ cmp::Ordering, - collections::{HashMap, HashSet}, + collections::HashMap, }; use daggy::Walker; @@ -10,8 +10,7 @@ use crate::{ fn_dag::{DagId, EnvFnExt, FnId}, mechanism::{MechanismImpl, ScheCmd, SimEnvObserve}, mechanism_thread::{MechCmdDistributor, MechScheduleOnceRes}, - node::{EnvNodeExt, NodeId}, - request::Request, + node::{EnvNodeExt, NodeId}, request::Request, sim_run::{schedule_helper, Scheduler}, util, with_env_sub::{WithEnvCore, WithEnvHelp} @@ -108,6 +107,9 @@ impl PriorityScheduler { // DAG中关键路径上的节点 let critical_path_nodes = util::graph::aoe_critical_path(&dag.dag_inner); + let m = self.node2node_all_bw.len(); + let n = env.nodes().len(); + // 拓扑排序 while let Some(func_g_i) = walker.next(&dag.dag_inner) { let fnid = dag.dag_inner[func_g_i]; @@ -118,29 +120,21 @@ impl PriorityScheduler { let mut t_sum_exec = 0.0; for node in env.nodes().iter() { - let node_cpu_left = self.node_resource_left.get(&node.node_id()).unwrap().0; - t_sum_exec += - // if self.mode == "a" { - // func.cpu / node_cpu_left - // } else - { - let node_running_task_count = + let node_running_task_count = self.node_task_count.get(&node.node_id()).unwrap().1; - let each_running_task_cpu = node_cpu_left / node_running_task_count as f32; - - func.cpu / each_running_task_cpu - }; + let each_running_task_cpu = node.rsc_limit.cpu / node_running_task_count as f32; + t_sum_exec += func.cpu / each_running_task_cpu } // 函数平均执行时间 - let t_avg_exec = t_sum_exec / self.node_resource_left.len() as f32; + let t_avg_exec = t_sum_exec / n as f32; let mut t_sum_trans = 0.0; for bw in &self.node2node_all_bw { t_sum_trans += func.out_put_size / bw * 5.0; } // 平均数据传输时间 - let t_avg_trans = t_sum_trans / self.node2node_all_bw.len() as f32; + let t_avg_trans = t_sum_trans / m as f32; // 函数内存占用 let mem_cost = func.mem as f32 * self.mem_cost_per_unit; @@ -303,14 +297,10 @@ impl PriorityScheduler { let node_all_task_count = self.node_task_count.get(&node_id).unwrap().0; - let node_running_task_count = self.node_task_count.get(&node_id).unwrap().1; - let node_cpu_left = self.node_resource_left.get(&node_id).unwrap().0; let node_mem_left = self.node_resource_left.get(&node_id).unwrap().1; - let each_running_task_cpu = node_cpu_left / node_running_task_count as f32; - // 节点分数 let score_this_node = // 不考虑节点剩余资源量 From 52c9762cd4e80594ca9a380693f407d4310bac3c Mon Sep 17 00:00:00 2001 From: chao fang <939915473@qq.com> Date: Fri, 29 Nov 2024 19:44:54 +0800 Subject: [PATCH 4/4] =?UTF-8?q?refactor:=20heft=E3=80=81priority?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- serverless_sim/src/sche/heft.rs | 11 ++++++----- serverless_sim/src/sche/priority.rs | 12 ++++++------ 2 files changed, 12 insertions(+), 11 deletions(-) diff --git a/serverless_sim/src/sche/heft.rs b/serverless_sim/src/sche/heft.rs index 5c5433b..de59226 100644 --- a/serverless_sim/src/sche/heft.rs +++ b/serverless_sim/src/sche/heft.rs @@ -126,25 +126,26 @@ impl HEFTScheduler { let dag_fns_priority = self.dag_fns_priority.get(&req.dag_i).unwrap(); - let fns = schedule_helper::collect_task_to_sche( + let scheduleable_fns = schedule_helper::collect_task_to_sche( req, env, schedule_helper::CollectTaskConfig::All, ); - let scheduleable_fns = { + // 可调度的而函数按优先级排序 + let sorted_scheduleable_fns = { let mut sorted = Vec::new(); for (fn_id, _) in dag_fns_priority { - if fns.contains(fn_id) { + if scheduleable_fns.contains(fn_id) { sorted.push(*fn_id); } } sorted }; - let mut scheduleable_fns_nodes = schedule_helper::collect_node_to_sche_task_to(&fns, env); + let mut scheduleable_fns_nodes = schedule_helper::collect_node_to_sche_task_to(&sorted_scheduleable_fns, env); - for fnid in scheduleable_fns { + for fnid in sorted_scheduleable_fns { let func = env.func(fnid); let mut target_cnt = mech.scale_num(fnid); diff --git a/serverless_sim/src/sche/priority.rs b/serverless_sim/src/sche/priority.rs index 827e0df..c6e13ad 100644 --- a/serverless_sim/src/sche/priority.rs +++ b/serverless_sim/src/sche/priority.rs @@ -204,18 +204,18 @@ impl PriorityScheduler { // 该请求对应的DAG中函数的优先级 let dag_fns_priority = self.dag_fns_priority.get(&req.dag_i).unwrap(); - // 可调度的函数 - let fns = schedule_helper::collect_task_to_sche( + // 可调度的函数(选择节点时需要考虑函数间的数据传输时间,所以每次可调度的函数是前驱函数已经完成调度的) + let scheduleable_fns = schedule_helper::collect_task_to_sche( req, env, schedule_helper::CollectTaskConfig::PreAllSched ); // 可调度的函数及已经有其容器的节点 - let mut scheduleable_fns_nodes = schedule_helper::collect_node_to_sche_task_to(&fns, env); + let mut scheduleable_fns_nodes = schedule_helper::collect_node_to_sche_task_to(&scheduleable_fns, env); // 本次可调度的函数 - let scheduleable_fns = + let sorted_scheduleable_fns = // // 不排序直接调度 // if self.mode == "a" { // fns @@ -225,14 +225,14 @@ impl PriorityScheduler { { let mut sorted = Vec::new(); for (fn_id, _) in dag_fns_priority { - if fns.contains(fn_id) { + if scheduleable_fns.contains(fn_id) { sorted.push(*fn_id); } } sorted }; - for fnid in scheduleable_fns { + for fnid in sorted_scheduleable_fns { let func = env.func(fnid); // scale_sche_joint在调度前已经更新了函数所需容器的数量,获取