Feb 1, 2022
4 stars
// This is a generic function provding transformations of binary blocks to Json objects.
function compressedBinaryToJson(delimiter = ';') {
return agen.compose(
fromStream(), // Read binary blocks from the input stream
agen.inflate(), // Inflate the compressed stream
agen.decode(), // Decode binary blocks and transform them to text blocks
agen.lines(), // Split text blocks to individual lines
agen.arraysFromDsv(delimiter),// Split lines by the specified delimiter and transforms them to arrays
toJson(), // Transform arrays to JSON objects;
// Note: This function will be added in one of @agen modules
// This function returns a filter transforming binary blocks to GeoJson objects.
// It takes into account specificities of the attached file:
// - it knows that the delimiter is the ";" symbol
// - it knows also how to transform individual fields to get GeoJson
function compressedBinaryToGeoJson() {
return agen.compose(
compressedBinaryToJson(';'), // Decompress, decode, split lines, transforms lines to arrays...
// ...and arrays to objects => { // Transforms simple JSON objects to GeoJson
let geometry;
if (obj.coordonnees_ban) {
geometry = { // Fix geometry - split and parse point coordinates
type : 'Point',
coordinates : obj.coordonnees_ban.split(',').map(v => parseFloat(v))
return { // Returns the resulting GeoJSON object
id : obj.ref,
type : 'Feature',
properties : obj,
agen.filter(obj => !!obj.geometry), // Filter out objects without geometries
const f = agen.compose(
compressedBinaryToGeoJson(';'), // This transformation chain converts compressed binaries to GeoJSON
agen.batch(20) // This step packs returned objects in batches of 20 objects
for await (let batch of f(await openFileStream())) {
yield batch;
break; // Shows just the first batch
async function* readData(stream, delimiter = ';') {
const f = compressedBinaryToGeoJson(delimiter);
yield* f(stream);
async function openFileStream() {
// 44988 entries
return await FileAttachment("liste-des-immeubles-proteges-au-titre-des-monuments-historiques.csv.gz").stream()
visualBatchSize = 1000
loadedObjects = {
await start;

const map = newMap(mapContainer);
invalidation.then(() => map.remove());

// This transformation chain converts compressed binaries to GeoJSON
const f = compressedBinaryToGeoJson(';');

let counter = 0;
const bufferSize = visualBatchSize;
const N = 10; // Math.round(Math.min(20, visualBatchSize / 4));
const buffer = [];

/* Create a heatmap layer and add an utility method to add coordinates in batch */
// For colors see the "ColorHunt" section here:
// ['#6a2c70', '#b83b5e', '#f08a5d', '#f9ed69']
const heat = L.heatLayer([], {
gradient: {0.3: '#6a2c70', 0.5 : '#b83b5e', 0.75: '#f08a5d', 1: '#f9ed69'}
heat.setLatLngs = function (list) {
this._latlngs = [...list];
return this.redraw();
invalidation.then(() => heat.remove());

// Group of markers
const group = L.featureGroup().addTo(map);
invalidation.then(() => group.remove());

for await (let feature of f(await openFileStream())) {
const latlng = feature.geometry.coordinates;
const marker = L.circleMarker(latlng, {
stroke : true,
color: 'white',
opacity : 0.9,
weight : 1,
fillColor: 'red',
fillOpacity: 0.8,
radius: 3
while (buffer.length > bufferSize) {
const toRemove = buffer.shift();
if ((counter % N) !== 0) continue;
yield {
total : counter,
visible : [counter - buffer.length, counter]

let center = { lat : 0, lng : 0};
const coords = [];
let count = 0;
group.eachLayer(layer => {
const { lat, lng } = layer.getLatLng();
coords.push([lat, lng]); += lat;
center.lng += lng;
if (count) { /= count;
center.lng /= count;

await Promises.delay(100);
yield {
total : counter,
visible : [counter - buffer.length, counter]
function toJson() {
return async function*(it) {
let header;
for await (let array of it) {
if (!header) {
header = array;
} else {
const obj = {};
for (let i =0; i < header.length; i++) {
obj[header[i]] = array[i];
yield obj;

// This method returns a function transforming readable stream to AsyncGenerator
function fromStream() {
return async function*(stream) {
const reader = stream.getReader();
try {
let chunk;
while ((chunk = await && !chunk.done) {
yield chunk.value;
} finally {
reader.cancel && reader.cancel();
stream.cancel && stream.cancel();
md`## Map Utilities`
mapConfig = ({
center : [48.854, 2.348],
zoom : 6,
maxZoom: 10
tilesUrl = {
return 'https://{s}{z}/{x}/{y}.png';
// return '{z}/{x}/{y}@2x.png';
function newMap(mapContainer) {
const map =, mapConfig);
L.tileLayer(tilesUrl, {
attribution: '&copy; <a href="">OpenStreetMap</a> contributors'
return map;
function* listenMap(map, event, filter =(_)=>_) {
yield* Generators.observe((next) => {
let handler = (ev) => next(filter(ev));
map.on(event, handler);
return () =>, handler);
agen = require(
'@agen/utils@0.8', // Common utility methods - composition, mapping, transformation etc
'@agen/gzip', // GZip inflating utilities for streams
'@agen/encoding', // Text decoding, splitting to lines
'@agen/dsv', // Transformation lines to delimited separated values
L = {
const L = await require("leaflet");
L.cssUrl = await require.resolve("leaflet/dist/leaflet.css");
document.head.appendChild(html`<link href='${L.cssUrl}' rel='stylesheet' />`);
// See
await require("").catch(
(err) => {}
return L;
