|
| 1 | +#!/usr/bin/env python3 |
| 2 | +# -*- coding: utf-8 -*- |
| 3 | +# @Time : 2019/8/17 |
| 4 | +# @Author : github.com/guofei9987 |
| 5 | + |
| 6 | +import numpy as np |
| 7 | +from sko.base import SkoBase |
| 8 | +from sko.operators import mutation |
| 9 | +import multiprocessing as mp |
| 10 | + |
| 11 | +CPU_CORE_NUM = 12 |
| 12 | + |
| 13 | + |
| 14 | +class SimulatedAnnealingBase(SkoBase): |
| 15 | + """ |
| 16 | + DO SA(Simulated Annealing) |
| 17 | +
|
| 18 | + Parameters |
| 19 | + ---------------- |
| 20 | + func : function |
| 21 | + The func you want to do optimal |
| 22 | + n_dim : int |
| 23 | + number of variables of func |
| 24 | + x0 : array, shape is n_dim |
| 25 | + initial solution |
| 26 | + T_max :float |
| 27 | + initial temperature |
| 28 | + T_min : float |
| 29 | + end temperature |
| 30 | + L : int |
| 31 | + num of iteration under every temperature(Long of Chain) |
| 32 | +
|
| 33 | + Attributes |
| 34 | + ---------------------- |
| 35 | +
|
| 36 | +
|
| 37 | + Examples |
| 38 | + ------------- |
| 39 | + See https://github.com/guofei9987/scikit-opt/blob/master/examples/demo_sa.py |
| 40 | + """ |
| 41 | + |
| 42 | + def __init__(self, func, x0, T_max=100, T_min=1e-7, L=300, max_stay_counter=150, **kwargs): |
| 43 | + assert T_max > T_min > 0, 'T_max > T_min > 0' |
| 44 | + |
| 45 | + self.func = func |
| 46 | + self.T_max = T_max # initial temperature |
| 47 | + self.T_min = T_min # end temperature |
| 48 | + self.L = int(L) # num of iteration under every temperature(also called Long of Chain) |
| 49 | + # stop if best_y stay unchanged over max_stay_counter times (also called cooldown time) |
| 50 | + self.max_stay_counter = max_stay_counter |
| 51 | + |
| 52 | + self.n_dims = len(x0) |
| 53 | + |
| 54 | + self.best_x = np.array(x0) # initial solution |
| 55 | + self.best_y = self.func(self.best_x) |
| 56 | + self.T = self.T_max |
| 57 | + self.iter_cycle = 0 |
| 58 | + self.generation_best_X, self.generation_best_Y = [self.best_x], [self.best_y] |
| 59 | + # history reasons, will be deprecated |
| 60 | + self.best_x_history, self.best_y_history = self.generation_best_X, self.generation_best_Y |
| 61 | + |
| 62 | + def get_new_x(self, x): |
| 63 | + u = np.random.uniform(-1, 1, size=self.n_dims) |
| 64 | + x_new = x + 20 * np.sign(u) * self.T * ((1 + 1.0 / self.T) ** np.abs(u) - 1.0) |
| 65 | + return x_new |
| 66 | + |
| 67 | + def cool_down(self): |
| 68 | + self.T = self.T * 0.7 |
| 69 | + |
| 70 | + def isclose(self, a, b, rel_tol=1e-09, abs_tol=1e-30): |
| 71 | + return abs(a - b) <= max(rel_tol * max(abs(a), abs(b)), abs_tol) |
| 72 | + |
| 73 | + def run(self): |
| 74 | + x_current, y_current = self.best_x, self.best_y |
| 75 | + stay_counter = 0 |
| 76 | + while True: |
| 77 | + # loop L times under the same Temperature |
| 78 | + for i in range(self.L): |
| 79 | + x_new = self.get_new_x(x_current) |
| 80 | + y_new = self.func(x_new) |
| 81 | + print("[Info: ]i/L = {}/{}".format( |
| 82 | + i, self.L |
| 83 | + )) |
| 84 | + |
| 85 | + # Metropolis |
| 86 | + df = y_new - y_current |
| 87 | + if df < 0 or np.exp(-df / self.T) > np.random.rand(): |
| 88 | + x_current, y_current = x_new, y_new |
| 89 | + if y_new < self.best_y: |
| 90 | + self.best_x, self.best_y = x_new, y_new |
| 91 | + print("[Info: ] iter_cycle = {} T = {} stay_counter = {}".format( |
| 92 | + self.iter_cycle, self.T, stay_counter |
| 93 | + )) |
| 94 | + print("[Info: ]origin minimalScoreAuto = {}\n" |
| 95 | + " minimalScoreManual = {}\n" |
| 96 | + " minimalScoreSeed = {}\n" |
| 97 | + " minimalScore2d = {}".format( |
| 98 | + self.best_x[0], self.best_x[1], self.best_x[2], self.best_x[3] |
| 99 | + )) |
| 100 | + self.iter_cycle += 1 |
| 101 | + self.cool_down() |
| 102 | + self.generation_best_Y.append(self.best_y) |
| 103 | + self.generation_best_X.append(self.best_x) |
| 104 | + |
| 105 | + # if best_y stay for max_stay_counter times, stop iteration |
| 106 | + if self.isclose(self.best_y_history[-1], self.best_y_history[-2]): |
| 107 | + stay_counter += 1 |
| 108 | + else: |
| 109 | + stay_counter = 0 |
| 110 | + |
| 111 | + if self.T < self.T_min: |
| 112 | + stop_code = 'Cooled to final temperature' |
| 113 | + break |
| 114 | + if stay_counter > self.max_stay_counter: |
| 115 | + stop_code = 'Stay unchanged in the last {stay_counter} iterations'.format(stay_counter=stay_counter) |
| 116 | + break |
| 117 | + |
| 118 | + return self.best_x, self.best_y |
| 119 | + |
| 120 | + fit = run |
| 121 | + |
| 122 | + |
| 123 | +class SAFast(SimulatedAnnealingBase): |
| 124 | + ''' |
| 125 | + u ~ Uniform(0, 1, size = d) |
| 126 | + y = sgn(u - 0.5) * T * ((1 + 1/T)**abs(2*u - 1) - 1.0) |
| 127 | +
|
| 128 | + xc = y * (upper - lower) |
| 129 | + x_new = x_old + xc |
| 130 | +
|
| 131 | + c = n * exp(-n * quench) |
| 132 | + T_new = T0 * exp(-c * k**quench) |
| 133 | + ''' |
| 134 | + |
| 135 | + def __init__(self, func, x0, T_max=100, T_min=1e-7, L=300, max_stay_counter=150, **kwargs): |
| 136 | + # nit parent class |
| 137 | + super().__init__(func, x0, T_max, T_min, L, max_stay_counter, **kwargs) |
| 138 | + self.m, self.n, self.quench = kwargs.get('m', 1), kwargs.get('n', 1), kwargs.get('quench', 1) |
| 139 | + # upper and down are range of the parameters. |
| 140 | + self.lower, self.upper = kwargs.get('lower', -10), kwargs.get('upper', 10) |
| 141 | + self.c = self.m * np.exp(-self.n * self.quench) |
| 142 | + |
| 143 | + def get_new_x(self, x): |
| 144 | + """randomly search for a new x point""" |
| 145 | + r = np.random.uniform(-1, 1, size=self.n_dims) |
| 146 | + xc = np.sign(r) * self.T * ((1 + 1.0 / self.T) ** np.abs(r) - 1.0) |
| 147 | + x_new = x + xc * (self.upper - self.lower) |
| 148 | + return x_new |
| 149 | + |
| 150 | + def cool_down(self): |
| 151 | + self.T = self.T_max * np.exp(-self.c * self.iter_cycle ** self.quench) |
| 152 | + |
| 153 | + |
| 154 | +# SA_fast is the default |
| 155 | +SA = SAFast |
| 156 | + |
0 commit comments