|
31 | 31 |
|
32 | 32 | import java.util.*; |
33 | 33 | import java.util.concurrent.ConcurrentHashMap; |
| 34 | +import java.util.concurrent.Executors; |
| 35 | +import java.util.concurrent.ScheduledExecutorService; |
34 | 36 | import java.util.concurrent.Semaphore; |
| 37 | +import java.util.concurrent.TimeUnit; |
| 38 | +import java.util.concurrent.atomic.AtomicBoolean; |
35 | 39 | import java.util.stream.Collectors; |
36 | 40 |
|
37 | 41 | import static cd.go.contrib.elasticagents.docker.DockerPlugin.LOG; |
|
40 | 44 | public class DockerContainers implements AgentInstances<DockerContainer> { |
41 | 45 | private final Map<String, DockerContainer> instances = new ConcurrentHashMap<>(); |
42 | 46 | private List<JobIdentifier> jobsWaitingForAgentCreation = new ArrayList<>(); |
43 | | - private boolean refreshed; |
| 47 | + private AtomicBoolean refreshed = new AtomicBoolean(false); |
| 48 | + private final int FORCE_REFRESH_TIMEOUT_MINUTES = 60; |
| 49 | + |
44 | 50 | public Clock clock = Clock.DEFAULT; |
45 | 51 |
|
46 | 52 | final Semaphore semaphore = new Semaphore(0, true); |
47 | 53 |
|
| 54 | + private final ScheduledExecutorService timerService = Executors.newSingleThreadScheduledExecutor(); |
| 55 | + |
| 56 | + public DockerContainers() { |
| 57 | + scheduleForceRefresh(); |
| 58 | + } |
| 59 | + |
48 | 60 | @Override |
49 | 61 | public DockerContainer create(CreateAgentRequest request, PluginRequest pluginRequest, ConsoleLogAppender consoleLogAppender) throws Exception { |
50 | 62 | LOG.info(String.format("[Create Agent] Processing create agent request for %s", request.jobIdentifier())); |
@@ -77,12 +89,20 @@ public DockerContainer create(CreateAgentRequest request, PluginRequest pluginRe |
77 | 89 | } |
78 | 90 | } |
79 | 91 |
|
| 92 | + private void scheduleForceRefresh() { |
| 93 | + timerService.scheduleAtFixedRate(this::forceNextRefresh, 0, FORCE_REFRESH_TIMEOUT_MINUTES, TimeUnit.MINUTES); |
| 94 | + } |
| 95 | + |
80 | 96 | private void doWithLockOnSemaphore(Runnable runnable) { |
81 | 97 | synchronized (semaphore) { |
82 | 98 | runnable.run(); |
83 | 99 | } |
84 | 100 | } |
85 | 101 |
|
| 102 | + protected void forceNextRefresh() { |
| 103 | + refreshed.set(false); |
| 104 | + } |
| 105 | + |
86 | 106 | @Override |
87 | 107 | public void terminate(String agentId, PluginSettings settings) throws Exception { |
88 | 108 | DockerContainer instance = instances.get(agentId); |
@@ -125,24 +145,22 @@ public Agents instancesCreatedAfterTimeout(PluginSettings settings, Agents agent |
125 | 145 | if (instance == null) { |
126 | 146 | continue; |
127 | 147 | } |
128 | | - |
129 | 148 | if (clock.now().isAfter(instance.createdAt().plus(settings.getAutoRegisterPeriod()))) { |
130 | 149 | oldAgents.add(agent); |
131 | 150 | } |
132 | 151 | } |
133 | | - |
134 | 152 | return new Agents(oldAgents); |
135 | 153 | } |
136 | 154 |
|
137 | 155 | @Override |
138 | 156 | public void refreshAll(ClusterProfileProperties clusterProfileProperties) throws Exception { |
139 | | - if (!refreshed) { |
| 157 | + if (!refreshed.get()){ |
140 | 158 | DockerClient docker = docker(clusterProfileProperties); |
141 | 159 | List<Container> containers = docker.listContainers(DockerClient.ListContainersParam.withLabel(Constants.CREATED_BY_LABEL_KEY, Constants.PLUGIN_ID)); |
142 | 160 | for (Container container : containers) { |
143 | 161 | register(DockerContainer.fromContainerInfo(docker.inspectContainer(container.id()))); |
144 | 162 | } |
145 | | - refreshed = true; |
| 163 | + refreshed.set(true); |
146 | 164 | } |
147 | 165 | } |
148 | 166 |
|
|
0 commit comments