class DuckDBClient {
constructor(_db) {
this._db = _db;
this._counter = 0;
}
async queryStream(query, params) {
const conn = await this.connection();
let result;
if (params) {
const stmt = await conn.prepare(query);
result = await stmt.query(...params);
} else {
result = await conn.query(query);
}
const schema = result.schema.fields.map(({ name, type }) => ({
name,
type: getType(String(type)),
databaseType: String(type)
}));
return {
schema,
async *readRows() {
let rows = result.toArray().map((r) => Object.fromEntries(r));
yield rows;
}
};
}
queryTag(strings, ...params) {
return [strings.join("?"), params];
}
escape(name) {
return `"${name}"`;
}
async describeTables() {
const conn = await this.connection();
const tables = (await conn.query(`SHOW TABLES`)).toArray();
return tables.map(({ name }) => ({ name }));
}
async describeColumns({ table } = {}) {
const conn = await this.connection();
const columns = (await conn.query(`DESCRIBE ${table}`)).toArray();
return columns.map(({ column_name, column_type }) => {
return {
name: column_name,
type: getType(column_type),
databaseType: column_type
};
});
}
async db() {
if (!this._db) {
this._db = await makeDB();
await this._db.open({
query: {
castTimestampToDate: true
}
});
}
return this._db;
}
async connection() {
if (!this._conn) {
const db = await this.db();
this._conn = await db.connect();
}
return this._conn;
}
async reconnect() {
if (this._conn) {
this._conn.close();
}
delete this._conn;
}
async query(query, params) {
const key = `Query ${this._counter++}: ${query}`;
console.time(key);
const conn = await this.connection();
let result;
if (params) {
const stmt = await conn.prepare(query);
result = stmt.query(...params);
} else {
result = await conn.query(query);
}
console.timeEnd(key);
return result;
}
async sql(strings, ...args) {
const results = await this.query(strings.join("?"), args);
let rows = results.toArray().map(Object.fromEntries);
rows.columns = results.schema.fields.map((d) => d.name);
return rows;
}
async table(query, params, opts) {
const result = await this.query(query, params);
return Inputs.table(result, { layout: "auto", ...(opts || {}) });
}
async client(query, params) {
await this.query(query, params);
return this;
}
async queryRow(query, params) {
const key = `Query ${this._counter++}: ${query}`;
console.time(key);
const conn = await this.connection();
const result = await conn.send(query, params);
const batch = (await result.next()).value;
console.timeEnd(key);
return batch?.get(0);
}
async explain(query, params) {
const row = await this.queryRow(`EXPLAIN ${query}`, params);
return element("pre", { className: "observablehq--inspect" }, [
text(row["explain_value"])
]);
}
async describe(object) {
const result = await (object === undefined
? this.query(`SHOW TABLES`)
: this.query(`DESCRIBE ${object}`));
return Inputs.table(result);
}
async summarize(query) {
const result = await this.query(`SUMMARIZE ${query}`);
return Inputs.table(result);
}
async insertJSON(name, buffer, options) {
const db = await this.db();
await db.registerFileBuffer(name, new Uint8Array(buffer));
const conn = await db.connect();
await conn.insertJSONFromPath(name, { name, schema: "main", ...options });
await conn.close();
return this;
}
async insertCSV(name, buffer, options) {
const db = await this.db();
await db.registerFileBuffer(name, new Uint8Array(buffer));
const conn = await db.connect();
await conn.insertCSVFromPath(name, { name, schema: "main", ...options });
await conn.close();
return this;
}
async insertParquet(name, buffer) {
const db = await this.db();
await db.registerFileBuffer(name, new Uint8Array(buffer));
const conn = await db.connect();
await conn.query(
`CREATE VIEW '${name}' AS SELECT * FROM parquet_scan('${name}')`
);
await conn.close();
return this;
}
async insertArrowTable(name, table, options) {
const buffer = arrow.tableToIPC(table);
return this.insertArrowFromIPCStream(name, buffer, options);
}
async insertArrowFromIPCStream(name, buffer, options) {
const db = await this.db();
const conn = await db.connect();
await conn.insertArrowFromIPCStream(buffer, {
name,
schema: "main",
...options
});
await conn.close();
return this;
}
static async of(files = []) {
const db = await makeDB();
await db.open({
query: {
castTimestampToDate: true
}
});
const toName = (file) =>
file.name.split(".").slice(0, -1).join(".").replace(/\@.+?/, "");
if (files.constructor.name === "FileAttachment") {
files = [[toName(files), files]];
} else if (!Array.isArray(files)) {
files = Object.entries(files);
}
await Promise.all(
files.map(async (entry) => {
let file;
let name;
let options = {};
if (Array.isArray(entry)) {
[name, file] = entry;
if (file.hasOwnProperty("file")) {
({ file, ...options } = file);
}
} else if (entry.constructor.name === "FileAttachment") {
[name, file] = [toName(entry), entry];
} else if (typeof entry === "object") {
({ file, name, ...options } = entry);
name = name ?? toName(file);
} else {
console.error("Unrecognized entry", entry);
}
console.log("entry", entry);
console.log("file", file);
console.log("name", name);
console.log("options", options);
if (!file.url && Array.isArray(file)) {
const data = file;
const table = arrow.tableFromJSON(data);
const buffer = arrow.tableToIPC(table);
const conn = await db.connect();
await conn.insertArrowFromIPCStream(buffer, {
name,
schema: "main",
...options
});
await conn.close();
return;
} else {
const url = await file.url();
if (url.indexOf("blob:") === 0) {
const buffer = await file.arrayBuffer();
await db.registerFileBuffer(file.name, new Uint8Array(buffer));
} else {
await db.registerFileURL(file.name, url);
}
}
const conn = await db.connect();
if (file.name.endsWith(".csv")) {
await conn.insertCSVFromPath(file.name, {
name,
schema: "main",
...options
});
} else if (file.name.endsWith(".json")) {
await conn.insertJSONFromPath(file.name, {
name,
schema: "main",
...options
});
} else if (file.name.endsWith(".parquet")) {
await conn.query(
`CREATE VIEW '${name}' AS SELECT * FROM parquet_scan('${file.name}')`
);
} else {
console.warn(`Don't know how to handle file type of ${file.name}`);
}
await conn.close();
})
);
return new DuckDBClient(db);
}
}