Public
Edited
Mar 10, 2023
Importers
Insert cell
Insert cell
Insert cell
duckdb = import("https://cdn.jsdelivr.net/npm/@duckdb/duckdb-wasm@1.24.0/+esm")
Insert cell
arrow = require("apache-arrow@11.0.0")
Insert cell
libraryVersion = duckdb.PACKAGE_VERSION
Insert cell
Insert cell
bundles = duckdb.getJsDelivrBundles()
Insert cell
bundle = duckdb.selectBundle(bundles)
Insert cell
async function makeDB() {
const logger = new duckdb.ConsoleLogger();
const worker = await duckdb.createWorker(bundle.mainWorker);
const db = new duckdb.AsyncDuckDB(logger, worker);
await db.instantiate(bundle.mainModule);
return db;
}
Insert cell
db = makeDB()
Insert cell
db.getVersion()
Insert cell
// db client similar to the SQLite client: https://observablehq.com/@observablehq/sqlite
// Follows the DatabaseClient specification: https://observablehq.com/@observablehq/database-client-specification
class DuckDBClient {
constructor(_db) {
this._db = _db;
this._counter = 0;
}

async queryStream(query, params) {
const conn = await this.connection();
let result;

if (params) {
const stmt = await conn.prepare(query);
result = await stmt.query(...params);
} else {
result = await conn.query(query);
}
// Populate the schema of the results
const schema = result.schema.fields.map(({ name, type }) => ({
name,
type: getType(String(type)),
databaseType: String(type)
}));
return {
schema,
async *readRows() {
let rows = result.toArray().map((r) => Object.fromEntries(r));
yield rows;
}
};
}

// This function gets called to prepare the `query` parameter of the `queryStream` method
queryTag(strings, ...params) {
return [strings.join("?"), params];
}

escape(name) {
return `"${name}"`;
}

async describeTables() {
const conn = await this.connection();
const tables = (await conn.query(`SHOW TABLES`)).toArray();
return tables.map(({ name }) => ({ name }));
}

async describeColumns({ table } = {}) {
const conn = await this.connection();
const columns = (await conn.query(`DESCRIBE ${table}`)).toArray();
return columns.map(({ column_name, column_type }) => {
return {
name: column_name,
type: getType(column_type),
databaseType: column_type
};
});
}

async db() {
if (!this._db) {
this._db = await makeDB();
await this._db.open({
query: {
castTimestampToDate: true
}
});
}
return this._db;
}

async connection() {
if (!this._conn) {
const db = await this.db();
this._conn = await db.connect();
}
return this._conn;
}

async reconnect() {
if (this._conn) {
this._conn.close();
}
delete this._conn;
}

// The `.queryStream` function will supercede this for SQL and Table cells
// Keeping this for backwards compatibility
async query(query, params) {
const key = `Query ${this._counter++}: ${query}`;
console.time(key);
const conn = await this.connection();
let result;

if (params) {
const stmt = await conn.prepare(query);
result = stmt.query(...params);
} else {
result = await conn.query(query);
}

console.timeEnd(key);
return result;
}

// The `.queryStream` function will supercede this for SQL and Table cells
// Keeping this for backwards compatibility
async sql(strings, ...args) {
// expected to be used like db.sql`select * from table where foo = ${param}`

// let queryWithParams = strings.join("?");
// if (typeof args !== 'undefined'){
// for (const param of args) {
// queryWithParams = queryWithParams.replace('?', param);
// }
// }
// const results = await this.query(queryWithParams);

const results = await this.query(strings.join("?"), args);

// return rows as a JavaScript array of objects for now
let rows = results.toArray().map(Object.fromEntries);
rows.columns = results.schema.fields.map((d) => d.name);
return rows;
}

async table(query, params, opts) {
const result = await this.query(query, params);
return Inputs.table(result, { layout: "auto", ...(opts || {}) });
}

// get the client after the query ran
async client(query, params) {
await this.query(query, params);
return this;
}

// query a single row
async queryRow(query, params) {
const key = `Query ${this._counter++}: ${query}`;

console.time(key);
const conn = await this.connection();
// use send as we can stop iterating after we get the first batch
const result = await conn.send(query, params);
const batch = (await result.next()).value;
console.timeEnd(key);

return batch?.get(0);
}

async explain(query, params) {
const row = await this.queryRow(`EXPLAIN ${query}`, params);
return element("pre", { className: "observablehq--inspect" }, [
text(row["explain_value"])
]);
}

// Describe the database (no arg) or a table
async describe(object) {
const result = await (object === undefined
? this.query(`SHOW TABLES`)
: this.query(`DESCRIBE ${object}`));
return Inputs.table(result);
}

// Summarize a query result
async summarize(query) {
const result = await this.query(`SUMMARIZE ${query}`);
return Inputs.table(result);
}

async insertJSON(name, buffer, options) {
const db = await this.db();
await db.registerFileBuffer(name, new Uint8Array(buffer));
const conn = await db.connect();
await conn.insertJSONFromPath(name, { name, schema: "main", ...options });
await conn.close();

return this;
}

async insertCSV(name, buffer, options) {
const db = await this.db();
await db.registerFileBuffer(name, new Uint8Array(buffer));
const conn = await db.connect();
await conn.insertCSVFromPath(name, { name, schema: "main", ...options });
await conn.close();

return this;
}

async insertParquet(name, buffer) {
const db = await this.db();
await db.registerFileBuffer(name, new Uint8Array(buffer));
const conn = await db.connect();
await conn.query(
`CREATE VIEW '${name}' AS SELECT * FROM parquet_scan('${name}')`
);
await conn.close();

return this;
}

async insertArrowTable(name, table, options) {
const buffer = arrow.tableToIPC(table);
return this.insertArrowFromIPCStream(name, buffer, options);
}

async insertArrowFromIPCStream(name, buffer, options) {
const db = await this.db();
const conn = await db.connect();
await conn.insertArrowFromIPCStream(buffer, {
name,
schema: "main",
...options
});
await conn.close();

return this;
}

// Create a database from FileArrachments
static async of(files = []) {
const db = await makeDB();
await db.open({
query: {
castTimestampToDate: true
}
});

const toName = (file) =>
file.name.split(".").slice(0, -1).join(".").replace(/\@.+?/, ""); // remove the "@X" versions Observable adds to file names

if (files.constructor.name === "FileAttachment") {
files = [[toName(files), files]];
} else if (!Array.isArray(files)) {
files = Object.entries(files);
}

// Add all files to the database. Import JSON and CSV. Create view for Parquet.
await Promise.all(
files.map(async (entry) => {
let file;
let name;
let options = {};

if (Array.isArray(entry)) {
[name, file] = entry;
if (file.hasOwnProperty("file")) {
({ file, ...options } = file);
}
} else if (entry.constructor.name === "FileAttachment") {
[name, file] = [toName(entry), entry];
} else if (typeof entry === "object") {
({ file, name, ...options } = entry);
name = name ?? toName(file);
} else {
console.error("Unrecognized entry", entry);
}

console.log("entry", entry);
console.log("file", file);
console.log("name", name);
console.log("options", options);

if (!file.url && Array.isArray(file)) {
const data = file;
// file = { name: name + ".json" };
// db.registerFileText(`${name}.json`, JSON.stringify(data));

const table = arrow.tableFromJSON(data);
const buffer = arrow.tableToIPC(table);

const conn = await db.connect();
await conn.insertArrowFromIPCStream(buffer, {
name,
schema: "main",
...options
});
await conn.close();
return;
} else {
const url = await file.url();
if (url.indexOf("blob:") === 0) {
const buffer = await file.arrayBuffer();
await db.registerFileBuffer(file.name, new Uint8Array(buffer));
} else {
await db.registerFileURL(file.name, url);
}
}

const conn = await db.connect();
if (file.name.endsWith(".csv")) {
await conn.insertCSVFromPath(file.name, {
name,
schema: "main",
...options
});
} else if (file.name.endsWith(".json")) {
await conn.insertJSONFromPath(file.name, {
name,
schema: "main",
...options
});
} else if (file.name.endsWith(".parquet")) {
await conn.query(
`CREATE VIEW '${name}' AS SELECT * FROM parquet_scan('${file.name}')`
);
} else {
console.warn(`Don't know how to handle file type of ${file.name}`);
}
await conn.close();
})
);

return new DuckDBClient(db);
}
}
Insert cell
// See https://duckdb.org/docs/sql/data_types/overview
getType = (type) => {
const typeLower = type.toLowerCase();
switch (typeLower) {
case "bigint":
case "int8":
case "long":
return "bigint";

case "double":
case "float8":
case "numeric":
case "decimal":
case "decimal(s, p)":
case "real":
case "float4":
case "float":
case "float32":
case "float64":
return "number";

case "hugeint":
case "integer":
case "smallint":
case "tinyint":
case "ubigint":
case "uinteger":
case "usmallint":
case "utinyint":
case "smallint":
case "tinyint":
case "ubigint":
case "uinteger":
case "usmallint":
case "utinyint":
case "int4":
case "int":
case "signed":
case "int2":
case "short":
case "int1":
case "int64":
case "int32":
return "integer";

case "boolean":
case "bool":
case "logical":
return "boolean";

case "date":
case "interval": // date or time delta
case "time":
case "timestamp":
case "timestamp with time zone":
case "datetime":
case "timestamptz":
return "date";

case "uuid":
case "varchar":
case "char":
case "bpchar":
case "text":
case "string":
case "utf8": // this type is unlisted in the `types`, but is returned by the db as `column_type`...
return "string";
default:
return "other";
}
}
Insert cell
function element(name, props, children) {
if (arguments.length === 2) children = props, props = undefined;
const element = document.createElement(name);
if (props !== undefined) for (const p in props) element[p] = props[p];
if (children !== undefined) for (const c of children) element.appendChild(c);
return element;
}
Insert cell
function text(value) {
return document.createTextNode(value);
}
Insert cell
{
const conn = await db.connect();
const result = await conn.query(` SELECT
v::INTEGER AS x,
(sin(v/50.0) * 100 + 100)::INTEGER AS y
FROM generate_series(0, 100) AS t(v)`);
await conn.close();
return result;
}
Insert cell
Insert cell
client = {
const c = new DuckDBClient();
await c.query(`CREATE TABLE dt(u STRING, x INTEGER, y FLOAT)`);
await c.query(
`INSERT INTO dt VALUES ('a', 1, 5), ('b', 2, 6), ('c', 3, 7), ('d', 4, 8);`
);
return c;
}
Insert cell
client.table(`SELECT ? as a, ? as b`, ["2", "3"])
Insert cell
a = 1
Insert cell
client.sql`SELECT ${a}, 2`
Insert cell
c = client.connection()
Insert cell
r = await c.send("select 1,2")
Insert cell
r.get(0)
Insert cell
client.queryRow(`SELECT 1`)
Insert cell
client
SELECT ${a}, 2
Insert cell
client
Type Table, then Shift-Enter. Ctrl-space for more options.

Insert cell
Insert cell
{
const conn = await db.connect();
const result = await conn.query(`SELECT 1 AS 'Result'
UNION SELECT 2
UNION SELECT 3`);
await conn.close();
return Inputs.table(result);
}
Insert cell

Purpose-built for displays of data

Observable is your go-to platform for exploring data and creating expressive data visualizations. Use reactive JavaScript notebooks for prototyping and a collaborative canvas for visual data exploration and dashboard creation.
Learn more