Streaming Execution (Acero)#
Creating and running execution plans#
-
enum class UnalignedBufferHandling#
How to handle unaligned buffers.
Values:
-
enumerator kWarn#
-
enumerator kIgnore#
-
enumerator kReallocate#
-
enumerator kError#
-
enumerator kWarn#
-
UnalignedBufferHandling GetDefaultUnalignedBufferHandling()#
get the default behavior of unaligned buffer handling
This is configurable via the ACERO_ALIGNMENT_HANDLING environment variable which can be set to “warn”, “ignore”, “reallocate”, or “error”. If the environment variable is not set, or is set to an invalid value, this will return kWarn
-
Result<std::shared_ptr<Schema>> DeclarationToSchema(const Declaration &declaration, FunctionRegistry *function_registry = NULLPTR)#
Calculate the output schema of a declaration.
This does not actually execute the plan. This operation may fail if the declaration represents an invalid plan (e.g. a project node with multiple inputs)
- Parameters:
declaration – A declaration describing an execution plan
function_registry – The function registry to use for function execution. If null then the default function registry will be used.
- Returns:
the schema that batches would have after going through the execution plan
-
Result<std::string> DeclarationToString(const Declaration &declaration, FunctionRegistry *function_registry = NULLPTR)#
Create a string representation of a plan.
This representation is for debug purposes only.
Conversion to a string may fail if the declaration represents an invalid plan.
Use Substrait for complete serialization of plans
- Parameters:
declaration – A declaration describing an execution plan
function_registry – The function registry to use for function execution. If null then the default function registry will be used.
- Returns:
a string representation of the plan suitable for debugging output
-
Result<std::shared_ptr<Table>> DeclarationToTable(Declaration declaration, bool use_threads = true, MemoryPool *memory_pool = default_memory_pool(), FunctionRegistry *function_registry = NULLPTR)#
Utility method to run a declaration and collect the results into a table.
This method will add a sink node to the declaration to collect results into a table. It will then create an ExecPlan from the declaration, start the exec plan, block until the plan has finished, and return the created table.
- Parameters:
declaration – A declaration describing the plan to run
use_threads – If
use_threads
is false then all CPU work will be done on the calling thread. I/O tasks will still happen on the I/O executor and may be multi-threaded (but should not use significant CPU resources).memory_pool – The memory pool to use for allocations made while running the plan.
function_registry – The function registry to use for function execution. If null then the default function registry will be used.
-
Result<std::shared_ptr<Table>> DeclarationToTable(Declaration declaration, QueryOptions query_options)#
-
Future<std::shared_ptr<Table>> DeclarationToTableAsync(Declaration declaration, bool use_threads = true, MemoryPool *memory_pool = default_memory_pool(), FunctionRegistry *function_registry = NULLPTR)#
Asynchronous version of.
See also
- Parameters:
declaration – A declaration describing the plan to run
use_threads – The behavior of use_threads is slightly different than the synchronous version since we cannot run synchronously on the calling thread. Instead, if use_threads=false then a new thread pool will be created with a single thread and this will be used for all compute work.
memory_pool – The memory pool to use for allocations made while running the plan.
function_registry – The function registry to use for function execution. If null then the default function registry will be used.
-
Future<std::shared_ptr<Table>> DeclarationToTableAsync(Declaration declaration, ExecContext custom_exec_context)#
Overload of.
The executor must be specified (cannot be null) and must be kept alive until the returned future finishes.
See also
DeclarationToTableAsync accepting a custom exec context
-
Result<BatchesWithCommonSchema> DeclarationToExecBatches(Declaration declaration, bool use_threads = true, MemoryPool *memory_pool = default_memory_pool(), FunctionRegistry *function_registry = NULLPTR)#
Utility method to run a declaration and collect the results into ExecBatch vector.
See also
DeclarationToTable for details on threading & execution
-
Result<BatchesWithCommonSchema> DeclarationToExecBatches(Declaration declaration, QueryOptions query_options)#
-
Future<BatchesWithCommonSchema> DeclarationToExecBatchesAsync(Declaration declaration, bool use_threads = true, MemoryPool *memory_pool = default_memory_pool(), FunctionRegistry *function_registry = NULLPTR)#
Asynchronous version of.
See also
See also
DeclarationToTableAsync for details on threading & execution
-
Future<BatchesWithCommonSchema> DeclarationToExecBatchesAsync(Declaration declaration, ExecContext custom_exec_context)#
Overload of.
See also
DeclarationToExecBatchesAsync accepting a custom exec context
See also
DeclarationToTableAsync for details on threading & execution
-
Result<std::vector<std::shared_ptr<RecordBatch>>> DeclarationToBatches(Declaration declaration, bool use_threads = true, MemoryPool *memory_pool = default_memory_pool(), FunctionRegistry *function_registry = NULLPTR)#
Utility method to run a declaration and collect the results into a vector.
See also
DeclarationToTable for details on threading & execution
-
Result<std::vector<std::shared_ptr<RecordBatch>>> DeclarationToBatches(Declaration declaration, QueryOptions query_options)#
-
Future<std::vector<std::shared_ptr<RecordBatch>>> DeclarationToBatchesAsync(Declaration declaration, bool use_threads = true, MemoryPool *memory_pool = default_memory_pool(), FunctionRegistry *function_registry = NULLPTR)#
Asynchronous version of.
See also
See also
DeclarationToTableAsync for details on threading & execution
-
Future<std::vector<std::shared_ptr<RecordBatch>>> DeclarationToBatchesAsync(Declaration declaration, ExecContext exec_context)#
Overload of.
See also
DeclarationToBatchesAsync accepting a custom exec context
See also
DeclarationToTableAsync for details on threading & execution
-
Result<std::unique_ptr<RecordBatchReader>> DeclarationToReader(Declaration declaration, bool use_threads = true, MemoryPool *memory_pool = default_memory_pool(), FunctionRegistry *function_registry = NULLPTR)#
Utility method to run a declaration and return results as a RecordBatchReader.
If an exec context is not provided then a default exec context will be used based on the value of
use_threads
. Ifuse_threads
is false then the CPU executor will be a serial executor and all CPU work will be done on the calling thread. I/O tasks will still happen on the I/O executor and may be multi-threaded.If
use_threads
is false then all CPU work will happen during the calls to RecordBatchReader::Next and no CPU work will happen in the background. Ifuse_threads
is true then CPU work will happen on the CPU thread pool and tasks may run in between calls to RecordBatchReader::Next. If the returned reader is not consumed quickly enough then the plan will eventually pause as the backpressure queue fills up.If a custom exec context is provided then the value of
use_threads
will be ignored.The returned RecordBatchReader can be closed early to cancel the computation of record batches. In this case, only errors encountered by the computation may be reported. In particular, no cancellation error may be reported.
-
Result<std::unique_ptr<RecordBatchReader>> DeclarationToReader(Declaration declaration, QueryOptions query_options)#
-
Status DeclarationToStatus(Declaration declaration, bool use_threads = true, MemoryPool *memory_pool = default_memory_pool(), FunctionRegistry *function_registry = NULLPTR)#
Utility method to run a declaration and ignore results.
This can be useful when the data are consumed as part of the plan itself, for example, when the plan ends with a write node.
See also
DeclarationToTable for details on threading & execution
-
Status DeclarationToStatus(Declaration declaration, QueryOptions query_options)#
-
Future DeclarationToStatusAsync(Declaration declaration, bool use_threads = true, MemoryPool *memory_pool = default_memory_pool(), FunctionRegistry *function_registry = NULLPTR)#
Asynchronous version of.
This can be useful when the data are consumed as part of the plan itself, for example, when the plan ends with a write node.
See also
See also
DeclarationToTableAsync for details on threading & execution
-
Future DeclarationToStatusAsync(Declaration declaration, ExecContext exec_context)#
Overload of.
See also
DeclarationToStatusAsync accepting a custom exec context
See also
DeclarationToTableAsync for details on threading & execution
-
struct Declaration#
- #include <arrow/acero/exec_plan.h>
Helper class for declaring execution nodes.
A Declaration represents an unconstructed ExecNode (and potentially an entire graph since its inputs may also be Declarations)
A Declaration can be converted to a plan and executed using one of the DeclarationToXyz methods.
For more direct control, a Declaration can be added to an existing execution plan with Declaration::AddToPlan, which will recursively construct any inputs as necessary.
Public Types
-
using Input = std::variant<ExecNode*, Declaration>#
Public Functions
-
inline Declaration()#
construct a declaration
- Parameters:
factory_name – the name of the exec node to construct. The node must have been added to the exec node registry with this name.
inputs – the inputs to the node, these should be other declarations
options – options that control the behavior of the node. You must use the appropriate subclass. For example, if
factory_name
is “project” thenoptions
should be ProjectNodeOptions.label – a label to give the node. Can be used to distinguish it from other nodes of the same type in the plan.
-
template<typename Options>
inline Declaration(std::string factory_name, std::vector<Input> inputs, Options options, std::string label)#
-
template<typename Options>
inline Declaration(std::string factory_name, std::vector<Input> inputs, Options options)#
-
template<typename Options>
inline Declaration(std::string factory_name, Options options, std::string label)#
-
Result<ExecNode*> AddToPlan(ExecPlan *plan, ExecFactoryRegistry *registry = default_exec_factory_registry()) const#
add the declaration to an already created execution plan
This method will recursively call AddToPlan on all of the declaration’s inputs. This method is only for advanced use when the DeclarationToXyz methods are not sufficient.
- Parameters:
plan – the plan to add the node to
registry – the registry to use to lookup the node factory
- Returns:
the instantiated execution node
-
bool IsValid(ExecFactoryRegistry *registry = default_exec_factory_registry()) const#
Public Members
-
std::string factory_name#
the name of the factory to use when creating a node
-
std::shared_ptr<ExecNodeOptions> options#
options to control the behavior of the node
-
std::string label#
a label to give the node in the plan
Public Static Functions
-
static Declaration Sequence(std::vector<Declaration> decls)#
Convenience factory for the common case of a simple sequence of nodes.
Each of decls will be appended to the inputs of the subsequent declaration, and the final modified declaration will be returned.
Without this convenience factory, constructing a sequence would require explicit, difficult-to-read nesting:
An equivalent Declaration can be constructed more tersely using Sequence:Declaration{"n3", { Declaration{"n2", { Declaration{"n1", { Declaration{"n0", N0Opts{}}, }, N1Opts{}}, }, N2Opts{}}, }, N3Opts{}};
Declaration::Sequence({ {"n0", N0Opts{}}, {"n1", N1Opts{}}, {"n2", N2Opts{}}, {"n3", N3Opts{}}, });
-
using Input = std::variant<ExecNode*, Declaration>#
-
struct QueryOptions#
- #include <arrow/acero/exec_plan.h>
plan-wide options that can be specified when executing an execution plan
Public Members
-
bool use_legacy_batching = false#
Should the plan use a legacy batching strategy.
This is currently in place only to support the Scanner::ToTable method. This method relies on batch indices from the scanner remaining consistent. This is impractical in the ExecPlan which might slice batches as needed (e.g. for a join)
However, it still works for simple plans and this is the only way we have at the moment for maintaining implicit order.
-
std::optional<bool> sequence_output = std::nullopt#
If the output has a meaningful order then sequence the output of the plan.
The default behavior (std::nullopt) will sequence output batches if there is a meaningful ordering in the final node and will emit batches immediately otherwise.
If explicitly set to true then plan execution will fail if there is no meaningful ordering. This can be useful to validate a query that should be emitting ordered results.
If explicitly set to false then batches will be emit immediately even if there is a meaningful ordering. This could cause batches to be emit out of order but may offer a small decrease to latency.
-
bool use_threads = true#
should the plan use multiple background threads for CPU-intensive work
If this is false then all CPU work will be done on the calling thread. I/O tasks will still happen on the I/O executor and may be multi-threaded (but should not use significant CPU resources).
Will be ignored if custom_cpu_executor is set
-
::arrow::internal::Executor *custom_cpu_executor = NULLPTR#
custom executor to use for CPU-intensive work
Must be null or remain valid for the duration of the plan. If this is null then a default thread pool will be chosen whose behavior will be controlled by the
use_threads
option.
-
::arrow::internal::Executor *custom_io_executor = NULLPTR#
custom executor to use for IO work
Must be null or remain valid for the duration of the plan. If this is null then the global io thread pool will be chosen whose behavior will be controlled by the “ARROW_IO_THREADS” environment.
-
MemoryPool *memory_pool = default_memory_pool()#
a memory pool to use for allocations
Must remain valid for the duration of the plan.
-
FunctionRegistry *function_registry = GetFunctionRegistry()#
a function registry to use for the plan
Must remain valid for the duration of the plan.
-
std::vector<std::string> field_names#
the names of the output columns
If this is empty then names will be generated based on the input columns
If set then the number of names must equal the number of output columns
-
std::optional<UnalignedBufferHandling> unaligned_buffer_handling#
Policy for unaligned buffers in source data.
Various compute functions and acero internals will type pun array buffers from uint8_t* to some kind of value type (e.g. we might cast to int32_t* to add two int32 arrays)
If the buffer is poorly aligned (e.g. an int32 array is not aligned on a 4-byte boundary) then this is technically undefined behavior in C++. However, most modern compilers and CPUs are fairly tolerant of this behavior and nothing bad (beyond a small hit to performance) is likely to happen.
Note that this only applies to source buffers. All buffers allocated internally by Acero will be suitably aligned.
If this field is set to kWarn then Acero will check if any buffers are unaligned and, if they are, will emit a warning.
If this field is set to kReallocate then Acero will allocate a new, suitably aligned buffer and copy the contents from the old buffer into this new buffer.
If this field is set to kError then Acero will gracefully abort the plan instead.
If this field is set to kIgnore then Acero will not even check if the buffers are unaligned.
If this field is not set then it will be treated as kWarn unless overridden by the ACERO_ALIGNMENT_HANDLING environment variable
-
bool use_legacy_batching = false#
-
struct BatchesWithCommonSchema#
- #include <arrow/acero/exec_plan.h>
a collection of exec batches with a common schema
Configuration for execution nodes#
-
enum class JoinType#
Values:
-
enumerator LEFT_SEMI#
-
enumerator RIGHT_SEMI#
-
enumerator LEFT_ANTI#
-
enumerator RIGHT_ANTI#
-
enumerator INNER#
-
enumerator LEFT_OUTER#
-
enumerator RIGHT_OUTER#
-
enumerator FULL_OUTER#
-
enumerator LEFT_SEMI#
-
using ArrayVectorIteratorMaker = std::function<Iterator<std::shared_ptr<ArrayVector>>()>#
a source node that reads from an iterator of array vectors
-
using ExecBatchIteratorMaker = std::function<Iterator<std::shared_ptr<ExecBatch>>()>#
a source node that reads from an iterator of ExecBatch
-
using RecordBatchIteratorMaker = std::function<Iterator<std::shared_ptr<RecordBatch>>()>#
-
constexpr int32_t kDefaultBackpressureHighBytes = 1 << 30#
a default value at which backpressure will be applied
-
constexpr int32_t kDefaultBackpressureLowBytes = 1 << 28#
a default value at which backpressure will be removed
-
class ExecNodeOptions#
- #include <arrow/acero/options.h>
A base class for all options objects.
The only time this is used directly is when a node has no configuration
Subclassed by arrow::acero::SchemaSourceNodeOptions< ArrayVectorIteratorMaker >, arrow::acero::SchemaSourceNodeOptions< ExecBatchIteratorMaker >, arrow::acero::SchemaSourceNodeOptions< RecordBatchIteratorMaker >, arrow::acero::AggregateNodeOptions, arrow::acero::AsofJoinNodeOptions, arrow::acero::ConsumingSinkNodeOptions, arrow::acero::FetchNodeOptions, arrow::acero::FilterNodeOptions, arrow::acero::HashJoinNodeOptions, arrow::acero::NamedTableNodeOptions, arrow::acero::OrderByNodeOptions, arrow::acero::PivotLongerNodeOptions, arrow::acero::ProjectNodeOptions, arrow::acero::RecordBatchReaderSourceNodeOptions, arrow::acero::SchemaSourceNodeOptions< ItMaker >, arrow::acero::SinkNodeOptions, arrow::acero::SourceNodeOptions, arrow::acero::TableSinkNodeOptions, arrow::acero::TableSourceNodeOptions, arrow::dataset::ScanNodeOptions, arrow::dataset::ScanV2Options, arrow::dataset::WriteNodeOptions
Public Functions
-
virtual ~ExecNodeOptions() = default#
Public Members
-
std::shared_ptr<DebugOptions> debug_opts#
This must not be used in release-mode.
-
virtual ~ExecNodeOptions() = default#
-
class SourceNodeOptions : public arrow::acero::ExecNodeOptions#
- #include <arrow/acero/options.h>
A node representing a generic source of data for Acero.
The source node will start calling
generator
during StartProducing. An initial task will be created that will callgenerator
. It will not callgenerator
reentrantly. If the source can be read in parallel then those details should be encapsulated withingenerator
.For each batch received a new task will be created to push that batch downstream. This task will slice smaller units of size
ExecPlan::kMaxBatchSize
from the parent batch and call InputReceived. Thus, if thegenerator
yields a large batch it may result in several calls to InputReceived.The SourceNode will, by default, assign an implicit ordering to outgoing batches. This is valid as long as the generator generates batches in a deterministic fashion. Currently, the only way to override this is to subclass the SourceNode.
This node is not generally used directly but can serve as the basis for various specialized nodes.
Public Functions
Create an instance from values.
-
class TableSourceNodeOptions : public arrow::acero::ExecNodeOptions#
- #include <arrow/acero/options.h>
a node that generates data from a table already loaded in memory
The table source node will slice off chunks, defined by
max_batch_size
for parallel processing. The table source node extends source node and so these chunks will be iteratively processed in small batches.See also
SourceNodeOptions for details.
Public Functions
Create an instance from values.
Public Members
-
int64_t max_batch_size#
size of batches to emit from this node If the table is larger the node will emit multiple batches from the the table to be processed in parallel.
Public Static Attributes
-
static constexpr int64_t kDefaultMaxBatchSize = 1 << 20#
-
class NamedTableNodeOptions : public arrow::acero::ExecNodeOptions#
- #include <arrow/acero/options.h>
define a lazily resolved Arrow table.
The table uniquely identified by the names can typically be resolved at the time when the plan is to be consumed.
This node is for serialization purposes only and can never be executed.
Public Functions
Create an instance from values.
-
template<typename ItMaker>
class SchemaSourceNodeOptions : public arrow::acero::ExecNodeOptions# - #include <arrow/acero/options.h>
a source node which feeds data from a synchronous iterator of batches
ItMaker is a maker of an iterator of tabular data.
The node can be configured to use an I/O executor. If set then each time the iterator is polled a new I/O thread task will be created to do the polling. This allows a blocking iterator to stay off the CPU thread pool.
Public Functions
Create an instance that will create a new task on io_executor for each iteration.
Create an instance that will either iterate synchronously or use the default I/O executor.
Public Members
-
arrow::internal::Executor *io_executor#
The executor to use for scanning the iterator.
Defaults to the default I/O executor. Only used if requires_io is true. If requires_io is false then this MUST be nullptr.
-
bool requires_io#
If true then items will be fetched from the iterator on a dedicated I/O thread to keep I/O off the CPU thread.
-
class RecordBatchReaderSourceNodeOptions : public arrow::acero::ExecNodeOptions#
- #include <arrow/acero/options.h>
a source node that reads from a RecordBatchReader
Each iteration of the RecordBatchReader will be run on a new thread task created on the I/O thread pool.
Public Functions
Create an instance from values.
Public Members
-
std::shared_ptr<RecordBatchReader> reader#
The RecordBatchReader which acts as the data source.
-
arrow::internal::Executor *io_executor#
The executor to use for the reader.
Defaults to the default I/O executor.
-
class ArrayVectorSourceNodeOptions : public arrow::acero::SchemaSourceNodeOptions<ArrayVectorIteratorMaker>#
- #include <arrow/acero/options.h>
An extended Source node which accepts a schema and array-vectors.
-
class ExecBatchSourceNodeOptions : public arrow::acero::SchemaSourceNodeOptions<ExecBatchIteratorMaker>#
- #include <arrow/acero/options.h>
An extended Source node which accepts a schema and exec-batches.
Public Functions
Create an instance that will create a new task on io_executor for each iteration.
Create an instance that will either iterate synchronously or use the default I/O executor.
-
class RecordBatchSourceNodeOptions : public arrow::acero::SchemaSourceNodeOptions<RecordBatchIteratorMaker>#
- #include <arrow/acero/options.h>
a source node that reads from an iterator of RecordBatch
-
class FilterNodeOptions : public arrow::acero::ExecNodeOptions#
- #include <arrow/acero/options.h>
a node which excludes some rows from batches passed through it
filter_expression will be evaluated against each batch which is pushed to this node. Any rows for which filter_expression does not evaluate to
true
will be excluded in the batch emitted by this node.This node will emit empty batches if all rows are excluded. This is done to avoid gaps in the ordering.
Public Functions
-
inline explicit FilterNodeOptions(Expression filter_expression)#
create an instance from values
Public Members
-
Expression filter_expression#
the expression to filter batches
The return type of this expression must be boolean
-
inline explicit FilterNodeOptions(Expression filter_expression)#
-
class FetchNodeOptions : public arrow::acero::ExecNodeOptions#
- #include <arrow/acero/options.h>
a node which selects a specified subset from the input
Public Functions
-
inline FetchNodeOptions(int64_t offset, int64_t count)#
create an instance from values
Public Members
-
int64_t offset#
the number of rows to skip
-
int64_t count#
the number of rows to keep (not counting skipped rows)
Public Static Attributes
-
static constexpr std::string_view kName = "fetch"#
-
inline FetchNodeOptions(int64_t offset, int64_t count)#
-
class ProjectNodeOptions : public arrow::acero::ExecNodeOptions#
- #include <arrow/acero/options.h>
a node which executes expressions on input batches, producing batches of the same length with new columns.
Each expression will be evaluated against each batch which is pushed to this node to produce a corresponding output column.
If names are not provided, the string representations of exprs will be used.
Public Functions
-
inline explicit ProjectNodeOptions(std::vector<Expression> expressions, std::vector<std::string> names = {})#
create an instance from values
Public Members
-
std::vector<Expression> expressions#
the expressions to run on the batches
The output will have one column for each expression. If you wish to keep any of the columns from the input then you should create a simple field_ref expression for that column.
-
std::vector<std::string> names#
the names of the output columns
If this is not specified then the result of calling ToString on the expression will be used instead
This list should either be empty or have the same length as
expressions
-
inline explicit ProjectNodeOptions(std::vector<Expression> expressions, std::vector<std::string> names = {})#
-
class AggregateNodeOptions : public arrow::acero::ExecNodeOptions#
- #include <arrow/acero/options.h>
a node which aggregates input batches and calculates summary statistics
The node can summarize the entire input or it can group the input with grouping keys and segment keys.
By default, the aggregate node is a pipeline breaker. It must accumulate all input before any output is produced. Segment keys are a performance optimization. If you know your input is already partitioned by one or more columns then you can specify these as segment keys. At each change in the segment keys the node will emit values for all data seen so far.
Segment keys are currently limited to single-threaded mode.
Both keys and segment-keys determine the group. However segment-keys are also used for determining grouping segments, which should be large, and allow streaming a partial aggregation result after processing each segment. One common use-case for segment-keys is ordered aggregation, in which the segment-key attribute specifies a column with non-decreasing values or a lexicographically-ordered set of such columns.
If the keys attribute is a non-empty vector, then each aggregate in
aggregates
is expected to be a HashAggregate function. If the keys attribute is an empty vector, then each aggregate is assumed to be a ScalarAggregate function.If the segment_keys attribute is a non-empty vector, then segmented aggregation, as described above, applies.
The keys and segment_keys vectors must be disjoint.
If no measures are provided then you will simply get the list of unique keys.
This node outputs segment keys first, followed by regular keys, followed by one column for each aggregate.
Public Functions
-
class BackpressureMonitor#
- #include <arrow/acero/options.h>
an interface that can be queried for backpressure statistics
-
struct BackpressureOptions#
- #include <arrow/acero/options.h>
Options to control backpressure behavior.
Public Functions
-
inline BackpressureOptions()#
Create default options that perform no backpressure.
-
inline BackpressureOptions(uint64_t resume_if_below, uint64_t pause_if_above)#
Create options that will perform backpressure.
- Parameters:
resume_if_below – The producer should resume producing if the backpressure queue has fewer than resume_if_below items.
pause_if_above – The producer should pause producing if the backpressure queue has more than pause_if_above items
-
inline bool should_apply_backpressure() const#
helper method to determine if backpressure is disabled
- Returns:
true if pause_if_above is greater than zero, false otherwise
Public Members
-
uint64_t resume_if_below#
the number of bytes at which the producer should resume producing
-
uint64_t pause_if_above#
the number of bytes at which the producer should pause producing
If this is <= 0 then backpressure will be disabled
Public Static Functions
-
static inline BackpressureOptions DefaultBackpressure()#
create an instance using default values for backpressure limits
-
inline BackpressureOptions()#
-
class SinkNodeOptions : public arrow::acero::ExecNodeOptions#
- #include <arrow/acero/options.h>
a sink node which collects results in a queue
Emitted batches will only be ordered if there is a meaningful ordering and sequence_output is not set to false.
Subclassed by arrow::acero::OrderBySinkNodeOptions, arrow::acero::SelectKSinkNodeOptions
Public Functions
-
inline explicit SinkNodeOptions(std::function<Future<std::optional<ExecBatch>>()> *generator, BackpressureOptions backpressure = {}, BackpressureMonitor **backpressure_monitor = NULLPTR, std::optional<bool> sequence_output = std::nullopt)#
Public Members
-
std::function<Future<std::optional<ExecBatch>>()> *generator#
A pointer to a generator of batches.
This will be set when the node is added to the plan and should be used to consume data from the plan. If this function is not called frequently enough then the sink node will start to accumulate data and may apply backpressure.
-
std::shared_ptr<Schema> *schema#
A pointer which will be set to the schema of the generated batches.
This is optional, if nullptr is passed in then it will be ignored. This will be set when the node is added to the plan, before StartProducing is called
-
BackpressureOptions backpressure#
Options to control when to apply backpressure.
This is optional, the default is to never apply backpressure. If the plan is not consumed quickly enough the system may eventually run out of memory.
-
BackpressureMonitor **backpressure_monitor#
A pointer to a backpressure monitor.
This will be set when the node is added to the plan. This can be used to inspect the amount of data currently queued in the sink node. This is an optional utility and backpressure can be applied even if this is not used.
-
std::optional<bool> sequence_output#
Controls whether batches should be emitted immediately or sequenced in order.
See also
QueryOptions for more details
-
inline explicit SinkNodeOptions(std::function<Future<std::optional<ExecBatch>>()> *generator, BackpressureOptions backpressure = {}, BackpressureMonitor **backpressure_monitor = NULLPTR, std::optional<bool> sequence_output = std::nullopt)#
-
class BackpressureControl#
- #include <arrow/acero/options.h>
Control used by a SinkNodeConsumer to pause & resume.
Callers should ensure that they do not call Pause and Resume simultaneously and they should sequence things so that a call to Pause() is always followed by an eventual call to Resume()
-
class SinkNodeConsumer#
- #include <arrow/acero/options.h>
a sink node that consumes the data as part of the plan using callbacks
Subclassed by arrow::acero::NullSinkNodeConsumer, arrow::acero::TableSinkNodeConsumer
Public Functions
-
virtual ~SinkNodeConsumer() = default#
Prepare any consumer state.
This will be run once the schema is finalized as the plan is starting and before any calls to Consume. A common use is to save off the schema so that batches can be interpreted.
-
virtual ~SinkNodeConsumer() = default#
-
class ConsumingSinkNodeOptions : public arrow::acero::ExecNodeOptions#
- #include <arrow/acero/options.h>
Add a sink node which consumes data within the exec plan run.
Public Functions
Public Members
-
std::shared_ptr<SinkNodeConsumer> consumer#
-
std::vector<std::string> names#
Names to rename the sink’s schema fields to.
If specified then names must be provided for all fields. Currently, only a flat schema is supported (see GH-31875).
If not specified then names will be generated based on the source data.
-
std::optional<bool> sequence_output#
Controls whether batches should be emitted immediately or sequenced in order.
See also
QueryOptions for more details
-
std::shared_ptr<SinkNodeConsumer> consumer#
-
class OrderBySinkNodeOptions : public arrow::acero::SinkNodeOptions#
- #include <arrow/acero/options.h>
Make a node which sorts rows passed through it.
All batches pushed to this node will be accumulated, then sorted, by the given fields. Then sorted batches will be forwarded to the generator in sorted order.
Public Functions
Public Members
-
SortOptions sort_options#
options describing which columns and direction to sort
-
SortOptions sort_options#
-
class OrderByNodeOptions : public arrow::acero::ExecNodeOptions#
- #include <arrow/acero/options.h>
Apply a new ordering to data.
Currently this node works by accumulating all data, sorting, and then emitting the new data with an updated batch index.
Larger-than-memory sort is not currently supported.
Public Functions
-
inline explicit OrderByNodeOptions(Ordering ordering)#
Public Members
-
Ordering ordering#
The new ordering to apply to outgoing data.
Public Static Attributes
-
static constexpr std::string_view kName = "order_by"#
-
inline explicit OrderByNodeOptions(Ordering ordering)#
-
class HashJoinNodeOptions : public arrow::acero::ExecNodeOptions#
- #include <arrow/acero/options.h>
a node which implements a join operation using a hash table
Public Functions
-
inline HashJoinNodeOptions(JoinType in_join_type, std::vector<FieldRef> in_left_keys, std::vector<FieldRef> in_right_keys, Expression filter = literal(true), std::string output_suffix_for_left = default_output_suffix_for_left, std::string output_suffix_for_right = default_output_suffix_for_right, bool disable_bloom_filter = false)#
create an instance from values that outputs all columns
-
inline HashJoinNodeOptions(std::vector<FieldRef> in_left_keys, std::vector<FieldRef> in_right_keys)#
create an instance from keys
This will create an inner join that outputs all columns and has no post join filter
in_left_keys
should have the same length and types asin_right_keys
- Parameters:
in_left_keys – the keys in the left input
in_right_keys – the keys in the right input
-
inline HashJoinNodeOptions(JoinType join_type, std::vector<FieldRef> left_keys, std::vector<FieldRef> right_keys, std::vector<FieldRef> left_output, std::vector<FieldRef> right_output, Expression filter = literal(true), std::string output_suffix_for_left = default_output_suffix_for_left, std::string output_suffix_for_right = default_output_suffix_for_right, bool disable_bloom_filter = false)#
create an instance from values using JoinKeyCmp::EQ for all comparisons
-
inline HashJoinNodeOptions(JoinType join_type, std::vector<FieldRef> left_keys, std::vector<FieldRef> right_keys, std::vector<FieldRef> left_output, std::vector<FieldRef> right_output, std::vector<JoinKeyCmp> key_cmp, Expression filter = literal(true), std::string output_suffix_for_left = default_output_suffix_for_left, std::string output_suffix_for_right = default_output_suffix_for_right, bool disable_bloom_filter = false)#
create an instance from values
-
HashJoinNodeOptions() = default#
-
inline HashJoinNodeOptions(JoinType in_join_type, std::vector<FieldRef> in_left_keys, std::vector<FieldRef> in_right_keys, Expression filter = literal(true), std::string output_suffix_for_left = default_output_suffix_for_left, std::string output_suffix_for_right = default_output_suffix_for_right, bool disable_bloom_filter = false)#
-
class AsofJoinNodeOptions : public arrow::acero::ExecNodeOptions#
- #include <arrow/acero/options.h>
a node which implements the asof join operation
Note, this API is experimental and will change in the future
This node takes one left table and any number of right tables, and asof joins them together. Batches produced by each input must be ordered by the “on” key. This node will output one row for each row in the left table.
Public Members
-
std::vector<Keys> input_keys#
AsofJoin keys per input table.
At least two keys must be given. The first key corresponds to a left table and all other keys correspond to right tables for the as-of-join.
See also
Keys
for details.
-
int64_t tolerance#
Tolerance for inexact “on” key matching.
A right row is considered a match with the left row if
right.on - left.on <= tolerance
. Thetolerance
may be:negative, in which case a past-as-of-join occurs;
or positive, in which case a future-as-of-join occurs;
or zero, in which case an exact-as-of-join occurs.
The tolerance is interpreted in the same units as the “on” key.
-
struct Keys#
- #include <arrow/acero/options.h>
Keys for one input table of the AsofJoin operation.
The keys must be consistent across the input tables: Each “on” key must refer to a field of the same type and units across the tables. Each “by” key must refer to a list of fields of the same types across the tables.
Public Members
-
FieldRef on_key#
“on” key for the join.
The input table must be sorted by the “on” key. Must be a single field of a common type. Inexact match is used on the “on” key. i.e., a row is considered a match iff left_on - tolerance <= right_on <= left_on. Currently, the “on” key must be of an integer, date, or timestamp type.
-
FieldRef on_key#
-
std::vector<Keys> input_keys#
-
class SelectKSinkNodeOptions : public arrow::acero::SinkNodeOptions#
- #include <arrow/acero/options.h>
a node which select top_k/bottom_k rows passed through it
All batches pushed to this node will be accumulated, then selected, by the given fields. Then sorted batches will be forwarded to the generator in sorted order.
Public Functions
Public Members
-
SelectKOptions select_k_options#
SelectK options.
-
SelectKOptions select_k_options#
-
class TableSinkNodeOptions : public arrow::acero::ExecNodeOptions#
- #include <arrow/acero/options.h>
a sink node which accumulates all output into a table
Public Functions
create an instance from values
Public Members
-
std::shared_ptr<Table> *output_table#
an “out parameter” specifying the table that will be created
Must not be null and remain valid for the entirety of the plan execution. After the plan has completed this will be set to point to the result table
-
std::optional<bool> sequence_output#
Controls whether batches should be emitted immediately or sequenced in order.
See also
QueryOptions for more details
-
std::vector<std::string> names#
Custom names to use for the columns.
If specified then names must be provided for all fields. Currently, only a flat schema is supported (see GH-31875).
If not specified then names will be generated based on the source data.
-
struct PivotLongerRowTemplate#
- #include <arrow/acero/options.h>
a row template that describes one row that will be generated for each input row
Public Functions
Public Members
-
std::vector<std::string> feature_values#
A (typically unique) set of feature values for the template, usually derived from a column name.
These will be used to populate the feature columns
-
std::vector<std::string> feature_values#
-
class PivotLongerNodeOptions : public arrow::acero::ExecNodeOptions#
- #include <arrow/acero/options.h>
Reshape a table by turning some columns into additional rows.
This operation is sometimes also referred to as UNPIVOT
This is typically done when there are multiple observations in each row in order to transform to a table containing a single observation per row.
For example:
time
left_temp
right_temp
1
10
20
2
15
18
The above table contains two observations per row. There is an implicit feature “location” (left vs right) and a measurement “temp”. What we really want is:
time
location
temp
1
left
10
1
right
20
2
left
15
2
right
18
For a more complex example consider:
time
ax1
ay1
bx1
ay2
0
1
2
3
4
We can pretend a vs b and x vs y are features while 1 and 2 are two different kinds of measurements. We thus want to pivot to
time
a/b
x/y
f1
f2
0
a
x
1
null
0
a
y
2
4
0
b
x
3
null
To do this we create a row template for each combination of features. One should be able to do this purely by looking at the column names. For example, given the above columns “ax1”, “ay1”, “bx1”, and “ay2” we know we have three feature combinations (a, x), (a, y), and (b, x). Similarly, we know we have two possible measurements, “1” and “2”.
For each combination of features we create a row template. In each row template we describe the combination and then list which columns to use for the measurements. If a measurement doesn’t exist for a given combination then we use nullopt.
So, for our above example, we have:
(a, x): names={“a”, “x”}, values={“ax1”, nullopt} (a, y): names={“a”, “y”}, values={“ay1”, “ay2”} (b, x): names={“b”, “x”}, values={“bx1”, nullopt}
Finishing it off we name our new columns: feature_field_names={“a/b”,”x/y”} measurement_field_names={“f1”, “f2”}
Public Members
-
std::vector<PivotLongerRowTemplate> row_templates#
One or more row templates to create new output rows.
Normally there are at least two row templates. The output # of rows will be the input # of rows * the number of row templates
-
std::vector<std::string> feature_field_names#
The names of the columns which describe the new features.
-
std::vector<std::string> measurement_field_names#
The names of the columns which represent the measurements.
Public Static Attributes
-
static constexpr std::string_view kName = "pivot_longer"#
-
std::vector<PivotLongerRowTemplate> row_templates#
Internals for creating custom nodes#
-
ExecFactoryRegistry *default_exec_factory_registry()#
The default registry, which includes built-in factories.
-
inline Result<ExecNode*> MakeExecNode(const std::string &factory_name, ExecPlan *plan, std::vector<ExecNode*> inputs, const ExecNodeOptions &options, ExecFactoryRegistry *registry = default_exec_factory_registry())#
Construct an ExecNode using the named factory.
-
inline bool operator==(const ExecBatch &l, const ExecBatch &r)#
-
inline bool operator!=(const ExecBatch &l, const ExecBatch &r)#
-
void PrintTo(const ExecBatch&, std::ostream*)#
-
class ExecPlan : public std::enable_shared_from_this<ExecPlan>#
- #include <arrow/acero/exec_plan.h>
Public Functions
-
virtual ~ExecPlan() = default#
-
QueryContext *query_context()#
-
const NodeVector &nodes() const#
retrieve the nodes in the plan
-
void StartProducing()#
Start producing on all nodes.
Nodes are started in reverse topological order, such that any node is started before all of its inputs.
-
void StopProducing()#
Stop producing on all nodes.
Triggers all sources to stop producing new data. In order to cleanly stop the plan will continue to run any tasks that are already in progress. The caller should still wait for
finished
to complete before destroying the plan.
-
bool HasMetadata() const#
Return whether the plan has non-empty metadata.
-
std::shared_ptr<const KeyValueMetadata> metadata() const#
Return the plan’s attached metadata.
-
std::string ToString() const#
Public Static Functions
Make an empty exec plan.
Public Static Attributes
-
static const uint32_t kMaxBatchSize = 1 << 15#
-
virtual ~ExecPlan() = default#
-
class ExecNode#
- #include <arrow/acero/exec_plan.h>
Subclassed by arrow::acero::MapNode
Public Functions
-
virtual ~ExecNode() = default#
-
virtual const char *kind_name() const = 0#
-
inline int num_inputs() const#
-
inline const NodeVector &inputs() const#
This node’s predecessors in the exec plan.
-
inline bool is_sink() const#
True if the plan has no output schema (is a sink)
-
inline const std::vector<std::string> &input_labels() const#
Labels identifying the function of each input.
-
inline const std::shared_ptr<Schema> &output_schema() const#
The datatypes for batches produced by this node.
-
inline const std::string &label() const#
An optional label, for display and debugging.
There is no guarantee that this value is non-empty or unique.
-
inline void SetLabel(std::string label)#
-
virtual const Ordering &ordering() const#
the ordering of the output batches
This does not guarantee the batches will be emitted by this node in order. Instead it guarantees that the batches will have their ExecBatch::index property set in a way that respects this ordering.
In other words, given the ordering {{“x”, SortOrder::Ascending}} we know that all values of x in a batch with index N will be less than or equal to all values of x in a batch with index N+k (assuming k > 0). Furthermore, we also know that values will be sorted within a batch. Any row N will have a value of x that is less than the value for any row N+k.
Note that an ordering can be both Ordering::Unordered and Ordering::Implicit. A node’s output should be marked Ordering::Unordered if the order is non-deterministic. For example, a hash-join has no predictable output order.
If the ordering is Ordering::Implicit
then there is a meaningful order but that ordering is not represented by any column in the data. The most common case for this is when reading data from an in-memory table. The data has an implicit “row
order” which is not necessarily represented in the data set.
A filter or project node will not modify the ordering. Nothing needs to be done other than ensure the index assigned to output batches is the same as the input batch that was mapped.
Other nodes may introduce order. For example, an order-by node will emit a brand new ordering independent of the input ordering.
Finally, as described above, such as a hash-join or aggregation may may destroy ordering (although these nodes could also choose to establish a new ordering based on the hash keys).
Some nodes will require an ordering. For example, a fetch node or an asof join node will only function if the input data is ordered (for fetch it is enough to be implicitly ordered. For an asof join the ordering must be explicit and compatible with the on key.)
Nodes that maintain ordering should be careful to avoid introducing gaps in the batch index. This may require emitting empty batches in order to maintain continuity.
-
virtual Status InputReceived(ExecNode *input, ExecBatch batch) = 0#
Upstream API: These functions are called by input nodes that want to inform this node about an updated condition (a new input batch or an impending end of stream).
Implementation rules:
these may be called anytime after StartProducing() has succeeded (and even during or after StopProducing())
these may be called concurrently
these are allowed to call back into PauseProducing(), ResumeProducing() and StopProducing() Transfer input batch to ExecNode
A node will typically perform some kind of operation on the batch and then call InputReceived on its outputs with the result.
Other nodes may need to accumulate some number of inputs before any output can be produced. These nodes will add the batch to some kind of in-memory accumulation queue and return.
-
virtual Status InputFinished(ExecNode *input, int total_batches) = 0#
Mark the inputs finished after the given number of batches.
This may be called before all inputs are received. This simply fixes the total number of incoming batches for an input, so that the ExecNode knows when it has received all input, regardless of order.
-
virtual Status Init()#
Perform any needed initialization.
This hook performs any actions in between creation of ExecPlan and the call to StartProducing. An example could be Bloom filter pushdown. The order of ExecNodes that executes this method is undefined, but the calls are made synchronously.
At this point a node can rely on all inputs & outputs (and the input schemas) being well defined.
-
virtual Status StartProducing() = 0#
Lifecycle API:
start / stop to initiate and terminate production
pause / resume to apply backpressure
Implementation rules:
StartProducing() should not recurse into the inputs, as it is handled by ExecPlan::StartProducing()
PauseProducing(), ResumeProducing(), StopProducing() may be called concurrently, potentially even before the call to StartProducing has finished.
PauseProducing(), ResumeProducing(), StopProducing() may be called by the downstream nodes’ InputReceived(), InputFinished() methods
StopProducing may be called due to an error, by the user (e.g. cancel), or because a node has all the data it needs (e.g. limit, top-k on sorted data). This means the method may be called multiple times and we have the following additional rules
StopProducing() must be idempotent
StopProducing() must be forwarded to inputs (this is needed for the limit/top-k case because we may not be stopping the entire plan)
Start producing
This must only be called once.
This is typically called automatically by ExecPlan::StartProducing().
-
virtual void PauseProducing(ExecNode *output, int32_t counter) = 0#
Pause producing temporarily.
This call is a hint that an output node is currently not willing to receive data.
This may be called any number of times. However, the node is still free to produce data (which may be difficult to prevent anyway if data is produced using multiple threads).
- Parameters:
output – Pointer to the output that is full
counter – Counter used to sequence calls to pause/resume
-
virtual void ResumeProducing(ExecNode *output, int32_t counter) = 0#
Resume producing after a temporary pause.
This call is a hint that an output node is willing to receive data again.
This may be called any number of times.
- Parameters:
output – Pointer to the output that is now free
counter – Counter used to sequence calls to pause/resume
-
virtual Status StopProducing()#
Stop producing new data.
If this node is a source then the source should stop generating data as quickly as possible. If this node is not a source then there is typically nothing that needs to be done although a node may choose to start ignoring incoming data.
This method will be called when an error occurs in the plan This method may also be called by the user if they wish to end a plan early Finally, this method may be called if a node determines it no longer needs any more input (for example, a limit node).
This method may be called multiple times.
This is not a pause. There will be no way to start the source again after this has been called.
-
std::string ToString(int indent = 0) const#
-
virtual ~ExecNode() = default#
-
class ExecFactoryRegistry#
- #include <arrow/acero/exec_plan.h>
An extensible registry for factories of ExecNodes.
Public Types
-
struct ExecBatch#
- #include <arrow/compute/exec.h>
Public Functions
-
ExecBatch() = default#
-
explicit ExecBatch(const RecordBatch &batch)#
-
int64_t TotalBufferSize() const#
The sum of bytes in each buffer referenced by the batch.
Note: Scalars are not counted Note: Some values may referenced only part of a buffer, for example, an array with an offset. The actual data visible to this batch will be smaller than the total buffer size in this case.
-
template<typename index_type>
inline const Datum &operator[](index_type i) const# Return the value at the i-th index.
-
inline int num_values() const#
A convenience for the number of values / arguments.
-
inline std::vector<TypeHolder> GetTypes() const#
A convenience for returning the types from the batch.
-
std::string ToString() const#
Public Members
-
std::vector<Datum> values#
The values representing positional arguments to be passed to a kernel’s exec function for processing.
-
std::shared_ptr<SelectionVector> selection_vector#
A deferred filter represented as an array of indices into the values.
For example, the filter [true, true, false, true] would be represented as the selection vector [0, 1, 3]. When the selection vector is set, ExecBatch::length is equal to the length of this array.
-
Expression guarantee = literal(true)#
A predicate Expression guaranteed to evaluate to true for all rows in this batch.
-
int64_t length = 0#
The semantic length of the ExecBatch.
When the values are all scalars, the length should be set to 1 for non-aggregate kernels, otherwise the length is taken from the array values, except when there is a selection vector. When there is a selection vector set, the length of the batch is the length of the selection. Aggregate kernels can have an ExecBatch formed by projecting just the partition columns from a batch in which case, it would have scalar rows with length greater than 1.
If the array values are of length 0 then the length is 0 regardless of whether any values are Scalar.
-
int64_t index = kUnsequencedIndex#
index of this batch in a sorted stream of batches
This index must be strictly monotonic starting at 0 without gaps or it can be set to kUnsequencedIndex if there is no meaningful order
Public Static Functions
-
static Result<int64_t> InferLength(const std::vector<Datum> &values)#
Infer the ExecBatch length from values.
-
static Result<ExecBatch> Make(std::vector<Datum> values, int64_t length = -1)#
Creates an ExecBatch with length-validation.
If any value is given, then all values must have a common length. If the given length is negative, then the length of the ExecBatch is set to this common length, or to 1 if no values are given. Otherwise, the given length must equal the common length, if any value is given.
-
ExecBatch() = default#