Skip to content

Commit 6802960

Browse files
committed
Change workerpools config name to executionModels, add override queue name mechanism
1 parent 94adc51 commit 6802960

File tree

2 files changed

+18
-5
lines changed

2 files changed

+18
-5
lines changed

functions/kubernetes/amqpConnector.js

Lines changed: 16 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -9,15 +9,28 @@ async function initialize(queue_name) {
99
conn = await amqplib.connect(`amqp://${process.env.RABBIT_HOSTNAME}`, "heartbeat=60");
1010
}
1111
let ch = await conn.createChannel()
12-
await ch.assertQueue(queue_name, {durable: false, expires: 6000000}); // TODO: implement dynamic queue creation & cleanup
12+
await ch.assertQueue(queue_name, {durable: false, expires: 6000000});
1313
channels[queue_name] = ch
1414

1515
}
1616

17+
function getQueueName(context) {
18+
if ("executionModels" in context.appConfig) {
19+
for (const taskType of context.appConfig.executionModels) {
20+
if (taskType.name === context['name']) {
21+
if ("queue" in taskType) {
22+
return taskType.queue;
23+
}
24+
}
25+
}
26+
}
27+
let namespace = process.env.HF_VAR_NAMESPACE || 'default'
28+
return namespace + "." + context['name']
29+
}
30+
1731
async function enqueueJobs(jobArr, taskIdArr, contextArr, customParams) {
1832
let context = contextArr[0];
19-
let namespace = process.env.HF_VAR_NAMESPACE || 'default'
20-
let queue_name = namespace + "." + context['name']
33+
let queue_name = getQueueName(context)
2134
if (conn === null || !(queue_name in channels)) {
2235
await initialize(queue_name)
2336
}

functions/kubernetes/k8sCommand.js

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,8 +23,8 @@ let restartCounter = new RestartCounter(backoffLimit);
2323
// * cb
2424

2525
function getExecutorType(context) {
26-
if ("workerpools" in context.appConfig) {
27-
for (const taskType of context.appConfig.workerpools) {
26+
if ("executionModels" in context.appConfig) {
27+
for (const taskType of context.appConfig.executionModels) {
2828
if (taskType.name === context['name']) {
2929
return "WORKER_POOL"
3030
}

0 commit comments

Comments
 (0)