Skip to content

Commit e0591a9

Browse files
authored
Include pending tasks in quota calculations (#4651)
When creating tasks in parallel, a race condition could lead to quota over-commitment. Tasks are first created in a `PENDING` state, which was not included in quota calculations. This allowed multiple tasks to be created and submitted to Diego for execution, even if their combined resource usage exceeded the defined quota. When the task's state was subsequently updated, the model validation would fail due to the exceeded quota, incorrectly marking the task as `FAILED`. This created a state mismatch, as the task had already been successfully submitted to Diego. This change resolves the issue by including `PENDING` tasks in the memory and log rate limit calculations, treating the state as a desired state similar to apps. This prevents the initial over-submission of tasks. Additionally, the quota validation is now skipped when a task transitions from `PENDING` to `RUNNING`. This ensures that a task successfully submitted to Diego is correctly reflected as `RUNNING` in the Cloud Controller database, aligning the desired state with the action taken.
1 parent 6cab54a commit e0591a9

File tree

9 files changed

+143
-63
lines changed

9 files changed

+143
-63
lines changed

app/models/runtime/constraints/max_log_rate_limit_policy.rb

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,9 @@ class TaskMaxLogRateLimitPolicy < BaseMaxLogRateLimitPolicy
6969
private
7070

7171
def additional_checks
72-
IGNORED_STATES.exclude?(resource.state)
72+
IGNORED_STATES.exclude?(resource.state) &&
73+
# Skipping the TaskMaxLogRateLimitPolicy if the task is transitioning from PENDING to RUNNING state as it might be already running on Diego
74+
resource.column_change(:state) != [VCAP::CloudController::TaskModel::PENDING_STATE, VCAP::CloudController::TaskModel::RUNNING_STATE]
7375
end
7476

7577
def field

app/models/runtime/constraints/max_memory_policy.rb

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,9 @@ class TaskMaxMemoryPolicy < BaseMaxMemoryPolicy
5656
private
5757

5858
def additional_checks
59-
IGNORED_STATES.exclude?(resource.state)
59+
IGNORED_STATES.exclude?(resource.state) &&
60+
# Skipping the TaskMaxMemoryPolicy if the task is transitioning from PENDING to RUNNING state as it might be already running on Diego
61+
resource.column_change(:state) != [VCAP::CloudController::TaskModel::PENDING_STATE, VCAP::CloudController::TaskModel::RUNNING_STATE]
6062
end
6163

6264
def field

app/models/runtime/organization.rb

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -228,7 +228,7 @@ def validate
228228
end
229229

230230
def memory_used
231-
started_app_memory + running_task_memory
231+
started_app_memory + running_task_memory + pending_task_memory
232232
end
233233

234234
def has_remaining_memory(mem)
@@ -333,13 +333,17 @@ def memory_remaining
333333
end
334334

335335
def log_rate_limit_remaining
336-
quota_definition.log_rate_limit - (started_app_log_rate_limit + running_task_log_rate_limit)
336+
quota_definition.log_rate_limit - (started_app_log_rate_limit + running_task_log_rate_limit + pending_task_log_rate_limit)
337337
end
338338

339339
def running_task_memory
340340
tasks_dataset.where(state: TaskModel::RUNNING_STATE).sum(:memory_in_mb) || 0
341341
end
342342

343+
def pending_task_memory
344+
tasks_dataset.where(state: TaskModel::PENDING_STATE).sum(:memory_in_mb) || 0
345+
end
346+
343347
def started_app_memory
344348
processes_dataset.where(state: ProcessModel::STARTED).sum(Sequel.*(:memory, :instances)) || 0
345349
end
@@ -348,6 +352,10 @@ def running_task_log_rate_limit
348352
tasks_dataset.where(state: TaskModel::RUNNING_STATE).sum(:log_rate_limit) || 0
349353
end
350354

355+
def pending_task_log_rate_limit
356+
tasks_dataset.where(state: TaskModel::PENDING_STATE).sum(:log_rate_limit) || 0
357+
end
358+
351359
def started_app_log_rate_limit
352360
processes_dataset.where(state: ProcessModel::STARTED).sum(Sequel.*(:log_rate_limit, :instances)) || 0
353361
end

app/models/runtime/space.rb

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -334,7 +334,7 @@ def members
334334
end
335335

336336
def memory_used
337-
started_app_memory + running_task_memory
337+
started_app_memory + running_task_memory + pending_task_memory
338338
end
339339

340340
def running_and_pending_tasks_count
@@ -356,13 +356,17 @@ def memory_remaining
356356
end
357357

358358
def log_rate_limit_remaining
359-
space_quota_definition.log_rate_limit - (started_app_log_rate_limit + running_task_log_rate_limit)
359+
space_quota_definition.log_rate_limit - (started_app_log_rate_limit + running_task_log_rate_limit + pending_task_log_rate_limit)
360360
end
361361

362362
def running_task_memory
363363
tasks_dataset.where(state: TaskModel::RUNNING_STATE).sum(:memory_in_mb) || 0
364364
end
365365

366+
def pending_task_memory
367+
tasks_dataset.where(state: TaskModel::PENDING_STATE).sum(:memory_in_mb) || 0
368+
end
369+
366370
def started_app_memory
367371
processes_dataset.where(state: ProcessModel::STARTED).sum(Sequel.*(:memory, :instances)) || 0
368372
end
@@ -371,6 +375,10 @@ def running_task_log_rate_limit
371375
tasks_dataset.where(state: TaskModel::RUNNING_STATE).sum(:log_rate_limit) || 0
372376
end
373377

378+
def pending_task_log_rate_limit
379+
tasks_dataset.where(state: TaskModel::PENDING_STATE).sum(:log_rate_limit) || 0
380+
end
381+
374382
def started_app_log_rate_limit
375383
processes_dataset.where(state: ProcessModel::STARTED).sum(Sequel.*(:log_rate_limit, :instances)) || 0
376384
end

spec/unit/models/runtime/constraints/max_log_rate_limit_policy_spec.rb

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -169,5 +169,12 @@
169169
expect(validator).to validate_without_error(task)
170170
end
171171
end
172+
173+
context 'when the task state changes from PENDING to RUNNING' do
174+
it 'does not register error' do
175+
allow(task).to receive(:column_change).with(:state).and_return([VCAP::CloudController::TaskModel::PENDING_STATE, VCAP::CloudController::TaskModel::RUNNING_STATE])
176+
expect(validator).to validate_without_error(task)
177+
end
178+
end
172179
end
173180
end

spec/unit/models/runtime/constraints/max_memory_policy_spec.rb

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -82,5 +82,12 @@
8282
expect(validator).to validate_without_error(task)
8383
end
8484
end
85+
86+
context 'when the task state changes from PENDING to RUNNING' do
87+
it 'does not register error' do
88+
allow(task).to receive(:column_change).with(:state).and_return([VCAP::CloudController::TaskModel::PENDING_STATE, VCAP::CloudController::TaskModel::RUNNING_STATE])
89+
expect(validator).to validate_without_error(task)
90+
end
91+
end
8592
end
8693
end

spec/unit/models/runtime/organization_spec.rb

Lines changed: 31 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -447,61 +447,55 @@ module VCAP::CloudController
447447
end
448448

449449
context 'memory quota' do
450-
let(:quota) do
451-
QuotaDefinition.make(memory_limit: 500)
452-
end
450+
let(:quota) { QuotaDefinition.make(memory_limit: 500) }
451+
let(:org) { Organization.make(quota_definition: quota) }
452+
let(:space) { Space.make(organization: org) }
453453

454454
it 'returns the memory available when no processes are running' do
455-
org = Organization.make(quota_definition: quota)
456-
space = Space.make(organization: org)
457455
ProcessModelFactory.make(space: space, memory: 200, instances: 2)
458456

459457
expect(org.has_remaining_memory(500)).to be(true)
460458
expect(org.has_remaining_memory(501)).to be(false)
461459
end
462460

463461
it 'returns the memory remaining when processes are consuming memory' do
464-
org = Organization.make(quota_definition: quota)
465-
space = Space.make(organization: org)
466-
467-
ProcessModelFactory.make(space: space, memory: 200, instances: 2, state: 'STARTED', type: 'worker')
468-
ProcessModelFactory.make(space: space, memory: 50, instances: 1, state: 'STARTED')
462+
ProcessModelFactory.make(space: space, memory: 200, instances: 2, state: ProcessModel::STARTED, type: 'worker')
463+
ProcessModelFactory.make(space: space, memory: 50, instances: 1, state: ProcessModel::STARTED)
469464

470465
expect(org.has_remaining_memory(50)).to be(true)
471466
expect(org.has_remaining_memory(51)).to be(false)
472467
end
473468

474469
it 'includes RUNNING tasks when returning remaining memory' do
475-
org = Organization.make(quota_definition: quota)
476-
space = Space.make(organization: org)
477-
478-
process = ProcessModelFactory.make(space: space, memory: 250, instances: 1, state: 'STARTED', type: 'worker')
470+
process = ProcessModelFactory.make(space: space, memory: 250, instances: 1, state: ProcessModel::STARTED, type: 'worker')
479471
TaskModel.make(app: process.app, memory_in_mb: 25, state: TaskModel::RUNNING_STATE)
480472
TaskModel.make(app: process.app, memory_in_mb: 25, state: TaskModel::RUNNING_STATE)
481473

482474
expect(org.has_remaining_memory(200)).to be(true)
483475
expect(org.has_remaining_memory(201)).to be(false)
484476
end
485477

478+
it 'includes PENDING tasks when determining available memory' do
479+
process = ProcessModelFactory.make(space: space, memory: 200, instances: 2, state: ProcessModel::STARTED)
480+
TaskModel.make(app: process.app, memory_in_mb: 50, state: TaskModel::PENDING_STATE)
481+
482+
expect(org.has_remaining_memory(50)).to be(true)
483+
expect(org.has_remaining_memory(51)).to be(false)
484+
end
485+
486486
context 'when memory quota is unlimited (-1)' do
487-
let(:quota) do
488-
QuotaDefinition.make(memory_limit: -1)
489-
end
487+
let(:quota) { QuotaDefinition.make(memory_limit: -1) }
490488

491489
it "indicates that there's more memory remaining" do
492-
org = Organization.make(quota_definition: quota)
493-
494490
expect(org.has_remaining_memory(1 << 63)).to be(true) # a very big number
495491
end
496492
end
497493

498494
it 'does NOT include non-RUNNING tasks when returning remaining memory' do
499-
org = Organization.make(quota_definition: quota)
500-
space = Space.make(organization: org)
501-
502-
process = ProcessModelFactory.make(space: space, memory: 250, instances: 1, state: 'STARTED', type: 'worker')
503-
TaskModel.make(app: process.app, memory_in_mb: 25, state: TaskModel::PENDING_STATE)
495+
process = ProcessModelFactory.make(space: space, memory: 250, instances: 1, state: ProcessModel::STARTED, type: 'worker')
504496
TaskModel.make(app: process.app, memory_in_mb: 25, state: TaskModel::SUCCEEDED_STATE)
497+
TaskModel.make(app: process.app, memory_in_mb: 25, state: TaskModel::FAILED_STATE)
498+
TaskModel.make(app: process.app, memory_in_mb: 25, state: TaskModel::CANCELING_STATE)
505499

506500
expect(org.has_remaining_memory(250)).to be(true)
507501
expect(org.has_remaining_memory(251)).to be(false)
@@ -548,6 +542,19 @@ module VCAP::CloudController
548542
expect(org.has_remaining_log_rate_limit(4)).to be_falsey
549543
end
550544

545+
it 'considers tasks in state PENDING' do
546+
TaskModel.make(app: app_model, log_rate_limit: 2, state: TaskModel::PENDING_STATE)
547+
expect(org.has_remaining_log_rate_limit(8)).to be_truthy
548+
expect(org.has_remaining_log_rate_limit(9)).to be_falsey
549+
end
550+
551+
it 'does not consider tasks in other states' do
552+
TaskModel.make(app: app_model, log_rate_limit: 2, state: TaskModel::SUCCEEDED_STATE)
553+
TaskModel.make(app: app_model, log_rate_limit: 2, state: TaskModel::FAILED_STATE)
554+
TaskModel.make(app: app_model, log_rate_limit: 2, state: TaskModel::CANCELING_STATE)
555+
expect(org.has_remaining_log_rate_limit(10)).to be_truthy
556+
end
557+
551558
context 'when something else is running in another org' do
552559
it 'only accounts for things running in the owning org' do
553560
ProcessModelFactory.make(space: space_org2, log_rate_limit: 1, instances: 2, state: 'STARTED')

spec/unit/models/runtime/space_spec.rb

Lines changed: 32 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -652,24 +652,34 @@ module VCAP::CloudController
652652
end
653653

654654
it 'returns true if there is enough memory remaining when processes are consuming memory' do
655-
ProcessModelFactory.make(space: space, memory: 200, instances: 2, state: 'STARTED', type: 'other')
656-
ProcessModelFactory.make(space: space, memory: 50, instances: 1, state: 'STARTED')
655+
ProcessModelFactory.make(space: space, memory: 200, instances: 2, state: ProcessModel::STARTED, type: 'other')
656+
ProcessModelFactory.make(space: space, memory: 50, instances: 1, state: ProcessModel::STARTED)
657657

658658
expect(space.has_remaining_memory(50)).to be(true)
659659
expect(space.has_remaining_memory(51)).to be(false)
660660
end
661661

662662
it 'includes RUNNING tasks when determining available memory' do
663-
process = ProcessModelFactory.make(space: space, memory: 200, instances: 2, state: 'STARTED')
664-
TaskModel.make(app: process.app, memory_in_mb: 50, state: 'RUNNING')
663+
process = ProcessModelFactory.make(space: space, memory: 200, instances: 2, state: ProcessModel::STARTED)
664+
TaskModel.make(app: process.app, memory_in_mb: 50, state: TaskModel::RUNNING_STATE)
665+
666+
expect(space.has_remaining_memory(50)).to be(true)
667+
expect(space.has_remaining_memory(51)).to be(false)
668+
end
669+
670+
it 'includes PENDING tasks when determining available memory' do
671+
process = ProcessModelFactory.make(space: space, memory: 200, instances: 2, state: ProcessModel::STARTED)
672+
TaskModel.make(app: process.app, memory_in_mb: 50, state: TaskModel::PENDING_STATE)
665673

666674
expect(space.has_remaining_memory(50)).to be(true)
667675
expect(space.has_remaining_memory(51)).to be(false)
668676
end
669677

670678
it 'does not include non-RUNNING tasks when determining available memory' do
671-
process = ProcessModelFactory.make(space: space, memory: 200, instances: 2, state: 'STARTED')
672-
TaskModel.make(app: process.app, memory_in_mb: 50, state: 'SUCCEEDED')
679+
process = ProcessModelFactory.make(space: space, memory: 200, instances: 2, state: ProcessModel::STARTED)
680+
TaskModel.make(app: process.app, memory_in_mb: 50, state: TaskModel::SUCCEEDED_STATE)
681+
TaskModel.make(app: process.app, memory_in_mb: 51, state: TaskModel::FAILED_STATE)
682+
TaskModel.make(app: process.app, memory_in_mb: 52, state: TaskModel::CANCELING_STATE)
673683

674684
expect(space.has_remaining_memory(100)).to be(true)
675685
expect(space.has_remaining_memory(101)).to be(false)
@@ -710,11 +720,11 @@ module VCAP::CloudController
710720

711721
context 'when something else is running' do
712722
it 'takes all things in the space into account' do
713-
ProcessModelFactory.make(space: space, log_rate_limit: 5, state: 'STARTED')
723+
ProcessModelFactory.make(space: space, log_rate_limit: 5, state: ProcessModel::STARTED)
714724
expect(space.has_remaining_log_rate_limit(5)).to be_truthy
715725
expect(space.has_remaining_log_rate_limit(6)).to be_falsey
716726

717-
ProcessModelFactory.make(space: space, log_rate_limit: 1, state: 'STARTED')
727+
ProcessModelFactory.make(space: space, log_rate_limit: 1, state: ProcessModel::STARTED)
718728
expect(space.has_remaining_log_rate_limit(4)).to be_truthy
719729
expect(space.has_remaining_log_rate_limit(5)).to be_falsey
720730

@@ -723,9 +733,22 @@ module VCAP::CloudController
723733
expect(space.has_remaining_log_rate_limit(4)).to be_falsey
724734
end
725735

736+
it 'considers tasks in state PENDING' do
737+
TaskModel.make(app: app_model, log_rate_limit: 2, state: TaskModel::PENDING_STATE)
738+
expect(space.has_remaining_log_rate_limit(8)).to be_truthy
739+
expect(space.has_remaining_log_rate_limit(9)).to be_falsey
740+
end
741+
742+
it 'does not consider tasks in other states' do
743+
TaskModel.make(app: app_model, log_rate_limit: 2, state: TaskModel::SUCCEEDED_STATE)
744+
TaskModel.make(app: app_model, log_rate_limit: 2, state: TaskModel::FAILED_STATE)
745+
TaskModel.make(app: app_model, log_rate_limit: 2, state: TaskModel::CANCELING_STATE)
746+
expect(space.has_remaining_log_rate_limit(10)).to be_truthy
747+
end
748+
726749
context 'when processes are running in another space' do
727750
it 'only accounts for processes running in the owning space' do
728-
ProcessModelFactory.make(space: space2, log_rate_limit: 1, instances: 2, state: 'STARTED')
751+
ProcessModelFactory.make(space: space2, log_rate_limit: 1, instances: 2, state: ProcessModel::STARTED)
729752

730753
expect(space.has_remaining_log_rate_limit(10)).to be_truthy
731754
expect(space.has_remaining_log_rate_limit(11)).to be_falsey

0 commit comments

Comments
 (0)