diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml deleted file mode 100644 index 52cd746ef9a8..000000000000 --- a/.github/workflows/build.yml +++ /dev/null @@ -1,80 +0,0 @@ -name: CI - -# Controls when the action will run. Triggers the workflow on push or pull request -# events but only for the master branch -on: - push: - branches: - - master - paths-ignore: - - README.md - - pull_request: - branches: - - master - paths-ignore: - - README.md - - 'docs/**' - -# A workflow run is made up of one or more jobs that can run sequentially or in parallel -jobs: - # This workflow contains a single job called "build" - build: - runs-on: ${{ matrix.os }} - - strategy: - matrix: - name: - - "CentOS 7" - include: - - name: "CentOS 7" - os: ubuntu-latest - docker_image: yugabyteci/yb_build_infra_centos7:v2020-05-24T22_16_17 - - if: > - (github.event_name == 'push' && - !contains(github.event.head_commit.message, 'skip ci') && - !contains(github.event.head_commit.message, 'ci skip')) || - github.event_name == 'pull_request' - - steps: - - uses: actions/checkout@v2 - - - name: Build code without tests - run: | - echo "OSTYPE (outside Docker): $OSTYPE" - echo "Build name: ${{ matrix.name }}" - echo "Docker image: ${{ matrix.docker_image }}" - if [[ $OSTYPE == linux* ]]; then - build_dir_in_container=/opt/yb-build/yugabyte-db - docker run \ - -i \ - "-w=$build_dir_in_container" \ - --mount type=bind,source="$PWD",target="$build_dir_in_container" \ - "${{ matrix.docker_image }}" \ - bash -c ' - set -euo pipefail -x - echo "OSTYPE (inside Docker): $OSTYPE" - echo "PATH: $PATH" - export PATH=/usr/local/bin:$PATH - ( set -x; ls -l /usr/local/bin ) - set +e - ( set -x; which ninja ) - ( set -x; which cmake ) - set -e - chown -R yugabyteci . - sudo -u yugabyteci bash -c " - set -euo pipefail - export PATH=/usr/local/bin:\$PATH - df -H / - echo ::group::Building the code without tests - ./yb_build.sh release packaged --download-thirdparty --ninja - echo ::endgroup:: - df -H / - " - ' - elif [[ $OSTYPE == darwin* ]]; then - system_profiler SPSoftwareDataType - sw_vers - ./yb_build.sh release - fi diff --git a/.github/workflows/yb-ctl-test.yml b/.github/workflows/yb-ctl-test.yml new file mode 100644 index 000000000000..b90b81bd236d --- /dev/null +++ b/.github/workflows/yb-ctl-test.yml @@ -0,0 +1,42 @@ +name: yb-ctl-test + +# Controls when the action will run. Triggers the workflow on push or pull request +# events but only for the master branch +on: + push: + branches: + - master + paths: + - '**/yb-ctl*' + + pull_request: + branches: + - master + paths: + - '**/yb-ctl*' + +# A workflow run is made up of one or more jobs that can run sequentially or in parallel +jobs: + yb-ctl-test: + runs-on: ${{ matrix.os }} + + strategy: + matrix: + os: [ macos-latest, ubuntu-18.04 ] + python-version: [ 2.7, 3.8 ] + + steps: + - name: Set up Python ${{ matrix.python-version }} + uses: actions/setup-python@v2 + with: + python-version: ${{ matrix.python-version }} + + - uses: actions/checkout@v2 + + - name: Run yb-ctl-test + run: | + set -x + tmpdir=$(mktemp -d) + cp -r scripts/installation "${tmpdir}/" + cd "${tmpdir}/installation" + test/yb-ctl-test.sh diff --git a/.gitmodules b/.gitmodules index 80481ac93be5..52786a1c2a15 100644 --- a/.gitmodules +++ b/.gitmodules @@ -1,6 +1,3 @@ -[submodule "submodules/yugabyte-installation"] - path = submodules/yugabyte-installation - url = https://github.com/yugabyte/yugabyte-installation.git [submodule "submodules/yugabyte-bash-common"] path = submodules/yugabyte-bash-common url = https://github.com/yugabyte/yugabyte-bash-common.git diff --git a/bin/yb-ctl b/bin/yb-ctl index fb4a5d4bc935..689c5e523fdc 100755 --- a/bin/yb-ctl +++ b/bin/yb-ctl @@ -26,4 +26,4 @@ set -euo pipefail set_sanitizer_runtime_options # Invoke the actual yb-ctl script. -"$YB_SRC_ROOT"/submodules/yugabyte-installation/bin/yb-ctl "$@" +"$YB_SRC_ROOT"/scripts/installation/bin/yb-ctl "$@" diff --git a/scripts/installation/bin/yb-ctl b/scripts/installation/bin/yb-ctl new file mode 100755 index 000000000000..a18879f4ee9f --- /dev/null +++ b/scripts/installation/bin/yb-ctl @@ -0,0 +1,2027 @@ +#!/usr/bin/env python + +# Copyright (c) YugaByte, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except +# in compliance with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software distributed under the License +# is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express +# or implied. See the License for the specific language governing permissions and limitations +# under the License. +# +"""A script to manage a local YugaByte cluster. + +We will aim to maintain https://docs.yugabyte.com/admin/yb-ctl/ as public facing documentation! + +Example use cases: + +Creating a cluster with default settings + yb-ctl start (yb-ctl create) + +Creating a cluster with replication factor 5 + yb-ctl --rf 5 start + +Creating a cluster with placement_info + yb-ctl start + --placement_info "cloud1.region1.zone1,cloud2.region2.zone2,cloud3.region3.zone3" + +Creating a cluster with custom_flags + yb-ctl start --master_flags "flag1=value,flag2=value,flag3=value" + --tserver_flags "flag1=value,flag2=value,flag3=value" + +Destroying a cluster + yb-ctl destroy + +Restart the cluster + yb-ctl restart + +Wipe restart + yb-ctl wipe_restart + +Destroying your local cluster and its data + yb-ctl destroy + +Add node + yb-ctl add_node (--placement_info "cloud1.region1.zone1") + +Stopping node #X from your cluster + yb-ctl remove_node + +Start node + yb-ctl start_node (--placement_info "cloud1.region1.zone1") + +Stop node + yb-ctl stop_node + +Restart node + yb-ctl restart_node + +""" + +from __future__ import print_function + +import atexit +import argparse +import errno +import glob +import hashlib +import json +import logging +import os +import random +import re +import shutil +import signal +import subprocess +import sys +import time +import tempfile +import csv + +DAEMON_TYPE_MASTER = 'master' +DAEMON_TYPE_TSERVER = 'tserver' +PROTOCOL_TYPE_YSQL = 'ysql' +PROTOCOL_TYPE_YCQL = 'ycql' +PROTOCOL_TYPE_YEDIS = 'yedis' + +DAEMON_TYPES = [ + DAEMON_TYPE_MASTER, + DAEMON_TYPE_TSERVER +] + +PROTOCOL_TYPES = { + PROTOCOL_TYPE_YSQL, + PROTOCOL_TYPE_YCQL, + PROTOCOL_TYPE_YEDIS +} + +YSQL_DEFAULT_PORT = 5433 +YCQL_DEFAULT_PORT = 9042 +YEDIS_DEFAULT_PORT = 6379 + +LOCALHOST_IP = '127.0.0.1' + +SLEEP_TIME_IN_SEC = 1 +MAX_YB_ADMIN_WAIT_SEC = 45 +MAX_WAIT_FOR_PROCESSES_RUNNING_SEC = 10 +DEFAULT_REPLICATION_FACTOR = 1 +DEFAULT_IP_START = 1 + +# A regex to get data directories out a command line of a running process. +# Needs to match e.g. the following command line snippet: +# --fs_data_dirs /tmp/yb-ctl-test-data-2019-06-05T14_14_12-4841/node-1/disk-1 +FS_DATA_DIRS_ARG_RE = re.compile(r'--fs_data_dirs[ =](\S+)') + + +def call_get_output_maybe_error(cmd_list, should_get_error=False): + """ + Subprocess call the passed in command and on success, return the output. + :param cmd_list: the command to execute + :param should_get_error: if to capture and log the error on failure + :return: the output of the command, on success, else raises a CalledProcessError + """ + with open(os.devnull, "wb") as devnull: + stderr = subprocess.PIPE if should_get_error else devnull + stderr = subprocess.PIPE + proc = subprocess.Popen( + cmd_list, stdout=subprocess.PIPE, stderr=stderr) + output, error = proc.communicate() + if proc.returncode: + raise subprocess.CalledProcessError( + proc.returncode, cmd_list, output="{}\n{}".format(output, error)) + return output + + +def retry_call_with_timeout(fn, timeout_sec=MAX_YB_ADMIN_WAIT_SEC): + """ + This will retry a given function that is supposed to be doing subprocess callouts. Based on + the passed in function behavior, this function will behave accordingly: + - if fn returns True, we return True + - if fn does not return True, we keep retrying + - if fn throws a CalledProcessError, we keep retrying and log errors if the last run + - if we hit the timeout, we just return + Note: the function takes a bool arg to decide if to capture and log stderr on error. + :param fn: the function to retry calling + :param timeout_sec: the amount of time to keep retrying + """ + start_time = time.time() + while True: + time_elapsed = time.time() - start_time + if time_elapsed > timeout_sec: + return + is_last_iteration = time_elapsed + SLEEP_TIME_IN_SEC > timeout_sec + try: + ret = fn(is_last_iteration) + if ret: + return ret + except subprocess.CalledProcessError as e: + if is_last_iteration: + logging.error("Failed too many times. CMDLINE={} RETCODE={} OUTPUT={}".format( + e.cmd, e.returncode, e.output)) + time.sleep(SLEEP_TIME_IN_SEC) + + +def wait_for_proc_report_progress(proc): + """ + Poll process to check if it has terminated and print one . per second. + This is used as a way to indicate to user that process is still running. + :param proc: the process whose status needs to be checked + """ + INITIAL_SECONDS_WITHOUT_PROGRESS_REPORTING = 2 + start_time_sec = time.time() + printed_dots = False + while proc.poll() is None: + # Do not add newline at the end, just report one . per second. + if time.time() - start_time_sec > INITIAL_SECONDS_WITHOUT_PROGRESS_REPORTING: + print(".", end="") + sys.stdout.flush() + printed_dots = True + time.sleep(1) + if printed_dots: + # Add a newline, since we did not do so during above printing of dots. + print() + + +def is_env_var_true(env_var_name): + env_var_value = os.getenv(env_var_name) + return env_var_value and env_var_value.strip().lower() not in ['n', 'no', '0', 'f', 'false'] + + +def format_cmd_line_with_host_port(executable, host, port, default_port): + """ + Adds -h host -p port options if necessary. This works for ysqlsh (psql) and redis-cli. + """ + cmd_line = str(executable) + if host != LOCALHOST_IP: + cmd_line += ' -h %s' % host + if port != default_port: + cmd_line += ' -p %d' % port + return cmd_line + + +DISABLE_CALLHOME_ENV_VAR_SET = is_env_var_true('YB_DISABLE_CALLHOME') + + +class ExitWithError(Exception): + pass + + +def get_local_ip(index): + return "127.0.0.{}".format(index) + + +def validate_daemon_type(daemon_type): + if daemon_type not in DAEMON_TYPES: + raise RuntimeError("Invalid daemon type: '{}'".format(daemon_type)) + + +def get_binary_name_for_daemon_type(daemon_type): + return "yb-{}".format(daemon_type) + + +def adjust_env_for_ysql(): + # TODO: we should not need to do this if Linuxbrew's glibc bundled with the YB package has + # proper access to locale data. + for k in os.environ.keys(): + if k == 'LANG' or k.startswith('LC_'): + del os.environ[k] + + +def get_home_dir(): + return os.path.expanduser('~') + + +def get_default_data_dir(): + return os.path.join(get_home_dir(), 'yugabyte-data') + + +def get_os_family(): + if sys.platform.startswith('linux'): + return 'linux' + if sys.platform.startswith('darwin'): + return 'darwin' + raise ValueError("Unsupported operating system: sys.platform=%s" % sys.platform) + + +def is_linux(): + return get_os_family() == 'linux' + + +def get_file_sha256_sum(file_path): + """ + Compute a SHA256 checksum of a file. Based on http://bit.ly/2uGHL8N + """ + sha256_hash = hashlib.sha256() + with open(file_path, "rb") as f: + for byte_block in iter(lambda: f.read(1048576), b""): + sha256_hash.update(byte_block) + return sha256_hash.hexdigest() + + +def download_file(url, dest_path): + """ + Download a file and return its SHA256 sum. + """ + try: + with open(dest_path, 'wb') as dest_file: + if sys.version_info[0] >= 3: + import urllib.request + req = urllib.request.Request(url, headers={'user-agent': 'Mozilla'}) + remote_file = urllib.request.urlopen(req) + else: + import urllib2 + req = urllib2.Request(url) + req.add_header('user-agent', 'Mozilla') + remote_file = urllib2.urlopen(req) + try: + sha256_hash = hashlib.sha256() + for byte_block in iter(lambda: remote_file.read(1048576), b""): + sha256_hash.update(byte_block) + dest_file.write(byte_block) + return sha256_hash.hexdigest() + finally: + remote_file.close() + except: # noqa + if os.path.exists(dest_path): + logging.warn("Deleting the unfinished download: %s", dest_path) + os.remove(dest_path) + raise + + +def mkdir_p(dir_path): + try: + if not os.path.exists(dir_path): + os.makedirs(dir_path) + except IOError as ex: + if os.path.isdir(dir_path): + # Concurrent directory creation. + return + raise + + +def has_rel_paths(top_dir, rel_paths): + for rel_path in rel_paths: + if not os.path.exists(os.path.join(top_dir, rel_path)): + return False + return True + + +def is_yugabyte_db_installation_dir(top_dir): + """ + Checks if the given directory is a viable YugaByte DB installation directory. A build directory + with master/tserver/postgres binaries already built would match this definition. + """ + if top_dir is None: + return False + return has_rel_paths( + top_dir, [ + 'bin/yb-master', + 'bin/yb-tserver', + 'postgres/bin/postgres' + ]) + + +def remove_surrounding_quotes(s): + if len(s) >= 2: + for quote in ['"', "'"]: + if s.startswith(quote) and s.endswith(quote): + return s[1:-1] + return s + + +def is_flag_true(flags, name): + value = flags.get(name, '').lower() + return value == '1' or value == 'true' + + +class Installer: + YUGABYTE_DB_VERSION = '2.1.8.2' + DOWNLOAD_URL_PATTERN = 'https://downloads.yugabyte.com/yugabyte-{version}-{os}.tar.gz' + SHA256_SUM_BY_OS = { + 'darwin': 'dd6cbd63ad4dd150c9707ed5dc8f3696adf9828dff941bae2255bc04eff7e924', + 'linux': 'e4709b75bc6f180d91281b1c898b0dbe9ef7ea81ed7c9a5ee2368d78c66a664e' + } + + def __init__(self, only_find_existing=False): + self.installation_dir = None + self.only_find_existing = only_find_existing + + def get_download_cache_dir(self): + return os.path.join(get_home_dir(), '.cache', 'yugabyte', 'downloads') + + def get_software_installation_top_dir(self): + return os.path.join(get_home_dir(), 'yugabyte-db') + + def install_or_find_existing(self): + """ + Downloads/installs YugaByte DB, or only finds an existing installation, if + "only_find_existing" was set during this installer's creation. + + Returns True in case YugaByte DB was installed or an existing installation was found. + Returns False only if only_find_existing is specified and no existing installation found. + Errors are still handled by raising exceptions. + """ + installation_top_dir = self.get_software_installation_top_dir() + self.installation_dir = os.path.join( + installation_top_dir, 'yugabyte-' + Installer.YUGABYTE_DB_VERSION) + if os.path.exists(self.installation_dir): + if not is_yugabyte_db_installation_dir(self.installation_dir): + logging.error( + "Directory %s exists but does not appear to be a valid YugaByte DB " + "installation directory. Remove that directory and re-run the script " + "to re-install YugaByte DB.", self.installation_dir) + sys.exit(1) + # Already installed. + logging.info("Found existing YugaByte DB installation at %s", self.installation_dir) + # This will re-run the post-install script if it did not complete initially. + return self.run_post_install_script() + + if self.only_find_existing: + # No installation found, and we're not allowed to make any changes. + return False + + cache_dir = self.get_download_cache_dir() + mkdir_p(cache_dir) + os_family = get_os_family() + download_url = Installer.DOWNLOAD_URL_PATTERN.format( + version=self.YUGABYTE_DB_VERSION, + os=os_family) + download_name = download_url.rsplit('/', 1)[-1] + download_dest_path = os.path.join(cache_dir, download_name) + expected_sha256_sum = Installer.SHA256_SUM_BY_OS[os_family] + + need_to_download = True + if os.path.exists(download_dest_path): + logging.info("File %s already exists, validating the checksum", download_dest_path) + existing_sha256_sum = get_file_sha256_sum(download_dest_path) + if existing_sha256_sum == expected_sha256_sum: + logging.info("Checksum is valid for %s", download_dest_path) + need_to_download = False + else: + logging.info( + "Existing file %s has an SHA-256 sum %s, different from the expected sum %s. " + "Removing the existing file and re-downloading.", + download_dest_path, existing_sha256_sum, expected_sha256_sum) + os.remove(download_dest_path) + + if need_to_download: + print("Downloading %s to %s", download_url, download_dest_path) + downloaded_sha256_sum = download_file(download_url, download_dest_path) + if downloaded_sha256_sum != expected_sha256_sum: + raise IOError( + "Downloaded file %s has SHA-256 sum %s, different from expected: %s" % ( + download_dest_path, downloaded_sha256_sum, expected_sha256_sum)) + + mkdir_p(installation_top_dir) + logging.info("Extracting %s in directory %s", download_dest_path, installation_top_dir) + subprocess.check_call( + ['tar', + 'xf', + download_dest_path], + cwd=installation_top_dir) + if not os.path.isdir(self.installation_dir): + raise RuntimeError( + "Extracting %s in directory %s failed to produce directory %s" % ( + download_dest_path, installation_top_dir, self.installation_dir)) + self.run_post_install_script() + + def run_post_install_script(self): + if not is_linux(): + # No post_install.sh script on macOS. + return True + post_install_script_path = os.path.join( + self.installation_dir, 'bin', 'post_install.sh') + post_install_completion_flag_path = post_install_script_path + '.completed' + if os.path.exists(post_install_completion_flag_path): + logging.debug( + "File %s already exists, meaning the post_install.sh script has already run.", + post_install_completion_flag_path) + return True + if self.only_find_existing: + logging.warning( + "The post-install script did not complete successfully earlier in %s. Specify " + "--install-if-needed to re-run it.", + self.installation_dir) + return False + + logging.info("Running the post-installation script %s", post_install_script_path) + process = subprocess.Popen( + post_install_script_path, stdout=subprocess.PIPE, stderr=subprocess.PIPE) + std_out, std_err = process.communicate() + if process.returncode != 0: + logging.error( + "Failed running %s (exit code: %d). Standard output:\n%s\n. Standard error:\n%s", + post_install_script_path, process.returncode, std_out, std_err) + raise RuntimeError("Failed running %s" % post_install_script_path) + logging.info("Successfully ran the post-installation script") + + with open(post_install_completion_flag_path, 'w'): + # Write an empty file. + pass + + +class SetAndRestoreEnv: + """A utility class to save environment variables, and optionally set new environment. """ + def __init__(self, new_env=None): + self.old_env = {} + self.new_env = new_env + + def __enter__(self): + for k in os.environ: + self.old_env[k] = os.environ[k] + if self.new_env: + for k in self.new_env: + v = self.new_env[k] + if v is None: + del os.environ[k] + else: + os.environ[k] = v + + def __exit__(self, type, value, traceback): + for k in os.environ.keys(): + if k not in self.old_env: + del os.environ[k] + for k in self.old_env: + os.environ[k] = self.old_env[k] + + +class DaemonId: + def __init__(self, daemon_type, index): + validate_daemon_type(daemon_type) + + self.daemon_type = daemon_type + self.index = index + + def __str__(self): + return "{}-{}".format(self.daemon_type, self.index) + + def __hash__(self): + return hash(str(self)) + + def __eq__(self, other): + return self.daemon_type == other.daemon_type and self.index == other.index + + def is_master(self): + return self.daemon_type == DAEMON_TYPE_MASTER + + def is_tserver(self): + return self.daemon_type == DAEMON_TYPE_TSERVER + + def supports_placement(self): + return self.daemon_type in [DAEMON_TYPE_MASTER, DAEMON_TYPE_TSERVER] + + +def get_default_base_ports_dict(): + return { + DAEMON_TYPE_MASTER: { + "http": 7000, + "rpc": 7100 + }, + DAEMON_TYPE_TSERVER: { + "http": 9000, + "rpc": 9100, + }, + PROTOCOL_TYPE_YSQL: { + "http": 13000, + "rpc": YSQL_DEFAULT_PORT + }, + PROTOCOL_TYPE_YCQL: { + "http": 12000, + "rpc": YCQL_DEFAULT_PORT, + }, + PROTOCOL_TYPE_YEDIS: { + "http": 11000, + "rpc": YEDIS_DEFAULT_PORT + } + } + + +class ClusterOptions: + def __init__(self): + self.max_daemon_index = 20 + self.num_shards_per_tserver = None + self.ysql_num_shards_per_tserver = None + self.timeout_yb_admin_sec = None + self.timeout_processes_running_sec = None + self.master_memory_limit_ratio = 0.35 + self.tserver_memory_limit_ratio = 0.65 + + self.cluster_base_dir = None + + self.custom_binary_dir = None + self.script_dir = os.path.dirname(os.path.realpath(__file__)) + + self.installation_dir = None + + self.placement_cloud = "cloud" + self.placement_region = "region" + self.placement_zone = "zone" + + self.master_addresses = "" + self.base_ports = get_default_base_ports_dict() + + self.master_flags = [] + self.tserver_flags = [] + self.placement_info_raw = "" + self.placement_info = [] + self.verbose_level = 0 + self.use_cassandra_authentication = False + self.node_type = DAEMON_TYPE_TSERVER + self.is_shell_master = False + self.is_startup_command = False + self.install_if_needed = False + self.yb_ctl_verbose = False + + self.cluster_config = None + + # These fields are saved into the cluster configuration. + self.enable_ysql = None + self.num_drives = None + self.replication_factor = DEFAULT_REPLICATION_FACTOR + self.ip_start = DEFAULT_IP_START + self.listen_ip = '' + + def parse_flag_args(self, flag_args): + flags = [] if flag_args is None else next(csv.reader([flag_args]), []) + return [item.strip() for item in flags] + + def _find_installation_dir(self): + yb_src_root_candidate = os.path.dirname(os.path.dirname(os.path.dirname(self.script_dir))) + # For development, assume $PARENT_DIR/yugabyte-installation/bin/yb-ctl will have sibling + # directories $PARENT_DIR/yugabyte or $PARENT_DIR/yugabyte-db. + installation_parent_dir = os.path.dirname(os.path.dirname(self.script_dir)) + + self.installation_dirs_considered = [ + # yb-ctl being run from the "bin" directory of an installation. + os.path.dirname(self.script_dir), + ] + + if os.environ.get('YB_CTL_NO_DEV_MODE') != '1': + # "YB_USE_EXTERNAL_BUILD_ROOT" is a special way to place the build directory. This is + # only relevant when running yb-ctl from the yugabyte-db source tree. + use_external_build_root = os.environ.get('YB_USE_EXTERNAL_BUILD_ROOT') == '1' + + self.installation_dirs_considered += [ + os.path.join(yb_src_root_candidate + '__build', 'latest') if use_external_build_root + else os.path.join(yb_src_root_candidate, 'build', 'latest'), + os.path.join(installation_parent_dir, 'yugabyte', 'build', 'latest'), + os.path.join(installation_parent_dir, 'yugabyte-db', 'build', 'latest') + ] + + for installation_dir_candidate in self.installation_dirs_considered: + if self.yb_ctl_verbose: + logging.info( + "Considering YugaByte DB installation directory candidate: %s", + installation_dir_candidate) + if is_yugabyte_db_installation_dir(installation_dir_candidate): + self.installation_dir = installation_dir_candidate + if self.yb_ctl_verbose: + logging.info( + "Found YugaByte DB installation directory: %s", + self.installation_dir) + break + + def update_options_from_args(self, args, fallback_installation_dir=None): + self.yb_ctl_verbose = args.verbose + self._find_installation_dir() + self.replication_factor = args.replication_factor + self.custom_binary_dir = args.binary_dir + self.cluster_base_dir = args.data_dir + self.num_shards_per_tserver = args.num_shards_per_tserver + self.ysql_num_shards_per_tserver = args.ysql_num_shards_per_tserver + self.timeout_yb_admin_sec = args.timeout_yb_admin_sec + self.timeout_processes_running_sec = args.timeout_processes_running_sec + if hasattr(args, "master_memory_limit_ratio"): + self.master_memory_limit_ratio = args.master_memory_limit_ratio + if hasattr(args, "tserver_memory_limit_ratio"): + self.tserver_memory_limit_ratio = args.tserver_memory_limit_ratio + + if hasattr(args, "v"): + self.verbose_level = args.v + + if hasattr(args, "use_cassandra_authentication"): + self.use_cassandra_authentication = args.use_cassandra_authentication + + if hasattr(args, "ip_start"): + self.ip_start = args.ip_start + + if hasattr(args, "listen_ip"): + self.listen_ip = args.listen_ip + if self.listen_ip and self.replication_factor > 1: + raise RuntimeError("Invalid argument: listen_ip is only compatible with rf=1") + + for arg in ["master_flags", "tserver_flags"]: + try: + parser_arg = getattr(args, arg) + except AttributeError: + parser_arg = None + setattr(self, arg, self.parse_flag_args(parser_arg)) + + try: + self.placement_info_raw = getattr(args, "placement_info") + placement_list = self.placement_info_raw.split(",") + except AttributeError: + placement_list = [] + + for items in placement_list: + t_item = tuple(items.split(".")) + if len(t_item) != 3: + raise RuntimeError("Invalid argument: Each entry in placement info should " + "specify cloud, region and zone as cloud.region.zone, " + "separated by commas.") + self.placement_info.append(t_item) + + self.num_drives = getattr(args, "num_drives", None) + if self.num_drives is not None and self.num_drives <= 0: + raise ExitWithError("Invalid number of drives: {}".format(self.num_drives)) + + if hasattr(args, "master") and args.master: + self.node_type = DAEMON_TYPE_MASTER + + # ----------------------------------------------------------------------------------------- + # Controlling whether to enable or disable YSQL + # ----------------------------------------------------------------------------------------- + + ysql_explicitly_enabled = False + ysql_explicitly_disabled = False + + if hasattr(args, "disable_ysql") and args.disable_ysql: + self.enable_ysql = False + ysql_explicitly_disabled = True + if args.verbose: + logging.info("Found --disable_ysql command-line option, setting enable_ysql=%s", + self.enable_ysql) + + if hasattr(args, "enable_ysql") and args.enable_ysql: + self.enable_ysql = True + ysql_explicitly_enabled = True + if args.verbose: + logging.info("Found --enable_ysql command-line option, setting enable_ysql=%s", + self.enable_ysql) + + if ysql_explicitly_enabled and ysql_explicitly_disabled: + raise ExitWithError("--disable_ysql and --enable_ysql cannot both be specified") + + if self.enable_ysql is None: + self.enable_ysql = True + + # ----------------------------------------------------------------------------------------- + # Client protocol ports + # ----------------------------------------------------------------------------------------- + + for protocol_type in PROTOCOL_TYPES: + port_option_str = protocol_type + '_port' + port = getattr(args, port_option_str, None) + if port is not None: + if port < 1 or port > 65535: + raise ExitWithError("Invalid port specified for option --%s: %d" % ( + port_option_str, port)) + self.base_ports[protocol_type]['rpc'] = port + + # ----------------------------------------------------------------------------------------- + # Automatic YugaByte DB installation + # ----------------------------------------------------------------------------------------- + if args.install_if_needed and not self.installation_dir: + installer = Installer() + if installer.install_or_find_existing(): + self.installation_dir = installer.installation_dir + + if not self.installation_dir: + self.installation_dir = fallback_installation_dir + if not self.installation_dir: + installation_finder = Installer(only_find_existing=True) + if installation_finder.install_or_find_existing(): + self.installation_dir = installation_finder.installation_dir + if not self.installation_dir: + raise ExitWithError( + "Failed to determine YugaByte DB installation directory. Directories " + "considered:\n%s\nPlease specify --install-if-needed to download and " + "install YugaByte DB automatically." % + (" " + "\n ".join(self.installation_dirs_considered))) + + def validate_daemon_type(self, daemon_type): + if daemon_type not in DAEMON_TYPES: + raise RuntimeError("Invalid daemon type: {}".format(daemon_type)) + # Validate the binary. + self.get_server_binary_path(daemon_type) + + def validate_daemon_index(self, daemon_index): + if daemon_index < 1 or daemon_index > self.max_daemon_index: + raise RuntimeError("Invalid daemon node_id: {}".format(daemon_index)) + + def get_server_binary_path(self, daemon_type): + binary_path = self.get_binary_path(get_binary_name_for_daemon_type(daemon_type)) + if self.yb_ctl_verbose: + logging.info("Found binary path for daemon type %s: %s", daemon_type, binary_path) + return binary_path + + def get_binary_path(self, binary_name): + # If the user specified a custom path, do not default back to anything else. + if self.custom_binary_dir: + binary_dirs = [self.custom_binary_dir] + logging.info("Using custom binaries path: {}".format(self.custom_binary_dir)) + else: + binary_dirs = [ + os.path.join(self.installation_dir, 'bin'), + os.path.join(self.installation_dir, 'postgres', 'bin') + ] + + for binary_dir in binary_dirs: + path = os.path.join(binary_dir, binary_name) + if not os.path.isfile(path) or not os.access(path, os.X_OK): + logging.debug("No binary found at {}".format(path)) + else: + return path + raise RuntimeError("No binary found for {}. Considered binary directories: {}".format( + binary_name, binary_dirs)) + + def get_ip_start(self): + return self.cluster_config.get("ip_start") or self.ip_start + + def get_ip_address(self, daemon_id): + # Subtract 1 because daemon_id.index starts from 1. + return get_local_ip(self.get_ip_start() + daemon_id.index - 1) + + def get_client_protocol_port(self, protocol_type): + return self.base_ports[protocol_type]['rpc'] + + def get_port_str(self, daemon_id, port_type): + if port_type in PROTOCOL_TYPES: + return str(self.get_client_protocol_port(port_type)) + else: + return str(self.base_ports[daemon_id.daemon_type][port_type]) + + def get_host_port(self, daemon_id, port_type): + base_local_url = self.get_ip_address(daemon_id) + return "{}:{}".format(base_local_url, self.get_port_str(daemon_id, port_type)) + + def get_client_listen_host_port(self, daemon_id, port_type=None): + base_local_url = self.listen_ip if self.listen_ip else self.get_ip_address(daemon_id) + if port_type is None: + return base_local_url + else: + return "{}:{}".format(base_local_url, self.get_port_str(daemon_id, port_type)) + + def get_client_advertise_host_port(self, daemon_id, port_type=None): + base_ip = self.get_ip_address(daemon_id) + if self.listen_ip and self.listen_ip != "0.0.0.0": + base_ip = self.listen_ip + if port_type is None: + return base_ip + else: + return "{}:{}".format(base_ip, self.get_port_str(daemon_id, port_type)) + + def set_cluster_config(self, cluster_config): + self.cluster_config = cluster_config + base_ports_from_config = cluster_config.get('ports') + if base_ports_from_config is not None: + self.base_ports = base_ports_from_config + + def get_client_tool_path(self, client_tool_name): + tool_abs_path = os.path.abspath( + os.path.join(self.installation_dir, 'bin', client_tool_name)) + cur_dir_abs_path = os.path.abspath(os.getcwd()) + home_dir_abs_path = os.path.abspath(os.path.expanduser('~')) + path_rel_to_cur = os.path.relpath(tool_abs_path, cur_dir_abs_path) + path_rel_to_home = '~/' + os.path.relpath(tool_abs_path, home_dir_abs_path) + candidates = [tool_abs_path, path_rel_to_cur, path_rel_to_home] + min_len = None + for i in range(len(candidates)): + cur_len = len(candidates[i]) + if min_len is None or cur_len < min_len: + min_len = cur_len + shortest_path = candidates[i] + return shortest_path + + +# End of ClusterOptions +# ------------------------------------------------------------------------------------------------- + + +class ClusterControl: + def __init__(self): + self.options = ClusterOptions() + self.args = None + + # Parent subparser holding all common flags. + self.parent_parser = argparse.ArgumentParser(add_help=False) + self.setup_parent_parser() + self.parser = argparse.ArgumentParser() + self.subparsers = self.parser.add_subparsers(dest='command') + self.subparsers.required = True + + # This is a dictionary serialized into JSON and written to a configuration file in the data + # directory. + self.cluster_config = None + + # This is true only for the "create" command. + self.creating_cluster = False + + self.setup_parsing() + self.log_file = None + self.already_running_daemons = set() + + def setup_base_parser(self, command, help=None): + subparser = self.subparsers.add_parser(command, help=help, parents=[self.parent_parser]) + func = getattr(self, "%s_cmd_impl" % command, None) + if not func: + raise RuntimeError("Invalid command: {}".format(command)) + subparser.set_defaults(func=func) + return subparser + + # TODO: Combine this with setup_parsing when we drop support for `yb-ctl --rf 3 create` so + # self.parser doesn't require a parent parser. + def setup_parent_parser(self): + self.parent_parser.add_argument( + "--binary_dir", default=None, + help="Specify a custom directory in which to find the yugabyte binaries.") + self.parent_parser.add_argument( + "--data_dir", default=None, + help="Specify a custom directory where to store data.") + self.parent_parser.add_argument( + "--replication_factor", "--rf", type=int, default=None, + help="Replication factor for the cluster as well as default number of masters. ") + self.parent_parser.add_argument( + "--num_shards_per_tserver", type=int, default=None, + help="Number of shards (tablets) to start per tablet server for each non-YSQL table.") + self.parent_parser.add_argument( + "--ysql_num_shards_per_tserver", type=int, default=None, + help="Number of shards (tablets) to start per tablet server for each YSQL table.") + self.parent_parser.add_argument( + "--timeout-yb-admin-sec", type=float, default=None, + help="Timeout in seconds for operations that call yb-admin and wait on the cluster.") + self.parent_parser.add_argument( + "--timeout-processes-running-sec", + type=float, default=None, + help="Timeout in seconds for operations that wait on the master and tserver processes " + "to come up and start running.") + self.parent_parser.add_argument( + "--verbose", action="store_true", default=None, + help="If specified, will log internal debug messages to stderr. Note --verbose " + "affects yb-ctl and --v affects server processes.") + self.parent_parser.add_argument( + "--install-if-needed", action='store_true', default=None, + help="With this option, if YugaByte DB is not yet installed on the system, the latest " + "version will be downloaded and installed automatically.") + + def get_cluster_config_file_path(self): + """ + :return: the path to a "cluster configuration file" that holds various options specified + at cluster creation time, e.g. whether YSQL is enabled. + """ + return os.path.join(self.args.data_dir, 'cluster_config.json') + + def load_cluster_config(self): + config_file_path = self.get_cluster_config_file_path() + if os.path.exists(config_file_path): + with open(config_file_path) as config_file: + self.cluster_config = json.load(config_file) + else: + # No configuration file -- let's create an empty one. + self.cluster_config = {} + + loaded_cluster_config = json.loads(json.dumps(self.cluster_config, sort_keys=True)) + if 'enable_postgres' in self.cluster_config: + # Migrate the deprecated "enable_postgres" cluster config option to the new format. + self.cluster_config['enable_ysql'] = ( + self.cluster_config.get('enable_ysql', False) or + self.cluster_config['enable_postgres']) + del self.cluster_config['enable_postgres'] + if self.cluster_config != loaded_cluster_config: + self.save_cluster_config() + self.options.set_cluster_config(self.cluster_config) + + def save_cluster_config(self): + cluster_config_path = self.get_cluster_config_file_path() + with open(cluster_config_path, 'w') as config_file: + json.dump(self.cluster_config, config_file, indent=2) + + def is_ysql_enabled(self): + ysql_enabled = self.cluster_config.get("enable_ysql", self.options.enable_ysql) + if self.args.verbose: + logging.info("is_ysql_enabled returning %s", ysql_enabled) + return ysql_enabled + + def get_replication_factor(self): + return self.cluster_config.get("replication_factor") or self.options.replication_factor + + def get_num_drives_based_on_dir_structure(self): + # Just look for node-1, since that should always exist, if there is cluster data. + return len(glob.glob("{}/node-1/disk-*".format(self.options.cluster_base_dir))) + + def get_num_drives(self): + # If the config does not have an entry, use the old default. + return self.cluster_config.get("num_drives") or self.get_num_drives_based_on_dir_structure() + + def get_drive_paths(self): + return ["disk-{}".format(i) for i in range(1, self.get_num_drives() + 1)] + + def get_base_node_dirs(self, daemon_index): + return ["{}/node-{}/{}".format(self.options.cluster_base_dir, daemon_index, drive) + for drive in self.get_drive_paths()] + + def get_log_path(self, daemon_id, is_err=True): + """ + Get the out or error log path for a daemon. A log file may not necessarily exist at the + returned path. + + :param daemon_id: daemon to get the log path for + :type daemon_id: :class:`DaemonId` + :param is_err: whether to get the error log path rather than the out log path + :type is_err: bool + :returns: path to the specified out or error log of the specified daemon + :rtype: str + """ + node_base_dirs = self.get_base_node_dirs(daemon_id.index) + first_base_dir = node_base_dirs[0] + extension = "err" if is_err else "out" + log_name = "{0}.{1}".format(daemon_id.daemon_type, extension) + return os.path.join(first_base_dir, log_name) + + @staticmethod + def add_extra_flags_arguments(subparser): + subparser.add_argument( + "--master_flags", default=None, + help="Specify extra master flags as a set of key value pairs. " + "Format (key=value,key=value)") + + subparser.add_argument( + "--tserver_flags", default=None, + help="Specify extra tserver flags as a set of key value pairs. " + "Format (key=value,key=value)") + + def setup_parsing(self): + """ + Sets up the command-line parser. Called from the constructor. + """ + self.parser.add_argument( + "--binary_dir", default=None, dest="binary_dir_dest", + help="Specify a custom directory in which to find the yugabyte binaries.") + self.parser.add_argument( + "--data_dir", default=get_default_data_dir(), dest="data_dir_dest", + help="Specify a custom directory where to store data.") + self.parser.add_argument( + "--replication_factor", "--rf", default=DEFAULT_REPLICATION_FACTOR, type=int, + dest="replication_factor_dest", + help="Replication factor for the cluster as well as default number of masters. ") + self.parser.add_argument( + "--num_shards_per_tserver", default=2, type=int, dest="num_shards_per_tserver_dest", + help="Number of shards (tablets) to start per tablet server for each non-YSQL table.") + self.parser.add_argument( + "--ysql_num_shards_per_tserver", default=2, type=int, + dest="ysql_num_shards_per_tserver_dest", + help="Number of shards (tablets) to start per tablet server for each YSQL table.") + self.parser.add_argument( + "--timeout-yb-admin-sec", default=MAX_YB_ADMIN_WAIT_SEC, type=float, + dest="timeout_yb_admin_sec_dest", + help="Timeout in seconds for operations that call yb-admin and wait on the cluster.") + self.parser.add_argument( + "--timeout-processes-running-sec", default=MAX_WAIT_FOR_PROCESSES_RUNNING_SEC, + dest="timeout_processes_running_sec_dest", type=float, + help="Timeout in seconds for operations that wait on the master and tserver processes " + "to come up and start running.") + self.parser.add_argument( + "--verbose", action="store_true", dest="verbose_dest", + help="If specified, will log internal debug messages to stderr. Note --verbose " + "affects yb-ctl and --v affects server processes.") + self.parser.add_argument( + "--install-if-needed", dest="install_dest", action='store_true', + help="With this option, if YugaByte DB is not yet installed on the system, the latest " + "version will be downloaded and installed automatically.") + + subparsers = {} + for cmd_name, help in ( + ("create", "Create a new cluster"), + ("start", "Create a new cluster, or start existing cluster if it already exists."), + ("stop", "Stop the cluster"), + ("destroy", "Destroy the cluster"), + ("restart", "Restart the cluster"), + ("wipe_restart", "Stop the cluster, wipe all data files and start the cluster as " + "before. Will lose all the flags though."), + ("add_node", "Add a new node to the cluster"), + ("remove_node", "Stop the specified node in the cluster"), + ("start_node", "Start the specified node in the cluster"), + # Adding this to keep the start_node/stop_node nomenclature symmetric. + ("stop_node", "Stop the specified node in the cluster"), + ("restart_node", "Restart the specified node in the cluster"), + ("status", "Get cluster information"), + ("setup_redis", "Setup YugaByte to support Redis API")): + + subparsers[cmd_name] = self.setup_base_parser(cmd_name, help=help) + + # commands that take --master + per_daemon_commands = ["remove_node", "restart_node", "add_node", "start_node", "stop_node"] + # commands that take node_id + node_id_commands = ["remove_node", "restart_node", "start_node", "stop_node"] + # commands that take placement_info/cassandra auth/verbosity flags. + startup_commands = ["start", "create", "restart", "wipe_restart", + "add_node", "start_node", "restart_node"] + + for cmd in node_id_commands: + subparsers[cmd].add_argument("node_id", type=int, + help="The id of the tserver/master in range: 1-{}". + format(self.options.max_daemon_index)) + + for cmd in per_daemon_commands: + subparsers[cmd].add_argument( + "--master", action='store_true', help="Specifies the node type.") + + for cmd in startup_commands: + subparsers[cmd].add_argument("--placement_info", + help="Specify the placement info in the following format:" + "cloud.region.zone. Can be comma separated " + "in case you would want to pass more than one value.") + + subparsers[cmd].add_argument("--v", default=0, choices=[str(i) for i in range(5)], + help="Specify the verbosity of server processes, which " + "dictates the amount of logging on servers trace files. " + "Note --verbose affects yb-ctl and --v affects server " + "processes. Default is 0 and maximum is 4.") + + subparsers[cmd].add_argument( + "--tserver_memory_limit_ratio", default=0.65, type=float, + help="Ratio of total memory that tserver will be restricted to.") + + subparsers[cmd].add_argument( + "--master_memory_limit_ratio", default=0.35, type=float, + help="Ratio of total memory that master will be restricted to.") + + subparsers[cmd].add_argument("--use_cassandra_authentication", + action='store_true', + help="If specified, this flag will be " + "passed down to tservers as true.") + + subparsers[cmd].add_argument("--enable_ysql", "--enable_postgres", + action='store_true', + help="Enable YugaByte SQL API. Useful for clusters where " + "YSQL was initially disabled.", + default=None) + + subparsers[cmd].add_argument("--disable_ysql", + action='store_true', + help="Disable YugaByte SQL API.", + default=None) + + subparsers[cmd].add_argument( + "--ysql_port", + help="YSQL (PostgreSQL-compatible) API port. Default: %d." % YSQL_DEFAULT_PORT, + type=int) + + subparsers[cmd].add_argument( + "--ycql_port", + help="YCQL (Cassandra-compatible) API port. Default: %d." % YCQL_DEFAULT_PORT, + type=int) + + subparsers[cmd].add_argument( + "--yedis_port", + help="YEDIS (Redis-compatible) API port. Default. %d." % YEDIS_DEFAULT_PORT, + type=int) + + subparsers[cmd].add_argument( + "--num_drives", default=1, type=int, + help="We create separate directories and treat them as separate drives.") + + subparsers[cmd].add_argument( + "--ip_start", default=DEFAULT_IP_START, type=int, + help="Start index for IP address") + + subparsers[cmd].add_argument( + "--listen_ip", + dest='listen_ip_dest', + help="Specify network interfaces for clients to bind to", + default='') + + ClusterControl.add_extra_flags_arguments(subparsers[cmd]) + self.options.is_startup_command = True + + def modify_placement_info(self): + """ + This will use yb-admin to set the cluster config object to the desired placement. + + This assumes you will have called set_master_addresses already! + """ + if not self.options.placement_info: + return + + cmd = self.yb_admin_cmd_list( + "modify_placement_info", self.options.placement_info_raw, + str(self.get_replication_factor())) + + def call_modify_placement_info(should_get_error): + call_get_output_maybe_error(cmd, should_get_error) + logging.info("Successfully modified placement info.") + return True + if not retry_call_with_timeout(call_modify_placement_info, + self.options.timeout_yb_admin_sec): + raise RuntimeError("Could not modify placement info for the cluster.") + + def get_number_of_servers(self, daemon_type): + # When the cluster is being created, YB data directory may not be ready on disk. + # In that case, use replication_factor to determine number of servers. + if self.creating_cluster: + num_servers = self.get_replication_factor() + explanation_str = "replication factor" + else: + drives = self.get_drive_paths() + if not drives: + num_servers = 0 + explanation_str = "the absence of drive paths" + else: + glob_pattern = "{}/*/{}/yb-data/{}".format( + self.options.cluster_base_dir, drives[0], daemon_type) + glob_results = glob.glob(glob_pattern) + num_servers = len(glob_results) + explanation_str = "glob results for data dirs: pattern is %s, results are: %s" % ( + glob_pattern, str(glob_results)) + if self.args.verbose: + logging.info("Number of servers for daemon type %s determined as %d based on %s", + daemon_type, num_servers, explanation_str) + return num_servers + + def get_number_of_servers_map(self): + return { + DAEMON_TYPE_MASTER: self.get_number_of_servers(DAEMON_TYPE_MASTER), + DAEMON_TYPE_TSERVER: self.get_number_of_servers(DAEMON_TYPE_TSERVER) + } + + def get_pgrep_regex(self, daemon_id): + # Regex to get process based on daemon type and bind address. Note the explicit space after + # rpc_bind_addresses: This is to be able to distinguish between bind addresses with the + # same prefix like 127.0.0.1, 127.0.0.11, 127.0.0.12. + return "yb-{} .* --rpc_bind_addresses {} ".format( + daemon_id.daemon_type, + self.options.get_ip_address(daemon_id)) + + def get_pid(self, daemon_id): + try: + if sys.platform == 'darwin': + # -l: Long output. For pgrep, print the process name in addition to the process ID + # for each matching process. If used in conjunction with -f, print the process ID + # and the full argument list for each matching process. For pkill, display the kill + # command used for each process killed. + pgrep_full_command_output_arg = '-l' + else: + # -a, --list-full + # List the full command line as well as the process ID. (pgrep only.) + pgrep_full_command_output_arg = '--list-full' + pgrep_regex_str = self.get_pgrep_regex(daemon_id) + pgrep_output = subprocess.check_output( + ["pgrep", pgrep_full_command_output_arg, "-f", pgrep_regex_str] + ).strip().decode('utf-8') + + data_dirs_re_match = FS_DATA_DIRS_ARG_RE.search(pgrep_output) + if data_dirs_re_match: + data_dirs_of_process = data_dirs_re_match.group(1) + expected_data_dirs = ','.join(self.get_base_node_dirs(daemon_id.index)) + if self.args.verbose: + logging.info( + "Data dirs of the running process with daemon id %s: %s", + daemon_id, data_dirs_of_process) + data_dirs_of_process = remove_surrounding_quotes(data_dirs_of_process) + if data_dirs_of_process != expected_data_dirs: + error_msg = ( + "When looking for process with daemon id %s, we found the following " + "running process with data dirs %s but expected data dirs to be %s: %s" % ( + daemon_id, data_dirs_of_process, expected_data_dirs, pgrep_output)) + raise ExitWithError(error_msg) + + if self.args.verbose: + logging.info( + "pgrep output when looking for daemon id %s: %s", + daemon_id, pgrep_output) + pgrep_output_lines = [line.strip() for line in pgrep_output.split("\n")] + pgrep_output_lines = [line for line in pgrep_output_lines] + if len(pgrep_output_lines) > 1: + raise ExitWithError( + 'Found multiple processes when looking for the %s process with pgrep based on ' + 'regular expression %s:\n%s' % (daemon_id, pgrep_regex_str, pgrep_output)) + + pid = pgrep_output_lines[0].split()[0] + return int(pid) + except subprocess.CalledProcessError as e: + # From man pgrep + # + # EXIT STATUS + # 0 One or more processes matched the criteria. + # 1 No processes matched. + # 2 Syntax error in the command line. + # 3 Fatal error: out of memory etc. + if e.returncode != 1: + raise RuntimeError("Error during pgrep: {}".format(e.output)) + return None + + def build_command(self, daemon_id, specific_arg_list): + node_base_dirs = self.get_base_node_dirs(daemon_id.index) + + binary_path = self.options.get_server_binary_path(daemon_id.daemon_type) + command_list = [ + # Start with the actual binary + binary_path + ] + + command_list += [ + # Add in all the shared flags + "--fs_data_dirs \"{}\"".format(",".join(node_base_dirs)), + "--webserver_interface {}".format(self.options.get_client_listen_host_port(daemon_id)), + "--rpc_bind_addresses {}".format(self.options.get_ip_address(daemon_id)), + "--v {}".format(self.options.verbose_level) + ] + + www_path = os.path.realpath(os.path.join(os.path.dirname(binary_path), "..", "www")) + version_metadata_path = os.path.realpath( + os.path.join(os.path.dirname(binary_path), "..")) + command_list.append("--version_file_json_path={}".format(version_metadata_path)) + if os.path.isdir(www_path): + command_list.append("--webserver_doc_root \"{}\"".format(www_path)) + if DISABLE_CALLHOME_ENV_VAR_SET: + command_list.append("--callhome_enabled=false") + + # Add custom args per type of server + command_list.extend(specific_arg_list) + + # Redirect out and err and launch in the background + command_list.append(">\"{0}\" 2>\"{1}\" &".format( + self.get_log_path(daemon_id, is_err=False), self.get_log_path(daemon_id, is_err=True))) + return " ".join(command_list) + + @staticmethod + def customize_flags(flags, extra_flags): + return flags + ["--{}".format(item) for item in extra_flags] + + def get_master_only_flags(self, daemon_id): + command_list = [ + "--replication_factor={}".format(self.get_replication_factor()), + "--yb_num_shards_per_tserver {}".format(self.options.num_shards_per_tserver), + "--ysql_num_shards_per_tserver={}".format(self.options.ysql_num_shards_per_tserver), + "--default_memory_limit_to_ram_ratio={}".format(self.options.master_memory_limit_ratio) + ] + + if not self.options.is_shell_master: + command_list += ["--master_addresses {}".format(self.options.master_addresses)] + + if self.options.enable_ysql: + command_list.append("--enable_ysql=true") + else: + command_list.append("--enable_ysql=false") + + return self.customize_flags(command_list, self.options.master_flags) + + def get_tserver_only_flags(self, daemon_id): + def get_host_port(port_type): + return self.options.get_host_port(daemon_id, port_type) + command_list = [ + "--tserver_master_addrs={}".format(self.options.master_addresses), + "--yb_num_shards_per_tserver={}".format(self.options.num_shards_per_tserver), + "--redis_proxy_bind_address=" + + self.options.get_client_listen_host_port(daemon_id, 'yedis'), + "--cql_proxy_bind_address=" + + self.options.get_client_listen_host_port(daemon_id, 'ycql'), + "--local_ip_for_outbound_sockets=" + self.options.get_ip_address(daemon_id), + # TODO ENG-2876: Enable this in master as well. + "--use_cassandra_authentication={}".format( + str(self.options.use_cassandra_authentication).lower()), + "--ysql_num_shards_per_tserver={}".format(self.options.ysql_num_shards_per_tserver), + "--default_memory_limit_to_ram_ratio={}".format(self.options.tserver_memory_limit_ratio) + ] + if self.is_ysql_enabled(): + command_list += [ + "--enable_ysql=true", + "--pgsql_proxy_bind_address=" + + self.options.get_client_listen_host_port(daemon_id, 'ysql') + ] + else: + command_list += ["--enable_ysql=false"] + return self.customize_flags(command_list, self.options.tserver_flags) + + def get_placement_info_flags(self, placement_flags): + return [ + "--placement_cloud {}".format(placement_flags[0]), + "--placement_region {}".format(placement_flags[1]), + "--placement_zone {}".format(placement_flags[2]) + ] + + def set_master_addresses(self, running_only=False): + """ + :param running_only: if we're only interested in running masters and not in stopped ones + """ + num_servers = self.get_number_of_servers(DAEMON_TYPE_MASTER) + self.options.master_addresses = ",".join( + [self.options.get_host_port(DaemonId(DAEMON_TYPE_MASTER, i), "rpc") + for i in range(1, num_servers + 1) + if not running_only or self.get_pid(DaemonId(DAEMON_TYPE_MASTER, i)) is not None]) + + def start_daemon(self, daemon_id): + self.options.validate_daemon_type(daemon_id.daemon_type) + self.options.validate_daemon_index(daemon_id.index) + + if self.get_pid(daemon_id) is not None: + logging.info("Server {} already running".format(daemon_id)) + self.already_running_daemons.add(daemon_id) + return + + if not os.path.isdir(self.options.cluster_base_dir): + raise ExitWithError("Found no cluster data at {}, cannot start daemon {}".format( + self.options.cluster_base_dir, daemon_id)) + + for path in self.get_base_node_dirs(daemon_id.index): + if not os.path.exists(path): + os.makedirs(path) + + if daemon_id.is_master(): + custom_flags = self.get_master_only_flags(daemon_id) + elif daemon_id.is_tserver(): + custom_flags = self.get_tserver_only_flags(daemon_id) + else: + raise ValueError("Invalid daemon id: %s" % daemon_id) + if len(self.options.placement_info) > 0 and daemon_id.supports_placement(): + mod_val = (daemon_id.index - 1) % len(self.options.placement_info) + custom_flags.extend(self.get_placement_info_flags(self.options.placement_info[mod_val])) + command = self.build_command(daemon_id, custom_flags) + logging.info("Starting {} with:\n{}".format(daemon_id, command)) + + with SetAndRestoreEnv(): + adjust_env_for_ysql() + os.system(command) + + def stop_process(self, pid, name): + if pid is None: + logging.info("{} already stopped".format(name)) + return + logging.info("Stopping {} PID={}".format(name, pid)) + # Kill the process, if it exists. + try: + os.kill(pid, signal.SIGTERM) + except OSError as e: + if e.errno == os.errno.ESRCH: + logging.info("{} does not exist; nothing to stop".format(name)) + else: + raise e + + # Wait for process to stop. + last_msg_time = time.time() + + def log_waiting_msg(): + logging.info("Waiting for {} PID={} to stop...".format(name, pid)) + sys.stdout.flush() + sys.stderr.flush() + log_waiting_msg() + + while True: + try: + os.kill(pid, 0) + except OSError as err: + if err.errno == errno.ESRCH: + return + + time.sleep(0.5) + + current_time = time.time() + if current_time - last_msg_time >= 1: + log_waiting_msg() + last_msg_time = current_time + + def stop_daemon(self, daemon_id): + self.options.validate_daemon_index(daemon_id.index) + self.stop_process(self.get_pid(daemon_id), "Server {}".format(daemon_id)) + postgres_pid = self.find_postgres_pid(daemon_id) + if postgres_pid: + self.stop_process(postgres_pid, "Postmaster for {}".format(daemon_id)) + + def find_postgres_pid(self, daemon_id): + if not daemon_id.is_tserver(): + return None + base_dirs = self.get_base_node_dirs(daemon_id.index) + for dir in base_dirs: + postmaster_pid = os.path.join(dir, "pg_data", "postmaster.pid") + if os.path.exists(postmaster_pid): + try: + with open(postmaster_pid, 'rt') as inp: + return int(inp.readline()) + except OSError as err: + logging.info("Failed to read postmaster.pid for {}".format(daemon_id)) + return None + + def restart_daemon(self, daemon_id): + self.stop_daemon(daemon_id) + self.start_daemon(daemon_id) + + def pre_create_checks(self): + """ + Before moving forward with `create`, make a preliminary check to make sure things are ready. + """ + if len(self.options.placement_info) > self.get_replication_factor(): + raise RuntimeError("Number of placement info fields is larger than " + "the replication factor and hence the number of servers.") + + if os.path.isdir(self.options.cluster_base_dir): + raise ExitWithError( + ("Found cluster data at {}, cannot start new cluster. " + "Use --data_dir to specify a different data directory " + "if necessary, or 'destroy' / 'wipe_restart'." + "Commands to wipe out the old directory.").format( + self.options.cluster_base_dir)) + + if not os.access( + os.path.abspath(os.path.join(self.options.cluster_base_dir, '..')), os.W_OK): + raise ExitWithError( + ("No write permission on {}. " + "Use --data_dir to specify a different data directory.").format( + self.options.cluster_base_dir)) + + # Make sure that there are no master or tserver processes before the `create`. + pids = self.for_all_daemons(self.get_pid) + if any(sum(pids.values(), [])): + logging.info("PIDs found: {}".format(pids)) + raise ExitWithError("Processes are still up. Make sure to destroy old state.") + + def change_config_if_master(self, daemon_id, cmd_type): + if daemon_id.daemon_type == DAEMON_TYPE_TSERVER: + return + cmd = self.yb_admin_cmd_list( + "change_master_config", cmd_type, self.options.get_ip_address(daemon_id), + str(self.options.base_ports[DAEMON_TYPE_MASTER]["rpc"])) + + def call_change_config(should_get_error): + call_get_output_maybe_error(cmd, should_get_error) + return True + + if not retry_call_with_timeout(call_change_config, self.options.timeout_yb_admin_sec): + raise ExitWithError("Could not change master config...") + + def dump_file_to_stderr(self, file_path): + """ + Keep file intact and dump file contents to STDERR. + + :param file_path: path to the file, assumed to exist + :type file_path: str + """ + print("Viewing file {}:".format(file_path), file=sys.stderr) + with open(file_path, "rb") as f: + if sys.version_info[0] >= 3: + shutil.copyfileobj(f, sys.stderr.buffer) + else: + shutil.copyfileobj(f, sys.stderr) + + def dump_missing_process_error_logs(self): + pids = self.for_all_daemons(self.get_pid) + logging.info("PIDs found: {}".format(pids)) + for daemon_type, pid_list in pids.items(): + for index, pid in enumerate(pid_list, 1): + if pid is None: + daemon_id = DaemonId(daemon_type, index) + log_path = self.get_log_path(daemon_id) + if os.path.exists(log_path): + self.dump_file_to_stderr(log_path) + else: + print("Process {0} is down, and there is no corresponding error log: {1}". + format(daemon_id, log_path), file=sys.stderr) + + def wait_for_cluster(self): + """ + This will wait for the masters to elect a leader and then keep querying for how many TS + the master is aware of. When that number reaches the number of TS that yb-ctl believes are + alive (based on valid PID), then this function returns True. Anything else will end up + returning False. + """ + # Wait for master and tserver processes to be ready. + def call_check_for_processes(should_get_error): + pids = self.for_all_daemons(self.get_pid) + return all(sum(pids.values(), [])) + + logging.info("Waiting for master and tserver processes to come up.") + if not retry_call_with_timeout(call_check_for_processes, + self.options.timeout_processes_running_sec): + self.dump_missing_process_error_logs() + logging.error("Failed waiting for master and tserver processes to come up.") + return False + + # Wait for master leader to be ready, and wait for enough tservers. + # 'Enough' here means that the number of live tservers reported to the master should be + # equal to the number of TS we have started -- based on directories we have on the FS. + cmd_list_tservers = self.yb_admin_cmd_list("list_all_tablet_servers") + + num_alive_ts = None + num_yb_admin_ts = None + + def call_check_for_ts(should_get_error): + if not call_check_for_processes(should_get_error): + self.dump_missing_process_error_logs() + raise ExitWithError("At least one master or tserver process is down.") + + max_num_tservers = self.get_number_of_servers(DAEMON_TYPE_TSERVER) + num_alive_ts = sum([self.get_pid(DaemonId(DAEMON_TYPE_TSERVER, i)) is not None + for i in range(1, max_num_tservers + 1)]) + # TODO: enhance this to tell us live vs dead. + # Tablet Server UUID RPC Host/Port + # 5d6cd15e0a6e48aba1c5128869f51328 127.0.0.5:9100 + # d0ed49b225c744f392b95b9d3eb32e64 127.0.0.1:9100 + # 8a46cace5d904423bf80bf1a6fc10d30 127.0.0.3:9100 + # 2dac590eefb3429bb4d315c51e20f774 127.0.0.2:9100 + # cb703e947033465a80c85577501cc93c 127.0.0.4:9100 + + output = call_get_output_maybe_error( + cmd_list_tservers, + should_get_error + ).decode('utf-8') + num_yb_admin_ts = len(output.splitlines()) - 1 + if num_yb_admin_ts == num_alive_ts: + # This will not work if you have stopped/removed a node and the master is still + # aware of it because we do not have a yb-admin API to return only live tablet + # servers. + for i in range(num_alive_ts): + ts_hp = self.options.get_host_port( + DaemonId(DAEMON_TYPE_TSERVER, i + 1), "rpc") + if ts_hp not in output: + logging.error("Could not find TS info ('{}') in yb-admin output: {}".format( + ts_hp, output)) + return False + return True + elif num_yb_admin_ts < 0: + # We might get no output from yb-admin if the leader is not up yet... + logging.info("Master leader election still pending...") + else: + logging.info("Waiting for all tablet servers to register: {}/{}".format( + num_yb_admin_ts, num_alive_ts)) + + logging.info("Waiting for master leader election and tablet server registration.") + if not retry_call_with_timeout(call_check_for_ts, self.options.timeout_yb_admin_sec): + logging.error("Failed waiting for {} tservers, got {}".format( + num_alive_ts, num_yb_admin_ts)) + return False + + return True + + def wait_for_cluster_or_raise(self): + print("Waiting for cluster to be ready.") + if not self.wait_for_cluster(): + raise RuntimeError("Timed out waiting for a YugaByte DB cluster!") + + def print_status_box(self, title, body_kv_list): + print("-" * 100) + print("| {:<96} |".format(title)) + print("-" * 100) + for k, v in body_kv_list: + print("| {:20}: {:<74} |".format(k, v)) + print("-" * 100) + + def cluster_status(self, more_info=True): + server_counts = self.get_number_of_servers_map() + num_servers = max( + server_counts.get(DAEMON_TYPE_MASTER), server_counts.get(DAEMON_TYPE_TSERVER)) + title = "Node Count: {} | Replication Factor: {}".format( + num_servers, self.get_replication_factor()) + info_kv_list = self.gen_status_tserver_connectivity(DaemonId(DAEMON_TYPE_TSERVER, 1)) + info_kv_list.extend([ + # TODO: this might cause issues if the first master is down... + ("Web UI", "http://{}/".format( + self.options.get_client_advertise_host_port( + DaemonId(DAEMON_TYPE_MASTER, 1), + "http"))), + ("Cluster Data", self.options.cluster_base_dir) + ]) + self.print_status_box(title, info_kv_list) + if more_info: + status_cmd = "yb-ctl" + if self.options.cluster_base_dir != get_default_data_dir(): + status_cmd += " --data_dir {}".format(self.options.cluster_base_dir) + status_cmd += " status" + print("") + print("For more info, please use: {}".format(status_cmd)) + + def gen_status_tserver_connectivity(self, tserver_daemon_id): + info_kv_list = [] + if not tserver_daemon_id: + return info_kv_list + + ip_address = self.options.get_client_advertise_host_port(tserver_daemon_id) + + # ----------------------------------------------------------------------------------------- + # ysqlsh command line + # ----------------------------------------------------------------------------------------- + + if self.is_ysql_enabled(): + ysql_port = self.options.get_client_protocol_port('ysql') + ysqlsh_cmd_line = format_cmd_line_with_host_port( + self.options.get_client_tool_path('ysqlsh'), ip_address, ysql_port, + YSQL_DEFAULT_PORT) + + info_kv_list.extend([ + ("JDBC", "jdbc:postgresql://{}:{}/postgres".format( + ip_address, ysql_port)), + ("YSQL Shell", ysqlsh_cmd_line) + ]) + + # ----------------------------------------------------------------------------------------- + # cqlsh command line + # ----------------------------------------------------------------------------------------- + + ycql_port = self.options.get_client_protocol_port('ycql') + cqlsh_cmd_line = self.options.get_client_tool_path('ycqlsh') + if ip_address != LOCALHOST_IP or ycql_port != YCQL_DEFAULT_PORT: + cqlsh_cmd_line += ' %s' % ip_address + if ycql_port != YCQL_DEFAULT_PORT: + cqlsh_cmd_line += ' %s' % ycql_port + + info_kv_list.append( + ("YCQL Shell", cqlsh_cmd_line)) + + # ----------------------------------------------------------------------------------------- + # redis-cli command line + # ----------------------------------------------------------------------------------------- + + # TODO: should probably not even display this if we can know it's not there... + yedis_port = self.options.get_client_protocol_port('yedis') + redis_cli_cmd_line = format_cmd_line_with_host_port( + self.options.get_client_tool_path('redis-cli'), ip_address, yedis_port, + YEDIS_DEFAULT_PORT) + info_kv_list.append(("YEDIS Shell", redis_cli_cmd_line)) + return info_kv_list + + def show_node_status(self, daemon_index, server_counts=None): + if server_counts is None: + server_counts = self.get_number_of_servers_map() + is_master = daemon_index <= server_counts.get(DAEMON_TYPE_MASTER) + is_tserver = daemon_index <= server_counts.get(DAEMON_TYPE_TSERVER) + tserver_daemon_id = DaemonId(DAEMON_TYPE_TSERVER, daemon_index) if is_tserver else None + master_daemon_id = DaemonId(DAEMON_TYPE_MASTER, daemon_index) if is_master else None + + info_kv_list = self.gen_status_tserver_connectivity(tserver_daemon_id) + log_kv_list = [] + for idx, node_dir in enumerate(self.get_base_node_dirs(daemon_index)): + data_path = os.path.join(node_dir, "yb-data") + if idx == 0: + if is_tserver: + log_kv_list.append( + ("yb-tserver Logs", os.path.join(data_path, DAEMON_TYPE_TSERVER, "logs"))) + if is_master: + log_kv_list.append( + ("yb-master Logs", os.path.join(data_path, DAEMON_TYPE_MASTER, "logs"))) + info_kv_list.append( + ("data-dir[{}]".format(idx), data_path)) + info_kv_list.extend(log_kv_list) + + master_pid = self.get_pid(master_daemon_id) if master_daemon_id else None + process_list = [] + if is_tserver: + pid = self.get_pid(tserver_daemon_id) + process = "pid {}".format(pid) if pid else "Stopped" + process_list.append("yb-{} ({})".format(DAEMON_TYPE_TSERVER, process)) + if is_master: + pid = self.get_pid(master_daemon_id) + process = "pid {}".format(pid) if pid else "Stopped" + process_list.append("yb-{} ({})".format(DAEMON_TYPE_MASTER, process)) + + title = "Node {}: {}".format(daemon_index, ", ".join(process_list)) + self.print_status_box(title, info_kv_list) + + def for_all_daemons(self, fn): + """ + Run the given function for all daemons. + + :returns: return values of each function call. + :rtype: dict + """ + fn_returns = dict() + for daemon_type in DAEMON_TYPES: + num_servers = self.get_number_of_servers(daemon_type) + for daemon_index in range(1, num_servers + 1): + fn_returns.setdefault(daemon_type, []).append( + fn(DaemonId(daemon_type, daemon_index))) + return fn_returns + + def create_cmd_impl(self): + print("Creating cluster.") + self.creating_cluster = True + server_counts = self.options.replication_factor + self.set_master_addresses() + self.cluster_config = {} + self.options.set_cluster_config(self.cluster_config) + + for attr_name in ( + "enable_ysql", + "num_drives", + "replication_factor" + ): + attr_value = getattr(self.options, attr_name) + if attr_value is not None: + self.cluster_config[attr_name] = attr_value + self.cluster_config["ports"] = self.options.base_ports + self.cluster_config["ip_start"] = self.options.ip_start + + self.pre_create_checks() + os.makedirs(self.options.cluster_base_dir) + self.for_all_daemons(self.start_daemon) + self.wait_for_cluster_or_raise() + self.modify_placement_info() + + if self.options.installation_dir: + self.cluster_config["installation_dir"] = self.options.installation_dir + self.save_cluster_config() + + if self.is_ysql_enabled(): + self.run_cluster_wide_ysql_initdb() + self.cluster_status() + + # Starts as well as creates. Check if the cluster exists. + # If it does not create it else start the individual daemons. + def start_cmd_impl(self): + if os.path.isdir(self.options.cluster_base_dir): + print("Starting cluster with base directory %s" % self.options.cluster_base_dir) + + self.set_master_addresses() + self.for_all_daemons(self.start_daemon) + self.wait_for_cluster_or_raise() + self.modify_placement_info() + + if self.options.enable_ysql and not self.cluster_config.get('enable_ysql'): + if self.already_running_daemons: + sys.stderr.write( + "YugaByte DB was already running. If you are trying to enable YSQL " + "on an existing cluster, please stop/start the cluster.\n") + # To prevent YSQL from showing up in the status. + self.options.enable_ysql = False + else: + print("YSQL has been enabled on an existing cluster, running initdb") + self.run_cluster_wide_ysql_initdb() + self.cluster_config["enable_ysql"] = True + self.save_cluster_config() + + self.cluster_status() + else: + self.create_cmd_impl() + + # Stops the cluster. + def stop_cmd_impl(self): + print("Stopping cluster.") + self.for_all_daemons(self.stop_daemon) + + def destroy_cmd_impl(self): + print("Destroying cluster.") + self.for_all_daemons(self.stop_daemon) + + # Remove the top-level directory. + top_level = self.options.cluster_base_dir + if os.path.exists(top_level) and os.path.isdir(top_level): + logging.info("Removing base directory: {}".format(top_level)) + shutil.rmtree(self.options.cluster_base_dir) + + def restart_cmd_impl(self): + self.set_master_addresses() + print("Stopping cluster.") + self.for_all_daemons(self.stop_daemon) + print("Starting cluster.") + self.for_all_daemons(self.start_daemon) + self.wait_for_cluster_or_raise() + self.modify_placement_info() + self.cluster_status() + + def wipe_restart_cmd_impl(self): + num_servers_map = self.get_number_of_servers_map() + self.set_master_addresses() + self.destroy_cmd_impl() + self.create_cmd_impl() + + def add_node_cmd_impl(self): + print("Adding node.") + self.set_master_addresses(running_only=True) + num_servers = self.get_number_of_servers(self.options.node_type) + if len(self.options.placement_info) > 1: + raise RuntimeError("Please specify exactly one placement_info value.") + daemon_id = DaemonId(self.options.node_type, num_servers + 1) + if self.options.node_type == DAEMON_TYPE_MASTER: + self.options.is_shell_master = True + self.start_daemon(daemon_id) + self.wait_for_cluster_or_raise() + self.change_config_if_master(daemon_id, "ADD_SERVER") + self.show_node_status(daemon_id.index) + + def remove_node_cmd_impl(self): + print("Stopping node {}-{}.".format(self.options.node_type, self.args.node_id)) + # Note: remove_node in its current implementation just stops a node and does not + # decommission it. To properly decommission a local "node", we'll need + # to remove it from the master's metadata and also delete the data directory. + daemon_id = DaemonId(self.options.node_type, self.args.node_id) + logging.info("Stopping server {}".format(daemon_id)) + self.stop_daemon(daemon_id) + + def start_node_cmd_impl(self): + print("Starting node {}-{}.".format(self.options.node_type, self.args.node_id)) + daemon_id = DaemonId(self.options.node_type, self.args.node_id) + self.set_master_addresses() + self.start_daemon(daemon_id) + self.wait_for_cluster_or_raise() + self.show_node_status(daemon_id.index) + + def stop_node_cmd_impl(self): + self.remove_node_cmd_impl() + + def restart_node_cmd_impl(self): + print("Stopping node {}-{}.".format(self.options.node_type, self.args.node_id)) + daemon_id = DaemonId(self.options.node_type, self.args.node_id) + self.stop_daemon(daemon_id) + self.set_master_addresses() + if len(self.options.placement_info) > 1: + raise RuntimeError("Please specify exactly one placement_info value.") + print("Starting node {}-{}.".format(self.options.node_type, self.args.node_id)) + self.start_daemon(daemon_id) + self.wait_for_cluster_or_raise() + + def status_cmd_impl(self): + if not os.path.isdir(self.options.cluster_base_dir): + print("No cluster data found at: '{}'".format(self.options.cluster_base_dir)) + return + self.cluster_status(more_info=False) + server_counts = self.get_number_of_servers_map() + max_nodes = max( + server_counts.get(DAEMON_TYPE_MASTER), server_counts.get(DAEMON_TYPE_TSERVER)) + for index in range(1, max_nodes + 1): + self.show_node_status(index, server_counts) + + def setup_redis_cmd_impl(self): + print("Setting up YugaByte DB support for Redis API.") + self.set_master_addresses() + self.wait_for_cluster_or_raise() + cmd_setup_redis_table = self.yb_admin_cmd_list( + "--yb_num_shards_per_tserver", str(self.options.num_shards_per_tserver), + "setup_redis_table") + proc = subprocess.Popen( + cmd_setup_redis_table, stdout=subprocess.PIPE, stderr=subprocess.PIPE) + result, _ = proc.communicate() + if proc.returncode: + logging.error(result) + raise RuntimeError("Failed to Setup Redis.") + print("Setup Redis successful.") + + def run_cluster_wide_ysql_initdb(self): + initdb_binary_path = self.options.get_binary_path("initdb") + + logging.info("Running initdb to initialize YSQL metadata in the YugaByte DB cluster.") + env = {'FLAGS_pggate_master_addresses': self.options.master_addresses, + 'YB_ENABLED_IN_POSTGRES': '1'} + certs_dir = self.get_certs_dir() + if certs_dir: + env.update({'FLAGS_use_client_to_server_encryption': '1', + 'FLAGS_certs_dir': certs_dir}) + with SetAndRestoreEnv(env): + adjust_env_for_ysql() + + tmp_pg_data_dir = os.path.join(self.args.data_dir, 'yb-ctl_tmp_pg_data_{}'.format( + ''.join([random.choice('0123456789abcdef') for _ in range(32)]))) + + initdb_log_path = os.path.join(self.args.data_dir, "initdb.log") + if os.path.exists(initdb_log_path): + os.remove(initdb_log_path) + succeeded = False + try: + with open(initdb_log_path, 'w') as initdb_stdout_file: + proc = subprocess.Popen([ + initdb_binary_path, + '-D', tmp_pg_data_dir, + '-U', 'postgres' + ], stdout=initdb_stdout_file, stderr=subprocess.STDOUT) + wait_for_proc_report_progress(proc) + succeeded = proc.returncode == 0 + finally: + if os.path.exists(tmp_pg_data_dir): + logging.info("Deleting temporary YSQL data directory: %s", + tmp_pg_data_dir) + shutil.rmtree(tmp_pg_data_dir, ignore_errors=True) + if not succeeded and os.path.exists(initdb_log_path): + logging.info("initdb log file (from %s):", initdb_log_path) + with open(initdb_log_path) as initdb_log_input_file: + for line in initdb_log_input_file: + logging.info(line.rstrip()) + + logging.info( + "Successfully ran initdb to initialize YSQL data in the YugaByte cluster") + + def get_certs_dir(self): + # We split parameters containing multiple '='s like 'vmodule=meta_cache=5,tablet=5' into + # key before first '=' and value for the rest by passing maxsplit = 1 into item.split as + # a second parameter. + options = {k: v for k, v in (item.split("=", 1) for item in self.options.master_flags)} + if is_flag_true(options, "use_node_to_node_encryption") \ + and not is_flag_true(options, "allow_insecure_connections"): + return options.get( + "certs_dir", + "{}/yb-data/{}/certs".format(self.get_base_node_dirs(1)[0], DAEMON_TYPE_MASTER)) + return None + + def yb_admin_cmd_list(self, *actions): + if not self.options.master_addresses: + raise ValueError("Cannot form yb-admin command without knowing master addresses") + result = [self.options.get_binary_path("yb-admin"), + "--master_addresses", + self.options.master_addresses] + certs_dir = self.get_certs_dir() + if certs_dir: + result.extend(("--certs_dir_name", certs_dir)) + result.extend(actions) + return result + + # --------------------------------------------------------------------------------------------- + # The "main" function of the ClusterControl class + # --------------------------------------------------------------------------------------------- + + def run(self): + self.args = self.parser.parse_args() + # TODO: This is required for `yb-ctl --FLAGS CMD` syntax to work. Delete this when we + # update the docs so we only support `yb-ctl CMD --FLAGS` syntax. + for flag in dir(self.args): + if "_dest" in flag: + real_flag = flag[:-5] + parent_arg = getattr(self.args, flag, None) + child_arg = getattr(self.args, real_flag, None) + if child_arg is None: + setattr(self.args, real_flag, parent_arg) + if not self.args.verbose: + fd, filename = tempfile.mkstemp(text=True) + os.close(fd) + self.log_file = filename + atexit.register(lambda: self.cleanup(keep_log_file_and_dump_to_stderr=True)) + + logging.basicConfig( + filename=self.log_file, + level=logging.INFO, + format="%(asctime)s %(levelname)s: %(message)s") + + # Load cluster configuration for the non-creation case -- however, we will override + # this configuration if the command is "create". + self.load_cluster_config() + + self.options.update_options_from_args( + self.args, + fallback_installation_dir=self.cluster_config.get("installation_dir")) + + if hasattr(self.args, 'func'): + self.args.func() + else: + self.parser.print_help() + + def cleanup(self, keep_log_file_and_dump_to_stderr=False): + # Never created a temp log file to begin with. + if not self.log_file: + return + # If we have a file, we either failed and should keep the file, or wrapped up successfully, + # case in which we should remove the file. + if os.path.isfile(self.log_file): + # If we called the atexit handler and we have a file, it means we must have had errors. + if keep_log_file_and_dump_to_stderr: + self.dump_file_to_stderr(self.log_file) + print("^^^ Encountered errors ^^^") + # Whichever cleanup comes first should remove the file. + # TODO: Maybe keep the log file around and allow the user to upload a gist with the log. + os.remove(self.log_file) + + +if __name__ == "__main__": + try: + control = ClusterControl() + control.run() + control.cleanup() + except ExitWithError as ex: + logging.error(ex) + sys.exit(1) diff --git a/scripts/installation/test/yb-ctl-test.sh b/scripts/installation/test/yb-ctl-test.sh new file mode 100755 index 000000000000..dfa8d847aa9e --- /dev/null +++ b/scripts/installation/test/yb-ctl-test.sh @@ -0,0 +1,388 @@ +#!/usr/bin/env bash + +set -euo pipefail + +export YB_PG_FALLBACK_SYSTEM_USER_NAME=$USER +export YB_DISABLE_CALLHOME=1 + +readonly YSQL_DEFAULT_PORT=5433 +ysql_ip=127.0.0.1 + +# This will be auto-detected the first time yb-ctl auto-downloads and installs YugaByte DB. +installation_dir="" + +log() { + echo >&2 "[$( date +%Y-%m-%dT%H:%M:%S )] $*" +} + +fatal() { + log "$@" + exit 1 +} + +detect_installation_dir() { + if [[ -z $installation_dir ]]; then + installation_dir=$( ls -td "$HOME/yugabyte-db/yugabyte-"* | grep -v .tar.gz | head -1 ) + log "YugaByte DB has been automatically installed into directory: $installation_dir" + fi +} + +verify_ysqlsh() { + local node_number=${1:-1} + local ysql_port=${2:-$YSQL_DEFAULT_PORT} + + local ysql_ip=127.0.0.$node_number + log "Waiting for YSQL to listen on port $ysql_ip:$ysql_port" + + local attempts=0 + while ! nc -z "$ysql_ip" "$ysql_port"; do + if [[ $attempts -gt 600 ]]; then + fatal "Timed out waiting for YSQL on $ysql_ip:$ysql_port after $(( $attempts / 10 )) sec" + fi + sleep 0.1 + let attempts+=1 + done + + log "YSQL listening on port $ysql_ip:$ysql_port" + + # Give the YSQL a chance to start up, or we'll get an error: + # FATAL: the database system is starting up + sleep 1 + + local ysqlsh_cmd=( "$installation_dir"/bin/ysqlsh -h "$ysql_ip" -p "$ysql_port" -U postgres ) + local table_name="mytable$RANDOM" + log "Creating a YSQL table and inserting a bit of data" + "${ysqlsh_cmd[@]}" <<-EOF +create table $table_name (k int primary key, v text); +insert into $table_name (k, v) values (10, 'sometextvalue'); +insert into $table_name (k, v) values (20, 'someothertextvalue'); +EOF + log "Running a simple select from our YSQL table" + echo "select * from $table_name where k = 10; drop table $table_name;" | \ + "${ysqlsh_cmd[@]}"| \ + grep "sometextvalue" +} + +start_cluster_run_tests() { + if [[ $# -ne 1 ]]; then + fatal "One arg expected: root directory to run in" + fi + local root_dir=$1 + ( set -x; "$python_interpreter" "$root_dir"/yb-ctl start $create_flags "${yb_ctl_args[@]}" ) + verify_ysqlsh + ( + set -x + "$python_interpreter" "$root_dir"/yb-ctl add_node $create_flags "${yb_ctl_args[@]}" + ) + verify_ysqlsh + ( set -x; "$python_interpreter" "$root_dir"/yb-ctl stop_node 1 "${yb_ctl_args[@]}" ) + + # It looks like if we try to create a table in this state, the master is trying to assign + # tablets to node 1, which is down, and times out: + # + # TODO: re-enable when https://github.com/YugaByte/yugabyte-db/issues/1508 is fixed. + if false; then + verify_ysqlsh 2 + fi + ( + set -x + "$python_interpreter" "$root_dir"/yb-ctl start_node 1 $create_flags "${yb_ctl_args[@]}" + ) + verify_ysqlsh + ( set -x; "$python_interpreter" "$root_dir"/yb-ctl stop "${yb_ctl_args[@]}" ) + ( set -x; "$python_interpreter" "$root_dir"/yb-ctl destroy "${yb_ctl_args[@]}" ) +} + +readonly yb_data_dir_parent="/tmp/yb-ctl-test-data-$( date +%Y-%m-%dT%H_%M_%S )-$RANDOM" +readonly yb_data_dir=$yb_data_dir_parent/single_univ +readonly yb_univ_1_data_dir=$yb_data_dir_parent/cdc_1 +readonly yb_univ_2_data_dir=$yb_data_dir_parent/cdc_2 + +yb_ctl_args=( + --data_dir "$yb_data_dir" +) + +create_flags="" + +thick_log_heading() { + ( + echo + echo "========================================================================================" + echo "$@" + echo "========================================================================================" + echo + ) >&2 +} + +log_heading() { + ( + echo + echo "----------------------------------------------------------------------------------------" + echo "$@" + echo "----------------------------------------------------------------------------------------" + echo + ) >&2 +} + +cleanup() { + local exit_code=$? + if [[ $exit_code -ne 0 ]]; then + log "^^^ SEE THE ERROR MESSAGE ABOVE ^^^" + thick_log_heading "Dumping all the log files below:" + fi + + if [[ -d $yb_data_dir_parent ]]; then + find "$yb_data_dir_parent" \ + -name "*.out" -or \ + -name "*.err" -or \ + -name "*.INFO" -or \ + \( -name "*.log" -and -not -wholename "*/tablet-*/*.log" \) | sort | + while read log_path; do + log_heading "$log_path" + cat "$log_path" >&2 + done + else + log_heading "No data at $yb_data_dir_parent" + fi + + thick_log_heading "End of dumping various logs" + + if [[ $exit_code -ne 0 ]]; then + echo "Scroll up past the various logs to where it says 'SEE THE ERROR MESSAGE'." + fi + + if "$keep"; then + log "Not killing yb-master/yb-tserver processes or removing data directories at " \ + "$yb_data_dir_parent" + else + if [[ -d $yb_data_dir_parent ]]; then + find "$yb_data_dir_parent" \ + -maxdepth 1 -type d | sort | + while read data_dir; do + log "Killing yb-master/yb-tserver processes for $data_dir" + set +e + ( + set -x + pkill -f "yb-master --fs_data_dirs $data_dir/" -SIGKILL + pkill -f "yb-tserver --fs_data_dirs $data_dir/" -SIGKILL + ) + + set -e + if [[ -d $data_dir ]]; then + log "Removing data directory at $data_dir" + ( set -x; rm -rf "$data_dir" ) + fi + done + fi + fi + + log "Exiting with code $exit_code" + exit "$exit_code" +} + +print_usage() { + cat <<-EOT +Usage: ${0##*/} [] +Options: + -h, --help + Print usage information. + -k, --keep + Keep the cluster data directory around and keep servers running on shutdown. + --python3 + Use python3 + --verbose + Produce verbose output -- passed down to yb-ctl. +EOT +} + +# ------------------------------------------------------------------------------------------------- +# Parsing test arguments +# ------------------------------------------------------------------------------------------------- + +verbose=false +keep=false +python_interpreter=python + +while [[ $# -gt 0 ]]; do + case "$1" in + -h|--help) + print_usage + exit 0 + ;; + -v|--verbose) + verbose=true + ;; + -k|--keep) + keep=true + ;; + --python27) + python_interpreter=python2.7 + ;; + --python3) + python_interpreter=python3 + ;; + --python_interpreter) + python_interpreter=$2 + shift + ;; + *) + print_usage >&2 + echo >&2 + echo "Invalid option: $1" >&2 + exit 1 + esac + shift +done + +if "$verbose"; then + yb_ctl_args+=( --verbose ) +fi + +# ------------------------------------------------------------------------------------------------- +# Main test code +# ------------------------------------------------------------------------------------------------- + +script_dir=$( cd "$( dirname "$0" )" && pwd ) +cd "$script_dir"/.. + + + + +CI_RUN=${TRAVIS:-${GITHUB_ACTIONS:-undefined}} +log "OSTYPE: $OSTYPE" +log "USER: $USER" +log "CI_RUN: $CI_RUN" + +# Mac needs loopback aliases explicitly created: +# https://docs.yugabyte.com/latest/quick-start/install/ +if [[ ${CI_RUN} == "true" && $OSTYPE == darwin* ]]; then + sudo ifconfig lo0 alias 127.0.0.2 + sudo ifconfig lo0 alias 127.0.0.3 + sudo ifconfig lo0 alias 127.0.0.4 + sudo ifconfig lo0 alias 127.0.0.5 + sudo ifconfig lo0 alias 127.0.0.6 + sudo ifconfig lo0 alias 127.0.0.7 +fi + +# Make top level data dir. +mkdir -p "$yb_data_dir_parent" + +trap cleanup EXIT + +log_heading "Running basic tests" +( + set -x + "$python_interpreter" bin/yb-ctl create $create_flags "${yb_ctl_args[@]}" --install-if-needed +) + +detect_installation_dir +verify_ysqlsh + +( + set -x + "$python_interpreter" bin/yb-ctl stop "${yb_ctl_args[@]}" + "$python_interpreter" bin/yb-ctl destroy "${yb_ctl_args[@]}" +) + +start_cluster_run_tests "bin" + +log_heading "Testing YSQL port override" +custom_ysql_port=54320 +( + set -x + "$python_interpreter" bin/yb-ctl create --ysql_port "$custom_ysql_port" \ + $create_flags "${yb_ctl_args[@]}" +) +verify_ysqlsh 1 "$custom_ysql_port" +( + set -x + "$python_interpreter" bin/yb-ctl stop "${yb_ctl_args[@]}" +) +log "Checking that the custom YSQL port persists across restarts" +( + set -x + "$python_interpreter" bin/yb-ctl start $create_flags "${yb_ctl_args[@]}" +) +verify_ysqlsh 1 "$custom_ysql_port" +( + set -x + "$python_interpreter" bin/yb-ctl destroy "${yb_ctl_args[@]}" +) + +log_heading "Test creating multiple universes" +( + set -x + "$python_interpreter" bin/yb-ctl create $create_flags --data_dir "$yb_univ_1_data_dir" --rf 1 +) +verify_ysqlsh + +log_heading "Creating second universe with custom ip_start" +custom_ip_start=2 +( + set -x + "$python_interpreter" bin/yb-ctl create --ip_start $custom_ip_start $create_flags \ + --data_dir "$yb_univ_2_data_dir" --rf 1 +) +verify_ysqlsh $custom_ip_start + +( + set -x + "$python_interpreter" bin/yb-ctl stop --data_dir "$yb_univ_1_data_dir" +) +( + set -x + "$python_interpreter" bin/yb-ctl stop --data_dir "$yb_univ_2_data_dir" +) + +log "Checking that the universes and custom ip addresses persist across restarts" +( + set -x + "$python_interpreter" bin/yb-ctl start $create_flags --data_dir "$yb_univ_1_data_dir" +) +( + set -x + "$python_interpreter" bin/yb-ctl start $create_flags --data_dir "$yb_univ_2_data_dir" +) +verify_ysqlsh +verify_ysqlsh $custom_ip_start +( + set -x + "$python_interpreter" bin/yb-ctl destroy --data_dir "$yb_univ_1_data_dir" +) +( + set -x + "$python_interpreter" bin/yb-ctl destroy --data_dir "$yb_univ_2_data_dir" +) + +# ------------------------------------------------------------------------------------------------- +log_heading "Testing putting this version of yb-ctl inside the installation directory" +( set -x; cp bin/yb-ctl "${installation_dir}/bin" ) +start_cluster_run_tests "${installation_dir}/bin" + +log_heading \ + "Pretending we've just built the code and are running yb-ctl from the bin directory in the code" +yb_src_root=$HOME/yugabyte-db-src-root +submodule_bin_dir=$yb_src_root/scripts/installation/bin +mkdir -p "$submodule_bin_dir" +cp bin/yb-ctl "$submodule_bin_dir" +mkdir -p "$yb_src_root/build" +yb_build_root=$yb_src_root/build/latest + +( set -x; time cp -R "$installation_dir" "$yb_build_root" ) + +if [[ $OSTYPE == linux* ]]; then + ( set -x; time "$yb_build_root/bin/post_install.sh" ) +fi + +( + set -x + installation_dir=$yb_build_root + "$python_interpreter" "$submodule_bin_dir/yb-ctl" start $create_flags "${yb_ctl_args[@]}" +) +verify_ysqlsh +( + "$python_interpreter" "$submodule_bin_dir/yb-ctl" stop "${yb_ctl_args[@]}" + "$python_interpreter" "$submodule_bin_dir/yb-ctl" destroy "${yb_ctl_args[@]}" +) + +log_heading "TESTS SUCCEEDED" diff --git a/submodules/yugabyte-installation b/submodules/yugabyte-installation deleted file mode 160000 index bfa1f8428a6f..000000000000 --- a/submodules/yugabyte-installation +++ /dev/null @@ -1 +0,0 @@ -Subproject commit bfa1f8428a6fcb8eeaddf8779b36202fe4c7adb9 diff --git a/yb_release_manifest.json b/yb_release_manifest.json index 0fde1d20998b..084d50ae1035 100644 --- a/yb_release_manifest.json +++ b/yb_release_manifest.json @@ -26,7 +26,7 @@ "bin/yb-check-consistency.py", "bin/yugabyted", "build-support/post_install.sh", - "submodules/yugabyte-installation/bin/yb-ctl", + "scripts/installation/bin/yb-ctl", "thirdparty/installed/common/bin/redis-cli", "thirdparty/installed/common/cqlsh/bin/cqlsh", "thirdparty/installed/common/cqlsh/bin/ycqlsh",