StackGenVis: Alignment of Data, Algorithms, and Models for Stacking Ensemble Learning Using Performance Metrics
function multikey(f) {
return function(x) {
var n = f.length,
i = 1,
k = String(f[0](x));
for (; i<n; ++i) {
k += '|' + f[i](x);
return k;
function groupkey(fields) {
return !fields || !fields.length ? function() { return ''; }
: fields.length === 1 ? fields[0]
: multikey(fields);
function measureName(op, field, as) {
return as || (op + (!field ? '' : '_' + field));
var AggregateOps = {
'values': measure({
name: 'values',
init: ' = true;',
set: '', idx: -1
'count': measure({
name: 'count',
set: 'cell.num'
'__count__': measure({
name: 'count',
set: 'this.missing + this.valid'
'missing': measure({
name: 'missing',
set: 'this.missing'
'valid': measure({
name: 'valid',
set: 'this.valid'
'sum': measure({
name: 'sum',
init: 'this.sum = 0;',
add: 'this.sum += +v;',
rem: 'this.sum -= v;',
set: 'this.sum'
'mean': measure({
name: 'mean',
init: 'this.mean = 0;',
add: 'var d = v - this.mean; this.mean += d / this.valid;',
rem: 'var d = v - this.mean; this.mean -= this.valid ? d / this.valid : this.mean;',
set: 'this.valid ? this.mean : undefined'
'average': measure({
name: 'average',
set: 'this.valid ? this.mean : undefined',
req: ['mean'], idx: 1
'variance': measure({
name: 'variance',
init: ' = 0;',
add: ' += d * (v - this.mean);',
rem: ' -= d * (v - this.mean);',
set: 'this.valid > 1 ? / (this.valid-1) : undefined',
req: ['mean'], idx: 1
'variancep': measure({
name: 'variancep',
set: 'this.valid > 1 ? / this.valid : undefined',
req: ['variance'], idx: 2
'stdev': measure({
name: 'stdev',
set: 'this.valid > 1 ? Math.sqrt( / (this.valid-1)) : undefined',
req: ['variance'], idx: 2
'stdevp': measure({
name: 'stdevp',
set: 'this.valid > 1 ? Math.sqrt( / this.valid) : undefined',
req: ['variance'], idx: 2
'stderr': measure({
name: 'stderr',
set: 'this.valid > 1 ? Math.sqrt( / (this.valid * (this.valid-1))) : undefined',
req: ['variance'], idx: 2
'distinct': measure({
name: 'distinct',
set: '',
req: ['values'], idx: 3
'ci0': measure({
name: 'ci0',
set: '',
req: ['values'], idx: 3
'ci1': measure({
name: 'ci1',
set: '',
req: ['values'], idx: 3
'median': measure({
name: 'median',
set: '',
req: ['values'], idx: 3
'q1': measure({
name: 'q1',
set: '',
req: ['values'], idx: 3
'q3': measure({
name: 'q3',
set: '',
req: ['values'], idx: 3
'argmin': measure({
name: 'argmin',
init: 'this.argmin = undefined;',
add: 'if (v < this.min) this.argmin = t;',
rem: 'if (v <= this.min) this.argmin = undefined;',
set: 'this.argmin ||',
req: ['min'], str: ['values'], idx: 3
'argmax': measure({
name: 'argmax',
init: 'this.argmax = undefined;',
add: 'if (v > this.max) this.argmax = t;',
rem: 'if (v >= this.max) this.argmax = undefined;',
set: 'this.argmax ||',
req: ['max'], str: ['values'], idx: 3
'min': measure({
name: 'min',
init: 'this.min = undefined;',
add: 'if (v < this.min || this.min === undefined) this.min = v;',
rem: 'if (v <= this.min) this.min = NaN;',
set: 'this.min = (Number.isNaN(this.min) ? : this.min)',
str: ['values'], idx: 4
'max': measure({
name: 'max',
init: 'this.max = undefined;',
add: 'if (v > this.max || this.max === undefined) this.max = v;',
rem: 'if (v >= this.max) this.max = NaN;',
set: 'this.max = (Number.isNaN(this.max) ? : this.max)',
str: ['values'], idx: 4
var ValidAggregateOps = Object.keys(AggregateOps);
function createMeasure(op, name) {
return AggregateOps[op](name);
function measure(base) {
return function(out) {
var m = vegaUtil.extend({init:'', add:'', rem:'', idx:0}, base);
m.out = out ||;
return m;
function compareIndex(a, b) {
return a.idx - b.idx;
function resolve(agg, stream) {
function collect(m, a) {
function helper(r) { if (!m[r]) collect(m, m[r] = AggregateOps[r]()); }
if (a.req) a.req.forEach(helper);
if (stream && a.str) a.str.forEach(helper);
return m;
var map = agg.reduce(
agg.reduce(function(m, a) {
m[] = a;
return m;
}, {})
var values = [], key;
for (key in map) values.push(map[key]);
return values.sort(compareIndex);
function compileMeasures(agg, field) {
var get = field || vegaUtil.identity,
all = resolve(agg, true), // assume streaming removes may occur
init = 'var cell = this.cell; this.valid = 0; this.missing = 0;',
ctr = 'this.cell = cell; this.init();',
add = 'if(v==null){++this.missing; return;} if(v!==v) return; ++this.valid;',
rem = 'if(v==null){--this.missing; return;} if(v!==v) return; --this.valid;',
set = 'var cell = this.cell;';
all.forEach(function(a) {
init += a.init;
add += a.add;
rem += a.rem;
agg.slice().sort(compareIndex).forEach(function(a) {
set += 't[' + vegaUtil.stringValue(a.out) + ']=' + a.set + ';';
set += 'return t;';
ctr = Function('cell', ctr);
ctr.prototype.init = Function(init);
ctr.prototype.add = Function('v', 't', add);
ctr.prototype.rem = Function('v', 't', rem);
ctr.prototype.set = Function('t', set);
ctr.prototype.get = get;
ctr.fields = { return _.out; });
return ctr;
function TupleStore(key) {
this._key = key ? vegaUtil.field(key) : vegaDataflow.tupleid;
var prototype = TupleStore.prototype;
prototype.reset = function() {
this._add = [];
this._rem = [];
this._ext = null;
this._get = null;
this._q = null;
prototype.add = function(v) {
prototype.rem = function(v) {
prototype.values = function() {
this._get = null;
if (this._rem.length === 0) return this._add;
var a = this._add,
r = this._rem,
k = this._key,
n = a.length,
m = r.length,
x = Array(n - m),
map = {}, i, j, v;
// use unique key field to clear removed values
for (i=0; i<m; ++i) {
map[k(r[i])] = 1;
for (i=0, j=0; i<n; ++i) {
if (map[k(v = a[i])]) {
map[k(v)] = 0;
} else {
x[j++] = v;
this._rem = [];
return (this._add = x);
// memoizing statistics methods
prototype.distinct = function(get) {
var v = this.values(),
n = v.length,
map = {},
count = 0, s;
while (--n >= 0) {
s = get(v[n]) + '';
if (!vegaUtil.hasOwnProperty(map, s)) {
map[s] = 1;
return count;
prototype.extent = function(get) {
if (this._get !== get || !this._ext) {
var v = this.values(),
i = vegaUtil.extentIndex(v, get);
this._ext = [v[i[0]], v[i[1]]];
this._get = get;
return this._ext;
prototype.argmin = function(get) {
return this.extent(get)[0] || {};
prototype.argmax = function(get) {
return this.extent(get)[1] || {};
prototype.min = function(get) {
var m = this.extent(get)[0];
return m != null ? get(m) : undefined;
prototype.max = function(get) {
var m = this.extent(get)[1];
return m != null ? get(m) : undefined;
prototype.quartile = function(get) {
if (this._get !== get || !this._q) {
this._q = vegaStatistics.quartiles(this.values(), get);
this._get = get;
return this._q;
prototype.q1 = function(get) {
return this.quartile(get)[0];
prototype.q2 = function(get) {
return this.quartile(get)[1];
prototype.q3 = function(get) {
return this.quartile(get)[2];
}; = function(get) {
if (this._get !== get || !this._ci) {
this._ci = vegaStatistics.bootstrapCI(this.values(), 1000, 0.05, get);
this._get = get;
return this._ci;
prototype.ci0 = function(get) {
prototype.ci1 = function(get) {
* Group-by aggregation operator.
* @constructor
* @param {object} params - The parameters for this operator.
* @param {Array<function(object): *>} [params.groupby] - An array of accessors to groupby.
* @param {Array<function(object): *>} [params.fields] - An array of accessors to aggregate.
* @param {Array<string>} [params.ops] - An array of strings indicating aggregation operations.
* @param {Array<string>} [] - An array of output field names for aggregated values.
* @param {boolean} [params.cross=false] - A flag indicating that the full
* cross-product of groupby values should be generated, including empty cells.
* If true, the drop parameter is ignored and empty cells are retained.
* @param {boolean} [params.drop=true] - A flag indicating if empty cells should be removed.
function Aggregate(params) {, null, params);
this._adds = []; // array of added output tuples
this._mods = []; // array of modified output tuples
this._alen = 0; // number of active added tuples
this._mlen = 0; // number of active modified tuples
this._drop = true; // should empty aggregation cells be removed
this._cross = false; // produce full cross-product of group-by values
this._dims = []; // group-by dimension accessors
this._dnames = []; // group-by dimension names
this._measures = []; // collection of aggregation monoids
this._countOnly = false; // flag indicating only count aggregation
this._counts = null; // collection of count fields
this._prev = null; // previous aggregation cells
this._inputs = null; // array of dependent input tuple field names
this._outputs = null; // array of output tuple field names
Aggregate.Definition = {
"type": "Aggregate",
"metadata": {"generates": true, "changes": true},
"params": [
{ "name": "groupby", "type": "field", "array": true },
{ "name": "ops", "type": "enum", "array": true, "values": ValidAggregateOps },
{ "name": "fields", "type": "field", "null": true, "array": true },
{ "name": "as", "type": "string", "null": true, "array": true },
{ "name": "drop", "type": "boolean", "default": true },
{ "name": "cross", "type": "boolean", "default": false },
{ "name": "key", "type": "field" }
var prototype$1 = vegaUtil.inherits(Aggregate, vegaDataflow.Transform);
prototype$1.transform = function(_, pulse) {
var aggr = this,
out = pulse.fork(pulse.NO_SOURCE | pulse.NO_FIELDS),
mod = _.modified();
aggr.stamp = out.stamp;
if (aggr.value && (mod || pulse.modified(aggr._inputs, true))) {
aggr._prev = aggr.value;
aggr.value = mod ? aggr.init(_) : {};
pulse.visit(pulse.SOURCE, t => aggr.add(t));
} else {
aggr.value = aggr.value || aggr.init(_);
pulse.visit(pulse.REM, t => aggr.rem(t));
pulse.visit(pulse.ADD, t => aggr.add(t));
// Indicate output fields and return aggregate tuples.
// Should empty cells be dropped?
aggr._drop = _.drop !== false;
// If domain cross-product requested, generate empty cells as needed
// and ensure that empty cells are not dropped
if (_.cross && aggr._dims.length > 1) {
aggr._drop = false;
return aggr.changes(out);
prototype$1.cross = function() {
var aggr = this,
curr = aggr.value,
dims = aggr._dnames,
vals = { return {}; }),
n = dims.length;
// collect all group-by domain values
function collect(cells) {
var key, i, t, v;
for (key in cells) {
t = cells[key].tuple;
for (i=0; i<n; ++i) {
vals[i][(v = t[dims[i]])] = v;
// iterate over key cross-product, create cells as needed
function generate(base, tuple, index) {
var name = dims[index],
v = vals[index++],
k, key;
for (k in v) {
tuple[name] = v[k];
key = base ? base + '|' + k : k;
if (index < n) generate(key, tuple, index);
else if (!curr[key]) aggr.cell(key, tuple);
generate('', {}, 0);
prototype$1.init = function(_) {
// initialize input and output fields
var inputs = (this._inputs = []),
outputs = (this._outputs = []),
inputMap = {};
function inputVisit(get) {
var fields = vegaUtil.array(vegaUtil.accessorFields(get)),
i = 0, n = fields.length, f;
for (; i<n; ++i) {
if (!inputMap[f=fields[i]]) {
inputMap[f] = 1;
// initialize group-by dimensions
this._dims = vegaUtil.array(_.groupby);
this._dnames = {
var dname = vegaUtil.accessorName(d);
return dname;
this.cellkey = _.key ? _.key : groupkey(this._dims);
// initialize aggregate measures
this._countOnly = true;
this._counts = [];
this._measures = [];
var fields = _.fields || [null],
ops = _.ops || ['count'],
as = || [],
n = fields.length,
map = {},
field, op, m, mname, outname, i;
if (n !== ops.length) {
vegaUtil.error('Unmatched number of fields and aggregate ops.');
for (i=0; i<n; ++i) {
field = fields[i];
op = ops[i];
if (field == null && op !== 'count') {
vegaUtil.error('Null aggregate field specified.');
mname = vegaUtil.accessorName(field);
outname = measureName(op, mname, as[i]);
if (op === 'count') {
m = map[mname];
if (!m) {
m = (map[mname] = []);
m.field = field;
if (op !== 'count') this._countOnly = false;
m.push(createMeasure(op, outname));
this._measures = {
return compileMeasures(m, m.field);
return {}; // aggregation cells (this.value)
// -- Cell Management -----
prototype$1.cellkey = groupkey();
prototype$1.cell = function(key, t) {
var cell = this.value[key];
if (!cell) {
cell = this.value[key] = this.newcell(key, t);
this._adds[this._alen++] = cell;
} else if (cell.num === 0 && this._drop && cell.stamp < this.stamp) {
cell.stamp = this.stamp;
this._adds[this._alen++] = cell;
} else if (cell.stamp < this.stamp) {
cell.stamp = this.stamp;
this._mods[this._mlen++] = cell;
return cell;
prototype$1.newcell = function(key, t) {
var cell = {
key: key,
num: 0,
agg: null,
tuple: this.newtuple(t, this._prev && this._prev[key]),
stamp: this.stamp,
store: false
if (!this._countOnly) {
var measures = this._measures,
n = measures.length, i;
cell.agg = Array(n);
for (i=0; i<n; ++i) {
cell.agg[i] = new measures[i](cell);
if ( { = new TupleStore();
return cell;
prototype$1.newtuple = function(t, p) {
var names = this._dnames,
dims = this._dims,
x = {}, i, n;
for (i=0, n=dims.length; i<n; ++i) {
x[names[i]] = dims[i](t);
return p ? vegaDataflow.replace(p.tuple, x) : vegaDataflow.ingest(x);
// -- Process Tuples -----
prototype$1.add = function(t) {
var key = this.cellkey(t),
cell = this.cell(key, t),
agg, i, n;
cell.num += 1;
if (this._countOnly) return;
if (;
agg = cell.agg;
for (i=0, n=agg.length; i<n; ++i) {
agg[i].add(agg[i].get(t), t);
prototype$1.rem = function(t) {
var key = this.cellkey(t),
cell = this.cell(key, t),
agg, i, n;
cell.num -= 1;
if (this._countOnly) return;
if (;
agg = cell.agg;
for (i=0, n=agg.length; i<n; ++i) {
agg[i].rem(agg[i].get(t), t);
prototype$1.celltuple = function(cell) {
var tuple = cell.tuple,
counts = this._counts,
agg, i, n;
// consolidate stored values
if ( {;
// update tuple properties
for (i=0, n=counts.length; i<n; ++i) {
tuple[counts[i]] = cell.num;
if (!this._countOnly) {
agg = cell.agg;
for (i=0, n=agg.length; i<n; ++i) {
return tuple;
prototype$1.changes = function(out) {
var adds = this._adds,
mods = this._mods,
prev = this._prev,
drop = this._drop,
add = out.add,
rem = out.rem,
mod = out.mod,
cell, key, i, n;
if (prev) for (key in prev) {
cell = prev[key];
if (!drop || cell.num) rem.push(cell.tuple);
for (i=0, n=this._alen; i<n; ++i) {
adds[i] = null; // for garbage collection
for (i=0, n=this._mlen; i<n; ++i) {
cell = mods[i];
(cell.num === 0 && drop ? rem : mod).push(this.celltuple(cell));
mods[i] = null; // for garbage collection
this._alen = this._mlen = 0; // reset list of active cells
this._prev = null;
return out;
// epsilon bias to offset floating point error (#1737)
const EPSILON = 1e-14;
* Generates a binning function for discretizing data.
* @constructor
* @param {object} params - The parameters for this operator. The
* provided values should be valid options for the {@link bin} function.
* @param {function(object): *} params.field - The data field to bin.
function Bin(params) {, null, params);
Bin.Definition = {
"type": "Bin",
"metadata": {"modifies": true},
"params": [
{ "name": "field", "type": "field", "required": true },
{ "name": "interval", "type": "boolean", "default": true },
{ "name": "anchor", "type": "number" },
{ "name": "maxbins", "type": "number", "default": 20 },
{ "name": "base", "type": "number", "default": 10 },
{ "name": "divide", "type": "number", "array": true, "default": [5, 2] },
{ "name": "extent", "type": "number", "array": true, "length": 2, "required": true },
{ "name": "span", "type": "number" },
{ "name": "step", "type": "number" },
{ "name": "steps", "type": "number", "array": true },
{ "name": "minstep", "type": "number", "default": 0 },
{ "name": "nice", "type": "boolean", "default": true },
{ "name": "name", "type": "string" },
{ "name": "as", "type": "string", "array": true, "length": 2, "default": ["bin0", "bin1"] }
var prototype$2 = vegaUtil.inherits(Bin, vegaDataflow.Transform);
prototype$2.transform = function(_, pulse) {
var band = _.interval !== false,
bins = this._bins(_),
start = bins.start,
step = bins.step,
as = || ['bin0', 'bin1'],
b0 = as[0],
b1 = as[1],
if (_.modified()) {
pulse = pulse.reflow(true);
flag = pulse.SOURCE;
} else {
flag = pulse.modified(vegaUtil.accessorFields(_.field)) ? pulse.ADD_MOD : pulse.ADD;
pulse.visit(flag, band
? function(t) {
var v = bins(t);
// minimum bin value (inclusive)
t[b0] = v;
// maximum bin value (exclusive)
// use convoluted math for better floating point agreement
// see
// infinite values propagate through this formula! #2227
t[b1] = v == null ? null : start + step * (1 + (v - start) / step);
: function(t) { t[b0] = bins(t); }
return pulse.modifies(band ? as : b0);
prototype$2._bins = function(_) {
if (this.value && !_.modified()) {
return this.value;
var field = _.field,
bins = vegaStatistics.bin(_),
step = bins.step,
start = bins.start,
stop = start + Math.ceil((bins.stop - start) / step) * step,
a, d;
if ((a = _.anchor) != null) {
d = a - (start + step * Math.floor((a - start) / step));
start += d;
stop += d;
var f = function(t) {
var v = field(t);
return v == null ? null
: v < start ? -Infinity
: v > stop ? +Infinity
: (
v = Math.max(start, Math.min(+v, stop - step)),
start + step * Math.floor(EPSILON + (v - start) / step)
f.start = start;
f.stop = bins.stop;
f.step = step;
return this.value = vegaUtil.accessor(
vegaUtil.accessorFields(field), || 'bin_' + vegaUtil.accessorName(field)
function SortedList(idFunc, source, input) {
var $ = idFunc,
data = source || [],
add = input || [],
rem = {},
cnt = 0;
return {
add: function(t) { add.push(t); },
remove: function(t) { rem[$(t)] = ++cnt; },
size: function() { return data.length; },
data: function(compare, resort) {
if (cnt) {
data = data.filter(function(t) { return !rem[$(t)]; });
rem = {};
cnt = 0;
if (resort && compare) {
if (add.length) {
data = compare
? vegaUtil.merge(compare, data, add.sort(compare))
: data.concat(add);
add = [];
return data;
* Collects all data tuples that pass through this operator.
* @constructor
* @param {object} params - The parameters for this operator.
* @param {function(*,*): number} [params.sort] - An optional
* comparator function for additionally sorting the collected tuples.
function Collect(params) {, [], params);
Collect.Definition = {
"type": "Collect",
"metadata": {"source": true},
"params": [
{ "name": "sort", "type": "compare" }
var prototype$3 = vegaUtil.inherits(Collect, vegaDataflow.Transform);
prototype$3.transform = function(_, pulse) {
var out = pulse.fork(pulse.ALL),
list = SortedList(vegaDataflow.tupleid, this.value, out.materialize(out.ADD).add),
sort = _.sort,
mod = pulse.changed() || (sort &&
(_.modified('sort') || pulse.modified(sort.fields)));
out.visit(out.REM, list.remove);
this.value = out.source =, mod);
// propagate tree root if defined
if (pulse.source && pulse.source.root) {
this.value.root = pulse.source.root;
return out;
* Generates a comparator function.
* @constructor
* @param {object} params - The parameters for this operator.
* @param {Array<string|function>} params.fields - The fields to compare.
* @param {Array<string>} [params.orders] - The sort orders.
* Each entry should be one of "ascending" (default) or "descending".
function Compare(params) {, null, update, params);
vegaUtil.inherits(Compare, vegaDataflow.Operator);
function update(_) {
return (this.value && !_.modified())
? this.value
:, _.orders);
* Count regexp-defined pattern occurrences in a text field.
* @constructor
* @param {object} params - The parameters for this operator.
* @param {function(object): *} params.field - An accessor for the text field.
* @param {string} [params.pattern] - RegExp string defining the text pattern.
* @param {string} [] - One of 'lower', 'upper' or null (mixed) case.
* @param {string} [params.stopwords] - RegExp string of words to ignore.
function CountPattern(params) {, null, params);
CountPattern.Definition = {
"type": "CountPattern",
"metadata": {"generates": true, "changes": true},
"params": [
{ "name": "field", "type": "field", "required": true },
{ "name": "case", "type": "enum", "values": ["upper", "lower", "mixed"], "default": "mixed" },
{ "name": "pattern", "type": "string", "default": "[\\w\"]+" },
{ "name": "stopwords", "type": "string", "default": "" },
{ "name": "as", "type": "string", "array": true, "length": 2, "default": ["text", "count"] }
function tokenize(text, tcase, match) {
switch (tcase) {
case 'upper': text = text.toUpperCase(); break;
case 'lower': text = text.toLowerCase(); break;
return text.match(match);
var prototype$4 = vegaUtil.inherits(CountPattern, vegaDataflow.Transform);
prototype$4.transform = function(_, pulse) {
function process(update) {
return function(tuple) {
var tokens = tokenize(get(tuple),, match) || [], t;
for (var i=0, n=tokens.length; i<n; ++i) {
if (!stop.test(t = tokens[i])) update(t);
var init = this._parameterCheck(_, pulse),
counts = this._counts,
match = this._match,
stop = this._stop,
get = _.field,
as = || ['text', 'count'],
add = process(function(t) { counts[t] = 1 + (counts[t] || 0); }),
rem = process(function(t) { counts[t] -= 1; });
if (init) {
pulse.visit(pulse.SOURCE, add);
} else {
pulse.visit(pulse.ADD, add);
pulse.visit(pulse.REM, rem);
return this._finish(pulse, as); // generate output tuples
prototype$4._parameterCheck = function(_, pulse) {
var init = false;
if (_.modified('stopwords') || !this._stop) {
this._stop = new RegExp('^' + (_.stopwords || '') + '$', 'i');
init = true;
if (_.modified('pattern') || !this._match) {
this._match = new RegExp((_.pattern || '[\\w\']+'), 'g');
init = true;
if (_.modified('field') || pulse.modified(_.field.fields)) {
init = true;
if (init) this._counts = {};
return init;
prototype$4._finish = function(pulse, as) {
var counts = this._counts,
tuples = this._tuples || (this._tuples = {}),
text = as[0],
count = as[1],
out = pulse.fork(pulse.NO_SOURCE | pulse.NO_FIELDS),
w, t, c;
for (w in counts) {
t = tuples[w];
c = counts[w] || 0;
if (!t && c) {
tuples[w] = (t = vegaDataflow.ingest({}));
t[text] = w;
t[count] = c;
} else if (c === 0) {
if (t) out.rem.push(t);
counts[w] = null;
tuples[w] = null;
} else if (t[count] !== c) {
t[count] = c;
return out.modifies(as);
* Perform a cross-product of a tuple stream with itself.
* @constructor
* @param {object} params - The parameters for this operator.
* @param {function(object):boolean} [params.filter] - An optional filter
* function for selectively including tuples in the cross product.
* @param {Array<string>} [] - The names of the output fields.
function Cross(params) {, null, params);
Cross.Definition = {
"type": "Cross",
"metadata": {"generates": true},
"params": [
{ "name": "filter", "type": "expr" },
{ "name": "as", "type": "string", "array": true, "length": 2, "default": ["a", "b"] }
var prototype$5 = vegaUtil.inherits(Cross, vegaDataflow.Transform);
prototype$5.transform = function(_, pulse) {
var out = pulse.fork(pulse.NO_SOURCE),
data = this.value,
as = || ['a', 'b'],
a = as[0], b = as[1],
reset = !data
|| pulse.changed(pulse.ADD_REM)
|| _.modified('as')
|| _.modified('filter');
if (reset) {
if (data) out.rem = data;
data = pulse.materialize(pulse.SOURCE).source;
out.add = this.value = cross(data, a, b, _.filter || vegaUtil.truthy);
} else {
out.mod = data;
out.source = this.value;
return out.modifies(as);
function cross(input, a, b, filter) {
var data = [],
t = {},
n = input.length,
i = 0,
j, left;
for (; i<n; ++i) {
t[a] = left = input[i];
for (j=0; j<n; ++j) {
t[b] = input[j];
if (filter(t)) {
t = {};
t[a] = left;
return data;
var Distributions = {
kde: vegaStatistics.randomKDE,
mixture: vegaStatistics.randomMixture,
normal: vegaStatistics.randomNormal,
lognormal: vegaStatistics.randomLogNormal,
uniform: vegaStatistics.randomUniform
var DISTRIBUTIONS = 'distributions',
FUNCTION = 'function',
FIELD = 'field';
* Parse a parameter object for a probability distribution.
* @param {object} def - The distribution parameter object.
* @param {function():Array<object>} - A method for requesting
* source data. Used for distributions (such as KDE) that
* require sample data points. This method will only be
* invoked if the 'from' parameter for a target data source
* is not provided. Typically this method returns backing
* source data for a Pulse object.
* @return {object} - The output distribution object.
function parse(def, data) {
var func = def[FUNCTION];
if (!vegaUtil.hasOwnProperty(Distributions, func)) {
vegaUtil.error('Unknown distribution function: ' + func);
var d = Distributions[func]();
for (var name in def) {
// if data field, extract values
if (name === FIELD) { || data()).map(def[name]));
// if distribution mixture, recurse to parse each definition
else if (name === DISTRIBUTIONS) {
d[name](def[name].map(function(_) { return parse(_, data); }));
// otherwise, simply set the parameter
else if (typeof d[name] === FUNCTION) {
return d;
* Grid sample points for a probability density. Given a distribution and
* a sampling extent, will generate points suitable for plotting either
* PDF (probability density function) or CDF (cumulative distribution
* function) curves.
* @constructor
* @param {object} params - The parameters for this operator.
* @param {object} params.distribution - The probability distribution. This
* is an object parameter dependent on the distribution type.
* @param {string} [params.method='pdf'] - The distribution method to sample.
* One of 'pdf' or 'cdf'.
* @param {Array<number>} [params.extent] - The [min, max] extent over which
* to sample the distribution. This argument is required in most cases, but
* can be omitted if the distribution (e.g., 'kde') supports a 'data' method
* that returns numerical sample points from which the extent can be deduced.
* @param {number} [params.minsteps=25] - The minimum number of curve samples
* for plotting the density.
* @param {number} [params.maxsteps=200] - The maximum number of curve samples
* for plotting the density.
* @param {number} [params.steps] - The exact number of curve samples for
* plotting the density. If specified, overrides both minsteps and maxsteps
* to set an exact number of uniform samples. Useful in conjunction with
* a fixed extent to ensure consistent sample points for stacked densities.
function Density(params) {, null, params);
var distributions = [
"key": {"function": "normal"},
"params": [
{ "name": "mean", "type": "number", "default": 0 },
{ "name": "stdev", "type": "number", "default": 1 }
"key": {"function": "lognormal"},
"params": [
{ "name": "mean", "type": "number", "default": 0 },
{ "name": "stdev", "type": "number", "default": 1 }
"key": {"function": "uniform"},
"params": [
{ "name": "min", "type": "number", "default": 0 },
{ "name": "max", "type": "number", "default": 1 }
"key": {"function": "kde"},
"params": [
{ "name": "field", "type": "field", "required": true },
{ "name": "from", "type": "data" },
{ "name": "bandwidth", "type": "number", "default": 0 }
var mixture = {
"key": {"function": "mixture"},
"params": [
{ "name": "distributions", "type": "param", "array": true,
"params": distributions },
{ "name": "weights", "type": "number", "array": true }
Density.Definition = {
"type": "Density",
"metadata": {"generates": true},
"params": [
{ "name": "extent", "type": "number", "array": true, "length": 2 },
{ "name": "steps", "type": "number" },
{ "name": "minsteps", "type": "number", "default": 25 },
{ "name": "maxsteps", "type": "number", "default": 200 },
{ "name": "method", "type": "string", "default": "pdf",
"values": ["pdf", "cdf"] },
{ "name": "distribution", "type": "param",
"params": distributions.concat(mixture) },
{ "name": "as", "type": "string", "array": true,
"default": ["value", "density"] }
var prototype$6 = vegaUtil.inherits(Density, vegaDataflow.Transform);
prototype$6.transform = function(_, pulse) {
var out = pulse.fork(pulse.NO_SOURCE | pulse.NO_FIELDS);
if (!this.value || pulse.changed() || _.modified()) {
var dist = parse(_.distribution, source(pulse)),
minsteps = _.steps || _.minsteps || 25,
maxsteps = _.steps || _.maxsteps || 200,
method = _.method || 'pdf';
if (method !== 'pdf' && method !== 'cdf') {
vegaUtil.error('Invalid density method: ' + method);
if (!_.extent && ! {
vegaUtil.error('Missing density extent parameter.');
method = dist[method];
var as = || ['value', 'density'],
domain = _.extent || vegaUtil.extent(,
values = vegaStatistics.sampleCurve(method, domain, minsteps, maxsteps).map(v => {
var tuple = {};
tuple[as[0]] = v[0];
tuple[as[1]] = v[1];
return vegaDataflow.ingest(tuple);
if (this.value) out.rem = this.value;
this.value = out.add = out.source = values;
return out;
function source(pulse) {
return function() { return pulse.materialize(pulse.SOURCE).source; };
// use either provided alias or accessor field name
function fieldNames(fields, as) {
if (!fields) return null;
return, i) {
return as[i] || vegaUtil.accessorName(f);
function partition(data, groupby, field) {
var groups = [],
get = function(f) { return f(t); },
map, i, n, t, k, g;
// partition data points into groups
if (groupby == null) {
} else {
for (map={}, i=0, n=data.length; i<n; ++i) {
t = data[i];
k =;
g = map[k];
if (!g) {
map[k] = (g = []);
g.dims = k;
return groups;
const Output = 'bin';
* Dot density binning for dot plot construction.
* Based on Leland Wilkinson, Dot Plots, The American Statistician, 1999.
* @constructor
* @param {object} params - The parameters for this operator.
* @param {function(object): *} params.field - The value field to bin.
* @param {Array<function(object): *>} [params.groupby] - An array of accessors to groupby.
* @param {number} [params.step] - The step size (bin width) within which dots should be
* stacked. Defaults to 1/30 of the extent of the data *field*.
* @param {boolean} [params.smooth=false] - A boolean flag indicating if dot density
* stacks should be smoothed to reduce variance.
function DotBin(params) {, null, params);
DotBin.Definition = {
"type": "DotBin",
"metadata": {"modifies": true},
"params": [
{ "name": "field", "type": "field", "required": true },
{ "name": "groupby", "type": "field", "array": true },
{ "name": "step", "type": "number" },
{ "name": "smooth", "type": "boolean", "default": false },
{ "name": "as", "type": "string", "default": Output }
const prototype$7 = vegaUtil.inherits(DotBin, vegaDataflow.Transform);
prototype$7.transform = function(_, pulse) {
if (this.value && !(_.modified() || pulse.changed())) {
return pulse; // early exit
const source = pulse.materialize(pulse.SOURCE).source,
groups = partition(pulse.source, _.groupby, vegaUtil.identity),
smooth = _.smooth || false,
field = _.field,
step = _.step || autostep(source, field),
sort = vegaDataflow.stableCompare((a, b) => field(a) - field(b)),
as = || Output,
n = groups.length;
// compute dotplot bins per group
let min = Infinity, max = -Infinity, i = 0, j;
for (; i<n; ++i) {
const g = groups[i].sort(sort);
j = -1;
for (const v of vegaStatistics.dotbin(g, step, smooth, field)) {
if (v < min) min = v;
if (v > max) max = v;
g[++j][as] = v;
this.value = {
start: min,
stop: max,
step: step
return pulse.reflow(true).modifies(as);
function autostep(data, field) {
return vegaUtil.span(vegaUtil.extent(data, field)) / 30;
* Wraps an expression function with access to external parameters.
* @constructor
* @param {object} params - The parameters for this operator.
* @param {function} params.expr - The expression function. The
* function should accept both a datum and a parameter object.
* This operator's value will be a new function that wraps the
* expression function with access to this operator's parameters.
function Expression(params) {, null, update$1, params);
vegaUtil.inherits(Expression, vegaDataflow.Operator);
function update$1(_) {
var expr = _.expr;
return this.value && !_.modified('expr')
? this.value
: vegaUtil.accessor(
datum => expr(datum, _),
* Computes extents (min/max) for a data field.
* @constructor
* @param {object} params - The parameters for this operator.
* @param {function(object): *} params.field - The field over which to compute extends.
function Extent(params) {, [undefined, undefined], params);
Extent.Definition = {
"type": "Extent",
"metadata": {},
"params": [
{ "name": "field", "type": "field", "required": true }
var prototype$8 = vegaUtil.inherits(Extent, vegaDataflow.Transform);
prototype$8.transform = function(_, pulse) {
var extent = this.value,
field = _.field,
min = extent[0],
max = extent[1],
mod = pulse.changed()
|| pulse.modified(field.fields)
|| _.modified('field');
if (mod || min == null) {
min = +Infinity;
max = -Infinity;
pulse.visit(mod ? pulse.SOURCE : pulse.ADD, function(t) {
var v = field(t);
if (v != null) {
// coerce to number
v = +v;
// NaNs will fail all comparisons!
if (v < min) min = v;
if (v > max) max = v;
if (!Number.isFinite(min) || !Number.isFinite(max)) {
let name = vegaUtil.accessorName(field);
if (name) name = ` for field "${name}"`;
pulse.dataflow.warn(`Infinite extent${name}: [${min}, ${max}]`);
min = max = undefined;
this.value = [min, max];
* Provides a bridge between a parent transform and a target subflow that
* consumes only a subset of the tuples that pass through the parent.
* @constructor
* @param {Pulse} pulse - A pulse to use as the value of this operator.
* @param {Transform} parent - The parent transform (typically a Facet instance).
* @param {Transform} target - A transform that receives the subflow of tuples.
function Subflow(pulse, parent) {, pulse);
this.parent = parent;
var prototype$9 = vegaUtil.inherits(Subflow, vegaDataflow.Operator);
prototype$9.connect = function(target) {
return (target.source = this);
* Add an 'add' tuple to the subflow pulse.
* @param {Tuple} t - The tuple being added.
prototype$9.add = function(t) {
* Add a 'rem' tuple to the subflow pulse.
* @param {Tuple} t - The tuple being removed.
prototype$9.rem = function(t) {
* Add a 'mod' tuple to the subflow pulse.
* @param {Tuple} t - The tuple being modified.
prototype$9.mod = function(t) {
* Re-initialize this operator's pulse value.
* @param {Pulse} pulse - The pulse to copy from.
* @see Pulse.init
prototype$9.init = function(pulse) {
this.value.init(pulse, pulse.NO_SOURCE);
* Evaluate this operator. This method overrides the
* default behavior to simply return the contained pulse value.
* @return {Pulse}
prototype$9.evaluate = function() {
// assert: this.value.stamp === pulse.stamp
return this.value;
* Facets a dataflow into a set of subflows based on a key.
* @constructor
* @param {object} params - The parameters for this operator.
* @param {function(Dataflow, string): Operator} params.subflow - A function
* that generates a subflow of operators and returns its root operator.
* @param {function(object): *} params.key - The key field to facet by.
function Facet(params) {, {}, params);
this._keys = vegaUtil.fastmap(); // cache previously calculated key values
// keep track of active subflows, use as targets array for listeners
// this allows us to limit propagation to only updated subflows
var a = this._targets = []; = 0;
a.forEach = function(f) {
for (var i=0,; i<n; ++i) f(a[i], i, a);
var prototype$a = vegaUtil.inherits(Facet, vegaDataflow.Transform);
prototype$a.activate = function(flow) {
this._targets[] = flow;
prototype$a.subflow = function(key, flow, pulse, parent) {
var flows = this.value,
sf = vegaUtil.hasOwnProperty(flows, key) && flows[key],
df, p;
if (!sf) {
p = parent || (p = this._group[key]) && p.tuple;
df = pulse.dataflow;
sf = df.add(new Subflow(pulse.fork(pulse.NO_SOURCE), this))
.connect(flow(df, key, p));
flows[key] = sf;
} else if (sf.value.stamp < pulse.stamp) {
return sf;
prototype$a.transform = function(_, pulse) {
var df = pulse.dataflow,
self = this,
key = _.key,
flow = _.subflow,
cache = this._keys,
rekey = _.modified('key');
function subflow(key) {
return self.subflow(key, flow, pulse);
this._group = || {}; = 0; // reset list of active subflows
pulse.visit(pulse.REM, function(t) {
var id = vegaDataflow.tupleid(t),
k = cache.get(id);
if (k !== undefined) {
pulse.visit(pulse.ADD, function(t) {
var k = key(t);
cache.set(vegaDataflow.tupleid(t), k);
if (rekey || pulse.modified(key.fields)) {
pulse.visit(pulse.MOD, function(t) {
var id = vegaDataflow.tupleid(t),
k0 = cache.get(id),
k1 = key(t);
if (k0 === k1) {
} else {
cache.set(id, k1);
} else if (pulse.changed(pulse.MOD)) {
pulse.visit(pulse.MOD, function(t) {
if (rekey) {
pulse.visit(pulse.REFLOW, function(t) {
var id = vegaDataflow.tupleid(t),
k0 = cache.get(id),
k1 = key(t);
if (k0 !== k1) {
cache.set(id, k1);
if (cache.empty > df.cleanThreshold) df.runAfter(cache.clean);
return pulse;
* Generates one or more field accessor functions.
* If the 'name' parameter is an array, an array of field accessors
* will be created and the 'as' parameter will be ignored.
* @constructor
* @param {object} params - The parameters for this operator.
* @param {string} - The field name(s) to access.
* @param {string} - The accessor function name.
function Field(params) {, null, update$2, params);
vegaUtil.inherits(Field, vegaDataflow.Operator);
function update$2(_) {
return (this.value && !_.modified()) ? this.value
: vegaUtil.isArray( ? vegaUtil.array( { return vegaUtil.field(f); })
: vegaUtil.field(,;
* Filters data tuples according to a predicate function.
* @constructor
* @param {object} params - The parameters for this operator.
* @param {function(object): *} params.expr - The predicate expression function
* that determines a tuple's filter status. Truthy values pass the filter.
function Filter(params) {, vegaUtil.fastmap(), params);
Filter.Definition = {
"type": "Filter",
"metadata": {"changes": true},
"params": [
{ "name": "expr", "type": "expr", "required": true }
var prototype$b = vegaUtil.inherits(Filter, vegaDataflow.Transform);
prototype$b.transform = function(_, pulse) {
var df = pulse.dataflow,
cache = this.value, // cache ids of filtered tuples
output = pulse.fork(),
add = output.add,
rem = output.rem,
mod = output.mod,
test = _.expr,
isMod = true;
pulse.visit(pulse.REM, function(t) {
var id = vegaDataflow.tupleid(t);
if (!cache.has(id)) rem.push(t);
else cache.delete(id);
pulse.visit(pulse.ADD, function(t) {
if (test(t, _)) add.push(t);
else cache.set(vegaDataflow.tupleid(t), 1);
function revisit(t) {
var id = vegaDataflow.tupleid(t),
b = test(t, _),
s = cache.get(id);
if (b && s) {
} else if (!b && !s) {
cache.set(id, 1);
} else if (isMod && b && !s) {
pulse.visit(pulse.MOD, revisit);
if (_.modified()) {
isMod = false;
pulse.visit(pulse.REFLOW, revisit);
if (cache.empty > df.cleanThreshold) df.runAfter(cache.clean);
return output;
* Flattens array-typed field values into new data objects.
* If multiple fields are specified, they are treated as parallel arrays,
* with output values included for each matching index (or null if missing).
* @constructor
* @param {object} params - The parameters for this operator.
* @param {Array<function(object): *>} params.fields - An array of field
* accessors for the tuple fields that should be flattened.
* @param {string} [params.index] - Optional output field name for index
* value. If unspecified, no index field is included in the output.
* @param {Array<string>} [] - Output field names for flattened
* array fields. Any unspecified fields will use the field name provided
* by the fields accessors.
function Flatten(params) {, [], params);
Flatten.Definition = {
"type": "Flatten",
"metadata": {"generates": true},
"params": [
{ "name": "fields", "type": "field", "array": true, "required": true },
{ "name": "index", "type": "string" },
{ "name": "as", "type": "string", "array": true }
var prototype$c = vegaUtil.inherits(Flatten, vegaDataflow.Transform);
prototype$c.transform = function(_, pulse) {
var out = pulse.fork(pulse.NO_SOURCE),
fields = _.fields,
as = fieldNames(fields, || []),
index = _.index || null,
m = as.length;
// remove any previous results
out.rem = this.value;
// generate flattened tuples
pulse.visit(pulse.SOURCE, function(t) {
var arrays = => f(t)),
maxlen = arrays.reduce((l, a) => Math.max(l, a.length), 0),
i = 0, j, d, v;
for (; i<maxlen; ++i) {
d = vegaDataflow.derive(t);
for (j=0; j<m; ++j) {
d[as[j]] = (v = arrays[j][i]) == null ? null : v;
if (index) {
d[index] = i;
this.value = out.source = out.add;
if (index) out.modifies(index);
return out.modifies(as);
* Folds one more tuple fields into multiple tuples in which the field
* name and values are available under new 'key' and 'value' fields.
* @constructor
* @param {object} params - The parameters for this operator.
* @param {function(object): *} params.fields - An array of field accessors
* for the tuple fields that should be folded.
* @param {Array<string>} [] - Output field names for folded key
* and value fields, defaults to ['key', 'value'].
function Fold(params) {, [], params);
Fold.Definition = {
"type": "Fold",
"metadata": {"generates": true},
"params": [
{ "name": "fields", "type": "field", "array": true, "required": true },
{ "name": "as", "type": "string", "array": true, "length": 2, "default": ["key", "value"] }
var prototype$d = vegaUtil.inherits(Fold, vegaDataflow.Transform);
prototype$d.transform = function(_, pulse) {
var out = pulse.fork(pulse.NO_SOURCE),
fields = _.fields,
fnames =,
as = || ['key', 'value'],
k = as[0],
v = as[1],
n = fields.length;
out.rem = this.value;
pulse.visit(pulse.SOURCE, function(t) {
for (var i=0, d; i<n; ++i) {
d = vegaDataflow.derive(t);
d[k] = fnames[i];
d[v] = fields[i](t);
this.value = out.source = out.add;
return out.modifies(as);
* Invokes a function for each data tuple and saves the results as a new field.
* @constructor
* @param {object} params - The parameters for this operator.
* @param {function(object): *} params.expr - The formula function to invoke for each tuple.
* @param {string} - The field name under which to save the result.
* @param {boolean} [params.initonly=false] - If true, the formula is applied to
* added tuples only, and does not update in response to modifications.
function Formula(params) {, null, params);
Formula.Definition = {
"type": "Formula",
"metadata": {"modifies": true},
"params": [
{ "name": "expr", "type": "expr", "required": true },
{ "name": "as", "type": "string", "required": true },
{ "name": "initonly", "type": "boolean" }
var prototype$e = vegaUtil.inherits(Formula, vegaDataflow.Transform);
prototype$e.transform = function(_, pulse) {
var func = _.expr,
as =,
mod = _.modified(),
flag = _.initonly ? pulse.ADD
: mod ? pulse.SOURCE
: pulse.modified(func.fields) || pulse.modified(as) ? pulse.ADD_MOD
: pulse.ADD;
if (mod) {
// parameters updated, need to reflow
pulse = pulse.materialize().reflow(true);
if (!_.initonly) {
return pulse.visit(flag, t => t[as] = func(t, _));
* Generates data tuples using a provided generator function.
* @constructor
* @param {object} params - The parameters for this operator.
* @param {function(Parameters): object} params.generator - A tuple generator
* function. This function is given the operator parameters as input.
* Changes to any additional parameters will not trigger re-calculation
* of previously generated tuples. Only future tuples are affected.
* @param {number} params.size - The number of tuples to produce.
function Generate(params) {, [], params);
var prototype$f = vegaUtil.inherits(Generate, vegaDataflow.Transform);
prototype$f.transform = function(_, pulse) {
var data = this.value,
out = pulse.fork(pulse.ALL),
num = _.size - data.length,
gen = _.generator,
add, rem, t;
if (num > 0) {
// need more tuples, generate and add
for (add=[]; --num >= 0;) {
add.push(t = vegaDataflow.ingest(gen(_)));
out.add = out.add.length
? out.materialize(out.ADD).add.concat(add)
: add;
} else {
// need fewer tuples, remove
rem = data.slice(0, -num);
out.rem = out.rem.length
? out.materialize(out.REM).rem.concat(rem)
: rem;
data = data.slice(-num);
out.source = this.value = data;
return out;
var Methods = {
value: 'value',
median: d3Array.median,
mean: d3Array.mean,
min: d3Array.min,
max: d3Array.max
var Empty = [];
* Impute missing values.
* @constructor
* @param {object} params - The parameters for this operator.
* @param {function(object): *} params.field - The value field to impute.
* @param {Array<function(object): *>} [params.groupby] - An array of
* accessors to determine series within which to perform imputation.
* @param {function(object): *} params.key - An accessor for a key value.
* Each key value should be unique within a group. New tuples will be
* imputed for any key values that are not found within a group.
* @param {Array<*>} [params.keyvals] - Optional array of required key
* values. New tuples will be imputed for any key values that are not
* found within a group. In addition, these values will be automatically
* augmented with the key values observed in the input data.
* @param {string} [method='value'] - The imputation method to use. One of
* 'value', 'mean', 'median', 'max', 'min'.
* @param {*} [value=0] - The constant value to use for imputation
* when using method 'value'.
function Impute(params) {, [], params);
Impute.Definition = {
"type": "Impute",
"metadata": {"changes": true},
"params": [
{ "name": "field", "type": "field", "required": true },
{ "name": "key", "type": "field", "required": true },
{ "name": "keyvals", "array": true },
{ "name": "groupby", "type": "field", "array": true },
{ "name": "method", "type": "enum", "default": "value",
"values": ["value", "mean", "median", "max", "min"] },
{ "name": "value", "default": 0 }
var prototype$g = vegaUtil.inherits(Impute, vegaDataflow.Transform);
function getValue(_) {
var m = _.method || Methods.value, v;
if (Methods[m] == null) {
vegaUtil.error('Unrecognized imputation method: ' + m);
} else if (m === Methods.value) {
v = _.value !== undefined ? _.value : 0;
return function() { return v; };
} else {
return Methods[m];
function getField(_) {
var f = _.field;
return function(t) { return t ? f(t) : NaN; };
prototype$g.transform = function(_, pulse) {
var out = pulse.fork(pulse.ALL),
impute = getValue(_),
field = getField(_),
fName = vegaUtil.accessorName(_.field),
kName = vegaUtil.accessorName(_.key),
gNames = (_.groupby || []).map(vegaUtil.accessorName),
groups = partition$1(pulse.source, _.groupby, _.key, _.keyvals),
curr = [],
prev = this.value,
m = groups.domain.length,
group, value, gVals, kVal, g, i, j, l, n, t;
for (g=0, l=groups.length; g<l; ++g) {
group = groups[g];
gVals = group.values;
value = NaN;
// add tuples for missing values
for (j=0; j<m; ++j) {
if (group[j] != null) continue;
kVal = groups.domain[j];
t = {_impute: true};
for (i=0, n=gVals.length; i<n; ++i) t[gNames[i]] = gVals[i];
t[kName] = kVal;
t[fName] = Number.isNaN(value) ? (value = impute(group, field)) : value;
// update pulse with imputed tuples
if (curr.length) out.add = out.materialize(out.ADD).add.concat(curr);
if (prev.length) out.rem = out.materialize(out.REM).rem.concat(prev);
this.value = curr;
return out;
function partition$1(data, groupby, key, keyvals) {
var get = function(f) { return f(t); },
groups = [],
domain = keyvals ? keyvals.slice() : [],
kMap = {},
gMap = {}, gVals, gKey,
group, i, j, k, n, t;
domain.forEach(function(k, i) { kMap[k] = i + 1; });
for (i=0, n=data.length; i<n; ++i) {
t = data[i];
k = key(t);
j = kMap[k] || (kMap[k] = domain.push(k));
gKey = (gVals = groupby ? : Empty) + '';
if (!(group = gMap[gKey])) {
group = (gMap[gKey] = []);
group.values = gVals;
group[j-1] = t;
groups.domain = domain;
return groups;
* Extend input tuples with aggregate values.
* Calcuates aggregate values and joins them with the input stream.
* @constructor
function JoinAggregate(params) {, params);
JoinAggregate.Definition = {
"type": "JoinAggregate",
"metadata": {"modifies": true},
"params": [
{ "name": "groupby", "type": "field", "array": true },
{ "name": "fields", "type": "field", "null": true, "array": true },
{ "name": "ops", "type": "enum", "array": true, "values": ValidAggregateOps },
{ "name": "as", "type": "string", "null": true, "array": true },
{ "name": "key", "type": "field" }
var prototype$h = vegaUtil.inherits(JoinAggregate, Aggregate);
prototype$h.transform = function(_, pulse) {
var aggr = this,
mod = _.modified(),
// process all input tuples to calculate aggregates
if (aggr.value && (mod || pulse.modified(aggr._inputs, true))) {
cells = aggr.value = mod ? aggr.init(_) : {};
pulse.visit(pulse.SOURCE, function(t) { aggr.add(t); });
} else {
cells = aggr.value = aggr.value || this.init(_);
pulse.visit(pulse.REM, function(t) { aggr.rem(t); });
pulse.visit(pulse.ADD, function(t) { aggr.add(t); });
// update aggregation cells
// write aggregate values to input tuples
pulse.visit(pulse.SOURCE, function(t) {
vegaUtil.extend(t, cells[aggr.cellkey(t)].tuple);
return pulse.reflow(mod).modifies(this._outputs);
prototype$h.changes = function() {
var adds = this._adds,
mods = this._mods,
i, n;
for (i=0, n=this._alen; i<n; ++i) {
adds[i] = null; // for garbage collection
for (i=0, n=this._mlen; i<n; ++i) {
mods[i] = null; // for garbage collection
this._alen = this._mlen = 0; // reset list of active cells
* Compute kernel density estimates (KDE) for one or more data groups.
* @constructor
* @param {object} params - The parameters for this operator.
* @param {Array<function(object): *>} [params.groupby] - An array of accessors
* to groupby.
* @param {function(object): *} params.field - An accessor for the data field
* to estimate.
* @param {number} [params.bandwidth=0] - The KDE kernel bandwidth.
* If zero or unspecified, the bandwidth is automatically determined.
* @param {boolean} [params.counts=false] - A boolean flag indicating if the
* output values should be probability estimates (false, default) or
* smoothed counts (true).
* @param {string} [params.cumulative=false] - A boolean flag indicating if a
* density (false) or cumulative distribution (true) should be generated.
* @param {Array<number>} [params.extent] - The domain extent over which to
* plot the density. If unspecified, the [min, max] data extent is used.
* @param {string} [params.resolve='independent'] - Indicates how parameters for
* multiple densities should be resolved. If "independent" (the default), each
* density may have its own domain extent and dynamic number of curve sample
* steps. If "shared", the KDE transform will ensure that all densities are
* defined over a shared domain and curve steps, enabling stacking.
* @param {number} [params.minsteps=25] - The minimum number of curve samples
* for plotting the density.
* @param {number} [params.maxsteps=200] - The maximum number of curve samples
* for plotting the density.
* @param {number} [params.steps] - The exact number of curve samples for
* plotting the density. If specified, overrides both minsteps and maxsteps
* to set an exact number of uniform samples. Useful in conjunction with
* a fixed extent to ensure consistent sample points for stacked densities.
function KDE(params) {, null, params);
KDE.Definition = {
"type": "KDE",
"metadata": {"generates": true},
"params": [
{ "name": "groupby", "type": "field", "array": true },
{ "name": "field", "type": "field", "required": true },
{ "name": "cumulative", "type": "boolean", "default": false },
{ "name": "counts", "type": "boolean", "default": false },
{ "name": "bandwidth", "type": "number", "default": 0 },
{ "name": "extent", "type": "number", "array": true, "length": 2 },
{ "name": "resolve", "type": "enum", "values": ["shared", "independent"], "default": "independent" },
{ "name": "steps", "type": "number" },
{ "name": "minsteps", "type": "number", "default": 25 },
{ "name": "maxsteps", "type": "number", "default": 200 },
{ "name": "as", "type": "string", "array": true, "default": ["value", "density"] }
var prototype$i = vegaUtil.inherits(KDE, vegaDataflow.Transform);
prototype$i.transform = function(_, pulse) {
var out = pulse.fork(pulse.NO_SOURCE | pulse.NO_FIELDS);
if (!this.value || pulse.changed() || _.modified()) {
const source = pulse.materialize(pulse.SOURCE).source,
groups = partition(source, _.groupby, _.field),
names = (_.groupby || []).map(vegaUtil.accessorName),
bandwidth = _.bandwidth,
method = _.cumulative ? 'cdf' : 'pdf',
as = || ['value', 'density'],
values = [];
let domain = _.extent,
minsteps = _.steps || _.minsteps || 25,
maxsteps = _.steps || _.maxsteps || 200;
if (method !== 'pdf' && method !== 'cdf') {
vegaUtil.error('Invalid density method: ' + method);
if (_.resolve === 'shared') {
if (!domain) domain = vegaUtil.extent(source, _.field);
minsteps = maxsteps = _.steps || maxsteps;
groups.forEach(g => {
const density = vegaStatistics.randomKDE(g, bandwidth)[method],
scale = _.counts ? g.length : 1,
local = domain || vegaUtil.extent(g);
vegaStatistics.sampleCurve(density, local, minsteps, maxsteps).forEach(v => {
const t = {};
for (let i=0; i<names.length; ++i) {
t[names[i]] = g.dims[i];
t[as[0]] = v[0];
t[as[1]] = v[1] * scale;
if (this.value) out.rem = this.value;
this.value = out.add = out.source = values;
return out;
* Generates a key function.
* @constructor
* @param {object} params - The parameters for this operator.
* @param {Array<string>} params.fields - The field name(s) for the key function.
* @param {boolean} params.flat - A boolean flag indicating if the field names
* should be treated as flat property names, side-stepping nested field
* lookups normally indicated by dot or bracket notation.
function Key(params) {, null, update$3, params);
vegaUtil.inherits(Key, vegaDataflow.Operator);
function update$3(_) {
return (this.value && !_.modified()) ? this.value : vegaUtil.key(_.fields, _.flat);
* Load and parse data from an external source. Marshalls parameter
* values and then invokes the Dataflow request method.
* @constructor
* @param {object} params - The parameters for this operator.
* @param {string} params.url - The URL to load from.
* @param {object} params.format - The data format options.
function Load(params) {, [], params);
this._pending = null;
var prototype$j = vegaUtil.inherits(Load, vegaDataflow.Transform);
prototype$j.transform = function(_, pulse) {
const df = pulse.dataflow;
if (this._pending) {
// update state and return pulse
return output(this, pulse, this._pending);
if (stop(_)) return pulse.StopPropagation;
if (_.values) {
// parse and ingest values, return output pulse
return output(this, pulse, df.parse(_.values, _.format));
} else if (_.async) {
// return promise for non-blocking async loading
const p = df.request(_.url, _.format).then(res => {
this._pending = vegaUtil.array(;
return df => df.touch(this);
return {async: p};
} else {
// return promise for synchronous loading
return df.request(_.url, _.format)
.then(res => output(this, pulse, vegaUtil.array(;
function stop(_) {
return _.modified('async') && !(
_.modified('values') || _.modified('url') || _.modified('format')
function output(op, pulse, data) {
const out = pulse.fork(pulse.NO_FIELDS & pulse.NO_SOURCE);
out.rem = op.value;
op.value = out.source = out.add = data;
op._pending = null;
return out;
* Extend tuples by joining them with values from a lookup table.
* @constructor
* @param {object} params - The parameters for this operator.
* @param {Map} params.index - The lookup table map.
* @param {Array<function(object): *} params.fields - The fields to lookup.
* @param {Array<string>} - Output field names for each lookup value.
* @param {*} [params.default] - A default value to use if lookup fails.
function Lookup(params) {, {}, params);
Lookup.Definition = {
"type": "Lookup",
"metadata": {"modifies": true},
"params": [
{ "name": "index", "type": "index", "params": [
{"name": "from", "type": "data", "required": true },
{"name": "key", "type": "field", "required": true }
] },
{ "name": "values", "type": "field", "array": true },
{ "name": "fields", "type": "field", "array": true, "required": true },
{ "name": "as", "type": "string", "array": true },
{ "name": "default", "default": null }
var prototype$k = vegaUtil.inherits(Lookup, vegaDataflow.Transform);
prototype$k.transform = function(_, pulse) {
var out = pulse,
as =,
keys = _.fields,
index = _.index,
values = _.values,
defaultValue = _.default==null ? null : _.default,
reset = _.modified(),
flag = reset ? pulse.SOURCE : pulse.ADD,
n = keys.length,
set, m, mods;
if (values) {
m = values.length;
if (n > 1 && !as) {
vegaUtil.error('Multi-field lookup requires explicit "as" parameter.');
if (as && as.length !== n * m) {
vegaUtil.error('The "as" parameter has too few output field names.');
as = as ||;
set = function(t) {
for (var i=0, k=0, j, v; i<n; ++i) {
v = index.get(keys[i](t));
if (v == null) for (j=0; j<m; ++j, ++k) t[as[k]] = defaultValue;
else for (j=0; j<m; ++j, ++k) t[as[k]] = values[j](v);
} else {
if (!as) {
vegaUtil.error('Missing output field names.');
set = function(t) {
for (var i=0, v; i<n; ++i) {
v = index.get(keys[i](t));
t[as[i]] = v==null ? defaultValue : v;
if (reset) {
out = pulse.reflow(true);
} else {
mods = keys.some(function(k) { return pulse.modified(k.fields); });
flag |= (mods ? pulse.MOD : 0);
pulse.visit(flag, set);
return out.modifies(as);
* Computes global min/max extents over a collection of extents.
* @constructor
* @param {object} params - The parameters for this operator.
* @param {Array<Array<number>>} params.extents - The input extents.
function MultiExtent(params) {, null, update$4, params);
vegaUtil.inherits(MultiExtent, vegaDataflow.Operator);
function update$4(_) {
if (this.value && !_.modified()) {
return this.value;
var min = +Infinity,
max = -Infinity,
ext = _.extents,
i, n, e;
for (i=0, n=ext.length; i<n; ++i) {
e = ext[i];
if (e[0] < min) min = e[0];
if (e[1] > max) max = e[1];
return [min, max];
* Merge a collection of value arrays.
* @constructor
* @param {object} params - The parameters for this operator.
* @param {Array<Array<*>>} params.values - The input value arrrays.
function MultiValues(params) {, null, update$5, params);
vegaUtil.inherits(MultiValues, vegaDataflow.Operator);
function update$5(_) {
return (this.value && !_.modified())
? this.value
: _.values.reduce(function(data, _) { return data.concat(_); }, []);
* Operator whose value is simply its parameter hash. This operator is
* useful for enabling reactive updates to values of nested objects.
* @constructor
* @param {object} params - The parameters for this operator.
function Params(params) {, null, params);
vegaUtil.inherits(Params, vegaDataflow.Transform);
Params.prototype.transform = function(_, pulse) {
this.value = _;
return pulse.fork(pulse.NO_SOURCE | pulse.NO_FIELDS); // do not pass tuples
* Aggregate and pivot selected field values to become new fields.
* This operator is useful to construction cross-tabulations.
* @constructor
* @param {Array<function(object): *>} [params.groupby] - An array of accessors
* to groupby. These fields act just like groupby fields of an Aggregate transform.
* @param {function(object): *} params.field - The field to pivot on. The unique
* values of this field become new field names in the output stream.
* @param {function(object): *} params.value - The field to populate pivoted fields.
* The aggregate values of this field become the values of the new pivoted fields.
* @param {string} [params.op] - The aggregation operation for the value field,
* applied per cell in the output stream. The default is "sum".
* @param {number} [params.limit] - An optional parameter indicating the maximum
* number of pivoted fields to generate. The pivoted field names are sorted in
* ascending order prior to enforcing the limit.
function Pivot(params) {, params);
Pivot.Definition = {
"type": "Pivot",
"metadata": {"generates": true, "changes": true},
"params": [
{ "name": "groupby", "type": "field", "array": true },
{ "name": "field", "type": "field", "required": true },
{ "name": "value", "type": "field", "required": true },
{ "name": "op", "type": "enum", "values": ValidAggregateOps, "default": "sum" },
{ "name": "limit", "type": "number", "default": 0 },
{ "name": "key", "type": "field" }
var prototype$l = vegaUtil.inherits(Pivot, Aggregate);
prototype$l._transform = prototype$l.transform;
prototype$l.transform = function(_, pulse) {
return this._transform(aggregateParams(_, pulse), pulse);
// Shoehorn a pivot transform into an aggregate transform!
// First collect all unique pivot field values.
// Then generate aggregate fields for each output pivot field.
function aggregateParams(_, pulse) {
var key = _.field,
value = _.value,
op = (_.op === 'count' ? '__count__' : _.op) || 'sum',
fields = vegaUtil.accessorFields(key).concat(vegaUtil.accessorFields(value)),
keys = pivotKeys(key, _.limit || 0, pulse);
// if data stream content changes, pivot fields may change
// flag parameter modification to ensure re-initialization
if (pulse.changed()) _.set('__pivot__', null, null, true);
return {
key: _.key,
groupby: _.groupby,
ops: { return op; }),
fields: { return get(k, key, value, fields); }),
as: { return k + ''; }),
modified: _.modified.bind(_)
// Generate aggregate field accessor.
// Output NaN for non-existent values; aggregator will ignore!
function get(k, key, value, fields) {
return vegaUtil.accessor(
function(d) { return key(d) === k ? value(d) : NaN; },
k + ''
// Collect (and optionally limit) all unique pivot values.
function pivotKeys(key, limit, pulse) {
var map = {},
list = [];
pulse.visit(pulse.SOURCE, function(t) {
var k = key(t);
if (!map[k]) {
map[k] = 1;
// TODO? Move this comparator to vega-util?
list.sort(function(u, v) {
return (u<v||u==null) && v!=null ? -1
: (u>v||v==null) && u!=null ? 1
: ((v=v instanceof Date?+v:v),(u=u instanceof Date?+u:u))!==u && v===v ? -1
: v!==v && u===u ? 1 : 0;
return limit ? list.slice(0, limit) : list;
* Partitions pre-faceted data into tuple subflows.
* @constructor
* @param {object} params - The parameters for this operator.
* @param {function(Dataflow, string): Operator} params.subflow - A function
* that generates a subflow of operators and returns its root operator.
* @param {function(object): Array<object>} params.field - The field
* accessor for an array of subflow tuple objects.
function PreFacet(params) {, params);
var prototype$m = vegaUtil.inherits(PreFacet, Facet);
prototype$m.transform = function(_, pulse) {
var self = this,
flow = _.subflow,
field = _.field;
if (_.modified('field') || field && pulse.modified(vegaUtil.accessorFields(field))) {
vegaUtil.error('PreFacet does not support field modification.');
} = 0; // reset list of active subflows
pulse.visit(pulse.MOD, function(t) {
var sf = self.subflow(vegaDataflow.tupleid(t), flow, pulse, t);
field ? field(t).forEach(function(_) { sf.mod(_); }) : sf.mod(t);
pulse.visit(pulse.ADD, function(t) {
var sf = self.subflow(vegaDataflow.tupleid(t), flow, pulse, t);
field ? field(t).forEach(function(_) { sf.add(vegaDataflow.ingest(_)); }) : sf.add(t);
pulse.visit(pulse.REM, function(t) {
var sf = self.subflow(vegaDataflow.tupleid(t), flow, pulse, t);
field ? field(t).forEach(function(_) { sf.rem(_); }) : sf.rem(t);
return pulse;
* Performs a relational projection, copying selected fields from source
* tuples to a new set of derived tuples.
* @constructor
* @param {object} params - The parameters for this operator.
* @param {Array<function(object): *} params.fields - The fields to project,
* as an array of field accessors. If unspecified, all fields will be
* copied with names unchanged.
* @param {Array<string>} [] - Output field names for each projected
* field. Any unspecified fields will use the field name provided by
* the field accessor.
function Project(params) {, null, params);
Project.Definition = {
"type": "Project",
"metadata": {"generates": true, "changes": true},
"params": [
{ "name": "fields", "type": "field", "array": true },
{ "name": "as", "type": "string", "null": true, "array": true }
var prototype$n = vegaUtil.inherits(Project, vegaDataflow.Transform);
prototype$n.transform = function(_, pulse) {
var fields = _.fields,
as = fieldNames(_.fields, || []),
derive = fields
? function(s, t) { return project(s, t, fields, as); }
: vegaDataflow.rederive,
out, lut;
if (this.value) {
lut = this.value;
} else {
pulse = pulse.addAll();
lut = this.value = {};
out = pulse.fork(pulse.NO_SOURCE);
pulse.visit(pulse.REM, function(t) {
var id = vegaDataflow.tupleid(t);
lut[id] = null;
pulse.visit(pulse.ADD, function(t) {
var dt = derive(t, vegaDataflow.ingest({}));
lut[vegaDataflow.tupleid(t)] = dt;
pulse.visit(pulse.MOD, function(t) {
out.mod.push(derive(t, lut[vegaDataflow.tupleid(t)]));
return out;
function project(s, t, fields, as) {
for (var i=0, n=fields.length; i<n; ++i) {
t[as[i]] = fields[i](s);
return t;
* Proxy the value of another operator as a pure signal value.
* Ensures no tuples are propagated.
* @constructor
* @param {object} params - The parameters for this operator.
* @param {*} params.value - The value to proxy, becomes the value of this operator.
function Proxy(params) {, null, params);
var prototype$o = vegaUtil.inherits(Proxy, vegaDataflow.Transform);
prototype$o.transform = function(_, pulse) {
this.value = _.value;
return _.modified('value')
? pulse.fork(pulse.NO_SOURCE | pulse.NO_FIELDS)
: pulse.StopPropagation;
* Generates sample quantile values from an input data stream.
* @constructor
* @param {object} params - The parameters for this operator.
* @param {function(object): *} params.field - An accessor for the data field
* over which to calculate quantile values.
* @param {Array<function(object): *>} [params.groupby] - An array of accessors
* to groupby.
* @param {Array<number>} [params.probs] - An array of probabilities in
* the range (0, 1) for which to compute quantile values. If not specified,
* the *step* parameter will be used.
* @param {Array<number>} [params.step=0.01] - A probability step size for
* sampling quantile values. All values from one-half the step size up to
* 1 (exclusive) will be sampled. This parameter is only used if the
* *quantiles* parameter is not provided.
function Quantile(params) {, null, params);
Quantile.Definition = {
"type": "Quantile",
"metadata": {"generates": true, "changes": true},
"params": [
{ "name": "groupby", "type": "field", "array": true },
{ "name": "field", "type": "field", "required": true },
{ "name": "probs", "type": "number", "array": true },
{ "name": "step", "type": "number", "default": 0.01 },
{ "name": "as", "type": "string", "array": true, "default": ["prob", "value"] }
var prototype$p = vegaUtil.inherits(Quantile, vegaDataflow.Transform);
var EPSILON$1 = 1e-14;
prototype$p.transform = function(_, pulse) {
var out = pulse.fork(pulse.NO_SOURCE | pulse.NO_FIELDS),
as = || ['prob', 'value'];
if (this.value && !_.modified() && !pulse.changed()) {
out.source = this.value;
return out;
const source = pulse.materialize(pulse.SOURCE).source,
groups = partition(source, _.groupby, _.field),
names = (_.groupby || []).map(vegaUtil.accessorName),
values = [],
step = _.step || 0.01,
p = _.probs || d3Array.range(step/2, 1 - EPSILON$1, step),
n = p.length;
groups.forEach(g => {
const q = vegaStatistics.quantiles(g, p);
for (let i=0; i<n; ++i) {
const t = {};
for (let i=0; i<names.length; ++i) {
t[names[i]] = g.dims[i];
t[as[0]] = p[i];
t[as[1]] = q[i];
if (this.value) out.rem = this.value;
this.value = out.add = out.source = values;
return out;
* Relays a data stream between data processing pipelines.
* If the derive parameter is set, this transform will create derived
* copies of observed tuples. This provides derived data streams in which
* modifications to the tuples do not pollute an upstream data source.
* @param {object} params - The parameters for this operator.
* @param {number} [params.derive=false] - Boolean flag indicating if
* the transform should make derived copies of incoming tuples.
* @constructor
function Relay(params) {, null, params);
var prototype$q = vegaUtil.inherits(Relay, vegaDataflow.Transform);
prototype$q.transform = function(_, pulse) {
var out, lut;
if (this.value) {
lut = this.value;
} else {
out = pulse = pulse.addAll();
lut = this.value = {};
if (_.derive) {
out = pulse.fork(pulse.NO_SOURCE);
pulse.visit(pulse.REM, t => {
var id = vegaDataflow.tupleid(t);
lut[id] = null;
pulse.visit(pulse.ADD, t => {
var dt = vegaDataflow.derive(t);
lut[vegaDataflow.tupleid(t)] = dt;
pulse.visit(pulse.MOD, t => {
var dt = lut[vegaDataflow.tupleid(t)], k;
for (k in t) {
dt[k] = t[k];
// down stream writes may overwrite re-derived tuples
// conservatively mark all source fields as modified
return out;
* Samples tuples passing through this operator.
* Uses reservoir sampling to maintain a representative sample.
* @constructor
* @param {object} params - The parameters for this operator.
* @param {number} [params.size=1000] - The maximum number of samples.
function Sample(params) {, [], params);
this.count = 0;
Sample.Definition = {
"type": "Sample",
"metadata": {},
"params": [
{ "name": "size", "type": "number", "default": 1000 }
var prototype$r = vegaUtil.inherits(Sample, vegaDataflow.Transform);
prototype$r.transform = function(_, pulse) {
var out = pulse.fork(pulse.NO_SOURCE),
mod = _.modified('size'),
num = _.size,
res = this.value,
cnt = this.count,
cap = 0,
map = res.reduce(function(m, t) {
m[vegaDataflow.tupleid(t)] = 1;
return m;
}, {});
// sample reservoir update function
function update(t) {
var p, idx;
if (res.length < num) {
} else {
idx = ~~((cnt + 1) * vegaStatistics.random());
if (idx < res.length && idx >= cap) {
p = res[idx];
if (map[vegaDataflow.tupleid(p)]) out.rem.push(p); // eviction
res[idx] = t;
if (pulse.rem.length) {
// find all tuples that should be removed, add to output
pulse.visit(pulse.REM, function(t) {
var id = vegaDataflow.tupleid(t);
if (map[id]) {
map[id] = -1;
// filter removed tuples out of the sample reservoir
res = res.filter(function(t) { return map[vegaDataflow.tupleid(t)] !== -1; });
if ((pulse.rem.length || mod) && res.length < num && pulse.source) {
// replenish sample if backing data source is available
cap = cnt = res.length;
pulse.visit(pulse.SOURCE, function(t) {
// update, but skip previously sampled tuples
if (!map[vegaDataflow.tupleid(t)]) update(t);
cap = -1;
if (mod && res.length > num) {
for (var i=0, n=res.length-num; i<n; ++i) {
map[vegaDataflow.tupleid(res[i])] = -1;
res = res.slice(n);
if (pulse.mod.length) {
// propagate modified tuples in the sample reservoir
pulse.visit(pulse.MOD, function(t) {
if (map[vegaDataflow.tupleid(t)]) out.mod.push(t);
if (pulse.add.length) {
// update sample reservoir
pulse.visit(pulse.ADD, update);
if (pulse.add.length || cap < 0) {
// output newly added tuples
out.add = res.filter(function(t) { return !map[vegaDataflow.tupleid(t)]; });
this.count = cnt;
this.value = out.source = res;
return out;
* Generates data tuples for a specified sequence range of numbers.
* @constructor
* @param {object} params - The parameters for this operator.
* @param {number} params.start - The first number in the sequence.
* @param {number} params.stop - The last number (exclusive) in the sequence.
* @param {number} [params.step=1] - The step size between numbers in the sequence.
function Sequence(params) {, null, params);
Sequence.Definition = {
"type": "Sequence",
"metadata": {"generates": true, "changes": true},
"params": [
{ "name": "start", "type": "number", "required": true },
{ "name": "stop", "type": "number", "required": true },
{ "name": "step", "type": "number", "default": 1 },
{ "name": "as", "type": "string", "default": "data" }
var prototype$s = vegaUtil.inherits(Sequence, vegaDataflow.Transform);
prototype$s.transform = function(_, pulse) {
if (this.value && !_.modified()) return;
var out = pulse.materialize().fork(pulse.MOD),
as = || 'data';
out.rem = this.value ? pulse.rem.concat(this.value) : pulse.rem;
this.value = d3Array.range(_.start, _.stop, _.step || 1).map(function(v) {
var t = {};
t[as] = v;
return vegaDataflow.ingest(t);
out.add = pulse.add.concat(this.value);
return out;
* Propagates a new pulse without any tuples so long as the input
* pulse contains some added, removed or modified tuples.
* @param {object} params - The parameters for this operator.
* @constructor
function Sieve(params) {, null, params);
this.modified(true); // always treat as modified
var prototype$t = vegaUtil.inherits(Sieve, vegaDataflow.Transform);
prototype$t.transform = function(_, pulse) {
this.value = pulse.source;
return pulse.changed()
? pulse.fork(pulse.NO_SOURCE | pulse.NO_FIELDS)
: pulse.StopPropagation;
* Discretize dates to specific time units.
* @constructor
* @param {object} params - The parameters for this operator.
* @param {function(object): *} params.field - The data field containing date/time values.
function TimeUnit(params) {, null, params);
const OUTPUT = ['unit0', 'unit1'];
TimeUnit.Definition = {
"type": "TimeUnit",
"metadata": {"modifies": true},
"params": [
{ "name": "field", "type": "field", "required": true },
{ "name": "interval", "type": "boolean", "default": true },
{ "name": "units", "type": "string", "array": true },
{ "name": "step", "type": "number", "default": 1 },
{ "name": "timezone", "type": "enum", "default": "local", "values": ["local", "utc"] },
{ "name": "as", "type": "string", "array": true, "length": 2, "default": OUTPUT }
var prototype$u = vegaUtil.inherits(TimeUnit, vegaDataflow.Transform);
prototype$u.transform = function(_, pulse) {
var field = _.field,
band = _.interval !== false,
utc = _.timezone === 'utc',
floor = this._floor(_, pulse),
offset = (utc ? vegaTime.utcInterval : vegaTime.timeInterval)(floor.unit).offset,
as = || OUTPUT,
u0 = as[0],
u1 = as[1],
min = floor.start || Infinity,
max = floor.stop || -Infinity,
step = floor.step,
flag = pulse.ADD;
if (_.modified() || pulse.modified(vegaUtil.accessorFields(_.field))) {
pulse = pulse.reflow(true);
flag = pulse.SOURCE;
min = Infinity;
max = -Infinity;
pulse.visit(flag, function(t) {
var v = field(t), a, b;
if (v == null) {
t[u0] = null;
if (band) t[u1] = null;
} else {
t[u0] = a = b = floor(v);
if (band) t[u1] = b = offset(a, step);
if (a < min) min = a;
if (b > max) max = b;
floor.start = min;
floor.stop = max;
return pulse.modifies(band ? as : u0);
prototype$u._floor = function(_, pulse) {
const utc = _.timezone === 'utc';
// get parameters
let {units, step} = _.units
? {units: _.units, step: _.step || 1}
: vegaTime.timeBin({
extent: vegaUtil.extent(pulse.materialize(pulse.SOURCE).source, _.field),
maxbins: _.maxbins
// check / standardize time units
units = vegaTime.timeUnits(units);
const prev = this.value || {},
floor = (utc ? vegaTime.utcFloor : vegaTime.timeFloor)(units, step);
floor.unit = vegaUtil.peek(units);
floor.units = units;
floor.step = step;
floor.start = prev.start;
floor.stop = prev.stop;
return this.value = floor;
* An index that maps from unique, string-coerced, field values to tuples.
* Assumes that the field serves as a unique key with no duplicate values.
* @constructor
* @param {object} params - The parameters for this operator.
* @param {function(object): *} params.field - The field accessor to index.
function TupleIndex(params) {, vegaUtil.fastmap(), params);
var prototype$v = vegaUtil.inherits(TupleIndex, vegaDataflow.Transform);
prototype$v.transform = function(_, pulse) {
var df = pulse.dataflow,
field = _.field,
index = this.value,
mod = true;
function set(t) { index.set(field(t), t); }
if (_.modified('field') || pulse.modified(field.fields)) {
pulse.visit(pulse.SOURCE, set);
} else if (pulse.changed()) {
pulse.visit(pulse.REM, function(t) { index.delete(field(t)); });
pulse.visit(pulse.ADD, set);
} else {
mod = false;
if (index.empty > df.cleanThreshold) df.runAfter(index.clean);
return pulse.fork();
* Extracts an array of values. Assumes the source data has already been
* reduced as needed (e.g., by an upstream Aggregate transform).
* @constructor
* @param {object} params - The parameters for this operator.
* @param {function(object): *} params.field - The domain field to extract.
* @param {function(*,*): number} [params.sort] - An optional
* comparator function for sorting the values. The comparator will be
* applied to backing tuples prior to value extraction.
function Values(params) {, null, params);
var prototype$w = vegaUtil.inherits(Values, vegaDataflow.Transform);
prototype$w.transform = function(_, pulse) {
var run = !this.value
|| _.modified('field')
|| _.modified('sort')
|| pulse.changed()
|| (_.sort && pulse.modified(_.sort.fields));
if (run) {
this.value = (_.sort
? pulse.source.slice().sort(vegaDataflow.stableCompare(_.sort))
: pulse.source).map(_.field);
function WindowOp(op, field, param, as) {
let fn = WindowOps[op](field, param);
return {
init: fn.init ||,
update: function(w, t) { t[as] =; }
const WindowOps = {
row_number: function() {
return {
next: w => w.index + 1
rank: function() {
let rank;
return {
init: () => rank = 1,
next: w => {
let i = w.index,
data =;
return (i &&[i - 1], data[i])) ? (rank = i + 1) : rank;
dense_rank: function() {
let drank;
return {
init: () => drank = 1,
next: w => {
let i = w.index,
d =;
return (i &&[i - 1], d[i])) ? ++drank : drank;
percent_rank: function() {
let rank = WindowOps.rank(),
next =;
return {
init: rank.init,
next: w => (next(w) - 1) / ( - 1)
cume_dist: function() {
let cume;
return {
init: () => cume = 0,
next: w => {
let i = w.index,
d =,
c =;
if (cume < i) {
while (i + 1 < d.length && !c(d[i], d[i + 1])) ++i;
cume = i;
return (1 + cume) / d.length;
ntile: function(field, num) {
num = +num;
if (!(num > 0)) vegaUtil.error('ntile num must be greater than zero.');
let cume = WindowOps.cume_dist(),
next =;
return {
init: cume.init,
next: w => Math.ceil(num * next(w))
lag: function(field, offset) {
offset = +offset || 1;
return {
next: w => {
let i = w.index - offset;
return i >= 0 ? field([i]) : null;
lead: function(field, offset) {
offset = +offset || 1;
return {
next: w => {
let i = w.index + offset,
d =;
return i < d.length ? field(d[i]) : null;
first_value: function(field) {
return {
next: w => field([w.i0])
last_value: function(field) {
return {
next: w => field([w.i1 - 1])
nth_value: function(field, nth) {
nth = +nth;
if (!(nth > 0)) vegaUtil.error('nth_value nth must be greater than zero.');
return {
next: w => {
let i = w.i0 + (nth - 1);
return i < w.i1 ? field([i]) : null;
prev_value: function(field) {
let prev = null;
return {
next: w => {
let v = field([w.index]);
return v != null ? (prev = v) : prev;
next_value: function(field) {
let v = null,
i = -1;
return {
next: w => {
let d =;
return w.index <= i ? v
: (i = find(field, d, w.index)) < 0
? (i = d.length, v = null)
: (v = field(d[i]));
function find(field, data, index) {
for (let n = data.length; index < n; ++index) {
let v = field(data[index]);
if (v != null) return index;
return -1;
var ValidWindowOps = Object.keys(WindowOps);
function WindowState(_) {
let self = this,
ops = vegaUtil.array(_.ops),
fields = vegaUtil.array(_.fields),
params = vegaUtil.array(_.params),
as = vegaUtil.array(,
outputs = self.outputs = [],
windows = = [],
inputs = {},
map = {},
countOnly = true,
counts = [],
measures = [];
function visitInputs(f) {
vegaUtil.array(vegaUtil.accessorFields(f)).forEach(_ => inputs[_] = 1);
ops.forEach(function(op, i) {
let field = fields[i],
mname = vegaUtil.accessorName(field),
name = measureName(op, mname, as[i]);
// Window operation
if (vegaUtil.hasOwnProperty(WindowOps, op)) {
windows.push(WindowOp(op, fields[i], params[i], name));
// Aggregate operation
else {
if (field == null && op !== 'count') {
vegaUtil.error('Null aggregate field specified.');
if (op === 'count') {
countOnly = false;
let m = map[mname];
if (!m) {
m = (map[mname] = []);
m.field = field;
m.push(createMeasure(op, name));
if (counts.length || measures.length) {
self.cell = cell(measures, counts, countOnly);
self.inputs = Object.keys(inputs);
const prototype$x = WindowState.prototype;
prototype$x.init = function() { => _.init());
if (this.cell) this.cell.init();
prototype$x.update = function(w, t) {
let self = this,
cell = self.cell,
wind =,
data =,
m = wind && wind.length,
if (cell) {
for (j=w.p0; j<w.i0; ++j) cell.rem(data[j]);
for (j=w.p1; j<w.i1; ++j) cell.add(data[j]);
for (j=0; j<m; ++j) wind[j].update(w, t);
function cell(measures, counts, countOnly) {
measures = => compileMeasures(m, m.field));
let cell = {
num: 0,
agg: null,
store: false,
count: counts
if (!countOnly) {
var n = measures.length,
a = cell.agg = Array(n),
i = 0;
for (; i<n; ++i) a[i] = new measures[i](cell);
if ( {
var store = = new TupleStore();
cell.add = function(t) {
cell.num += 1;
if (countOnly) return;
if (store) store.add(t);
for (let i=0; i<n; ++i) {
a[i].add(a[i].get(t), t);
cell.rem = function(t) {
cell.num -= 1;
if (countOnly) return;
if (store) store.rem(t);
for (let i=0; i<n; ++i) {
a[i].rem(a[i].get(t), t);
cell.set = function(t) {
let i, n;
// consolidate stored values
if (store) store.values();
// update tuple properties
for (i=0, n=counts.length; i<n; ++i) t[counts[i]] = cell.num;
if (!countOnly) for (i=0, n=a.length; i<n; ++i) a[i].set(t);
cell.init = function() {
cell.num = 0;
if (store) store.reset();
for (let i=0; i<n; ++i) a[i].init();
return cell;
* Perform window calculations and write results to the input stream.
* @constructor
* @param {object} params - The parameters for this operator.
* @param {function(*,*): number} [params.sort] - A comparator function for sorting tuples within a window.
* @param {Array<function(object): *>} [params.groupby] - An array of accessors by which to partition tuples into separate windows.
* @param {Array<string>} params.ops - An array of strings indicating window operations to perform.
* @param {Array<function(object): *>} [params.fields] - An array of accessors
* for data fields to use as inputs to window operations.
* @param {Array<*>} [params.params] - An array of parameter values for window operations.
* @param {Array<string>} [] - An array of output field names for window operations.
* @param {Array<number>} [params.frame] - Window frame definition as two-element array.
* @param {boolean} [params.ignorePeers=false] - If true, base window frame boundaries on row
* number alone, ignoring peers with identical sort values. If false (default),
* the window boundaries will be adjusted to include peer values.
function Window(params) {, {}, params);
this._mlen = 0;
this._mods = [];
Window.Definition = {
"type": "Window",
"metadata": {"modifies": true},
"params": [
{ "name": "sort", "type": "compare" },
{ "name": "groupby", "type": "field", "array": true },
{ "name": "ops", "type": "enum", "array": true, "values": ValidWindowOps.concat(ValidAggregateOps) },
{ "name": "params", "type": "number", "null": true, "array": true },
{ "name": "fields", "type": "field", "null": true, "array": true },
{ "name": "as", "type": "string", "null": true, "array": true },
{ "name": "frame", "type": "number", "null": true, "array": true, "length": 2, "default": [null, 0] },
{ "name": "ignorePeers", "type": "boolean", "default": false }
var prototype$y = vegaUtil.inherits(Window, vegaDataflow.Transform);
prototype$y.transform = function(_, pulse) {
var self = this,
state = self.state,
mod = _.modified(),
cmp = vegaDataflow.stableCompare(_.sort),
i, n;
this.stamp = pulse.stamp;
// initialize window state
if (!state || mod) {
state = self.state = new WindowState(_);
// retrieve group for a tuple
var key = groupkey(_.groupby);
function group(t) { return; }
// partition input tuples
if (mod || pulse.modified(state.inputs)) {
self.value = {};
pulse.visit(pulse.SOURCE, function(t) { group(t).add(t); });
} else {
pulse.visit(pulse.REM, function(t) { group(t).remove(t); });
pulse.visit(pulse.ADD, function(t) { group(t).add(t); });
// perform window calculations for each modified partition
for (i=0, n=self._mlen; i<n; ++i) {
processPartition(self._mods[i], state, cmp, _);
self._mlen = 0;
self._mods = [];
// TODO don't reflow everything?
return pulse.reflow(mod).modifies(state.outputs);
prototype$ = function(key) {
var self = this,
group = self.value[key];
if (!group) {
group = self.value[key] = SortedList(vegaDataflow.tupleid);
group.stamp = -1;
if (group.stamp < self.stamp) {
group.stamp = self.stamp;
self._mods[self._mlen++] = group;
return group;
function processPartition(list, state, cmp, _) {
var sort = _.sort,
range = sort && !_.ignorePeers,
frame = _.frame || [null, 0],
data =, // use cmp for stable sort
n = data.length,
i = 0,
b = range ? d3Array.bisector(sort) : null,
w = {
i0: 0, i1: 0, p0: 0, p1: 0, index: 0,
data: data, compare: sort || vegaUtil.constant(-1)
for (state.init(); i<n; ++i) {
setWindow(w, frame, i, n);
if (range) adjustRange(w, b);
state.update(w, data[i]);
function setWindow(w, f, i, n) {
w.p0 = w.i0;
w.p1 = w.i1;
w.i0 = f[0] == null ? 0 : Math.max(0, i - Math.abs(f[0]));
w.i1 = f[1] == null ? n : Math.min(n, i + Math.abs(f[1]) + 1);
w.index = i;
// if frame type is 'range', adjust window for peer values
function adjustRange(w, bisect) {
var r0 = w.i0,
r1 = w.i1 - 1,
c =,
d =,
n = d.length - 1;
if (r0 > 0 && !c(d[r0], d[r0-1])) w.i0 = bisect.left(d, d[r0]);
if (r1 < n && !c(d[r1], d[r1+1])) w.i1 = bisect.right(d, d[r1]);
