Public
Edited
Aug 6, 2024
Importers
Insert cell
Insert cell
Insert cell
duckdb = import("https://cdn.jsdelivr.net/npm/@duckdb/duckdb-wasm@1.17.0/+esm")
Insert cell
arrow = require("apache-arrow@8")
Insert cell
libraryVersion = duckdb.PACKAGE_VERSION
Insert cell
Insert cell
bundles = duckdb.getJsDelivrBundles()
Insert cell
bundle = duckdb.selectBundle(bundles)
Insert cell
async function makeDB() {
const logger = new duckdb.ConsoleLogger();
const worker = await duckdb.createWorker(bundle.mainWorker);
const db = new duckdb.AsyncDuckDB(logger, worker);
await db.instantiate(bundle.mainModule);
return db
}
Insert cell
db = makeDB()
Insert cell
db.getVersion()
Insert cell
chardet = import("https://cdn.skypack.dev/chardet@2.0.0?min")
Insert cell
// db client similar to the SQLite client: https://observablehq.com/@observablehq/sqlite
// Follows the DatabaseClient specification: https://observablehq.com/@observablehq/database-client-specification
class DuckDBClient {
constructor(_db) {
this._db = _db;
this._counter = 0;
}

async queryStream(query, params) {
const conn = await this.connection();
let result;

if (params) {
const stmt = await conn.prepare(query);
result = await stmt.query(...params);
} else {
result = await conn.query(query);
}
// Populate the schema of the results
const schema = result.schema.fields.map(({ name, type }) => ({
name,
type: getType(String(type)),
databaseType: String(type)
}));
return {
schema,
async *readRows() {
let rows = result.toArray().map((r) => Object.fromEntries(r));
yield rows;
}
};
}

// This function gets called to prepare the `query` parameter of the `queryStream` method
queryTag(strings, ...params) {
return [strings.join("?"), params];
}

escape(name) {
return `"${name}"`;
}

async describeTables() {
const conn = await this.connection();
const tables = (await conn.query(`SHOW TABLES`)).toArray();
return tables.map(({ name }) => ({ name }));
}

async describeColumns({ table } = {}) {
const conn = await this.connection();
const columns = (await conn.query(`DESCRIBE ${table}`)).toArray();
return columns.map(({ column_name, column_type }) => {
return {
name: column_name,
type: getType(column_type),
databaseType: column_type
};
});
}

async db() {
if (!this._db) {
this._db = await makeDB();
await this._db.open({
query: {
castTimestampToDate: true
}
});
}
return this._db;
}

async connection() {
if (!this._conn) {
const db = await this.db();
this._conn = await db.connect();
}
return this._conn;
}

async reconnect() {
if (this._conn) {
this._conn.close();
}
delete this._conn;
}

// The `.queryStream` function will supercede this for SQL and Table cells
// Keeping this for backwards compatibility
async query(query, params) {
const key = `Query ${this._counter++}: ${query}`;
console.time(key);
const conn = await this.connection();
let result;

if (params) {
const stmt = await conn.prepare(query);
result = stmt.query(...params);
} else {
result = await conn.query(query);
}

console.timeEnd(key);
return result;
}

// The `.queryStream` function will supercede this for SQL and Table cells
// Keeping this for backwards compatibility
async sql(strings, ...args) {
// expected to be used like db.sql`select * from table where foo = ${param}`

// let queryWithParams = strings.join("?");
// if (typeof args !== 'undefined'){
// for (const param of args) {
// queryWithParams = queryWithParams.replace('?', param);
// }
// }
// const results = await this.query(queryWithParams);

const results = await this.query(strings.join("?"), args);

// return rows as a JavaScript array of objects for now
let rows = results.toArray().map(Object.fromEntries);
rows.columns = results.schema.fields.map((d) => d.name);
return rows;
}

async table(query, params, opts) {
const result = await this.query(query, params);
return Inputs.table(result, { layout: "auto", ...(opts || {}) });
}

// get the client after the query ran
async client(query, params) {
await this.query(query, params);
return this;
}

// query a single row
async queryRow(query, params) {
const key = `Query ${this._counter++}: ${query}`;

console.time(key);
const conn = await this.connection();
// use send as we can stop iterating after we get the first batch
const result = await conn.send(query, params);
const batch = (await result.next()).value;
console.timeEnd(key);

return batch?.get(0);
}

async explain(query, params) {
const row = await this.queryRow(`EXPLAIN ${query}`, params);
return element("pre", { className: "observablehq--inspect" }, [
text(row["explain_value"])
]);
}

// Describe the database (no arg) or a table
async describe(object) {
const result = await (object === undefined
? this.query(`SHOW TABLES`)
: this.query(`DESCRIBE ${object}`));
return Inputs.table(result);
}

// Summarize a query result
async summarize(query) {
const result = await this.query(`SUMMARIZE ${query}`);
return Inputs.table(result);
}

async insertJSON(name, buffer, options) {
const db = await this.db();
await db.registerFileBuffer(name, new Uint8Array(buffer));
const conn = await db.connect();
await conn.insertJSONFromPath(name, { name, schema: "main", ...options });
await conn.close();

return this;
}

async insertCSV(name, file, options) {
const db = await this.db();

async function fileToArrayBuffer(file) {
return new Promise((resolve, reject) => {
const reader = new FileReader();
reader.onload = () => resolve(reader.result);
reader.onerror = () => reject(reader.error);
reader.readAsArrayBuffer(file);
});
}

async function tryInsert(buffer, encoding, options) {
const decoder = new TextDecoder(encoding);
const decodedText = decoder.decode(buffer);
const encodedBuffer = new TextEncoder().encode(decodedText);

try {
await db.registerFileBuffer(name, encodedBuffer);
const conn = await db.connect();
await conn.insertCSVFromPath(name, {
name,
schema: "main",
...options
});
await conn.close();
} catch (e) {
console.error(`Failed to insert CSV with encoding ${encoding}:`, e);
throw e;
}
}

async function cleanCSV(buffer, delimiter = ";", maxLineLength = 2097152) {
const decoder = new TextDecoder("utf-8");
const decodedText = decoder.decode(buffer);

const lines = decodedText.split("\n");
const header = lines[0].split(delimiter);
const expectedColumns = header.length;
const cleanedLines = [];

lines.forEach((line) => {
// Fix unpaired quotes
let quoteCount = 0;
for (let char of line) {
if (char === '"') {
quoteCount++;
}
}
if (quoteCount % 2 !== 0) {
line += '"';
}

let values = line.split(delimiter);

if (values.length < expectedColumns) {
// Add missing values
values = values.concat(
new Array(expectedColumns - values.length).fill("")
);
} else if (values.length > expectedColumns) {
// Trim excess values
values = values.slice(0, expectedColumns);
}

let cleanedLine = values.join(delimiter);

// Split lines that exceed the maximum length
while (cleanedLine.length > maxLineLength) {
cleanedLines.push(cleanedLine.slice(0, maxLineLength));
cleanedLine = cleanedLine.slice(maxLineLength);
}
cleanedLines.push(cleanedLine);
});

const cleanedText = cleanedLines.join("\n");
const encoder = new TextEncoder();
return encoder.encode(cleanedText);
}

async function readFileInChunks(file, chunkSize = 1024 * 1024) {
// 1MB chunk size
const fileSize = file.size;
let offset = 0;
const chunks = [];

while (offset < fileSize) {
const slice = file.slice(offset, offset + chunkSize);
const buffer = await fileToArrayBuffer(slice);
chunks.push(buffer);
offset += chunkSize;
}

return chunks;
}

const fileChunks = await readFileInChunks(file);
let combinedBuffer = new Uint8Array(file.size);
let position = 0;

for (const chunk of fileChunks) {
combinedBuffer.set(new Uint8Array(chunk), position);
position += chunk.byteLength;
}

// Detect initial encoding
const initialBuffer = combinedBuffer.slice(0, 1024);
const initialEncoding = chardet.detect(new Uint8Array(initialBuffer));
console.log(`Detected initial encoding: ${initialEncoding}`);

const fallbackEncodings = ["latin1", "windows-1252", "UTF-16", "ASCII"];
let successfulInsert = false;
let encounteredError = false;

for (const encoding of fallbackEncodings) {
try {
await tryInsert(combinedBuffer, encoding, options);
successfulInsert = true;
break;
} catch (e) {
if (
e.message.includes("Invalid Input Error") ||
e.message.includes("Maximum line size")
) {
encounteredError = true;
console.warn(`Error with encoding ${encoding}, will try cleaning...`);
break;
}
console.warn(`Failed with encoding ${encoding}, trying next...`);
}
}

if (!successfulInsert && encounteredError) {
console.warn("Cleaning CSV data and retrying...");
const cleanedBuffer = await cleanCSV(combinedBuffer);
await tryInsert(cleanedBuffer, "utf-8", options);
}

return this;
}

async insertParquet(name, buffer) {
const db = await this.db();
await db.registerFileBuffer(name, new Uint8Array(buffer));
const conn = await db.connect();
await conn.query(
`CREATE VIEW '${name}' AS SELECT * FROM parquet_scan('${name}')`
);
await conn.close();

return this;
}

async insertArrowTable(name, table, options) {
const buffer = arrow.tableToIPC(table);
return this.insertArrowFromIPCStream(name, buffer, options);
}

async insertArrowFromIPCStream(name, buffer, options) {
const db = await this.db();
const conn = await db.connect();
await conn.insertArrowFromIPCStream(buffer, {
name,
schema: "main",
...options
});
await conn.close();

return this;
}

// Create a database from FileArrachments
static async of(files = []) {
const db = await makeDB();
await db.open({
query: {
castTimestampToDate: true
}
});

const toName = (file) =>
file.name.split(".").slice(0, -1).join(".").replace(/\@.+?/, ""); // remove the "@X" versions Observable adds to file names

if (files.constructor.name === "FileAttachment") {
files = [[toName(files), files]];
} else if (!Array.isArray(files)) {
files = Object.entries(files);
}

// Add all files to the database. Import JSON and CSV. Create view for Parquet.
await Promise.all(
files.map(async (entry) => {
let file;
let name;
let options = {};

if (Array.isArray(entry)) {
[name, file] = entry;
if (file.hasOwnProperty("file")) {
({ file, ...options } = file);
}
} else if (entry.constructor.name === "FileAttachment") {
[name, file] = [toName(entry), entry];
} else if (typeof entry === "object") {
({ file, name, ...options } = entry);
name = name ?? toName(file);
} else {
console.error("Unrecognized entry", entry);
}

console.log("entry", entry);
console.log("file", file);
console.log("name", name);
console.log("options", options);

if (!file.url && Array.isArray(file)) {
const data = file;
// file = { name: name + ".json" };
// db.registerFileText(`${name}.json`, JSON.stringify(data));

const table = arrow.tableFromJSON(data);
const buffer = arrow.tableToIPC(table);

const conn = await db.connect();
await conn.insertArrowFromIPCStream(buffer, {
name,
schema: "main",
...options
});
await conn.close();
return;
} else {
const url = await file.url();
if (url.indexOf("blob:") === 0) {
const buffer = await file.arrayBuffer();
await db.registerFileBuffer(file.name, new Uint8Array(buffer));
} else {
await db.registerFileURL(file.name, url);
}
}

const conn = await db.connect();
if (file.name.endsWith(".csv")) {
await conn.insertCSVFromPath(file.name, {
name,
schema: "main",
...options
});
} else if (file.name.endsWith(".json")) {
await conn.insertJSONFromPath(file.name, {
name,
schema: "main",
...options
});
} else if (file.name.endsWith(".parquet")) {
await conn.query(
`CREATE VIEW '${name}' AS SELECT * FROM parquet_scan('${file.name}')`
);
} else {
console.warn(`Don't know how to handle file type of ${file.name}`);
}
await conn.close();
})
);

return new DuckDBClient(db);
}
}
Insert cell
// See https://duckdb.org/docs/sql/data_types/overview
getType = (type) => {
const typeLower = type.toLowerCase();
switch (typeLower) {
case "bigint":
case "int8":
case "long":
return "bigint";

case "double":
case "float8":
case "numeric":
case "decimal":
case "decimal(s, p)":
case "real":
case "float4":
case "float":
case "float32":
case "float64":
return "number";

case "hugeint":
case "integer":
case "smallint":
case "tinyint":
case "ubigint":
case "uinteger":
case "usmallint":
case "utinyint":
case "smallint":
case "tinyint":
case "ubigint":
case "uinteger":
case "usmallint":
case "utinyint":
case "int4":
case "int":
case "signed":
case "int2":
case "short":
case "int1":
case "int64":
case "int32":
return "integer";

case "boolean":
case "bool":
case "logical":
return "boolean";

case "date":
case "interval": // date or time delta
case "time":
case "timestamp":
case "timestamp with time zone":
case "datetime":
case "timestamptz":
return "date";

case "uuid":
case "varchar":
case "char":
case "bpchar":
case "text":
case "string":
case "utf8": // this type is unlisted in the `types`, but is returned by the db as `column_type`...
return "string";
default:
return "other";
}
}
Insert cell
function element(name, props, children) {
if (arguments.length === 2) children = props, props = undefined;
const element = document.createElement(name);
if (props !== undefined) for (const p in props) element[p] = props[p];
if (children !== undefined) for (const c of children) element.appendChild(c);
return element;
}
Insert cell
function text(value) {
return document.createTextNode(value);
}
Insert cell
{
const conn = await db.connect();
const result = await conn.query(` SELECT
v::INTEGER AS x,
(sin(v/50.0) * 100 + 100)::INTEGER AS y
FROM generate_series(0, 100) AS t(v)`);
await conn.close();
return result;
}
Insert cell
Insert cell
client = {
const c = new DuckDBClient();
await c.query(`CREATE TABLE dt(u STRING, x INTEGER, y FLOAT)`);
await c.query(
`INSERT INTO dt VALUES ('a', 1, 5), ('b', 2, 6), ('c', 3, 7), ('d', 4, 8);`
);
return c;
}
Insert cell
client.table(`SELECT ? as a, ? as b`, ["2", "3"])
Insert cell
a = 1
Insert cell
client.sql`SELECT ${a}, 2`
Insert cell
c = client.connection()
Insert cell
r = await c.send("select 1,2")
Insert cell
r.get(0)
Insert cell
client.queryRow(`SELECT 1`)
Insert cell
client
SELECT ${a}, 2
Insert cell
client
Type Table, then Shift-Enter. Ctrl-space for more options.

Insert cell
Insert cell
{
const conn = await db.connect();
const result = await conn.query(`SELECT 1 AS 'Result'
UNION SELECT 2
UNION SELECT 3`);
await conn.close();
return Inputs.table(result);
}
Insert cell

Purpose-built for displays of data

Observable is your go-to platform for exploring data and creating expressive data visualizations. Use reactive JavaScript notebooks for prototyping and a collaborative canvas for visual data exploration and dashboard creation.
Learn more