|
7 | 7 | from collections import defaultdict |
8 | 8 | from time import sleep |
9 | 9 | from typing import Any, Dict, List, Optional, Set, Tuple, Union |
| 10 | +from unittest import result |
10 | 11 |
|
11 | 12 | from rich import box |
12 | 13 | from rich.console import Console |
13 | 14 | from rich.panel import Panel |
14 | 15 | from rich.table import Table |
| 16 | +from rich.live import Live |
15 | 17 |
|
16 | 18 | from cratedb_toolkit.admin.xmover.model import ( |
17 | 19 | DistributionStats, |
@@ -840,88 +842,95 @@ def plan_node_decommission(self, node_name: str, min_free_space_gb: float = 100. |
840 | 842 | class ShardMonitor: |
841 | 843 | def __init__(self, analyzer: ShardAnalyzer): |
842 | 844 | self.analyzer = analyzer |
843 | | - self.shard_time_deltas: dict[str, int] = {} |
844 | | - self.shard_seq_deltas: dict[str, int] = {} |
845 | | - self.shards: dict[str, ShardInfo] = {} |
| 845 | + self.reference_shards: dict[str, ShardInfo] |
| 846 | + self.latest_shards: list[ShardInfo] |
| 847 | + self.seq_deltas: dict[str, int] |
| 848 | + self.size_deltas: dict[str, float] |
846 | 849 |
|
847 | 850 | def _get_shard_compound_id(self, shard: ShardInfo) -> str: |
848 | 851 | return f"{shard.node_id}-{shard.shard_id}" |
849 | | - |
850 | | - def refresh_deltas(self, refresh_data: bool=True): |
851 | | - if refresh_data: |
852 | | - self.analyzer._refresh_data() |
853 | 852 |
|
854 | | - shards: list[ShardInfo] = self.analyzer.shards |
| 853 | + def calculate_heat_deltas(self, reference_shards: dict[str, ShardInfo], updated_shards: list[ShardInfo]): |
| 854 | + seq_result: dict[str, int] = {} |
| 855 | + size_result: dict[str, float] = {} |
855 | 856 |
|
856 | | - refreshed_shards: list[str] = [] |
857 | | - self.shards = {} |
858 | | - for shard in shards: |
| 857 | + for shard in updated_shards: |
859 | 858 | shard_compound_id = self._get_shard_compound_id(shard) |
860 | | - refreshed_shards.append(shard_compound_id) |
861 | | - self.shards[shard_compound_id] = shard |
862 | | - |
863 | | - hot_timestamp = datetime.datetime.fromtimestamp(float(shard.hot_timestamp) / 1000) |
864 | | - hot_delta = int((datetime.datetime.now() - hot_timestamp).total_seconds()) if shard.hot_timestamp else None |
865 | 859 |
|
866 | | - self.shard_time_deltas[shard_compound_id] = hot_delta |
867 | | - if not shard_compound_id in self.shard_seq_deltas: |
868 | | - self.shard_seq_deltas[shard_compound_id] = shard.seq_stats_max_seq_no |
| 860 | + if shard_compound_id not in reference_shards: |
| 861 | + seq_result[shard_compound_id] = 0 |
| 862 | + size_result[shard_compound_id] = shard.size_gb |
869 | 863 | else: |
870 | | - self.shard_seq_deltas[shard_compound_id] = shard.seq_stats_max_seq_no - self.shard_seq_deltas[shard_compound_id] |
| 864 | + refreshed_number = shard.seq_stats_max_seq_no |
| 865 | + reference = reference_shards[shard_compound_id].seq_stats_max_seq_no |
871 | 866 |
|
872 | | - # Forget shards that are not reported |
873 | | - to_delete: set[str] = set(self.shard_seq_deltas.keys()).difference(refreshed_shards) |
874 | | - for id in to_delete: |
875 | | - del(self.shard_seq_deltas[id]) |
876 | | - del(self.shard_time_deltas[id]) |
| 867 | + if refreshed_number < reference: |
| 868 | + refreshed_number += 2 ** 63 - 1 |
877 | 869 |
|
878 | | - def filter_shards(self, max_seconds_old: int) -> list[ShardInfo]: |
879 | | - """ |
880 | | - Filter shards not touched in the last `max_seconds_old` seconds |
881 | | - """ |
882 | | - return [self.shards.get(k) for k, v in self.shard_time_deltas.items() if v <= max_seconds_old] |
| 870 | + seq_result[shard_compound_id] = refreshed_number - reference |
| 871 | + size_result[shard_compound_id] = shard.size_gb - reference_shards[shard_compound_id].size_gb |
| 872 | + |
| 873 | + self.seq_deltas = seq_result |
| 874 | + self.size_deltas = size_result |
| 875 | + |
| 876 | + def refresh_data(self): |
| 877 | + self.analyzer._refresh_data() |
| 878 | + updated_shards: list[ShardInfo] = self.analyzer.shards |
| 879 | + self.calculate_heat_deltas(self.reference_shards, updated_shards) |
| 880 | + self.latest_shards = sorted(updated_shards, key=lambda s: self.seq_deltas[self._get_shard_compound_id(s)], reverse=True) |
| 881 | + self.latest_shards = self.latest_shards[:20] |
883 | 882 |
|
884 | 883 | def monitor_shards(self): |
885 | | - max_seconds_old = 300 |
| 884 | + self.reference_shards = {self._get_shard_compound_id(shard): shard for shard in self.analyzer.shards} |
| 885 | + self.refresh_data() |
886 | 886 |
|
887 | | - self.refresh_deltas(refresh_data=False) |
888 | | - sleep(10) |
889 | | - self.refresh_deltas() |
| 887 | + console.print(Panel.fit("[bold blue]The 25 most Hot Shards[/bold blue]")) |
| 888 | + shards_table = self.display_shards_table_header() |
890 | 889 |
|
891 | | - shards: list[ShardInfo] = self.filter_shards(max_seconds_old) |
892 | | - sorted_shards: list[ShardInfo] = sorted(shards, key=lambda s: self.shard_seq_deltas[self._get_shard_compound_id(s)]) |
| 890 | + with Live(self.generate_table(self.latest_shards, self.seq_deltas), refresh_per_second=4, console=console) as live: |
| 891 | + while True: |
| 892 | + sleep(10) |
| 893 | + self.refresh_data() |
| 894 | + # self.display_shards_table_rows(shards_table, self.latest_shards, self.deltas) |
| 895 | + live.update(self.generate_table(self.latest_shards, self.seq_deltas)) |
893 | 896 |
|
894 | | - console.print(Panel.fit("[bold blue]CrateDB Hot Shard Monitor[/bold blue]")) |
| 897 | + def generate_table(self, sorted_shards: list[ShardInfo], deltas: dict[str, int]): |
| 898 | + t = self.display_shards_table_header() |
| 899 | + self.display_shards_table_rows(t, self.latest_shards, self.seq_deltas) |
| 900 | + return t |
895 | 901 |
|
896 | 902 | # Cluster summary table |
| 903 | + def display_shards_table_header(self): |
897 | 904 | shards_table = Table(title="Hot shards", box=box.ROUNDED) |
898 | 905 | shards_table.add_column("Schema", style="cyan") |
899 | 906 | shards_table.add_column("Table", style="cyan") |
900 | 907 | shards_table.add_column("ID", style="cyan") |
901 | 908 | shards_table.add_column("Node", style="cyan") |
902 | 909 | shards_table.add_column("Primary", style="cyan") |
903 | 910 | shards_table.add_column("Size", style="magenta") |
| 911 | + shards_table.add_column("Size Delta", style="magenta") |
904 | 912 | shards_table.add_column("Seq Delta", style="magenta") |
905 | | - shards_table.add_column("Seconds ago", style="magenta") |
| 913 | + return shards_table |
| 914 | + |
| 915 | + def display_shards_table_rows(self, shards_table: Table, sorted_shards: list[ShardInfo], deltas: dict[str, int]): |
| 916 | + shards_table.rows.clear() |
906 | 917 |
|
907 | 918 | for shard in sorted_shards: |
908 | 919 | shard_compound_id = self._get_shard_compound_id(shard) |
909 | | - shards_table.add_row( |
910 | | - shard.schema_name, |
911 | | - shard.table_name, |
912 | | - str(shard.shard_id), |
913 | | - shard.node_name, |
914 | | - str(shard.is_primary), |
915 | | - format_size(shard.size_gb), |
916 | | - str(self.shard_seq_deltas[shard_compound_id]), |
917 | | - str(self.shard_time_deltas[shard_compound_id]), |
918 | | - ) |
| 920 | + seq_delta = deltas.get(shard_compound_id, 0) |
| 921 | + if seq_delta != 0: |
| 922 | + shards_table.add_row( |
| 923 | + shard.schema_name, |
| 924 | + shard.table_name, |
| 925 | + str(shard.shard_id), |
| 926 | + shard.node_name, |
| 927 | + str(shard.is_primary), |
| 928 | + format_size(shard.size_gb), |
| 929 | + format_size(self.size_deltas[shard_compound_id]), |
| 930 | + str(seq_delta), |
| 931 | + ) |
919 | 932 | console.print(shards_table) |
920 | 933 |
|
921 | | - console.print(Panel.fit("[bold blue]CrateDB Heat per Node[/bold blue]")) |
922 | | - |
923 | | - |
924 | | - |
925 | 934 |
|
926 | 935 | class ShardReporter: |
927 | 936 | def __init__(self, analyzer: ShardAnalyzer): |
|
0 commit comments