diff --git a/logparser/LogMine/src/alignment.py b/logparser/LogMine/src/alignment.py index 779be042..8e3bd3ab 100644 --- a/logparser/LogMine/src/alignment.py +++ b/logparser/LogMine/src/alignment.py @@ -1,113 +1,105 @@ # ========================================================================= -# This software is a free software. Thus, it is licensed under GNU General Public License. -# Python implementation to Smith-Waterman Algorithm for Homework 1 of Bioinformatics class. -# Forrest Bao, Sept. 26 +# This software is licensed Apache License, Version 2.0. +# Python implementation of the Smith-Waterman Algorithm. -# zeros() was origianlly from NumPy. -# This version is implemented by alevchuk 2011-04-10 +# This version is implemented by lopozz 2025-03-01 +# Implementation reference https://en.wikipedia.org/wiki/Smith%E2%80%93Waterman_algorithm#Algorithm # ========================================================================= +class SWParams: + def __init__(self, similarity_score=10, gap_penalty=0, mismatch_penalty=1): + self.similarity_score = similarity_score + self.gap_penalty = gap_penalty + self.mismatch_penalty = mismatch_penalty + + def zeros(shape): - retval = [] - for x in range(shape[0]): - retval.append([]) - for y in range(shape[1]): - retval[-1].append(0) - return retval - -match_award = 10 -mismatch_penalty = 1 -gap_penalty = 0 # both for opening and extanding - -def match_score(alpha, beta): - if alpha == beta: - return match_award - elif alpha == '-' or beta == '-': - return gap_penalty - else: - return mismatch_penalty - -def finalize(align1, align2): - align1.reverse() #reverse sequence 1 - align2.reverse() #reverse sequence 2 - - i,j = 0,0 - - #calcuate identity, score and aligned sequeces - symbol = '' - found = 0 - score = 0 - identity = 0 - for i in range(0,len(align1)): - # if two AAs are the same, then output the letter - if align1[i] == align2[i]: - symbol = symbol + align1[i] - identity = identity + 1 - score += match_score(align1[i], align2[i]) - - # if they are not identical and none of them is gap - elif align1[i] != align2[i] and align1[i] != '-' and align2[i] != '-': - score += match_score(align1[i], align2[i]) - symbol += ' ' - found = 0 - - #if one of them is a gap, output a space - elif align1[i] == '-' or align2[i] == '-': - symbol += ' ' - score += gap_penalty - - identity = float(identity) / len(align1) * 100 - - return align1, align2 - -def water(seq1, seq2): - m, n = len(seq1), len(seq2) # length of two sequences - - # Generate DP table and traceback path pointer matrix - score = zeros((m+1, n+1)) # the DP table - pointer = zeros((m+1, n+1)) # to store the traceback path - - max_score = 0 # initial maximum score in DP table - # Calculate DP table and mark pointers + m = [] + for _ in range(shape[0]): + m.append([]) + for _ in range(shape[1]): + m[-1].append(0) + return m + + +def match_score(a, b, params=None): + if params is None: + params = SWParams() + + return ( + params.similarity_score + if a == b + else params.gap_penalty + if a == "-" or b == "-" + else params.mismatch_penalty + ) + + +def identity_score(align_s1, align_s2): + mathches = sum([1 for i, _ in enumerate(align_s1) if align_s1[i] == align_s2[i]]) + identity = float(mathches) / len(align_s1) * 100 + return identity + + +def water(s1, s2, params=None): + if params is None: + params = SWParams() + + # 1. Initialize a scoring matrix H + m, n = len(s1), len(s2) + H = zeros((m + 1, n + 1)) + + # Initialize traceback path pointer matrix + P = zeros((m + 1, n + 1)) + + # 2. Fill the scoring matrix + max_score = 0 for i in range(1, m + 1): for j in range(1, n + 1): - score_diagonal = score[i-1][j-1] + match_score(seq1[i-1], seq2[j-1]) - score_up = score[i][j-1] + gap_penalty - score_left = score[i-1][j] + gap_penalty - score[i][j] = max(0,score_left, score_up, score_diagonal) - if score[i][j] == 0: - pointer[i][j] = 0 # 0 means end of the path - if score[i][j] == score_left: - pointer[i][j] = 1 # 1 means trace up - if score[i][j] == score_up: - pointer[i][j] = 2 # 2 means trace left - if score[i][j] == score_diagonal: - pointer[i][j] = 3 # 3 means trace diagonal - if score[i][j] >= max_score: - max_i = i - max_j = j - max_score = score[i][j]; - - align1 = [] - align2 = [] # initial sequences - - i,j = max_i,max_j # indices of path starting point - - #traceback, follow pointers - while pointer[i][j] != 0: - if pointer[i][j] == 3: - align1.append(seq1[i-1]) - align2.append(seq2[j-1]) + match = H[i - 1][j - 1] + match_score(s1[i - 1], s2[j - 1]) + insert = H[i][j - 1] + params.gap_penalty + delete = H[i - 1][j] + params.gap_penalty + zero = 0 + H[i][j] = max(match, delete, insert, zero) + + # Determine pointer direction + if H[i][j] == zero: + P[i][j] = 0 # End of the path + if H[i][j] == delete: + P[i][j] = 1 # Trace up + if H[i][j] == insert: + P[i][j] = 2 # Trace left + if H[i][j] == match: + P[i][j] = 3 # Trace diagonal + + # Update max score position + if H[i][j] >= max_score: + max_i, max_j, max_score = i, j, H[i][j] + + # 3. Traceback + align_s1, align_s2 = [], [] + i, j = max_i, max_j + + while True: + pointer = P[i][j] + if pointer == 0: # End of the path + break + if pointer == 3: + align_s1.append(s1[i - 1]) + align_s2.append(s2[j - 1]) i -= 1 j -= 1 - elif pointer[i][j] == 2: - align1.append('-') - align2.append(seq2[j-1]) + elif pointer == 2: + align_s1.append("-") + align_s2.append(s2[j - 1]) j -= 1 - elif pointer[i][j] == 1: - align1.append(seq1[i-1]) - align2.append('-') + elif pointer == 1: + align_s1.append(s1[i - 1]) + align_s2.append("-") i -= 1 - return finalize(align1, align2) + align_s1.reverse() + align_s2.reverse() + + return align_s1, align_s2