Skip to content

Commit 5ea34b7

Browse files
committed
Improve streaming and progress bar
1 parent 605dc89 commit 5ea34b7

File tree

2 files changed

+71
-124
lines changed

2 files changed

+71
-124
lines changed

js/data/eurostat.js

Lines changed: 36 additions & 60 deletions
Original file line numberDiff line numberDiff line change
@@ -53,10 +53,20 @@ export default class EurostatData extends FixData {
5353
aelm.innerText = 'Eurostat'
5454

5555
const loaderArea = document.createElement('div')
56-
this._loader = document.createElement('span')
56+
this._loader = document.createElement('div')
57+
this._loader.classList.add('loader')
58+
this._loader.style.display = 'none'
5759
loaderArea.appendChild(this._loader)
58-
this._progress = document.createElement('span')
59-
loaderArea.appendChild(this._progress)
60+
this._progressBar = document.createElement('div')
61+
this._progressBar.style.width = '100%'
62+
this._progressBar.style.fontSize = '50%'
63+
this._progressBar.style.textAlign = 'center'
64+
this._progressBar.style.backgroundColor = 'white'
65+
this._progressBar.style.display = 'none'
66+
this._progressBar.style.marginTop = '5px'
67+
loaderArea.appendChild(this._progressBar)
68+
this._errorMessage = document.createElement('div')
69+
loaderArea.appendChild(this._errorMessage)
6070
elm.appendChild(loaderArea)
6171

6272
this._filter = document.createElement('div')
@@ -230,7 +240,7 @@ export default class EurostatData extends FixData {
230240
...query,
231241
}
232242
const paramstr = datasetCode + '?' + new URLSearchParams(params).toString()
233-
this._progress.innerText = ''
243+
this._errorMessage.innerText = ''
234244

235245
const db = new EurostatDB()
236246
const storedData = await db.get('data', paramstr)
@@ -257,6 +267,10 @@ export default class EurostatData extends FixData {
257267
const total = Object.values(this._metabase[datasetCode]).reduce((s, v) => s * v.length, 1)
258268
const dates = []
259269
const pad0 = v => `${Math.floor(v)}`.padStart(2, '0')
270+
this._progressBar.innerText = '0 / 0'
271+
this._progressBar.style.display = 'block'
272+
this._progressBar.style.background = 'white'
273+
this._loader.style.display = 'none'
260274
let data
261275
try {
262276
data = await fetchProgress(url, {
@@ -267,6 +281,7 @@ export default class EurostatData extends FixData {
267281
}
268282
if (Date.now() - (dates.at(-1)?.t ?? 0) > 100) {
269283
dates.push({ c: loaded, t: Date.now() })
284+
const p = (100 * dates[dates.length - 1].c) / total
270285
const n = Math.max(1, dates.length - 100)
271286
const t =
272287
(dates[dates.length - 1].t - dates[n - 1].t) /
@@ -276,19 +291,19 @@ export default class EurostatData extends FixData {
276291
et >= 3600
277292
? `${Math.floor(et / 3600)}:${pad0((Math.floor(et) % 3600) / 60)}:${pad0(Math.floor(et) % 60)}`
278293
: `${Math.floor(et / 60)}:${pad0(Math.floor(et) % 60)}`
279-
this._progress.innerText = `${loaded} / ${total} (${etstr})`
294+
this._progressBar.innerText = `${loaded} / ${total} (${etstr})`
295+
this._progressBar.style.background = `linear-gradient(90deg, lightgray, ${p}%, gray, ${p}%, white)`
280296
}
281297
},
282298
})
283-
} catch {
284-
// ignore
299+
} catch (e) {
300+
console.warn(e)
285301
}
286-
this._progress.innerText = ''
287302
if (abortController.signal.aborted || !data) {
288303
data = null
289304
} else {
290305
if ((data.error?.length ?? 0) > 0) {
291-
this._progress.innerText = data.error[0].label
306+
this._errorMessage.innerText = data.error[0].label
292307
console.error(data.error)
293308
}
294309
data.fetchDate = new Date()
@@ -318,13 +333,14 @@ export default class EurostatData extends FixData {
318333

319334
const info = datasetInfos[this._code]
320335

321-
this._loader.classList.add('loader')
322-
this._loader.style.display = 'inline-block'
336+
this._loader.style.display = 'block'
323337

324338
const targetCode = this._code
325339
const data = await this._getData(this._code, info.query)
326-
this._loader.classList.remove('loader')
327-
this._loader.style.display = null
340+
this._loader.style.display = 'none'
341+
if (this._code === targetCode) {
342+
this._progressBar.style.display = 'none'
343+
}
328344
if (this._code !== targetCode || !data) {
329345
return
330346
}
@@ -518,50 +534,15 @@ const fetchProgress = async (input, init) => {
518534
if (obj.size) {
519535
total = obj.size.reduce((s, v) => s * v, 1)
520536
}
537+
const lastLoaded = loaded
521538
if (obj.value) {
522539
loaded = Object.keys(obj.value).length
523540
}
524-
init?.onprogress({ loaded, total })
541+
if (lastLoaded !== loaded) {
542+
init?.onprogress({ loaded, total })
543+
}
525544
}
526-
const bufferedStream = new TransformStream({
527-
start() {
528-
this.buf = []
529-
this.size = 0
530-
this.offset = 0
531-
},
532-
transform(chunk, controller) {
533-
this.buf.push(chunk)
534-
this.size += chunk.length
535-
536-
const chunkSize = 8192
537-
while (this.size >= chunkSize) {
538-
let o = 0
539-
const b = new Uint8Array(chunkSize)
540-
while (o < chunkSize) {
541-
if (this.buf[0].length - this.offset <= chunkSize - o) {
542-
const b0 = this.buf.shift()
543-
b.set(this.offset === 0 ? b0 : b0.slice(this.offset), o)
544-
o += b0.length - this.offset
545-
this.offset = 0
546-
} else {
547-
b.set(this.buf[0].slice(this.offset, this.offset + chunkSize - o), o)
548-
this.offset += chunkSize - o
549-
o = chunkSize
550-
}
551-
}
552-
controller.enqueue(b)
553-
this.size -= chunkSize
554-
}
555-
},
556-
flush(controller) {
557-
for (let i = 0; i < this.buf.length; i++) {
558-
controller.enqueue(this.offset === 0 ? this.buf[i] : this.buf[i].slice(this.offset))
559-
this.offset = 0
560-
}
561-
this.buf = []
562-
},
563-
})
564-
return parser.parse(response.body.pipeThrough(bufferedStream).pipeThrough(new TextDecoderStream(), {}))
545+
return parser.parse(response.body.pipeThrough(new TextDecoderStream(), {}))
565546
}
566547

567548
const numberPattern = /^[-+]?[0-9]+(\.[0-9]+)?$/
@@ -572,13 +553,8 @@ class JSONStreamParser {
572553
}
573554

574555
async parse(stream) {
575-
try {
576-
await this.construct(this.tokenize(stream))
577-
return this.obj._root
578-
} catch (e) {
579-
console.warn(e)
580-
throw e
581-
}
556+
await this.construct(this.tokenize(stream))
557+
return this.obj._root
582558
}
583559

584560
async *tokenize(stream) {

js/data/meta/process_eurostat_catalogue.js

Lines changed: 35 additions & 64 deletions
Original file line numberDiff line numberDiff line change
@@ -5,16 +5,17 @@ import { XMLParser } from 'fast-xml-parser'
55
const basePath = import.meta.dirname
66

77
const fetchProgress = async (input, init) => {
8-
const response = await fetch(input, init)
9-
let loaded = 0
10-
let total = response.headers.get('content-length')
11-
let prevPrintTime = 0
8+
let waitTime = 0.01
129
const units = ['B', 'KiB', 'MiB', 'GiB']
10+
for (let i = 0; i < 100; i++) {
11+
try {
12+
const response = await fetch(input, init)
13+
const total = response.headers.get('content-length')
14+
let loaded = 0
15+
let prevPrintTime = 0
1316

14-
return new Response(
15-
new ReadableStream({
16-
pull: async controller => {
17-
for await (const chunk of response.body) {
17+
const progressdStream = new TransformStream({
18+
transform(chunk, controller) {
1819
controller.enqueue(chunk)
1920
loaded += chunk.byteLength
2021
const now = Date.now()
@@ -32,72 +33,52 @@ const fetchProgress = async (input, init) => {
3233
console.info(Math.floor(val * 100) / 100 + ' ' + units[uidx])
3334
}
3435
}
35-
}
36-
controller.close()
37-
},
38-
})
39-
)
36+
},
37+
})
38+
39+
return new Response(response.body.pipeThrough(progressdStream))
40+
} catch {
41+
await new Promise(resolve => setTimeout(resolve, waitTime * 1000))
42+
waitTime = Math.min(waitTime * 2, 5)
43+
}
44+
}
45+
return null
4046
}
4147

4248
const getCatalogue = async () => {
4349
console.log('Fetching catalogue...')
44-
let res = null
45-
let waitTime = 0.5
46-
for (let i = 0; i < 100; i++) {
47-
try {
48-
res = await fetchProgress('https://ec.europa.eu/eurostat/api/dissemination/catalogue/toc/xml')
49-
break
50-
} catch (error) {
51-
await new Promise(resolve => setTimeout(resolve, waitTime))
52-
waitTime = Math.min(waitTime * 2, 60)
53-
}
54-
}
50+
const res = await fetchProgress('https://ec.europa.eu/eurostat/api/dissemination/catalogue/toc/xml')
5551
if (!res) {
5652
throw new Error('Failed to fetch catalogue')
5753
}
5854
const catalogues = await res.text()
59-
const parser = new XMLParser({
60-
ignoreAttributes: false,
61-
})
55+
const parser = new XMLParser({ ignoreAttributes: false })
6256
const doc = parser.parse(catalogues, 'application/xml')
6357
const themes = doc['nt:tree']['nt:branch'][0]
6458

65-
const root = {}
66-
const getLeafs = (node, depth, obj) => {
67-
obj.title = node['nt:title'][0]['#text']
68-
obj.children = []
59+
const getLeafs = node => {
60+
const obj = {
61+
title: node['nt:title'][0]['#text'],
62+
children: [],
63+
}
6964
const branch = node['nt:children']['nt:branch']
7065
if (branch) {
71-
if (Array.isArray(branch)) {
72-
for (const b of branch) {
73-
const c = {}
74-
obj.children.push(c)
75-
getLeafs(b, depth + 1, c)
76-
}
77-
} else {
78-
const c = {}
79-
obj.children.push(c)
80-
getLeafs(branch, depth + 1, c)
66+
for (const b of Array.isArray(branch) ? branch : [branch]) {
67+
obj.children.push(getLeafs(b))
8168
}
8269
} else {
8370
const leafs = node['nt:children']['nt:leaf']
84-
if (Array.isArray(leafs)) {
85-
for (const leaf of leafs) {
86-
obj.children.push({
87-
title: leaf['nt:title'][0]['#text'],
88-
code: leaf['nt:code'],
89-
})
90-
}
91-
} else {
71+
for (const leaf of Array.isArray(leafs) ? leafs : [leafs]) {
9272
obj.children.push({
93-
title: leafs['nt:title'][0]['#text'],
94-
code: leafs['nt:code'],
73+
title: leaf['nt:title'][0]['#text'],
74+
code: leaf['nt:code'],
9575
})
9676
}
9777
}
78+
return obj
9879
}
9980

100-
getLeafs(themes, 0, root)
81+
const root = getLeafs(themes)
10182

10283
const readableStream = new Blob([JSON.stringify(root)]).stream()
10384
const compressedStream = readableStream.pipeThrough(new CompressionStream('gzip'))
@@ -107,17 +88,7 @@ const getCatalogue = async () => {
10788

10889
const getMetabase = async () => {
10990
console.log('Fetching metabase...')
110-
let res = null
111-
let waitTime = 0.5
112-
for (let i = 0; i < 100; i++) {
113-
try {
114-
res = await fetchProgress('https://ec.europa.eu/eurostat/api/dissemination/catalogue/metabase.txt.gz')
115-
break
116-
} catch (error) {
117-
await new Promise(resolve => setTimeout(resolve, waitTime))
118-
waitTime = Math.min(waitTime * 2, 60)
119-
}
120-
}
91+
let res = await fetchProgress('https://ec.europa.eu/eurostat/api/dissemination/catalogue/metabase.txt.gz')
12192
if (!res) {
12293
throw new Error('Failed to fetch metadata')
12394
}
@@ -126,7 +97,7 @@ const getMetabase = async () => {
12697

12798
const data = {}
12899
for (const line of text.split('\n')) {
129-
if (line.trim() === '') {
100+
if (line.trim().length === 0) {
130101
continue
131102
}
132103
const [id, unit, value] = line.split('\t')

0 commit comments

Comments
 (0)