@@ -28,6 +28,7 @@ import (
2828 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
2929 "k8s.io/client-go/informers"
3030 infov1 "k8s.io/client-go/informers/core/v1"
31+ policyv1 "k8s.io/client-go/informers/policy/v1beta1"
3132 storagev1 "k8s.io/client-go/informers/storage/v1"
3233 "k8s.io/client-go/kubernetes"
3334 corev1 "k8s.io/client-go/kubernetes/typed/core/v1"
@@ -59,6 +60,7 @@ type SchedulerCache struct {
5960
6061 podInformer infov1.PodInformer
6162 nodeInformer infov1.NodeInformer
63+ pdbInformer policyv1.PodDisruptionBudgetInformer
6264 nsInformer infov1.NamespaceInformer
6365 podGroupInformer kbinfov1.PodGroupInformer
6466 queueInformer kbinfov1.QueueInformer
@@ -269,6 +271,13 @@ func newSchedulerCache(config *rest.Config, schedulerName string, nsAsQueue bool
269271 },
270272 })
271273
274+ sc .pdbInformer = informerFactory .Policy ().V1beta1 ().PodDisruptionBudgets ()
275+ sc .pdbInformer .Informer ().AddEventHandler (cache.ResourceEventHandlerFuncs {
276+ AddFunc : sc .AddPDB ,
277+ UpdateFunc : sc .UpdatePDB ,
278+ DeleteFunc : sc .DeletePDB ,
279+ })
280+
272281 kbinformer := kbinfo .NewSharedInformerFactory (sc .kbclient , 0 )
273282 // create informer for PodGroup information
274283 sc .podGroupInformer = kbinformer .Scheduling ().V1alpha1 ().PodGroups ()
@@ -300,6 +309,7 @@ func newSchedulerCache(config *rest.Config, schedulerName string, nsAsQueue bool
300309}
301310
302311func (sc * SchedulerCache ) Run (stopCh <- chan struct {}) {
312+ go sc .pdbInformer .Informer ().Run (stopCh )
303313 go sc .podInformer .Informer ().Run (stopCh )
304314 go sc .nodeInformer .Informer ().Run (stopCh )
305315 go sc .podGroupInformer .Informer ().Run (stopCh )
@@ -329,6 +339,7 @@ func (sc *SchedulerCache) WaitForCacheSync(stopCh <-chan struct{}) bool {
329339 }
330340
331341 return cache .WaitForCacheSync (stopCh ,
342+ sc .pdbInformer .Informer ().HasSynced ,
332343 sc .podInformer .Informer ().HasSynced ,
333344 sc .podGroupInformer .Informer ().HasSynced ,
334345 sc .nodeInformer .Informer ().HasSynced ,
@@ -558,7 +569,7 @@ func (sc *SchedulerCache) Snapshot() *kbapi.ClusterInfo {
558569
559570 for _ , value := range sc .Jobs {
560571 // If no scheduling spec, does not handle it.
561- if value .PodGroup == nil {
572+ if value .PodGroup == nil && value . PDB == nil {
562573 glog .V (4 ).Infof ("The scheduling spec of Job <%v:%s/%s> is nil, ignore it." ,
563574 value .UID , value .Namespace , value .Name )
564575
@@ -622,26 +633,23 @@ func (sc *SchedulerCache) String() string {
622633 return str
623634}
624635
625- // UpdateJobStatus update the status of job and its tasks .
626- func (sc * SchedulerCache ) UpdateJobStatus (job * kbapi.JobInfo ) ( * kbapi. JobInfo , error ) {
636+ // RecordJobStatusEvent records related events according to job status .
637+ func (sc * SchedulerCache ) RecordJobStatusEvent (job * kbapi.JobInfo ) {
627638 jobErrMsg := job .FitError ()
628639
629- // If pending or unschedulable, record unschedulable event.
630- if job .PodGroup .Status .Phase == v1alpha1 .PodGroupUnknown ||
631- job .PodGroup .Status .Phase == v1alpha1 .PodGroupPending {
640+ pgUnschedulable := job .PodGroup != nil &&
641+ (job .PodGroup .Status .Phase == v1alpha1 .PodGroupUnknown ||
642+ job .PodGroup .Status .Phase == v1alpha1 .PodGroupPending )
643+ pdbUnschedulabe := job .PDB != nil && len (job .TaskStatusIndex [api .Pending ]) != 0
632644
645+ // If pending or unschedulable, record unschedulable event.
646+ if pgUnschedulable || pdbUnschedulabe {
633647 msg := fmt .Sprintf ("%v/%v tasks in gang unschedulable: %v" ,
634648 len (job .TaskStatusIndex [api .Pending ]), len (job .Tasks ), job .FitError ())
635649 sc .Recorder .Eventf (job .PodGroup , v1 .EventTypeWarning ,
636650 string (v1alpha1 .PodGroupUnschedulableType ), msg )
637651 }
638652
639- pg , err := sc .StatusUpdater .UpdatePodGroup (job .PodGroup )
640- if err != nil {
641- return nil , err
642- }
643- job .PodGroup = pg
644-
645653 // Update podCondition for tasks Allocated and Pending before job discarded
646654 for _ , status := range []api.TaskStatus {api .Allocated , api .Pending } {
647655 for _ , taskInfo := range job .TaskStatusIndex [status ] {
@@ -651,6 +659,17 @@ func (sc *SchedulerCache) UpdateJobStatus(job *kbapi.JobInfo) (*kbapi.JobInfo, e
651659 }
652660 }
653661 }
662+ }
663+
664+ // UpdateJobStatus update the status of job and its tasks.
665+ func (sc * SchedulerCache ) UpdateJobStatus (job * kbapi.JobInfo ) (* kbapi.JobInfo , error ) {
666+ pg , err := sc .StatusUpdater .UpdatePodGroup (job .PodGroup )
667+ if err != nil {
668+ return nil , err
669+ }
670+ job .PodGroup = pg
671+
672+ sc .RecordJobStatusEvent (job )
654673
655674 return job , nil
656675}
0 commit comments