Published unlisted
Edited
Nov 8, 2020
1 star
Insert cell
md`# Radix sort with Indices`
Insert cell
/**
* Concept: flag sort, with a whole bunch of small optimizations:
*
* 1) We revert to insertion sort for small ranges (standard optimization, really)
*
* 2) Assume numbers fit in the smallest bit-width (8, 16, 24, 32, in that order)
* until proven otherwise. Using this we can skip doing multiple passes on arrays
* of (mostly) small numbers or sparse data. We also repeat this on sub-passes
* so in the unlikely case that we have values where only the upper and lower
* bits are set (that is, are like 0xFF0000FF), the middle passes are also skipped
*
* [3) The last pass is optimized by overwriting the last eight bits. Together with
* the previous point this makes sorting values < 256 much faster than native sorts
* for all arrays] - this is removed in the indirect indices version, obviously
*
* 4) With a bit of prep-work we can avoid swaps and directly write the values to
* the correct bucket, saving on precious memory bandwidth.
*
* 5) We check if a range is already sorted and bail out early - this should also
* benefit nearly sorted data, because one would expect many buckets to not need
* sorting, which allows us to bail out early on these buckets.
*
* 6) If one bucket contains all values in a single pass, we immediately recurse
* to the next pass (speeds up cases where all values are large but only the last
* bits have different values - similar but not quite the same as point 2)
*
* I won't bother with implementing a version that handles the floating point or 64 bit case,
* but in theory they could be radix sorted too (requires a copy to and from a typed array though)
*/
radixsort = {
// Make the backing array a continguous piece of memory for cache purposes
const countArrayBackingArray = new Uint32Array(256 * 5);
const countingArrays = [0, 1, 2, 3, 4].map(i =>
countArrayBackingArray.subarray(i * 256, (i + 1) * 256)
);

function _radixsort(
array,
indices,
left = 0,
right = array.length,
pass = 0
) {
// used to mask out the higher bits for later passes
// pass == 0 for the full 32 bits, pass == 3 for 8 bits
const mask = 0xFFFFFFFF >>> (8 * pass);
let val = (array[indices[left]] & mask) >>> 0;
let maxVal = val | 0;
let i = left;
// It's practically free to check if the array is already sorted
// because we spend most time waiting for reading values of the
// input array and updating the counting arrays
let sorted = true,
prev = val;

if (maxVal < 0x100) {
// 8 bit values
const countingArray = countingArrays[3];
countingArray.fill(0);
for (; i < right; i++) {
// We really only care about whether or not the max value
// requires 8, 16, 24 or 32 bit, so "maxVal" is a bit
//misleading here, but bitmasking should be faster than testing.
val = (array[indices[i]] & mask) >>> 0;
maxVal = (maxVal | val) >>> 0;
// if we are bigger than maxVal, the current index has to
// fall through to the next appropriate bit width
if (maxVal > 0xFF) break;
// can only reach this point if val < 256
countingArray[val]++;
// abusing shortcircuiting to avoid assignment
// in the more common unsorted case
sorted = sorted && prev <= (prev = val);
}
}
if (maxVal < 0x10000 && i < right) {
// 16 bit values
const countingArray = countingArrays[2];
countingArray.fill(0);
// if we already advanced a few points, we know that
// these will all go into the first bucket
countingArray[0] = i - left;
for (; i < right; i++) {
val = (array[indices[i]] & mask) >>> 0;
maxVal = (maxVal | (val & mask)) >>> 0;
if (maxVal > 0xFFFF) break;
countingArray[(val >>> 8) & 0xFF]++;
sorted = sorted && prev <= (prev = val);
}
}
if (maxVal < 0x1000000 && i < right) {
// 24 bit values
const countingArray = countingArrays[1];
countingArray.fill(0);
countingArray[0] = i - left;
for (; i < right; i++) {
val = (array[indices[i]] & mask) >>> 0;
maxVal = (maxVal | (val & mask)) >>> 0;
if (maxVal > 0xFFFFFF) break;
countingArray[(val >>> 16) & 0xFF]++;
sorted = sorted && prev <= (prev = val);
}
}
if (i < right) {
// remaining 32 bit values
const countingArray = countingArrays[0];
countingArray.fill(0);
countingArray[0] = i - left;
while (i < right) {
val = array[indices[i++]];
countingArray[val >>> 24]++;
sorted = sorted && prev <= (prev = val);
}
}

// Fast path for the sorted (sub)array case
if (sorted) return indices;

// Now we do the actual sorting. Again, based on maxVal
// we determine which approach we need. Regardless of
// the pass we originally had, we can skipp to the
// furthes one that can contain max value

if (maxVal < 0x100) {
pass = 3;
} else if (maxVal < 0x10000) {
pass = 2;
} else if (maxVal < 0x1000000) {
pass = 1;
}

const countingArray = countingArrays[pass];
const shift = 24 - pass * 8;

// Half-swap trick. We lift the first value of every non-empty bucket
// This leaves "holes" in every bucket to be filled. Then we move the
// first lifted value to the correct bucket, advance the idx of that
// bucket to its next value, then move that value, and repeat until
// we reach the end of a bucket. Then we repeat this with the next lifted
// until we run out of lifted values to move.
// This way we can avoid swaps: we can directly move values to the
// correct bucket instead.
let liftedValues = [];

// Array use for storing the starting indices each bucket,
// and convert the regular count array into marking the end-points
// of every bucket (which effectively are the starting indices of
// the next bucket).
const indexArray = countingArrays[4];

// Create indices array + lift values
if (countingArray[0] === right - left) {
// If one bucket contains all values in the current range then
// this pass will effectively do nothing. So in that case we
// can immediately recurse to the next pass (or return from the
// range if pass === 3 because that means it is already sorted
return _radixsort(array, indices, left, right, ++pass);
} else if (countingArray[0] > 0) {
liftedValues.push(indices[left]);
}
indexArray[0] = left;
for (let i = 1, prevCount = (countingArray[0] += left); i < 256; i++) {
let count = countingArray[i];
if (count === right - left)
return _radixsort(array, indices, left, right, ++pass);
else if (count > 0) liftedValues.push(indices[prevCount]);
indexArray[i] = prevCount;
prevCount += count;
countingArray[i] = prevCount;
}

// The part where we actually move the values to the correct bucket
for (let i = 0; i < liftedValues.length - 1; i++) {
// We put one of the values in the hole in the
// bucket it belongs, pick the next value from
// that bucket (leaving a hole), put that value
// into the hole in the bucket where it belongs,
// and repeat this process until we reach the end
// of a bucket. At that point we move onto the next
// value in the "hole" aray,
let index = liftedValues[i];
let val = array[index];
let bucket = (val >>> shift) & 0xFF;
let bIdx = indexArray[bucket]++;
let end = countingArray[bucket];
indices[bIdx++] = index;
while (bIdx < end) {
index = indices[bIdx];
val = array[index];
bucket = (val >>> shift) & 0xFF;
bIdx = indexArray[bucket]++;
end = countingArray[bucket];
indices[bIdx++] = index;
}
}
// Clean-up: plug the last value back into the last hole
// Here's a fun mini-optimization: if out of a total of N
// buckets N-1 are "sorted", then the last bucket can only
// contain values that should be there and therefore must
// already be sorted as well.
let index = liftedValues[liftedValues.length - 1];
val = array[index];
indices[indexArray[(val >>> shift) & 0xFF]] = index;

// Move on to the next pass
pass++;
for (let i = 0, sum = left; i < 256; i++) {
const nextSum = countingArray[i];
if (sum < nextSum) {
// insertion sort fallback for small ranges
if (nextSum - sum <= 40) {
for (let j = sum + 1, k; j < nextSum; j++) {
index = indices[j];
val = array[index];
for (k = j - 1; k >= sum && array[indices[k]] > val; k--) {
indices[k + 1] = indices[k];
}
if (++k < j) indices[k] = index;
}
} else {
_radixsort(array, indices, sum, nextSum, pass);
}
}
sum = nextSum;
}
return indices;
}

return function radixsort(array, indices) {
// All benchmarks so far suggest that for arrays
// smaller than this the native sorts definitely win
if (array.length <= 200) {
indices.sort((a, b) => array[a] - array[b]);
return indices;
}
return _radixsort(array, indices, 0, array.length, 0);
};
}
Insert cell
/*
Michael Herf has also discovered a good way to generalize this to floating point numbers: Reinterpret cast the float to a uint32, then flip every bit if the float was negative, but flip only the sign bit if the float was positive. The same trick works for doubles and uint64s. Michael Herf explains why this works in the linked piece, but the short version of it is this: Positive floating point numbers already sort correctly if we just reinterpret cast them to a uint32. The exponent comes before the mantissa, so we would sort by the exponent first, then by the mantissa. Everything works out. Negative floating point numbers however would sort the wrong way. Flipping all the bits on them fixes that. The final remaining problem is that positive floating point numbers need to sort as bigger than negative numbers, and the easiest way to do that is to flip the sign bit since it’s the most significant bit.

https://probablydance.com/2016/12/27/i-wrote-a-faster-sorting-algorithm/
*/
function radixF32(F32, indices) {
const U32 = new Uint32Array(F32.buffer);
for (let i = 0; i < U32.length; i++) {
const v = U32[i];
U32[i] = v & 0x80000000 ? v ^ 0xFFFFFFFF : v ^ 0x80000000;
}

radixsort(U32, indices);

for (let i = 0; i < U32.length; i++) {
const v = U32[i];
U32[i] = v & 0x80000000 ? v ^ 0x80000000 : v ^ 0xFFFFFFFF;
}
}
Insert cell
test_sort = {
if (true) {
const size = 0x10000;
const testInput = new Float32Array(size);
const original = testInput.slice();
const verification = testInput.slice();
const testIndices = new Uint32Array(size);
for (let i = 0; i < testIndices.length; i++) testIndices[i] = i;
let error;
const samples = 1000;
for (let i = 0; i < samples; i++) {
xorShiftF32Array(testInput);
original.set(testInput);
original.sort();
radixF32(testInput, testIndices);
verification.set(testInput);
verification.sort();
for (let j = 0; j < testInput.length; j++) {
if (testInput[testIndices[j]] !== original[j]) {
error = {
idx: j,
o: original[j],
t: testInput[testIndices[j]],
v: verification[j],
i: testIndices[j],
original,
testInput,
testIndices,
verification
};
break;
}
}
if (error) break;
yield `"${i + 1}/${samples} - no errors yet!`;
}
yield error || "no errors!";
} else {
yield "enable debugging to test";
}
}
Insert cell
xorShiftF32Array(new Float32Array(100))
Insert cell
// Since we're probably want to have lots of random values in typed arrays,
// we might as well operate directly on said arrays
xorShiftF32Array = {
let x = (Math.random() * 0x100000000) | 0;
return function xorShiftF32Array(f) {
const u = new Uint32Array(f.buffer);
let _x = x;
for (let i = 0; i < f.length; i++) {
_x ^= _x << 13;
_x ^= _x >> 17;
_x ^= _x << 5;
u[i] = (_x & 0x7fffff) | 0b111111100000000000000000000000;
f[i] = (f[i] - 1) * 255;
}
x = _x;
return f;
};
}
Insert cell

Purpose-built for displays of data

Observable is your go-to platform for exploring data and creating expressive data visualizations. Use reactive JavaScript notebooks for prototyping and a collaborative canvas for visual data exploration and dashboard creation.
Learn more