1515from typing import NamedTuple , List
1616from uuid import UUID
1717from yb .txndump .model import DocHybridTime , HybridTime , SubDocKey
18- from yb .txndump .parser import AnalyzerBase , DumpProcessor , TransactionBase
18+ from yb .txndump .parser import AnalyzerBase , DumpProcessor , TransactionBase , Update
1919
2020kValueColumn = 1
2121
@@ -40,59 +40,18 @@ def __repr__(self) -> str:
4040 return result + " }"
4141
4242
43- class Update (NamedTuple ):
44- doc_ht : DocHybridTime
45- txn_id : UUID
46- value : int
47- log_ht : HybridTime
48-
49-
50- class Read (NamedTuple ):
51- read_time : HybridTime
52- value : int
53- write_time : DocHybridTime
54- txn_id : UUID
55- same_transaction : bool
56-
57-
58- class KeyData (NamedTuple ):
59- updates : List [Update ] = []
60- reads : List [Read ] = []
61-
62-
63- class BankAccountsAnalyzer (AnalyzerBase ):
43+ class BankAccountsAnalyzer (AnalyzerBase [int , int ]):
6444 def __init__ (self ):
6545 super ().__init__ ()
66- self .rows = {}
6746 self .log = []
6847
69- def apply_row (self , txn_id : UUID , key : SubDocKey , value : int , log_ht : HybridTime ):
48+ def extract_key (self , tablet : str , key : SubDocKey ):
7049 if key .sub_keys [0 ] == kValueColumn :
71- row = key .hash_components [0 ]
72- self .get_row (row ).updates .append (Update (key .doc_ht , txn_id , value , log_ht ))
73-
74- def read_value (
75- self , txn_id : UUID , key , value , read_time : HybridTime , write_time : DocHybridTime ,
76- same_transaction : bool ):
77- if key .sub_keys [0 ] == kValueColumn and (write_time .hybrid_time <= read_time ):
78- self .get_row (key .hash_components [0 ]).reads .append (Read (
79- read_time , value , write_time , txn_id , same_transaction ))
80-
81- def get_row (self , key ) -> KeyData :
82- if key not in self .rows :
83- self .rows [key ] = KeyData ()
84- return self .rows [key ]
85-
86- def get_transaction (self , txn_id : UUID ):
87- if txn_id not in self .txns :
88- self .txns [txn_id ] = BankAccountTransaction (txn_id )
89- return self .txns [txn_id ]
90-
91- def check_same_updates (self , key : int , update : Update , same_updates : int ):
92- if same_updates < 3 :
93- err_fmt = "Wrong number of same updates for key {}, update {}: {}"
94- self .error (
95- update .doc_ht .hybrid_time , update .txn_id , err_fmt .format (key , update , same_updates ))
50+ return key .hash_components [0 ]
51+ return None
52+
53+ def create_transaction (self , txn_id : UUID ):
54+ return BankAccountTransaction (txn_id )
9655
9756 def analyze (self ):
9857 self .check_status_logs ()
@@ -107,33 +66,10 @@ def analyze(self):
10766 for line in sorted (self .log ):
10867 print (line )
10968
110- def analyze_key (self , key ):
111- updates = sorted (self .rows [key ].updates ,
112- key = lambda upd : (upd .doc_ht , upd .txn_id ))
113- reads = sorted (self .rows [key ].reads ,
114- key = lambda read : read .read_time )
115- read_idx = 0
116- old_balance = 100 if key == 0 else 0
117- prev_update = None
118- same_updates = 3
119- for update in updates :
120- if prev_update is not None and prev_update == update :
121- same_updates += 1
122- continue
123- else :
124- self .check_same_updates (key , prev_update , same_updates )
125- same_updates = 1
126-
127- new_balance : int = self .analyze_update (key , update , old_balance )
69+ def initial_value (self , key : int ):
70+ return 100 if key == 0 else 0
12871
129- read_idx = self .analyze_read (
130- key , reads , read_idx , update .doc_ht .hybrid_time , old_balance )
131-
132- old_balance = new_balance
133- prev_update = update
134- self .check_same_updates (key , prev_update , same_updates )
135-
136- def analyze_update (self , key : int , update : Update , old_balance : int ) -> int :
72+ def analyze_update (self , key : int , update : Update [int ], old_balance : int ) -> int :
13773 new_balance = update .value
13874 hybrid_time = update .doc_ht .hybrid_time
13975 txn = self .txns [update .txn_id ]
@@ -168,23 +104,6 @@ def analyze_update(self, key: int, update: Update, old_balance: int) -> int:
168104 self .log .append ((hybrid_time , 'w' , txn ))
169105 return new_balance
170106
171- def analyze_read (
172- self , key : int , reads : List [Read ], read_idx : int , hybrid_time : HybridTime ,
173- old_balance : int ) -> int :
174- while read_idx < len (reads ) and hybrid_time > reads [read_idx ].read_time :
175- read_txn = reads [read_idx ].txn_id
176- if read_txn in self .txns :
177- read_balance = reads [read_idx ].value
178- if old_balance != read_balance :
179- self .error (
180- reads [read_idx ].read_time ,
181- read_txn ,
182- "Bad read key: {}, actual: {}, read: {}" .format (
183- key , old_balance , reads [read_idx ]))
184- self .log .append ((reads [read_idx ].read_time , 'r' , read_txn , key , read_balance ))
185- read_idx += 1
186- return read_idx
187-
188107 def check_transaction (self , txn : BankAccountTransaction ):
189108 cnt_keys = (1 if txn .key1 is not None else 0 ) + (1 if txn .key2 is not None else 0 )
190109 if cnt_keys == 0 :
0 commit comments