|
2 | 2 | import platform |
3 | 3 | import os, re, time, sys |
4 | 4 | import subprocess |
| 5 | +import socket |
| 6 | +from contextlib import closing |
5 | 7 | import wget |
6 | 8 | import staroid |
7 | 9 | import requests |
@@ -98,13 +100,20 @@ def __download_jar(self, download_dir, url): |
98 | 100 | if not os.path.exists("{}/{}".format(download_dir, filename)): |
99 | 101 | wget.download(url, download_dir) |
100 | 102 |
|
| 103 | + def __find_free_port(self): |
| 104 | + with closing(socket.socket(socket.AF_INET, socket.SOCK_STREAM)) as s: |
| 105 | + s.bind(('localhost', 0)) |
| 106 | + s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) |
| 107 | + return s.getsockname()[1] |
| 108 | + |
101 | 109 | def start(self): |
102 | 110 | "Start cluster" |
103 | 111 |
|
104 | 112 | # run previous steps |
105 | 113 | self.install() |
106 | 114 |
|
107 | | - local_kube_api_port = 8001 |
| 115 | + local_kube_api_port = self.__find_free_port() |
| 116 | + self.__local_kube_api_port = local_kube_api_port |
108 | 117 | local_kube_api_addr = "http://localhost:{}".format(local_kube_api_port) |
109 | 118 |
|
110 | 119 | # create start namespace |
@@ -243,7 +252,7 @@ def session(self): |
243 | 252 | jars_packages = [] |
244 | 253 | session_builder = SparkSession.builder \ |
245 | 254 | .appName(self.__cluster_name) \ |
246 | | - .config("spark.master", "k8s://http://localhost:8001") \ |
| 255 | + .config("spark.master", "k8s://http://localhost:{}".format(self.__local_kube_api_port)) \ |
247 | 256 | .config("spark.kubernetes.namespace", ns.namespace()) \ |
248 | 257 | .config("spark.kubernetes.container.image", SPARK_ARTIFACTS[self.__spark_version]["image"]) \ |
249 | 258 | .config("spark.driver.host", "driver-{}".format(self.__cluster_name)) \ |
|
0 commit comments