import { xxHash32 } from '../util/xxhash'; import { compressSigned, uncompressSigned } from '../util/FastIntegerCompression'; import { runLengthEncode, deltaEncode, quantize, deltaDecode, runLengthDecode, unquantize, } from '../util/compression'; // `HistoricalObject`s require the developer to pass in the // field names that'll be tracked and sent down to the client. // // By default, the historical tracking will round each floating point // value to an integer. The developer can specify more or less precision // via the `precision` parameter: the table's quantization will maintain // less than `1 / 2^precision` error. Note that higher precision values // imply less error. export type FieldConfig = Array; // `HistoricalObject`s support at most 16 fields. const MAX_FIELDS = 16; const PACKED_VERSION = 1; type NormalizedFieldConfig = Array<{ name: string; precision: number; }>; // The `History` structure represents the history of a continuous // value over all bounded time. Each sample represents a line // segment that's extends to the previous sample's time inclusively // and to the sample's time non-inclusively. We track an `initialValue` // that goes to `-\infty` up until the first sample, and the final // sample persists out to `+\infty`. // ``` // ^ // position // | // samples[0].value - | x---------------o // | // samples[1].value - | x--------> // | // initialValue - <---------o // | // ------------------------------> time // | | // samples[0].time samples[1].time // ``` export type History = { initialValue: number; samples: Sample[]; }; export type Sample = { time: number; value: number; }; // `HistoricalObject` tracks a set of numeric fields over time and // supports compressing the fields' histories into a binary buffer. // This can be useful for continuous properties like position, where // we'd want to smoothly replay their tick-by-tick progress at a high // frame rate on the client. // // `HistoricalObject`s have a few limitations: // - Documents in a historical can only have up to 16 fields. // - The historical tracking only applies to a specified list of fields, // and these fields must match between the client and server. export class HistoricalObject> { startTs?: number; fieldConfig: NormalizedFieldConfig; data: T; history: Record = {}; constructor(fields: FieldConfig, initialValue: T) { if (fields.length >= MAX_FIELDS) { throw new Error(`HistoricalObject can have at most ${MAX_FIELDS} fields.`); } this.fieldConfig = normalizeFieldConfig(fields); this.checkShape(initialValue); this.data = initialValue; } historyLength() { return Object.values(this.history) .map((h) => h.samples.length) .reduce((a, b) => a + b, 0); } checkShape(data: any) { for (const [key, value] of Object.entries(data)) { if (!this.fieldConfig.find((f) => f.name === key)) { throw new Error(`Cannot set undeclared field '${key}'`); } if (typeof value !== 'number') { throw new Error( `HistoricalObject only supports numeric values, found: ${JSON.stringify(value)}`, ); } } } update(now: number, data: T) { this.checkShape(data); for (const [key, value] of Object.entries(data)) { const currentValue = this.data[key]; if (currentValue !== value) { let history = this.history[key]; if (!history) { this.history[key] = history = { initialValue: currentValue, samples: [] }; } const { samples } = history; let inserted = false; if (samples.length > 0) { const last = samples[samples.length - 1]; if (now < last.time) { throw new Error(`Server time moving backwards: ${now} < ${last.time}`); } if (now === last.time) { last.value = value; inserted = true; } } if (!inserted) { samples.push({ time: now, value }); } } } this.data = data; } pack(): ArrayBuffer | null { if (this.historyLength() === 0) { return null; } return packSampleRecord(this.fieldConfig, this.history); } } // Pack (normalized) field configuration into a binary buffer. // // Format: // ``` // [ u8 version ] // for each field config: // [ u8 field name length ] // [ UTF8 encoded field name ] // [ u8 precision ] // ``` function packFieldConfig(fields: NormalizedFieldConfig) { const out = new ArrayBuffer(1024); const outView = new DataView(out); let pos = 0; outView.setUint8(pos, PACKED_VERSION); pos += 1; const encoder = new TextEncoder(); for (const fieldConfig of fields) { const name = encoder.encode(fieldConfig.name); outView.setUint8(pos, name.length); pos += 1; new Uint8Array(out, pos, name.length).set(name); pos += name.length; outView.setUint8(pos, fieldConfig.precision); pos += 1; } return out.slice(0, pos); } // Pack a document's sample record into a binary buffer. // // We encode each field's history with a few layered forms of // compression: // 1. Quantization: Turn each floating point number into an integer // by multiplying by 2^precision and then `Math.floor()`. // 2. Delta encoding: Assume that values are continuous and don't // abruptly change over time, so their differences will be small. // This step turns the large integers from (1) into small ones. // 3. Run length encoding (optional): Assume that some quantities // in the system will have constant velocity, so encode `k` // repetitions of `n` as `[k, n]`. If run length encoding doesn't // make (2) smaller, we skip it. // 4. Varint encoding: Using FastIntegerCompression.js, we use a // variable length integer encoding that uses fewer bytes for // smaller numbers. // // Format: // ``` // [ 4 byte xxhash of packed field config ] // // for each set field: // [ 0 0 0 useRLE? ] // [ u4 field number ] // // Sample timestamps: // [ u64le initial timestamp ] // [ u16le timestamp buffer length ] // [ vint(RLE(delta(remaining timestamps)))] // // Sample values: // [ u16le value buffer length ] // [ vint(RLE?(delta([initialValue, ...values])))] // ``` export function packSampleRecord( fields: NormalizedFieldConfig, sampleRecord: Record, ): ArrayBuffer { const out = new ArrayBuffer(65536); const outView = new DataView(out); let pos = 0; const configHash = xxHash32(new Uint8Array(packFieldConfig(fields))); outView.setUint32(pos, configHash, true); pos += 4; for (let fieldNumber = 0; fieldNumber < fields.length; fieldNumber += 1) { const { name, precision } = fields[fieldNumber]; const history = sampleRecord[name]; if (!history || history.samples.length === 0) { continue; } const timestamps = history.samples.map((s) => Math.floor(s.time)); const initialTimestamp = timestamps[0]; const encodedTimestamps = runLengthEncode(deltaEncode(timestamps.slice(1), initialTimestamp)); const compressedTimestamps = compressSigned(encodedTimestamps); if (compressedTimestamps.byteLength >= 1 << 16) { throw new Error(`Compressed buffer too long: ${compressedTimestamps.byteLength}`); } const values = [history.initialValue, ...history.samples.map((s) => s.value)]; const quantized = quantize(values, precision); const deltaEncoded = deltaEncode(quantized); const runLengthEncoded = runLengthEncode(deltaEncoded); // Decide if we're going to run length encode the values based on whether // it actually made the encoded buffer smaller. const useRLE = runLengthEncoded.length < deltaEncoded.length; let fieldHeader = fieldNumber; if (useRLE) { fieldHeader |= 1 << 4; } const encoded = useRLE ? runLengthEncoded : deltaEncoded; const compressed = compressSigned(encoded); if (compressed.byteLength >= 1 << 16) { throw new Error(`Compressed buffer too long: ${compressed.byteLength}`); } outView.setUint8(pos, fieldHeader); pos += 1; outView.setBigUint64(pos, BigInt(initialTimestamp), true); pos += 8; outView.setUint16(pos, compressedTimestamps.byteLength, true); pos += 2; new Uint8Array(out, pos, compressedTimestamps.byteLength).set( new Uint8Array(compressedTimestamps), ); pos += compressedTimestamps.byteLength; outView.setUint16(pos, compressed.byteLength, true); pos += 2; new Uint8Array(out, pos, compressed.byteLength).set(new Uint8Array(compressed)); pos += compressed.byteLength; } return out.slice(0, pos); } export function unpackSampleRecord(fields: FieldConfig, buffer: ArrayBuffer) { const view = new DataView(buffer); let pos = 0; const normalizedFields = normalizeFieldConfig(fields); const expectedConfigHash = xxHash32(new Uint8Array(packFieldConfig(normalizedFields))); const configHash = view.getUint32(pos, true); pos += 4; if (configHash !== expectedConfigHash) { throw new Error(`Config hash mismatch: ${configHash} !== ${expectedConfigHash}`); } const out = {} as Record; while (pos < buffer.byteLength) { const fieldHeader = view.getUint8(pos); pos += 1; const fieldNumber = fieldHeader & 0b00001111; const useRLE = (fieldHeader & (1 << 4)) !== 0; const fieldConfig = normalizedFields[fieldNumber]; if (!fieldConfig) { throw new Error(`Invalid field number: ${fieldNumber}`); } const initialTimestamp = Number(view.getBigUint64(pos, true)); pos += 8; const compressedTimestampLength = view.getUint16(pos, true); pos += 2; const compressedTimestampBuffer = buffer.slice(pos, pos + compressedTimestampLength); pos += compressedTimestampLength; const timestamps = [ initialTimestamp, ...deltaDecode( runLengthDecode(uncompressSigned(compressedTimestampBuffer)), initialTimestamp, ), ]; const compressedLength = view.getUint16(pos, true); pos += 2; const compressedBuffer = buffer.slice(pos, pos + compressedLength); pos += compressedLength; const encoded = uncompressSigned(compressedBuffer); const deltaEncoded = useRLE ? runLengthDecode(encoded) : encoded; const quantized = deltaDecode(deltaEncoded); const values = unquantize(quantized, fieldConfig.precision); if (timestamps.length + 1 !== values.length) { throw new Error(`Invalid sample record: ${timestamps.length} + 1 !== ${values.length}`); } const initialValue = values[0]; const samples = []; for (let i = 0; i < timestamps.length; i++) { const time = timestamps[i]; const value = values[i + 1]; samples.push({ value, time }); } const history = { initialValue, samples }; out[fieldConfig.name] = history; } return out; } function normalizeFieldConfig(fields: FieldConfig): NormalizedFieldConfig { return fields.map((f) => (typeof f === 'string' ? { name: f, precision: 0 } : f)); }