class DuckDBClient {
constructor(_db) {
this._db = _db;
this._counter = 0;
}
async db() {
if (!this._db) {
this._db = await makeDB();
await this._db.open({
query: {
castTimestampToDate: true
}
});
}
return this._db;
}
async connection() {
if (!this._conn) {
const db = await this.db();
this._conn = await db.connect();
}
return this._conn;
}
async reconnect() {
if (this._conn) {
this._conn.close();
}
delete this._conn;
}
async query(query, params) {
const key = `Query ${this._counter++}: ${query}`;
console.time(key)
const conn = await this.connection();
let result;
if (params) {
const stmt = await conn.prepare(query);
result = stmt.query(...params)
} else {
result = await conn.query(query);
}
console.timeEnd(key)
return result;
}
async sql(strings, ...args) {
// expected to be used like db.sql`select * from table where foo = ${param}`
// let queryWithParams = strings.join("?");
// if (typeof args !== 'undefined'){
// for (const param of args) {
// queryWithParams = queryWithParams.replace('?', param);
// }
// }
// const results = await this.query(queryWithParams);
const results = await this.query(strings.join("?"), args);
// return rows as a JavaScript array of objects for now
let rows = results.toArray().map((r) => Object.fromEntries(r));
rows.columns = results._schema.fields.map(d => d.name);
return rows;
}
async table(query, params, opts) {
const result = await this.query(query, params);
return Inputs.table(result, {layout: 'auto', ...(opts || {})});
}
// get the client after the query ran
async client(query, params) {
await this.query(query, params);
return this;
}
// query a single row
async queryRow(query, params) {
const result = await this.query(query, params);
// XXX we cannot expose the async iterator of arrow today
// const key = `Query ${this._counter++}: ${query}`;
// console.time(key)
// const conn = await this.connection();
// use sendQuery as we can stop iterating after we get the first batch
// const result = await conn.query(query, params);
// console.timeEnd(key)
return result.chunks[0].get(0);
}
async explain(query, params) {
const row = await this.queryRow(`EXPLAIN ${query}`, params);
return element("pre", {className: "observablehq--inspect"}, [
text(row["explain_value"])
]);
}
// describe the database (no arg) or a table
async describe(object) {
const result = await (object === undefined
? this.query(`PRAGMA show_tables`)
: this.query(`PRAGMA table_info('${object}')`));
return Inputs.table(result)
}
// summzarize a query result
async summarize(query) {
const result = await this.query(`SUMMARIZE ${query}`);
return Inputs.table(result)
}
async insertJSON(name, buffer, options) {
const db = await this.db();
await db.registerFileBuffer(name, new Uint8Array(buffer))
const conn = await db.connect();
await conn.insertJSONFromPath(name, {name, schema: 'main', ...options});
await conn.close();
}
async insertCSV(name, buffer, options) {
const db = await this.db();
await db.registerFileBuffer(name, new Uint8Array(buffer))
const conn = await db.connect();
await conn.insertCSVFromPath(name, {name, schema: 'main', ...options});
await conn.close();
}
// Create a database from FileArrachments
static async of(files=[]) {
const db = await makeDB();
await db.open({
query: {
castTimestampToDate: true
}
});
const toName = (file) => file.name.split('.').slice(0, -1).join('.')
.replace(/\@.+?/, '') // remove the "@X" versions Observable adds to file names
if (files.constructor.name === 'FileAttachment') {
files = [[toName(files), files]];
} else if (!Array.isArray(files)) {
files = Object.entries(files);
}
// Add all files to the database. Import JSON and CSV. Create view for Parquet.
await Promise.all(files.map(async (entry) => {
let file, name;
if (Array.isArray(entry)) {
[name, file] = entry;
} else {
[name, file] = [toName(entry), entry];
}
const url = await file.url();
if(url.indexOf("blob:") === 0) {
const buffer = await file.arrayBuffer();
await db.registerFileBuffer(file.name, new Uint8Array(buffer));
} else {
await db.registerFileURL(file.name, url);
}
const conn = await db.connect();
if (file.name.endsWith('.csv')) {
await conn.insertCSVFromPath(file.name, {name, schema: 'main'});
} else if (file.name.endsWith('.json')) {
await conn.insertJSONFromPath(file.name, {name, schema: 'main'});
} else if (file.name.endsWith('.parquet')) {
await conn.query(`CREATE VIEW '${name}' AS SELECT * FROM parquet_scan('${file.name}')`);
} else {
console.warn(`Don't know how to handle file type of ${file.name}`);
}
await conn.close();
}));
return new DuckDBClient(db);
}
}