Skip to content

Commit f5c7e83

Browse files
committed
Replace regex fragment matching with streaming KMP
1 parent 8f667a3 commit f5c7e83

File tree

1 file changed

+152
-70
lines changed

1 file changed

+152
-70
lines changed

src/participant/streamParsing.ts

Lines changed: 152 additions & 70 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,148 @@
1-
function escapeRegex(str: string): string {
2-
return str.replace(/[.*+?^${}()|[\]\\]/g, '\\$&');
1+
// This is a stateful streaming implementation of the Knuth-Morris-Pratt algorithm
2+
// for substring search. It supports being invoked with multiple fragments of the
3+
// haystack and is capable of finding matches spanning multiple fragments.
4+
class StreamingKMP {
5+
public needle: string;
6+
private _lookupVector: number[];
7+
8+
// In cases where we are fed a string that has a suffix that matches a prefix
9+
// of the needle, we're storing the index in the needle which we last matched.
10+
// Then when we get a new haystack, we start matching from that needle.
11+
private _lastMatchingIndex = 0;
12+
13+
constructor(needle: string) {
14+
this.needle = needle;
15+
this._lookupVector = this._createLookupVector();
16+
}
17+
18+
private _createLookupVector(): number[] {
19+
const vector = new Array<number>(this.needle.length);
20+
let j = 0;
21+
vector[0] = 0;
22+
23+
for (let i = 1; i < this.needle.length; i++) {
24+
while (j > 0 && this.needle[i] !== this.needle[j]) {
25+
j = vector[j - 1];
26+
}
27+
28+
if (this.needle[i] === this.needle[j]) {
29+
j++;
30+
}
31+
32+
vector[i] = j;
33+
}
34+
35+
return vector;
36+
}
37+
38+
// Returns the index in the haystackFragment **after** the needle.
39+
// This is done because the match may have occurred over multiple fragments,
40+
// so the index of the needle start would be negative.
41+
public match(haystackFragment: string): number {
42+
let j = this._lastMatchingIndex; // index in needle
43+
let i = 0; // index in haystack
44+
45+
while (i < haystackFragment.length) {
46+
if (haystackFragment[i] === this.needle[j]) {
47+
i++;
48+
j++;
49+
}
50+
51+
if (j === this.needle.length) {
52+
this._lastMatchingIndex = 0;
53+
return i;
54+
}
55+
56+
if (
57+
i < haystackFragment.length &&
58+
haystackFragment[i] !== this.needle[j]
59+
) {
60+
if (j !== 0) {
61+
j = this._lookupVector[j - 1];
62+
} else {
63+
i++;
64+
}
65+
}
66+
}
67+
68+
this._lastMatchingIndex = j;
69+
return -1;
70+
}
71+
72+
public reset(): void {
73+
this._lastMatchingIndex = 0;
74+
}
75+
}
76+
77+
class FragmentMatcher {
78+
private _startMatcher: StreamingKMP;
79+
private _endMatcher: StreamingKMP;
80+
private _matchedContent?: string;
81+
private _onContentMatched: (content: string) => void;
82+
83+
constructor({
84+
identifier,
85+
onContentMatched,
86+
}: {
87+
identifier: {
88+
start: string;
89+
end: string;
90+
};
91+
onContentMatched: (content: string) => void;
92+
}) {
93+
this._startMatcher = new StreamingKMP(identifier.start);
94+
this._endMatcher = new StreamingKMP(identifier.end);
95+
this._onContentMatched = onContentMatched;
96+
}
97+
98+
private _contentMatched(): void {
99+
const content = this._matchedContent;
100+
if (content !== undefined) {
101+
// Strip the trailing end identifier from the matched content
102+
this._onContentMatched(
103+
content.slice(0, content.length - this._endMatcher.needle.length)
104+
);
105+
}
106+
107+
this._matchedContent = undefined;
108+
this._startMatcher.reset();
109+
this._endMatcher.reset();
110+
}
111+
112+
public process(fragment: string): void {
113+
if (this._matchedContent === undefined) {
114+
// We haven't matched the start identifier yet, so try and do that
115+
const startIndex = this._startMatcher.match(fragment);
116+
if (startIndex !== -1) {
117+
let endIndex = this._endMatcher.match(fragment.slice(startIndex));
118+
if (endIndex !== -1) {
119+
// This is the case where both the start and the end identifiers are contained in the same fragment.
120+
// In this case, we emit the content between the two identifiers and continue processing the rest of the fragment.
121+
122+
// endIndex is relative to the slice, so we need to add startIndex to it.
123+
endIndex = startIndex + endIndex;
124+
125+
this._matchedContent = fragment.slice(startIndex, endIndex);
126+
this._contentMatched();
127+
this.process(fragment.slice(endIndex));
128+
} else {
129+
// If we only matched the start, we add the partial fragment to the matched content and
130+
// wait for another fragment to complete the match.
131+
this._matchedContent = fragment.slice(startIndex);
132+
}
133+
}
134+
} else {
135+
const endIndex = this._endMatcher.match(fragment);
136+
if (endIndex !== -1) {
137+
// We've matched the end - emit the matched content and continue processing the partial fragment
138+
this._matchedContent += fragment.slice(0, endIndex);
139+
this._contentMatched();
140+
this.process(fragment.slice(endIndex));
141+
} else {
142+
this._matchedContent += fragment;
143+
}
144+
}
145+
}
3146
}
4147

5148
/**
@@ -22,74 +165,13 @@ export async function processStreamWithIdentifiers({
22165
end: string;
23166
};
24167
}): Promise<void> {
25-
const escapedIdentifierStart = escapeRegex(identifier.start);
26-
const escapedIdentifierEnd = escapeRegex(identifier.end);
27-
const regex = new RegExp(
28-
`${escapedIdentifierStart}([\\s\\S]*?)${escapedIdentifierEnd}`,
29-
'g'
30-
);
31-
32-
let contentSinceLastIdentifier = '';
33-
for await (const fragment of inputIterable) {
34-
contentSinceLastIdentifier += fragment;
35-
36-
let lastIndex = 0;
37-
let match: RegExpExecArray | null;
38-
while ((match = regex.exec(contentSinceLastIdentifier)) !== null) {
39-
const endIndex = regex.lastIndex;
40-
41-
// Stream content up to the end of the identifier.
42-
const contentToStream = contentSinceLastIdentifier.slice(
43-
lastIndex,
44-
endIndex
45-
);
46-
processStreamFragment(contentToStream);
168+
const fragmentMatcher = new FragmentMatcher({
169+
identifier,
170+
onContentMatched: onStreamIdentifier,
171+
});
47172

48-
const identifierContent = match[1];
49-
onStreamIdentifier(identifierContent);
50-
51-
lastIndex = endIndex;
52-
}
53-
54-
if (lastIndex > 0) {
55-
// Remove all of the processed content.
56-
contentSinceLastIdentifier = contentSinceLastIdentifier.slice(lastIndex);
57-
// Reset the regex.
58-
regex.lastIndex = 0;
59-
} else {
60-
// Clear as much of the content as we can safely.
61-
const maxUnprocessedLength = identifier.start.length - 1;
62-
if (contentSinceLastIdentifier.length > maxUnprocessedLength) {
63-
const identifierIndex = contentSinceLastIdentifier.indexOf(
64-
identifier.start
65-
);
66-
if (identifierIndex > -1) {
67-
// We have an identifier, so clear up until the identifier.
68-
const contentToStream = contentSinceLastIdentifier.slice(
69-
0,
70-
identifierIndex
71-
);
72-
processStreamFragment(contentToStream);
73-
contentSinceLastIdentifier =
74-
contentSinceLastIdentifier.slice(identifierIndex);
75-
} else {
76-
// No identifier, so clear up until the last maxUnprocessedLength.
77-
const processUpTo =
78-
contentSinceLastIdentifier.length - maxUnprocessedLength;
79-
const contentToStream = contentSinceLastIdentifier.slice(
80-
0,
81-
processUpTo
82-
);
83-
processStreamFragment(contentToStream);
84-
contentSinceLastIdentifier =
85-
contentSinceLastIdentifier.slice(processUpTo);
86-
}
87-
}
88-
}
89-
}
90-
91-
// Finish up anything not streamed yet.
92-
if (contentSinceLastIdentifier.length > 0) {
93-
processStreamFragment(contentSinceLastIdentifier);
173+
for await (const fragment of inputIterable) {
174+
processStreamFragment(fragment);
175+
fragmentMatcher.process(fragment);
94176
}
95177
}

0 commit comments

Comments
 (0)