From f5c7e8362ee59827fbfac8f6459789c7b85fa981 Mon Sep 17 00:00:00 2001 From: Nikola Irinchev Date: Fri, 27 Sep 2024 00:11:11 +0200 Subject: [PATCH 1/4] Replace regex fragment matching with streaming KMP --- src/participant/streamParsing.ts | 222 +++++++++++++++++++++---------- 1 file changed, 152 insertions(+), 70 deletions(-) diff --git a/src/participant/streamParsing.ts b/src/participant/streamParsing.ts index 93bb5dad9..c90eb97ed 100644 --- a/src/participant/streamParsing.ts +++ b/src/participant/streamParsing.ts @@ -1,5 +1,148 @@ -function escapeRegex(str: string): string { - return str.replace(/[.*+?^${}()|[\]\\]/g, '\\$&'); +// This is a stateful streaming implementation of the Knuth-Morris-Pratt algorithm +// for substring search. It supports being invoked with multiple fragments of the +// haystack and is capable of finding matches spanning multiple fragments. +class StreamingKMP { + public needle: string; + private _lookupVector: number[]; + + // In cases where we are fed a string that has a suffix that matches a prefix + // of the needle, we're storing the index in the needle which we last matched. + // Then when we get a new haystack, we start matching from that needle. + private _lastMatchingIndex = 0; + + constructor(needle: string) { + this.needle = needle; + this._lookupVector = this._createLookupVector(); + } + + private _createLookupVector(): number[] { + const vector = new Array(this.needle.length); + let j = 0; + vector[0] = 0; + + for (let i = 1; i < this.needle.length; i++) { + while (j > 0 && this.needle[i] !== this.needle[j]) { + j = vector[j - 1]; + } + + if (this.needle[i] === this.needle[j]) { + j++; + } + + vector[i] = j; + } + + return vector; + } + + // Returns the index in the haystackFragment **after** the needle. + // This is done because the match may have occurred over multiple fragments, + // so the index of the needle start would be negative. + public match(haystackFragment: string): number { + let j = this._lastMatchingIndex; // index in needle + let i = 0; // index in haystack + + while (i < haystackFragment.length) { + if (haystackFragment[i] === this.needle[j]) { + i++; + j++; + } + + if (j === this.needle.length) { + this._lastMatchingIndex = 0; + return i; + } + + if ( + i < haystackFragment.length && + haystackFragment[i] !== this.needle[j] + ) { + if (j !== 0) { + j = this._lookupVector[j - 1]; + } else { + i++; + } + } + } + + this._lastMatchingIndex = j; + return -1; + } + + public reset(): void { + this._lastMatchingIndex = 0; + } +} + +class FragmentMatcher { + private _startMatcher: StreamingKMP; + private _endMatcher: StreamingKMP; + private _matchedContent?: string; + private _onContentMatched: (content: string) => void; + + constructor({ + identifier, + onContentMatched, + }: { + identifier: { + start: string; + end: string; + }; + onContentMatched: (content: string) => void; + }) { + this._startMatcher = new StreamingKMP(identifier.start); + this._endMatcher = new StreamingKMP(identifier.end); + this._onContentMatched = onContentMatched; + } + + private _contentMatched(): void { + const content = this._matchedContent; + if (content !== undefined) { + // Strip the trailing end identifier from the matched content + this._onContentMatched( + content.slice(0, content.length - this._endMatcher.needle.length) + ); + } + + this._matchedContent = undefined; + this._startMatcher.reset(); + this._endMatcher.reset(); + } + + public process(fragment: string): void { + if (this._matchedContent === undefined) { + // We haven't matched the start identifier yet, so try and do that + const startIndex = this._startMatcher.match(fragment); + if (startIndex !== -1) { + let endIndex = this._endMatcher.match(fragment.slice(startIndex)); + if (endIndex !== -1) { + // This is the case where both the start and the end identifiers are contained in the same fragment. + // In this case, we emit the content between the two identifiers and continue processing the rest of the fragment. + + // endIndex is relative to the slice, so we need to add startIndex to it. + endIndex = startIndex + endIndex; + + this._matchedContent = fragment.slice(startIndex, endIndex); + this._contentMatched(); + this.process(fragment.slice(endIndex)); + } else { + // If we only matched the start, we add the partial fragment to the matched content and + // wait for another fragment to complete the match. + this._matchedContent = fragment.slice(startIndex); + } + } + } else { + const endIndex = this._endMatcher.match(fragment); + if (endIndex !== -1) { + // We've matched the end - emit the matched content and continue processing the partial fragment + this._matchedContent += fragment.slice(0, endIndex); + this._contentMatched(); + this.process(fragment.slice(endIndex)); + } else { + this._matchedContent += fragment; + } + } + } } /** @@ -22,74 +165,13 @@ export async function processStreamWithIdentifiers({ end: string; }; }): Promise { - const escapedIdentifierStart = escapeRegex(identifier.start); - const escapedIdentifierEnd = escapeRegex(identifier.end); - const regex = new RegExp( - `${escapedIdentifierStart}([\\s\\S]*?)${escapedIdentifierEnd}`, - 'g' - ); - - let contentSinceLastIdentifier = ''; - for await (const fragment of inputIterable) { - contentSinceLastIdentifier += fragment; - - let lastIndex = 0; - let match: RegExpExecArray | null; - while ((match = regex.exec(contentSinceLastIdentifier)) !== null) { - const endIndex = regex.lastIndex; - - // Stream content up to the end of the identifier. - const contentToStream = contentSinceLastIdentifier.slice( - lastIndex, - endIndex - ); - processStreamFragment(contentToStream); + const fragmentMatcher = new FragmentMatcher({ + identifier, + onContentMatched: onStreamIdentifier, + }); - const identifierContent = match[1]; - onStreamIdentifier(identifierContent); - - lastIndex = endIndex; - } - - if (lastIndex > 0) { - // Remove all of the processed content. - contentSinceLastIdentifier = contentSinceLastIdentifier.slice(lastIndex); - // Reset the regex. - regex.lastIndex = 0; - } else { - // Clear as much of the content as we can safely. - const maxUnprocessedLength = identifier.start.length - 1; - if (contentSinceLastIdentifier.length > maxUnprocessedLength) { - const identifierIndex = contentSinceLastIdentifier.indexOf( - identifier.start - ); - if (identifierIndex > -1) { - // We have an identifier, so clear up until the identifier. - const contentToStream = contentSinceLastIdentifier.slice( - 0, - identifierIndex - ); - processStreamFragment(contentToStream); - contentSinceLastIdentifier = - contentSinceLastIdentifier.slice(identifierIndex); - } else { - // No identifier, so clear up until the last maxUnprocessedLength. - const processUpTo = - contentSinceLastIdentifier.length - maxUnprocessedLength; - const contentToStream = contentSinceLastIdentifier.slice( - 0, - processUpTo - ); - processStreamFragment(contentToStream); - contentSinceLastIdentifier = - contentSinceLastIdentifier.slice(processUpTo); - } - } - } - } - - // Finish up anything not streamed yet. - if (contentSinceLastIdentifier.length > 0) { - processStreamFragment(contentSinceLastIdentifier); + for await (const fragment of inputIterable) { + processStreamFragment(fragment); + fragmentMatcher.process(fragment); } } From 6d6770da0d8e05d5e473e105bb23bd0d806ff460 Mon Sep 17 00:00:00 2001 From: Nikola Irinchev Date: Fri, 27 Sep 2024 01:42:02 +0200 Subject: [PATCH 2/4] Simplify content capturing --- src/participant/streamParsing.ts | 22 ++++++---------------- 1 file changed, 6 insertions(+), 16 deletions(-) diff --git a/src/participant/streamParsing.ts b/src/participant/streamParsing.ts index c90eb97ed..47b8cdece 100644 --- a/src/participant/streamParsing.ts +++ b/src/participant/streamParsing.ts @@ -114,22 +114,10 @@ class FragmentMatcher { // We haven't matched the start identifier yet, so try and do that const startIndex = this._startMatcher.match(fragment); if (startIndex !== -1) { - let endIndex = this._endMatcher.match(fragment.slice(startIndex)); - if (endIndex !== -1) { - // This is the case where both the start and the end identifiers are contained in the same fragment. - // In this case, we emit the content between the two identifiers and continue processing the rest of the fragment. - - // endIndex is relative to the slice, so we need to add startIndex to it. - endIndex = startIndex + endIndex; - - this._matchedContent = fragment.slice(startIndex, endIndex); - this._contentMatched(); - this.process(fragment.slice(endIndex)); - } else { - // If we only matched the start, we add the partial fragment to the matched content and - // wait for another fragment to complete the match. - this._matchedContent = fragment.slice(startIndex); - } + // We found a match for the start identifier - update `_matchedContent` to an empty string + // and recursively call `process` with the remainder of the fragment. + this._matchedContent = ''; + this.process(fragment.slice(startIndex)); } } else { const endIndex = this._endMatcher.match(fragment); @@ -139,6 +127,8 @@ class FragmentMatcher { this._contentMatched(); this.process(fragment.slice(endIndex)); } else { + // We haven't matched the end yet - append the fragment to the matched content and wait + // for a future fragment to contain the end identifier. this._matchedContent += fragment; } } From c8f09f4e253816dd9e757193716b21d3293ad8ab Mon Sep 17 00:00:00 2001 From: Nikola Irinchev Date: Mon, 21 Oct 2024 15:13:59 +0200 Subject: [PATCH 3/4] Add an explanation for FragmentMatcher --- src/participant/streamParsing.ts | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/src/participant/streamParsing.ts b/src/participant/streamParsing.ts index 47b8cdece..6f1f9987f 100644 --- a/src/participant/streamParsing.ts +++ b/src/participant/streamParsing.ts @@ -74,6 +74,13 @@ class StreamingKMP { } } +// This class is essentially a state machine that processes a stream of text fragments +// and emitting a callback with the content between each start and end identifier. The +// two states we have are: +// 1. "waiting for start identifier" - `_matchedContent === undefined` +// 2. "waiting for end identifier" - `_matchedContent !== undefined` +// with the state transitioning from one to the other when the corresponding identifier +// is matched in the fragment stream. class FragmentMatcher { private _startMatcher: StreamingKMP; private _endMatcher: StreamingKMP; From 0b1c66a213f3d551c3e557c9c8bcfd73fffb1c06 Mon Sep 17 00:00:00 2001 From: Nikola Irinchev Date: Tue, 22 Oct 2024 14:10:26 +0200 Subject: [PATCH 4/4] Handle multiple code blocks per fragment --- src/participant/streamParsing.ts | 23 +++++- .../suite/participant/streamParsing.test.ts | 76 +++++++++++++++++++ 2 files changed, 98 insertions(+), 1 deletion(-) diff --git a/src/participant/streamParsing.ts b/src/participant/streamParsing.ts index 6f1f9987f..d8131a759 100644 --- a/src/participant/streamParsing.ts +++ b/src/participant/streamParsing.ts @@ -86,20 +86,24 @@ class FragmentMatcher { private _endMatcher: StreamingKMP; private _matchedContent?: string; private _onContentMatched: (content: string) => void; + private _onFragmentProcessed: (content: string) => void; constructor({ identifier, onContentMatched, + onFragmentProcessed, }: { identifier: { start: string; end: string; }; onContentMatched: (content: string) => void; + onFragmentProcessed: (content: string) => void; }) { this._startMatcher = new StreamingKMP(identifier.start); this._endMatcher = new StreamingKMP(identifier.end); this._onContentMatched = onContentMatched; + this._onFragmentProcessed = onFragmentProcessed; } private _contentMatched(): void { @@ -116,6 +120,18 @@ class FragmentMatcher { this._endMatcher.reset(); } + // This needs to be invoked every time before we call `process` recursively or when `process` + // completes processing the fragment. It will emit a notification to subscribers with the partial + // fragment we've processed, regardless of whether there's a match or not. + private _partialFragmentProcessed( + fragment: string, + index: number | undefined = undefined + ): void { + this._onFragmentProcessed( + index === undefined ? fragment : fragment.slice(0, index) + ); + } + public process(fragment: string): void { if (this._matchedContent === undefined) { // We haven't matched the start identifier yet, so try and do that @@ -124,19 +140,24 @@ class FragmentMatcher { // We found a match for the start identifier - update `_matchedContent` to an empty string // and recursively call `process` with the remainder of the fragment. this._matchedContent = ''; + this._partialFragmentProcessed(fragment, startIndex); this.process(fragment.slice(startIndex)); + } else { + this._partialFragmentProcessed(fragment); } } else { const endIndex = this._endMatcher.match(fragment); if (endIndex !== -1) { // We've matched the end - emit the matched content and continue processing the partial fragment this._matchedContent += fragment.slice(0, endIndex); + this._partialFragmentProcessed(fragment, endIndex); this._contentMatched(); this.process(fragment.slice(endIndex)); } else { // We haven't matched the end yet - append the fragment to the matched content and wait // for a future fragment to contain the end identifier. this._matchedContent += fragment; + this._partialFragmentProcessed(fragment); } } } @@ -165,10 +186,10 @@ export async function processStreamWithIdentifiers({ const fragmentMatcher = new FragmentMatcher({ identifier, onContentMatched: onStreamIdentifier, + onFragmentProcessed: processStreamFragment, }); for await (const fragment of inputIterable) { - processStreamFragment(fragment); fragmentMatcher.process(fragment); } } diff --git a/src/test/suite/participant/streamParsing.test.ts b/src/test/suite/participant/streamParsing.test.ts index 66208ecdd..96d274d00 100644 --- a/src/test/suite/participant/streamParsing.test.ts +++ b/src/test/suite/participant/streamParsing.test.ts @@ -216,4 +216,80 @@ suite('processStreamWithIdentifiers', () => { expect(fragmentsProcessed.join('')).to.deep.equal(inputFragments.join('')); expect(identifiersStreamed).to.deep.equal(['\ncode1\n', '\ncode2\n']); }); + + test('one fragment containing multiple code blocks emits event in correct order', async () => { + // In case we have one fragment containing multiple code blocks, we want to make sure that + // fragment notifications and identifier notifications arrive in the right order so that we're + // adding code actions after the correct subfragment. + // For example: + // 'Text before code.\n```js\ncode1\n```\nText between code.\n```js\ncode2\n```\nText after code.' + // + // should emit: + // + // processStreamFragment: 'Text before code.\n```js\ncode1\n```' + // onStreamIdentifier: '\ncode1\n' + // processStreamFragment: '\nText between code.\n```js\ncode2\n```' + // onStreamIdentifier: '\ncode2\n' + // processStreamFragment: '\nText after code.' + // + // in that order to ensure we add each code action immediately after the code block + // rather than add both at the end. + + const inputFragments = [ + 'Text before code.\n```js\ncode1\n```\nText between code.\n```js\ncode2\n```\nText after code.', + ]; + + const inputIterable = asyncIterableFromArray(inputFragments); + const identifier = { start: '```js', end: '```' }; + + const fragmentsEmitted: { + source: 'processStreamFragment' | 'onStreamIdentifier'; + content: string; + }[] = []; + + const getFragmentHandler = ( + source: 'processStreamFragment' | 'onStreamIdentifier' + ): ((fragment: string) => void) => { + return (fragment: string): void => { + // It's an implementation detail, but the way the code is structured today, we're splitting the emitted fragments + // whenever we find either a start or end identifier. This is irrelevant as long as we're emitting the entirety of + // the text until the end of the code block in `processStreamFragment` and then the code block itself in `onStreamIdentifier`. + // With the code below, we're combining all subfragments with the same source to make the test verify the desired + // behavior rather than the actual implementation. + const lastFragment = fragmentsEmitted[fragmentsEmitted.length - 1]; + if (lastFragment?.source === source) { + lastFragment.content += fragment; + } else { + fragmentsEmitted.push({ source, content: fragment }); + } + }; + }; + + await processStreamWithIdentifiers({ + processStreamFragment: getFragmentHandler('processStreamFragment'), + onStreamIdentifier: getFragmentHandler('onStreamIdentifier'), + inputIterable, + identifier, + }); + + expect(fragmentsEmitted).to.have.length(5); + expect(fragmentsEmitted[0].source).to.equal('processStreamFragment'); + expect(fragmentsEmitted[0].content).to.equal( + 'Text before code.\n```js\ncode1\n```' + ); + + expect(fragmentsEmitted[1].source).to.equal('onStreamIdentifier'); + expect(fragmentsEmitted[1].content).to.equal('\ncode1\n'); + + expect(fragmentsEmitted[2].source).to.equal('processStreamFragment'); + expect(fragmentsEmitted[2].content).to.equal( + '\nText between code.\n```js\ncode2\n```' + ); + + expect(fragmentsEmitted[3].source).to.equal('onStreamIdentifier'); + expect(fragmentsEmitted[3].content).to.equal('\ncode2\n'); + + expect(fragmentsEmitted[4].source).to.equal('processStreamFragment'); + expect(fragmentsEmitted[4].content).to.equal('\nText after code.'); + }); });