Skip to content

Commit e85bbf7

Browse files
committed
Ensure existing usages in DF are calling state_mut and execute_mut
1 parent 6ced235 commit e85bbf7

File tree

5 files changed

+9
-9
lines changed

5 files changed

+9
-9
lines changed

datafusion/functions-aggregate-common/src/aggregate/groups_accumulator.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -325,9 +325,9 @@ impl GroupsAccumulator for GroupsAccumulatorAdapter {
325325

326326
let results: Vec<ScalarValue> = states
327327
.into_iter()
328-
.map(|state| {
328+
.map(|mut state| {
329329
self.free_allocation(state.size());
330-
state.accumulator.evaluate()
330+
state.accumulator.evaluate_mut()
331331
})
332332
.collect::<Result<_>>()?;
333333

@@ -408,7 +408,7 @@ impl GroupsAccumulator for GroupsAccumulatorAdapter {
408408
// If there are no rows, return empty arrays
409409
if num_rows == 0 {
410410
// create empty accumulator to get the state types
411-
let empty_state = (self.factory)()?.state()?;
411+
let empty_state = (self.factory)()?.state_mut()?;
412412
let empty_arrays = empty_state
413413
.into_iter()
414414
.map(|state_val| new_empty_array(&state_val.data_type()))
@@ -427,7 +427,7 @@ impl GroupsAccumulator for GroupsAccumulatorAdapter {
427427
let values_to_accumulate =
428428
slice_and_maybe_filter(values, opt_filter, &[row_idx, row_idx + 1])?;
429429
converted_accumulator.update_batch(&values_to_accumulate)?;
430-
let states = converted_accumulator.state()?;
430+
let states = converted_accumulator.state_mut()?;
431431

432432
// Resize results to have enough columns according to the converted states
433433
results.resize_with(states.len(), || Vec::with_capacity(num_rows));

datafusion/functions-aggregate-common/src/utils.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ pub fn get_accum_scalar_values_as_arrays(
3737
accum: &mut dyn Accumulator,
3838
) -> Result<Vec<ArrayRef>> {
3939
accum
40-
.state()?
40+
.state_mut()?
4141
.iter()
4242
.map(|s| s.to_array_of_size(1))
4343
.collect()

datafusion/physical-expr/src/window/aggregate.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -210,7 +210,7 @@ impl AggregateWindowExpr for PlainAggregateWindowExpr {
210210
.collect();
211211
accumulator.update_batch(&update)?
212212
}
213-
accumulator.evaluate()
213+
accumulator.evaluate_mut()
214214
}
215215
}
216216
}

datafusion/physical-expr/src/window/sliding_aggregate.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -207,7 +207,7 @@ impl AggregateWindowExpr for SlidingAggregateWindowExpr {
207207
.collect();
208208
accumulator.retract_batch(&retract)?
209209
}
210-
accumulator.evaluate()
210+
accumulator.evaluate_mut()
211211
}
212212
}
213213
}

datafusion/physical-plan/src/aggregates/mod.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1308,7 +1308,7 @@ pub fn finalize_aggregation(
13081308
accumulators
13091309
.iter_mut()
13101310
.map(|accumulator| {
1311-
accumulator.state().and_then(|e| {
1311+
accumulator.state_mut().and_then(|e| {
13121312
e.iter()
13131313
.map(|v| v.to_array())
13141314
.collect::<Result<Vec<ArrayRef>>>()
@@ -1324,7 +1324,7 @@ pub fn finalize_aggregation(
13241324
// Merge the state to the final value
13251325
accumulators
13261326
.iter_mut()
1327-
.map(|accumulator| accumulator.evaluate().and_then(|v| v.to_array()))
1327+
.map(|accumulator| accumulator.evaluate_mut().and_then(|v| v.to_array()))
13281328
.collect()
13291329
}
13301330
}

0 commit comments

Comments
 (0)