Public
Edited
Nov 20, 2023
3 stars
Insert cell
Insert cell
mapContainer = html`<div style="height:500px"></div>`
Insert cell
// Todo: switch back to previous url when CORS cache expires
url = "https://r2-public.kylebarron.dev/observable/2023/streaming-parquet-wasm/UScounties2.parquet/UScounties.parquet"
Insert cell
parquetFile = await new parquet.AsyncParquetFile(url)
Insert cell
table = await parquetFile.read_row_group(0)
Insert cell
// recordBatches = {
// const stream = await parquet.readParquetStream(url);
// const layers = [];
// for await (const wasmRecordBatch of streamAsyncIterator(stream)) {
// // Note to self: using FFI here gave invalid top-level geometry offsets; there's a bug somewhere
// let ipcBatch = wasmRecordBatch.intoIPC();

// // Note: we take the first batch because this table is known to only have one batch
// const table = apacheArrow.tableFromIPC(ipcBatch);
// const recordBatch = table.batches[0];

// // // Parse GeoParquet metadata
// // const geoMetadata = JSON.parse(recordBatch.schema.metadata.get("geo"));
// // const geometryColumnName = geoMetadata.primary_column;
// // const geometryColumnIdx = recordBatch.schema.fields.findIndex(
// // (field) => field.name === geometryColumnName
// // );
// const geometryColumnIdx = 6;

// const geometryColumn = recordBatch.getChildAt(geometryColumnIdx);
// const geometryOffsets = geometryColumn.data[0].valueOffsets;
// const polygonOffsets = geometryColumn.getChildAt(0).data[0].valueOffsets;
// const ringOffsets = geometryColumn.getChildAt(0).getChildAt(0)
// .data[0].valueOffsets;
// const flatCoordinateArray = geometryColumn
// .getChildAt(0)
// .getChildAt(0)
// .getChildAt(0)
// .getChildAt(0).data[0].values;

// const resolvedPolygonOffsets = new Int32Array(polygonOffsets.length);
// for (let i = 0; i < resolvedPolygonOffsets.length; ++i) {
// // Perform the lookup into the ringOffsets array using the polygonOffsets array
// resolvedPolygonOffsets[i] = ringOffsets[polygonOffsets[i]];
// }

// const data = {
// // Number of geometries (here exploding multi polygons)
// length: polygonOffsets.length,
// // Indices into coordinateArray where each polygon starts
// startIndices: resolvedPolygonOffsets,
// // Flat coordinates array
// attributes: {
// getPolygon: { value: flatCoordinateArray, size: 2 }
// }
// };

// const layer = new deck.SolidPolygonLayer({
// // This is an Observable hack - changing the id will force the layer to refresh when the cell reevaluates
// id: `layer-${Date.now()}`,
// data,
// // Skip normalization for binary data
// _normalize: false,
// // Counter-clockwise winding order
// _windingOrder: "CCW",
// getFillColor: [0, 100, 60, 160],
// getLineColor: [0, 0, 0, 255]
// });

// const pathData = {
// length: ringOffsets.length,
// startIndices: ringOffsets,
// attributes: {
// getPath: { value: flatCoordinateArray, size: 2 }
// }
// };

// const pathLayer = new deck.PathLayer({
// // This is an Observable hack - changing the id will force the layer to refresh when the cell reevaluates
// id: `layer-${Date.now()}`,
// pathData,
// // _pathType: "open",
// getFillColor: [0, 100, 60, 160],
// getColor: [200, 0, 0, 150],
// getWidth: 300,
// widthMinPixels: 1000
// });

// layers.push(layer);
// // bug somewhere
// // layers.push(pathLayer);

// // Hack: I had to copy into a new layer or else `layers === layers` is true and the map never updates
// deckglMap.setProps({ layers: Array.from(layers) });
// }
// }
Insert cell
// batch = recordBatches[0]
Insert cell
// batch.schema.fields[6]
Insert cell
// geometryColumn = batch.getChildAt(6)
Insert cell
// geometryOffsets = geometryColumn.data[0].valueOffsets
Insert cell
// polygonOffsets = geometryColumn.getChildAt(0).data[0].valueOffsets
Insert cell
// ringOffsets = geometryColumn.getChildAt(0).getChildAt(0).data[0].valueOffsets
Insert cell
// flatCoordinateArray = geometryColumn
// .getChildAt(0)
// .getChildAt(0)
// .getChildAt(0)
// .getChildAt(0).data[0].values
Insert cell
// resolvedPolygonOffsets = {
// const resolvedOffsets = new Int32Array(polygonOffsets.length);
// for (let i = 0; i < resolvedOffsets.length; ++i) {
// // Perform the lookup into the ringOffsets array using the polygonOffsets array
// resolvedOffsets[i] = ringOffsets[polygonOffsets[i]];
// }
// return resolvedOffsets;
// }
Insert cell
// {
// let batch = recordBatches[0];

// const data = {
// // Number of geometries (here exploding multi polygons)
// length: polygonOffsets.length,
// // Indices into coordinateArray where each polygon starts
// startIndices: resolvedPolygonOffsets,
// // Flat coordinates array
// attributes: {
// getPolygon: { value: flatCoordinateArray, size: 2 }
// }
// };

// const layer = new deck.SolidPolygonLayer({
// // This is an Observable hack - changing the id will force the layer to refresh when the cell reevaluates
// id: `layer-${Date.now()}`,
// data,
// // Skip normalization for binary data
// _normalize: false,
// // Counter-clockwise winding order
// _windingOrder: "CCW",
// getFillColor: [0, 100, 60, 160]
// });

// deckglMap.setProps({ layers: [layer] });
// }
Insert cell
// Load the parquet-wasm library
parquet = {
const url = "http://localhost:8080/esm/arrow1.js";
const parquetModule = await import(url);
// Need to await the default export first to initialize the WebAssembly code
await parquetModule.default();
return parquetModule;
}
Insert cell
deckglMap = {
// This is an Observable hack: clear previously generated content
mapContainer.innerHTML = "";

return new deck.DeckGL({
// The HTML container to render into
container: mapContainer,
map: mapboxgl,
mapStyle:
"https://basemaps.cartocdn.com/gl/positron-nolabels-gl-style/style.json",

// Viewport settings
initialViewState: {
longitude: -99.5243335,
latitude: 38.8543844,
zoom: 2,
pitch: 0,
bearing: 0
},
controller: true
});
}
Insert cell
deck = require.alias({
h3: {}
})("deck.gl@8.8.6/dist.min.js")
Insert cell
wasmMemory = parquet._memory()
Insert cell
mapboxgl = require("mapbox-gl@1.6.0/dist/mapbox-gl.js")
Insert cell
{
ReadableStream.prototype[Symbol.asyncIterator] = async function* () {
const reader = this.getReader();
try {
while (true) {
const { done, value } = await reader.read();
if (done) return;
yield value;
}
} finally {
reader.releaseLock();
}
};
}
Insert cell
apacheArrow = require("apache-arrow@12")
Insert cell
arrowJsFfi = require.alias({
"apache-arrow": apacheArrow
})("arrow-js-ffi@0.3.0")
Insert cell
// Polyfill from https://jakearchibald.com/2017/async-iterators-and-generators/#a-shorter-implementation
function streamAsyncIterator(stream) {
// Get a lock on the stream:
const reader = stream.getReader();

return {
next() {
// Stream reads already resolve with {done, value}, so
// we can just call read:
return reader.read();
},
return() {
// Release the lock if the iterator terminates.
reader.releaseLock();
return {};
},
// for-await calls this on whatever it's passed, so
// iterators tend to return themselves.
[Symbol.asyncIterator]() {
return this;
}
};
}
Insert cell

Purpose-built for displays of data

Observable is your go-to platform for exploring data and creating expressive data visualizations. Use reactive JavaScript notebooks for prototyping and a collaborative canvas for visual data exploration and dashboard creation.
Learn more