1- function escapeRegex ( str : string ) : string {
2- return str . replace ( / [ . * + ? ^ $ { } ( ) | [ \] \\ ] / g, '\\$&' ) ;
1+ // This is a stateful streaming implementation of the Knuth-Morris-Pratt algorithm
2+ // for substring search. It supports being invoked with multiple fragments of the
3+ // haystack and is capable of finding matches spanning multiple fragments.
4+ class StreamingKMP {
5+ public needle : string ;
6+ private _lookupVector : number [ ] ;
7+
8+ // In cases where we are fed a string that has a suffix that matches a prefix
9+ // of the needle, we're storing the index in the needle which we last matched.
10+ // Then when we get a new haystack, we start matching from that needle.
11+ private _lastMatchingIndex = 0 ;
12+
13+ constructor ( needle : string ) {
14+ this . needle = needle ;
15+ this . _lookupVector = this . _createLookupVector ( ) ;
16+ }
17+
18+ private _createLookupVector ( ) : number [ ] {
19+ const vector = new Array < number > ( this . needle . length ) ;
20+ let j = 0 ;
21+ vector [ 0 ] = 0 ;
22+
23+ for ( let i = 1 ; i < this . needle . length ; i ++ ) {
24+ while ( j > 0 && this . needle [ i ] !== this . needle [ j ] ) {
25+ j = vector [ j - 1 ] ;
26+ }
27+
28+ if ( this . needle [ i ] === this . needle [ j ] ) {
29+ j ++ ;
30+ }
31+
32+ vector [ i ] = j ;
33+ }
34+
35+ return vector ;
36+ }
37+
38+ // Returns the index in the haystackFragment **after** the needle.
39+ // This is done because the match may have occurred over multiple fragments,
40+ // so the index of the needle start would be negative.
41+ public match ( haystackFragment : string ) : number {
42+ let j = this . _lastMatchingIndex ; // index in needle
43+ let i = 0 ; // index in haystack
44+
45+ while ( i < haystackFragment . length ) {
46+ if ( haystackFragment [ i ] === this . needle [ j ] ) {
47+ i ++ ;
48+ j ++ ;
49+ }
50+
51+ if ( j === this . needle . length ) {
52+ this . _lastMatchingIndex = 0 ;
53+ return i ;
54+ }
55+
56+ if (
57+ i < haystackFragment . length &&
58+ haystackFragment [ i ] !== this . needle [ j ]
59+ ) {
60+ if ( j !== 0 ) {
61+ j = this . _lookupVector [ j - 1 ] ;
62+ } else {
63+ i ++ ;
64+ }
65+ }
66+ }
67+
68+ this . _lastMatchingIndex = j ;
69+ return - 1 ;
70+ }
71+
72+ public reset ( ) : void {
73+ this . _lastMatchingIndex = 0 ;
74+ }
75+ }
76+
77+ // This class is essentially a state machine that processes a stream of text fragments
78+ // and emitting a callback with the content between each start and end identifier. The
79+ // two states we have are:
80+ // 1. "waiting for start identifier" - `_matchedContent === undefined`
81+ // 2. "waiting for end identifier" - `_matchedContent !== undefined`
82+ // with the state transitioning from one to the other when the corresponding identifier
83+ // is matched in the fragment stream.
84+ class FragmentMatcher {
85+ private _startMatcher : StreamingKMP ;
86+ private _endMatcher : StreamingKMP ;
87+ private _matchedContent ?: string ;
88+ private _onContentMatched : ( content : string ) => void ;
89+ private _onFragmentProcessed : ( content : string ) => void ;
90+
91+ constructor ( {
92+ identifier,
93+ onContentMatched,
94+ onFragmentProcessed,
95+ } : {
96+ identifier : {
97+ start : string ;
98+ end : string ;
99+ } ;
100+ onContentMatched : ( content : string ) => void ;
101+ onFragmentProcessed : ( content : string ) => void ;
102+ } ) {
103+ this . _startMatcher = new StreamingKMP ( identifier . start ) ;
104+ this . _endMatcher = new StreamingKMP ( identifier . end ) ;
105+ this . _onContentMatched = onContentMatched ;
106+ this . _onFragmentProcessed = onFragmentProcessed ;
107+ }
108+
109+ private _contentMatched ( ) : void {
110+ const content = this . _matchedContent ;
111+ if ( content !== undefined ) {
112+ // Strip the trailing end identifier from the matched content
113+ this . _onContentMatched (
114+ content . slice ( 0 , content . length - this . _endMatcher . needle . length )
115+ ) ;
116+ }
117+
118+ this . _matchedContent = undefined ;
119+ this . _startMatcher . reset ( ) ;
120+ this . _endMatcher . reset ( ) ;
121+ }
122+
123+ // This needs to be invoked every time before we call `process` recursively or when `process`
124+ // completes processing the fragment. It will emit a notification to subscribers with the partial
125+ // fragment we've processed, regardless of whether there's a match or not.
126+ private _partialFragmentProcessed (
127+ fragment : string ,
128+ index : number | undefined = undefined
129+ ) : void {
130+ this . _onFragmentProcessed (
131+ index === undefined ? fragment : fragment . slice ( 0 , index )
132+ ) ;
133+ }
134+
135+ public process ( fragment : string ) : void {
136+ if ( this . _matchedContent === undefined ) {
137+ // We haven't matched the start identifier yet, so try and do that
138+ const startIndex = this . _startMatcher . match ( fragment ) ;
139+ if ( startIndex !== - 1 ) {
140+ // We found a match for the start identifier - update `_matchedContent` to an empty string
141+ // and recursively call `process` with the remainder of the fragment.
142+ this . _matchedContent = '' ;
143+ this . _partialFragmentProcessed ( fragment , startIndex ) ;
144+ this . process ( fragment . slice ( startIndex ) ) ;
145+ } else {
146+ this . _partialFragmentProcessed ( fragment ) ;
147+ }
148+ } else {
149+ const endIndex = this . _endMatcher . match ( fragment ) ;
150+ if ( endIndex !== - 1 ) {
151+ // We've matched the end - emit the matched content and continue processing the partial fragment
152+ this . _matchedContent += fragment . slice ( 0 , endIndex ) ;
153+ this . _partialFragmentProcessed ( fragment , endIndex ) ;
154+ this . _contentMatched ( ) ;
155+ this . process ( fragment . slice ( endIndex ) ) ;
156+ } else {
157+ // We haven't matched the end yet - append the fragment to the matched content and wait
158+ // for a future fragment to contain the end identifier.
159+ this . _matchedContent += fragment ;
160+ this . _partialFragmentProcessed ( fragment ) ;
161+ }
162+ }
163+ }
3164}
4165
5166/**
@@ -22,74 +183,13 @@ export async function processStreamWithIdentifiers({
22183 end : string ;
23184 } ;
24185} ) : Promise < void > {
25- const escapedIdentifierStart = escapeRegex ( identifier . start ) ;
26- const escapedIdentifierEnd = escapeRegex ( identifier . end ) ;
27- const regex = new RegExp (
28- `${ escapedIdentifierStart } ([\\s\\S]*?)${ escapedIdentifierEnd } ` ,
29- 'g'
30- ) ;
31-
32- let contentSinceLastIdentifier = '' ;
33- for await ( const fragment of inputIterable ) {
34- contentSinceLastIdentifier += fragment ;
186+ const fragmentMatcher = new FragmentMatcher ( {
187+ identifier,
188+ onContentMatched : onStreamIdentifier ,
189+ onFragmentProcessed : processStreamFragment ,
190+ } ) ;
35191
36- let lastIndex = 0 ;
37- let match : RegExpExecArray | null ;
38- while ( ( match = regex . exec ( contentSinceLastIdentifier ) ) !== null ) {
39- const endIndex = regex . lastIndex ;
40-
41- // Stream content up to the end of the identifier.
42- const contentToStream = contentSinceLastIdentifier . slice (
43- lastIndex ,
44- endIndex
45- ) ;
46- processStreamFragment ( contentToStream ) ;
47-
48- const identifierContent = match [ 1 ] ;
49- onStreamIdentifier ( identifierContent ) ;
50-
51- lastIndex = endIndex ;
52- }
53-
54- if ( lastIndex > 0 ) {
55- // Remove all of the processed content.
56- contentSinceLastIdentifier = contentSinceLastIdentifier . slice ( lastIndex ) ;
57- // Reset the regex.
58- regex . lastIndex = 0 ;
59- } else {
60- // Clear as much of the content as we can safely.
61- const maxUnprocessedLength = identifier . start . length - 1 ;
62- if ( contentSinceLastIdentifier . length > maxUnprocessedLength ) {
63- const identifierIndex = contentSinceLastIdentifier . indexOf (
64- identifier . start
65- ) ;
66- if ( identifierIndex > - 1 ) {
67- // We have an identifier, so clear up until the identifier.
68- const contentToStream = contentSinceLastIdentifier . slice (
69- 0 ,
70- identifierIndex
71- ) ;
72- processStreamFragment ( contentToStream ) ;
73- contentSinceLastIdentifier =
74- contentSinceLastIdentifier . slice ( identifierIndex ) ;
75- } else {
76- // No identifier, so clear up until the last maxUnprocessedLength.
77- const processUpTo =
78- contentSinceLastIdentifier . length - maxUnprocessedLength ;
79- const contentToStream = contentSinceLastIdentifier . slice (
80- 0 ,
81- processUpTo
82- ) ;
83- processStreamFragment ( contentToStream ) ;
84- contentSinceLastIdentifier =
85- contentSinceLastIdentifier . slice ( processUpTo ) ;
86- }
87- }
88- }
89- }
90-
91- // Finish up anything not streamed yet.
92- if ( contentSinceLastIdentifier . length > 0 ) {
93- processStreamFragment ( contentSinceLastIdentifier ) ;
192+ for await ( const fragment of inputIterable ) {
193+ fragmentMatcher . process ( fragment ) ;
94194 }
95195}
0 commit comments