Skip to content

Commit aa338f8

Browse files
authored
[components] move cache code to worker + to types.d.ts (#31)
* move cache code to worker + to types.d.ts * remove unused headers and depend on hyparquet
1 parent c536f49 commit aa338f8

File tree

9 files changed

+83
-196
lines changed

9 files changed

+83
-196
lines changed

packages/components/src/components/Cell.tsx

Lines changed: 3 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,8 @@
11
import { asyncRows } from 'hightable'
2-
import { parquetMetadataAsync } from 'hyparquet'
2+
import { asyncBufferFromUrl, parquetMetadataAsync } from 'hyparquet'
33
import { useEffect, useState } from 'react'
44
import { FileKey, UrlKey } from '../lib/key.ts'
55
import { parquetDataFrame } from '../lib/tableProvider.js'
6-
import { asyncBufferFromUrl } from '../lib/utils.ts'
76
import Breadcrumb from './Breadcrumb.tsx'
87
import Layout from './Layout.tsx'
98

@@ -37,15 +36,8 @@ export default function CellView({ parsedKey, row, col }: CellProps) {
3736
try {
3837
// TODO: handle first row > 100kb
3938
setProgress(0.25)
40-
const asyncBuffer = await asyncBufferFromUrl({
41-
url: resolveUrl,
42-
headers: {},
43-
})
44-
const from = {
45-
url: resolveUrl,
46-
byteLength: asyncBuffer.byteLength,
47-
headers: {},
48-
}
39+
const asyncBuffer = await asyncBufferFromUrl(resolveUrl)
40+
const from = { url: resolveUrl, byteLength: asyncBuffer.byteLength }
4941
setProgress(0.5)
5042
const metadata = await parquetMetadataAsync(asyncBuffer)
5143
setProgress(0.75)

packages/components/src/components/viewers/ParquetView.tsx

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,8 @@
11
import HighTable, { DataFrame, rowCache } from 'hightable'
2-
import { parquetMetadataAsync } from 'hyparquet'
2+
import { asyncBufferFromUrl, parquetMetadataAsync } from 'hyparquet'
33
import React, { useCallback, useEffect, useState } from 'react'
44
import { FileKey, UrlKey } from '../../lib/key.ts'
55
import { parquetDataFrame } from '../../lib/tableProvider.ts'
6-
import { asyncBufferFromUrl } from '../../lib/utils.ts'
76
import { Spinner } from '../Layout.tsx'
87
import ContentHeader, { ContentSize } from './ContentHeader.tsx'
98

@@ -35,8 +34,8 @@ export default function ParquetView({ parsedKey, setProgress, setError }: Viewer
3534
async function loadParquetDataFrame() {
3635
try {
3736
setProgress(0.33)
38-
const asyncBuffer = await asyncBufferFromUrl({ url: resolveUrl, headers: {} })
39-
const from = { url: resolveUrl, byteLength: asyncBuffer.byteLength, headers: {} }
37+
const asyncBuffer = await asyncBufferFromUrl(resolveUrl)
38+
const from = { url: resolveUrl, byteLength: asyncBuffer.byteLength }
4039
setProgress(0.66)
4140
const metadata = await parquetMetadataAsync(asyncBuffer)
4241
let dataframe = parquetDataFrame(from, metadata)

packages/components/src/lib/tableProvider.ts

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,14 @@
11
import { DataFrame, ResolvablePromise, resolvablePromise } from 'hightable'
22
import { FileMetaData, parquetSchema } from 'hyparquet'
3-
import { AsyncBufferFrom, parquetQueryWorker, parquetSortIndexWorker } from '../workers/parquetWorkerClient.ts'
3+
import { parquetQueryWorker, parquetSortIndexWorker } from '../workers/parquetWorkerClient.ts'
4+
import type { AsyncBufferFromUrl } from '../workers/types.d.ts'
45

56
type ResolvableRow = Record<string, ResolvablePromise<unknown>>;
67

78
/**
89
* Convert a parquet file into a dataframe.
910
*/
10-
export function parquetDataFrame(from: AsyncBufferFrom, metadata: FileMetaData): DataFrame {
11+
export function parquetDataFrame(from: AsyncBufferFromUrl, metadata: FileMetaData): DataFrame {
1112
const { children } = parquetSchema(metadata)
1213
const header = children.map(child => child.element.name)
1314
const sortCache = new Map<string, Promise<number[]>>()
Lines changed: 0 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -1,57 +1,7 @@
1-
import { AsyncBuffer } from 'hyparquet'
21

32
/**
43
* Helper function to join class names
54
*/
65
export function cn(...names: (string | undefined | false)[]): string {
76
return names.filter((n) => n).join(' ')
87
}
9-
10-
interface AsyncBufferFromUrlOptions {
11-
url: string;
12-
byteLength?: number;
13-
headers?: Record<string, string>;
14-
}
15-
16-
/**
17-
* Get the byte length of a URL using a HEAD request.
18-
*
19-
* @param {string} url
20-
* @returns {Promise<number>}
21-
*/
22-
export async function byteLengthFromUrl(
23-
url: globalThis.RequestInfo | URL,
24-
init?: globalThis.RequestInit,
25-
): Promise<number> {
26-
return await fetch(url, { ...init, method: 'HEAD' }).then((res) => {
27-
if (!res.ok) throw new Error(`fetch head failed ${res.status.toString()}`)
28-
const length = res.headers.get('Content-Length')
29-
if (!length) throw new Error('missing content length')
30-
return parseInt(length)
31-
})
32-
}
33-
34-
export async function asyncBufferFromUrl({
35-
url,
36-
byteLength,
37-
headers,
38-
}: AsyncBufferFromUrlOptions): Promise<AsyncBuffer> {
39-
// byte length from HEAD request
40-
byteLength ??= await byteLengthFromUrl(url, { headers })
41-
return {
42-
byteLength,
43-
async slice(start, end) {
44-
// fetch byte range from url
45-
const endStr = end === undefined ? '' : end - 1
46-
const res = await fetch(url, {
47-
headers: {
48-
...headers,
49-
range: `bytes=${start.toString()}-${endStr.toString()}`,
50-
},
51-
})
52-
if (!res.ok || !res.body)
53-
throw new Error(`fetch failed ${res.status.toString()}`)
54-
return res.arrayBuffer()
55-
},
56-
}
57-
}

packages/components/src/workers/parquetWorker.ts

Lines changed: 28 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,14 @@
1-
import { ColumnData, parquetQuery } from 'hyparquet'
1+
import { AsyncBuffer, ColumnData, asyncBufferFromUrl, cachedAsyncBuffer, parquetQuery } from 'hyparquet'
22
import { compressors } from 'hyparquet-compressors'
3-
import {
3+
// import { asyncBufferFromUrl } from '../lib/utils.ts'
4+
import type {
5+
AsyncBufferFromUrl,
46
ChunkMessage,
57
ErrorMessage,
68
IndicesMessage,
79
ParquetReadWorkerOptions,
810
ResultMessage,
9-
asyncBufferFrom,
10-
compare,
11-
} from './parquetWorkerClient.ts'
11+
} from './types.d.ts'
1212

1313
function postChunkMessage ({ chunk, queryId }: ChunkMessage) {
1414
self.postMessage({ chunk, queryId })
@@ -23,6 +23,9 @@ function postIndicesMessage ({ indices, queryId }: IndicesMessage) {
2323
self.postMessage({ indices, queryId })
2424
}
2525

26+
// Cache for AsyncBuffers
27+
const cache = new Map<string, Promise<AsyncBuffer>>()
28+
2629
self.onmessage = async ({
2730
data,
2831
}: {
@@ -83,3 +86,23 @@ self.onmessage = async ({
8386
}
8487
}
8588
}
89+
90+
function compare<T>(a: T, b: T): number {
91+
if (a < b) return -1
92+
if (a > b) return 1
93+
return 1 // TODO: how to handle nulls?
94+
}
95+
96+
/**
97+
* Convert AsyncBufferFrom to AsyncBuffer and cache results.
98+
*/
99+
function asyncBufferFrom(
100+
from: AsyncBufferFromUrl,
101+
): Promise<AsyncBuffer> {
102+
const key = JSON.stringify(from)
103+
const cached = cache.get(key)
104+
if (cached) return cached
105+
const asyncBuffer = asyncBufferFromUrl(from.url, from.byteLength).then(cachedAsyncBuffer)
106+
cache.set(key, asyncBuffer)
107+
return asyncBuffer
108+
}
Lines changed: 2 additions & 120 deletions
Original file line numberDiff line numberDiff line change
@@ -1,49 +1,8 @@
11
import ParquetWorker from './parquetWorker?worker&inline'
22
/// ^ the worker is bundled with the main thread code (inline) which is easier for users to import
33
/// (no need to copy the worker file to the right place)
4-
import { AsyncBuffer, ColumnData, FileMetaData, ParquetReadOptions } from 'hyparquet'
5-
import { asyncBufferFromUrl } from '../lib/utils.ts'
6-
7-
// Serializable constructor for AsyncBuffers
8-
export interface AsyncBufferFrom {
9-
url: string
10-
byteLength: number
11-
headers?: Record<string, string>
12-
}
13-
// Same as ParquetReadOptions, but AsyncBufferFrom instead of AsyncBuffer
14-
export interface ParquetReadWorkerOptions extends Omit<ParquetReadOptions, 'file'> {
15-
from: AsyncBufferFrom
16-
orderBy?: string
17-
sortIndex?: boolean
18-
}
19-
// Row is defined in hightable, but not exported + we change any to unknown
20-
export type Row = Record<string, unknown>;
21-
22-
interface Message {
23-
queryId: number
24-
}
25-
export interface ChunkMessage extends Message {
26-
chunk: ColumnData
27-
}
28-
export interface ResultMessage extends Message {
29-
result: Row[]
30-
}
31-
export interface IndicesMessage extends Message {
32-
indices: number[]
33-
}
34-
export interface ErrorMessage extends Message {
35-
error: Error
36-
}
37-
38-
export type ParquetMessage = ChunkMessage | ResultMessage | ErrorMessage
39-
export type SortParquetMessage = IndicesMessage | ErrorMessage
40-
41-
export interface ParquetSortIndexOptions {
42-
metadata: FileMetaData
43-
from: AsyncBufferFrom
44-
orderBy: string
45-
}
46-
4+
import { ColumnData } from 'hyparquet'
5+
import type { ParquetMessage, ParquetReadWorkerOptions, ParquetSortIndexOptions, Row, SortParquetMessage } from './types.d.ts'
476

487
let worker: Worker | undefined
498
let nextQueryId = 0
@@ -99,7 +58,6 @@ function getWorker() {
9958
return worker
10059
}
10160

102-
10361
/**
10462
* Presents almost the same interface as parquetRead, but runs in a worker.
10563
* This is useful for reading large parquet files without blocking the main thread.
@@ -143,79 +101,3 @@ export function parquetSortIndexWorker({ metadata, from, orderBy }: ParquetSortI
143101
})
144102
})
145103
}
146-
147-
/**
148-
* Convert AsyncBufferFrom to AsyncBuffer and cache results.
149-
*/
150-
export function asyncBufferFrom(
151-
from: AsyncBufferFrom,
152-
): Promise<AsyncBuffer> {
153-
const key = JSON.stringify(from)
154-
const cached = cache.get(key)
155-
if (cached) return cached
156-
const asyncBuffer = asyncBufferFromUrl(from).then(cachedAsyncBuffer)
157-
cache.set(key, asyncBuffer)
158-
return asyncBuffer
159-
}
160-
const cache = new Map<string, Promise<AsyncBuffer>>()
161-
162-
export function compare<T>(a: T, b: T): number {
163-
if (a < b) return -1
164-
if (a > b) return 1
165-
return 1 // TODO: how to handle nulls?
166-
}
167-
168-
// TODO(SL): once the types in cachedAsyncBuffer are fixed, import all the following from hyparquet
169-
type Awaitable<T> = T | Promise<T>;
170-
171-
function cachedAsyncBuffer(asyncBuffer: AsyncBuffer): AsyncBuffer {
172-
const cache = new Map<string, Awaitable<ArrayBuffer>>()
173-
const { byteLength } = asyncBuffer
174-
return {
175-
byteLength,
176-
/**
177-
* @param {number} start
178-
* @param {number} [end]
179-
* @returns {Awaitable<ArrayBuffer>}
180-
*/
181-
slice(start: number, end?: number): Awaitable<ArrayBuffer> {
182-
const key = cacheKey(start, end, byteLength)
183-
const cached = cache.get(key)
184-
if (cached) return cached
185-
// cache miss, read from file
186-
const promise = asyncBuffer.slice(start, end)
187-
cache.set(key, promise)
188-
return promise
189-
},
190-
}
191-
}
192-
193-
/**
194-
* Returns canonical cache key for a byte range 'start,end'.
195-
* Normalize int-range and suffix-range requests to the same key.
196-
*
197-
* @param {number} start start byte of range
198-
* @param {number} [end] end byte of range, or undefined for suffix range
199-
* @param {number} [size] size of file, or undefined for suffix range
200-
* @returns {string}
201-
*/
202-
function cacheKey(start: number, end?: number, size?: number): string {
203-
if (start < 0) {
204-
if (end !== undefined)
205-
throw new Error(
206-
`invalid suffix range [${start.toString()}, ${end.toString()}]`,
207-
)
208-
if (size === undefined) return `${start.toString()},`
209-
return `${(size + start).toString()},${size.toString()}`
210-
} else if (end !== undefined) {
211-
if (start > end)
212-
throw new Error(
213-
`invalid empty range [${start.toString()}, ${end.toString()}]`,
214-
)
215-
return `${start.toString()},${end.toString()}`
216-
} else if (size === undefined) {
217-
return `${start.toString()},`
218-
} else {
219-
return `${start.toString()},${size.toString()}`
220-
}
221-
}
Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
import { ColumnData, FileMetaData, ParquetReadOptions } from 'hyparquet'
2+
3+
// Serializable constructor for AsyncBuffers
4+
export interface AsyncBufferFromUrl {
5+
url: string
6+
byteLength: number
7+
}
8+
// Same as ParquetReadOptions, but AsyncBufferFrom instead of AsyncBuffer
9+
export interface ParquetReadWorkerOptions extends Omit<ParquetReadOptions, 'file'> {
10+
from: AsyncBufferFromUrl
11+
orderBy?: string
12+
sortIndex?: boolean
13+
}
14+
// Row is defined in hightable, but not exported + we change any to unknown
15+
export type Row = Record<string, unknown> ;
16+
17+
interface Message {
18+
queryId: number
19+
}
20+
export interface ChunkMessage extends Message {
21+
chunk: ColumnData
22+
}
23+
export interface ResultMessage extends Message {
24+
result: Row[]
25+
}
26+
export interface IndicesMessage extends Message {
27+
indices: number[]
28+
}
29+
export interface ErrorMessage extends Message {
30+
error: Error
31+
}
32+
33+
export type ParquetMessage = ChunkMessage | ResultMessage | ErrorMessage
34+
export type SortParquetMessage = IndicesMessage | ErrorMessage
35+
36+
export interface ParquetSortIndexOptions {
37+
metadata: FileMetaData
38+
from: AsyncBufferFromUrl
39+
orderBy: string
40+
}

0 commit comments

Comments
 (0)