Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
240 changes: 170 additions & 70 deletions src/participant/streamParsing.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,166 @@
function escapeRegex(str: string): string {
return str.replace(/[.*+?^${}()|[\]\\]/g, '\\$&');
// This is a stateful streaming implementation of the Knuth-Morris-Pratt algorithm
// for substring search. It supports being invoked with multiple fragments of the
// haystack and is capable of finding matches spanning multiple fragments.
class StreamingKMP {
public needle: string;
private _lookupVector: number[];

// In cases where we are fed a string that has a suffix that matches a prefix
// of the needle, we're storing the index in the needle which we last matched.
// Then when we get a new haystack, we start matching from that needle.
private _lastMatchingIndex = 0;

constructor(needle: string) {
this.needle = needle;
this._lookupVector = this._createLookupVector();
}

private _createLookupVector(): number[] {
const vector = new Array<number>(this.needle.length);
let j = 0;
vector[0] = 0;

for (let i = 1; i < this.needle.length; i++) {
while (j > 0 && this.needle[i] !== this.needle[j]) {
j = vector[j - 1];
}

if (this.needle[i] === this.needle[j]) {
j++;
}

vector[i] = j;
}

return vector;
}

// Returns the index in the haystackFragment **after** the needle.
// This is done because the match may have occurred over multiple fragments,
// so the index of the needle start would be negative.
public match(haystackFragment: string): number {
let j = this._lastMatchingIndex; // index in needle
let i = 0; // index in haystack

while (i < haystackFragment.length) {
if (haystackFragment[i] === this.needle[j]) {
i++;
j++;
}

if (j === this.needle.length) {
this._lastMatchingIndex = 0;
return i;
}

if (
i < haystackFragment.length &&
haystackFragment[i] !== this.needle[j]
) {
if (j !== 0) {
j = this._lookupVector[j - 1];
} else {
i++;
}
}
}

this._lastMatchingIndex = j;
return -1;
}

public reset(): void {
this._lastMatchingIndex = 0;
}
}

// This class is essentially a state machine that processes a stream of text fragments
// and emitting a callback with the content between each start and end identifier. The
// two states we have are:
// 1. "waiting for start identifier" - `_matchedContent === undefined`
// 2. "waiting for end identifier" - `_matchedContent !== undefined`
// with the state transitioning from one to the other when the corresponding identifier
// is matched in the fragment stream.
class FragmentMatcher {
private _startMatcher: StreamingKMP;
private _endMatcher: StreamingKMP;
private _matchedContent?: string;
private _onContentMatched: (content: string) => void;
private _onFragmentProcessed: (content: string) => void;

constructor({
identifier,
onContentMatched,
onFragmentProcessed,
}: {
identifier: {
start: string;
end: string;
};
onContentMatched: (content: string) => void;
onFragmentProcessed: (content: string) => void;
}) {
this._startMatcher = new StreamingKMP(identifier.start);
this._endMatcher = new StreamingKMP(identifier.end);
this._onContentMatched = onContentMatched;
this._onFragmentProcessed = onFragmentProcessed;
}

private _contentMatched(): void {
const content = this._matchedContent;
if (content !== undefined) {
// Strip the trailing end identifier from the matched content
this._onContentMatched(
content.slice(0, content.length - this._endMatcher.needle.length)
);
}

this._matchedContent = undefined;
this._startMatcher.reset();
this._endMatcher.reset();
}

// This needs to be invoked every time before we call `process` recursively or when `process`
// completes processing the fragment. It will emit a notification to subscribers with the partial
// fragment we've processed, regardless of whether there's a match or not.
private _partialFragmentProcessed(
fragment: string,
index: number | undefined = undefined
): void {
this._onFragmentProcessed(
index === undefined ? fragment : fragment.slice(0, index)
);
}

public process(fragment: string): void {
if (this._matchedContent === undefined) {
// We haven't matched the start identifier yet, so try and do that
const startIndex = this._startMatcher.match(fragment);
if (startIndex !== -1) {
// We found a match for the start identifier - update `_matchedContent` to an empty string
// and recursively call `process` with the remainder of the fragment.
this._matchedContent = '';
this._partialFragmentProcessed(fragment, startIndex);
this.process(fragment.slice(startIndex));
} else {
this._partialFragmentProcessed(fragment);
}
} else {
const endIndex = this._endMatcher.match(fragment);
if (endIndex !== -1) {
// We've matched the end - emit the matched content and continue processing the partial fragment
this._matchedContent += fragment.slice(0, endIndex);
this._partialFragmentProcessed(fragment, endIndex);
this._contentMatched();
this.process(fragment.slice(endIndex));
} else {
// We haven't matched the end yet - append the fragment to the matched content and wait
// for a future fragment to contain the end identifier.
this._matchedContent += fragment;
this._partialFragmentProcessed(fragment);
}
}
}
}

/**
Expand All @@ -22,74 +183,13 @@ export async function processStreamWithIdentifiers({
end: string;
};
}): Promise<void> {
const escapedIdentifierStart = escapeRegex(identifier.start);
const escapedIdentifierEnd = escapeRegex(identifier.end);
const regex = new RegExp(
`${escapedIdentifierStart}([\\s\\S]*?)${escapedIdentifierEnd}`,
'g'
);

let contentSinceLastIdentifier = '';
for await (const fragment of inputIterable) {
contentSinceLastIdentifier += fragment;
const fragmentMatcher = new FragmentMatcher({
identifier,
onContentMatched: onStreamIdentifier,
onFragmentProcessed: processStreamFragment,
});

let lastIndex = 0;
let match: RegExpExecArray | null;
while ((match = regex.exec(contentSinceLastIdentifier)) !== null) {
const endIndex = regex.lastIndex;

// Stream content up to the end of the identifier.
const contentToStream = contentSinceLastIdentifier.slice(
lastIndex,
endIndex
);
processStreamFragment(contentToStream);

const identifierContent = match[1];
onStreamIdentifier(identifierContent);

lastIndex = endIndex;
}

if (lastIndex > 0) {
// Remove all of the processed content.
contentSinceLastIdentifier = contentSinceLastIdentifier.slice(lastIndex);
// Reset the regex.
regex.lastIndex = 0;
} else {
// Clear as much of the content as we can safely.
const maxUnprocessedLength = identifier.start.length - 1;
if (contentSinceLastIdentifier.length > maxUnprocessedLength) {
const identifierIndex = contentSinceLastIdentifier.indexOf(
identifier.start
);
if (identifierIndex > -1) {
// We have an identifier, so clear up until the identifier.
const contentToStream = contentSinceLastIdentifier.slice(
0,
identifierIndex
);
processStreamFragment(contentToStream);
contentSinceLastIdentifier =
contentSinceLastIdentifier.slice(identifierIndex);
} else {
// No identifier, so clear up until the last maxUnprocessedLength.
const processUpTo =
contentSinceLastIdentifier.length - maxUnprocessedLength;
const contentToStream = contentSinceLastIdentifier.slice(
0,
processUpTo
);
processStreamFragment(contentToStream);
contentSinceLastIdentifier =
contentSinceLastIdentifier.slice(processUpTo);
}
}
}
}

// Finish up anything not streamed yet.
if (contentSinceLastIdentifier.length > 0) {
processStreamFragment(contentSinceLastIdentifier);
for await (const fragment of inputIterable) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One change in behavior that will happen is when a fragment is very large, and could contain two code blocks. It would now print all of the fragment before it adds the actions.
This won't impact our use of this since the fragments are pretty much always 2-4 characters.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for spotting that. The fix for it was simple enough, so I added it anyway just to be on the safe side.

fragmentMatcher.process(fragment);
}
}
76 changes: 76 additions & 0 deletions src/test/suite/participant/streamParsing.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -216,4 +216,80 @@ suite('processStreamWithIdentifiers', () => {
expect(fragmentsProcessed.join('')).to.deep.equal(inputFragments.join(''));
expect(identifiersStreamed).to.deep.equal(['\ncode1\n', '\ncode2\n']);
});

test('one fragment containing multiple code blocks emits event in correct order', async () => {
// In case we have one fragment containing multiple code blocks, we want to make sure that
// fragment notifications and identifier notifications arrive in the right order so that we're
// adding code actions after the correct subfragment.
// For example:
// 'Text before code.\n```js\ncode1\n```\nText between code.\n```js\ncode2\n```\nText after code.'
//
// should emit:
//
// processStreamFragment: 'Text before code.\n```js\ncode1\n```'
// onStreamIdentifier: '\ncode1\n'
// processStreamFragment: '\nText between code.\n```js\ncode2\n```'
// onStreamIdentifier: '\ncode2\n'
// processStreamFragment: '\nText after code.'
//
// in that order to ensure we add each code action immediately after the code block
// rather than add both at the end.

const inputFragments = [
'Text before code.\n```js\ncode1\n```\nText between code.\n```js\ncode2\n```\nText after code.',
];

const inputIterable = asyncIterableFromArray<string>(inputFragments);
const identifier = { start: '```js', end: '```' };

const fragmentsEmitted: {
source: 'processStreamFragment' | 'onStreamIdentifier';
content: string;
}[] = [];

const getFragmentHandler = (
source: 'processStreamFragment' | 'onStreamIdentifier'
): ((fragment: string) => void) => {
return (fragment: string): void => {
// It's an implementation detail, but the way the code is structured today, we're splitting the emitted fragments
// whenever we find either a start or end identifier. This is irrelevant as long as we're emitting the entirety of
// the text until the end of the code block in `processStreamFragment` and then the code block itself in `onStreamIdentifier`.
// With the code below, we're combining all subfragments with the same source to make the test verify the desired
// behavior rather than the actual implementation.
const lastFragment = fragmentsEmitted[fragmentsEmitted.length - 1];
if (lastFragment?.source === source) {
lastFragment.content += fragment;
} else {
fragmentsEmitted.push({ source, content: fragment });
}
};
};

await processStreamWithIdentifiers({
processStreamFragment: getFragmentHandler('processStreamFragment'),
onStreamIdentifier: getFragmentHandler('onStreamIdentifier'),
inputIterable,
identifier,
});

expect(fragmentsEmitted).to.have.length(5);
expect(fragmentsEmitted[0].source).to.equal('processStreamFragment');
expect(fragmentsEmitted[0].content).to.equal(
'Text before code.\n```js\ncode1\n```'
);

expect(fragmentsEmitted[1].source).to.equal('onStreamIdentifier');
expect(fragmentsEmitted[1].content).to.equal('\ncode1\n');

expect(fragmentsEmitted[2].source).to.equal('processStreamFragment');
expect(fragmentsEmitted[2].content).to.equal(
'\nText between code.\n```js\ncode2\n```'
);

expect(fragmentsEmitted[3].source).to.equal('onStreamIdentifier');
expect(fragmentsEmitted[3].content).to.equal('\ncode2\n');

expect(fragmentsEmitted[4].source).to.equal('processStreamFragment');
expect(fragmentsEmitted[4].content).to.equal('\nText after code.');
});
});
Loading