import {groupkey} from './util/AggregateKeys'; import {createMeasure, compileMeasures, measureName, ValidAggregateOps} from './util/AggregateOps'; import TupleStore from './util/TupleStore'; import {ingest, replace, Transform} from 'vega-dataflow'; import {accessorFields, accessorName, array, error, inherits} from 'vega-util'; /** * Group-by aggregation operator. * @constructor * @param {object} params - The parameters for this operator. * @param {Array} [params.groupby] - An array of accessors to groupby. * @param {Array} [params.fields] - An array of accessors to aggregate. * @param {Array} [params.ops] - An array of strings indicating aggregation operations. * @param {Array} [params.as] - An array of output field names for aggregated values. * @param {boolean} [params.cross=false] - A flag indicating that the full * cross-product of groupby values should be generated, including empty cells. * If true, the drop parameter is ignored and empty cells are retained. * @param {boolean} [params.drop=true] - A flag indicating if empty cells should be removed. */ export default function Aggregate(params) { Transform.call(this, null, params); this._adds = []; // array of added output tuples this._mods = []; // array of modified output tuples this._alen = 0; // number of active added tuples this._mlen = 0; // number of active modified tuples this._drop = true; // should empty aggregation cells be removed this._cross = false; // produce full cross-product of group-by values this._dims = []; // group-by dimension accessors this._dnames = []; // group-by dimension names this._measures = []; // collection of aggregation monoids this._countOnly = false; // flag indicating only count aggregation this._counts = null; // collection of count fields this._prev = null; // previous aggregation cells this._inputs = null; // array of dependent input tuple field names this._outputs = null; // array of output tuple field names } Aggregate.Definition = { "type": "Aggregate", "metadata": {"generates": true, "changes": true}, "params": [ { "name": "groupby", "type": "field", "array": true }, { "name": "ops", "type": "enum", "array": true, "values": ValidAggregateOps }, { "name": "fields", "type": "field", "null": true, "array": true }, { "name": "as", "type": "string", "null": true, "array": true }, { "name": "drop", "type": "boolean", "default": true }, { "name": "cross", "type": "boolean", "default": false }, { "name": "key", "type": "field" } ] }; var prototype = inherits(Aggregate, Transform); prototype.transform = function(_, pulse) { var aggr = this, out = pulse.fork(pulse.NO_SOURCE | pulse.NO_FIELDS), mod = _.modified(); aggr.stamp = out.stamp; if (aggr.value && (mod || pulse.modified(aggr._inputs, true))) { aggr._prev = aggr.value; aggr.value = mod ? aggr.init(_) : {}; pulse.visit(pulse.SOURCE, t => aggr.add(t)); } else { aggr.value = aggr.value || aggr.init(_); pulse.visit(pulse.REM, t => aggr.rem(t)); pulse.visit(pulse.ADD, t => aggr.add(t)); } // Indicate output fields and return aggregate tuples. out.modifies(aggr._outputs); // Should empty cells be dropped? aggr._drop = _.drop !== false; // If domain cross-product requested, generate empty cells as needed // and ensure that empty cells are not dropped if (_.cross && aggr._dims.length > 1) { aggr._drop = false; aggr.cross(); } return aggr.changes(out); }; prototype.cross = function() { var aggr = this, curr = aggr.value, dims = aggr._dnames, vals = dims.map(function() { return {}; }), n = dims.length; // collect all group-by domain values function collect(cells) { var key, i, t, v; for (key in cells) { t = cells[key].tuple; for (i=0; i