class DuckDBClient {
constructor(_db) {
this._db = _db;
this._counter = 0;
}
async queryStream(query, params) {
const conn = await this.connection();
let result;
if (params) {
const stmt = await conn.prepare(query);
result = await stmt.query(...params);
} else {
result = await conn.query(query);
}
const schema = result.schema.fields.map(({ name, type }) => ({
name,
type: getType(String(type)),
databaseType: String(type)
}));
return {
schema,
async *readRows() {
let rows = result.toArray().map((r) => Object.fromEntries(r));
yield rows;
}
};
}
queryTag(strings, ...params) {
return [strings.join("?"), params];
}
escape(name) {
return `"${name}"`;
}
async describeTables() {
const conn = await this.connection();
const tables = (await conn.query(`SHOW TABLES`)).toArray();
return tables.map(({ name }) => ({ name }));
}
async describeColumns({ table } = {}) {
const conn = await this.connection();
const columns = (await conn.query(`DESCRIBE ${table}`)).toArray();
return columns.map(({ column_name, column_type }) => {
return {
name: column_name,
type: getType(column_type),
databaseType: column_type
};
});
}
async db() {
if (!this._db) {
this._db = await makeDB();
await this._db.open({
query: {
castTimestampToDate: true
}
});
}
return this._db;
}
async connection() {
if (!this._conn) {
const db = await this.db();
this._conn = await db.connect();
}
return this._conn;
}
async reconnect() {
if (this._conn) {
this._conn.close();
}
delete this._conn;
}
// The `.queryStream` function will supercede this for SQL and Table cells
// Keeping this for backwards compatibility
async query(query, params) {
const key = `Query ${this._counter++}: ${query}`;
console.time(key);
const conn = await this.connection();
let result;
if (params) {
const stmt = await conn.prepare(query);
result = stmt.query(...params);
} else {
result = await conn.query(query);
}
console.timeEnd(key);
return result;
}
// The `.queryStream` function will supercede this for SQL and Table cells
// Keeping this for backwards compatibility
async sql(strings, ...args) {
// expected to be used like db.sql`select * from table where foo = ${param}`
// let queryWithParams = strings.join("?");
// if (typeof args !== 'undefined'){
// for (const param of args) {
// queryWithParams = queryWithParams.replace('?', param);
// }
// }
// const results = await this.query(queryWithParams);
const results = await this.query(strings.join("?"), args);
// return rows as a JavaScript array of objects for now
let rows = results.toArray().map(Object.fromEntries);
rows.columns = results.schema.fields.map((d) => d.name);
return rows;
}
async table(query, params, opts) {
const result = await this.query(query, params);
return Inputs.table(result, { layout: "auto", ...(opts || {}) });
}
// get the client after the query ran
async client(query, params) {
await this.query(query, params);
return this;
}
// query a single row
async queryRow(query, params) {
const key = `Query ${this._counter++}: ${query}`;
console.time(key);
const conn = await this.connection();
// use send as we can stop iterating after we get the first batch
const result = await conn.send(query, params);
const batch = (await result.next()).value;
console.timeEnd(key);
return batch?.get(0);
}
async explain(query, params) {
const row = await this.queryRow(`EXPLAIN ${query}`, params);
return element("pre", { className: "observablehq--inspect" }, [
text(row["explain_value"])
]);
}
// Describe the database (no arg) or a table
async describe(object) {
const result = await (object === undefined
? this.query(`SHOW TABLES`)
: this.query(`DESCRIBE ${object}`));
return Inputs.table(result);
}
// Summarize a query result
async summarize(query) {
const result = await this.query(`SUMMARIZE ${query}`);
return Inputs.table(result);
}
async insertJSON(name, buffer, options) {
const db = await this.db();
await db.registerFileBuffer(name, new Uint8Array(buffer));
const conn = await db.connect();
await conn.insertJSONFromPath(name, { name, schema: "main", ...options });
await conn.close();
return this;
}
async insertCSV(name, file, options) {
const db = await this.db();
async function fileToArrayBuffer(file) {
return new Promise((resolve, reject) => {
const reader = new FileReader();
reader.onload = () => resolve(reader.result);
reader.onerror = () => reject(reader.error);
reader.readAsArrayBuffer(file);
});
}
async function tryInsert(buffer, encoding, options) {
const decoder = new TextDecoder(encoding);
const decodedText = decoder.decode(buffer);
const encodedBuffer = new TextEncoder().encode(decodedText);
try {
await db.registerFileBuffer(name, encodedBuffer);
const conn = await db.connect();
await conn.insertCSVFromPath(name, {
name,
schema: "main",
...options
});
await conn.close();
} catch (e) {
console.error(`Failed to insert CSV with encoding ${encoding}:`, e);
throw e;
}
}
async function cleanCSV(buffer, delimiter = ";", maxLineLength = 2097152) {
const decoder = new TextDecoder("utf-8");
const decodedText = decoder.decode(buffer);
const lines = decodedText.split("\n");
const header = lines[0].split(delimiter);
const expectedColumns = header.length;
const cleanedLines = [];
lines.forEach((line) => {
// Fix unpaired quotes
let quoteCount = 0;
for (let char of line) {
if (char === '"') {
quoteCount++;
}
}
if (quoteCount % 2 !== 0) {
line += '"';
}
let values = line.split(delimiter);
if (values.length < expectedColumns) {
// Add missing values
values = values.concat(
new Array(expectedColumns - values.length).fill("")
);
} else if (values.length > expectedColumns) {
// Trim excess values
values = values.slice(0, expectedColumns);
}
let cleanedLine = values.join(delimiter);
// Split lines that exceed the maximum length
while (cleanedLine.length > maxLineLength) {
cleanedLines.push(cleanedLine.slice(0, maxLineLength));
cleanedLine = cleanedLine.slice(maxLineLength);
}
cleanedLines.push(cleanedLine);
});
const cleanedText = cleanedLines.join("\n");
const encoder = new TextEncoder();
return encoder.encode(cleanedText);
}
async function readFileInChunks(file, chunkSize = 1024 * 1024) {
// 1MB chunk size
const fileSize = file.size;
let offset = 0;
const chunks = [];
while (offset < fileSize) {
const slice = file.slice(offset, offset + chunkSize);
const buffer = await fileToArrayBuffer(slice);
chunks.push(buffer);
offset += chunkSize;
}
return chunks;
}
const fileChunks = await readFileInChunks(file);
let combinedBuffer = new Uint8Array(file.size);
let position = 0;
for (const chunk of fileChunks) {
combinedBuffer.set(new Uint8Array(chunk), position);
position += chunk.byteLength;
}
// Detect initial encoding
const initialBuffer = combinedBuffer.slice(0, 1024);
const initialEncoding = chardet.detect(new Uint8Array(initialBuffer));
console.log(`Detected initial encoding: ${initialEncoding}`);
const fallbackEncodings = ["latin1", "windows-1252", "UTF-16", "ASCII"];
let successfulInsert = false;
let encounteredError = false;
for (const encoding of fallbackEncodings) {
try {
await tryInsert(combinedBuffer, encoding, options);
successfulInsert = true;
break;
} catch (e) {
if (
e.message.includes("Invalid Input Error") ||
e.message.includes("Maximum line size")
) {
encounteredError = true;
console.warn(`Error with encoding ${encoding}, will try cleaning...`);
break;
}
console.warn(`Failed with encoding ${encoding}, trying next...`);
}
}
if (!successfulInsert && encounteredError) {
console.warn("Cleaning CSV data and retrying...");
const cleanedBuffer = await cleanCSV(combinedBuffer);
await tryInsert(cleanedBuffer, "utf-8", options);
}
return this;
}
async insertParquet(name, buffer) {
const db = await this.db();
await db.registerFileBuffer(name, new Uint8Array(buffer));
const conn = await db.connect();
await conn.query(
`CREATE VIEW '${name}' AS SELECT * FROM parquet_scan('${name}')`
);
await conn.close();
return this;
}
async insertArrowTable(name, table, options) {
const buffer = arrow.tableToIPC(table);
return this.insertArrowFromIPCStream(name, buffer, options);
}
async insertArrowFromIPCStream(name, buffer, options) {
const db = await this.db();
const conn = await db.connect();
await conn.insertArrowFromIPCStream(buffer, {
name,
schema: "main",
...options
});
await conn.close();
return this;
}
// Create a database from FileArrachments
static async of(files = []) {
const db = await makeDB();
await db.open({
query: {
castTimestampToDate: true
}
});
const toName = (file) =>
file.name.split(".").slice(0, -1).join(".").replace(/\@.+?/, ""); // remove the "@X" versions Observable adds to file names
if (files.constructor.name === "FileAttachment") {
files = [[toName(files), files]];
} else if (!Array.isArray(files)) {
files = Object.entries(files);
}
// Add all files to the database. Import JSON and CSV. Create view for Parquet.
await Promise.all(
files.map(async (entry) => {
let file;
let name;
let options = {};
if (Array.isArray(entry)) {
[name, file] = entry;
if (file.hasOwnProperty("file")) {
({ file, ...options } = file);
}
} else if (entry.constructor.name === "FileAttachment") {
[name, file] = [toName(entry), entry];
} else if (typeof entry === "object") {
({ file, name, ...options } = entry);
name = name ?? toName(file);
} else {
console.error("Unrecognized entry", entry);
}
console.log("entry", entry);
console.log("file", file);
console.log("name", name);
console.log("options", options);
if (!file.url && Array.isArray(file)) {
const data = file;
// file = { name: name + ".json" };
// db.registerFileText(`${name}.json`, JSON.stringify(data));
const table = arrow.tableFromJSON(data);
const buffer = arrow.tableToIPC(table);
const conn = await db.connect();
await conn.insertArrowFromIPCStream(buffer, {
name,
schema: "main",
...options
});
await conn.close();
return;
} else {
const url = await file.url();
if (url.indexOf("blob:") === 0) {
const buffer = await file.arrayBuffer();
await db.registerFileBuffer(file.name, new Uint8Array(buffer));
} else {
await db.registerFileURL(file.name, url);
}
}
const conn = await db.connect();
if (file.name.endsWith(".csv")) {
await conn.insertCSVFromPath(file.name, {
name,
schema: "main",
...options
});
} else if (file.name.endsWith(".json")) {
await conn.insertJSONFromPath(file.name, {
name,
schema: "main",
...options
});
} else if (file.name.endsWith(".parquet")) {
await conn.query(
`CREATE VIEW '${name}' AS SELECT * FROM parquet_scan('${file.name}')`
);
} else {
console.warn(`Don't know how to handle file type of ${file.name}`);
}
await conn.close();
})
);
return new DuckDBClient(db);
}
}