Data Pipes and Data Processing Streams

For introduction on Pipes see Data Pipes and Data Processing Streams

See also

For complete information about nodes see Node Reference

class brewery.pipes.Stream(nodes=None, connections=None)

Creates a data stream.

Parameters :
  • nodes - dictionary with keys as node names and values as nodes
  • connections - list of two-item tuples. Each tuple contains source and target node or source and target node name.
add(node, name=None)

Add a node into the stream.

connect(source, target)

Connects source node and target node. Nodes can be provided as objects or names.

initialize()

Initializes the data processing stream:

  • sorts nodes based on connection dependencies
  • creates pipes between nodes
  • initializes each node
  • initializes pipe fields
node(node)

Returns a node in the stream or node with name. This method is used for coalescing in other methods, where you can pass either node name or node object.

Parameters :
  • node - node object or node name
node_sources(node)

Return nodes that provide data for node.

node_targets(node)

Return nodes that node passes data into.

remove(node)

Remove a node from the stream. Also all connections will be removed.

remove_connection(source, target)

Remove connection between source and target nodes, if exists.

run()

Run all nodes in the stream.

Each node is being wrapped and run in a separate thread.

When an exception occurs, the stream is stopped and all catched exceptions are stored in attribute exceptions.

sorted_nodes()

Return topologically sorted nodes.

Algorithm:

L = Empty list that will contain the sorted elements
S = Set of all nodes with no incoming edges
while S is non-empty do
    remove a node n from S
    insert n into L
    for each node m with an edge e from n to m do
        remove edge e from the graph
        if m has no other incoming edges then
            insert m into S
if graph has edges then
    raise exception: graph has at least one cycle
else 
    return proposed topologically sorted order: L
class brewery.pipes.Node

Creates a new data processing node.

Attributes :
  • inputs: input pipes
  • outputs: output pipes
  • description: custom node annotation
finalize()

Finalizes the node. Default implementation does nothing.

initialize()

Initializes the node. Initialization is separated from creation. Put any Node subclass initialization in this method. Default implementation does nothing.

input

Return single node imput if exists. Convenience property for nodes which process only one input. Raises exception if there are no inputs or are more than one imput.

input_fields

Return fields from input pipe, if there is one and only one input pipe.

output_field_names

Convenience method for gettin names of fields generated by the node. For more information see brewery.pipes.output_fields()

output_fields

Return fields passed to the output by the node. Subclasses should override this method. Default implementation raises a NotImplementedError.

put(obj)

Put row into all output pipes. Convenience method.

put_record(obj)

Put record into all output pipes. Convenience method. Not recommended to be used.

run()

Main method for running the node code. Subclasses should implement this method.

class brewery.pipes.SourceNode

Abstract class for all source nodes

All source nodes should provide an attribute or implement a property (@property) called output_fields.

class brewery.pipes.TargetNode

Abstract class for all target nodes

class brewery.pipes.Pipe(buffer_size=1000, queue_size=1)

Create a pipe for transfer of structured data between processing nodes.

Pipe passes structured data between processing nodes and node threads by using Queue object. Data are not being send as they come, but they are buffered instead. When buffer is full or when pipe flush() is requeted, then the buffer is send through the queue.

If receiving node is finished with source data and does not want anything any more, it should send stop() to the pipe. In most cases, stream runner will send stop() to all input pipes when node run() method is finished.

If sending node is finished, it should send flush() to the pipe, however this is not necessary in most cases, as the method for running stream flushes outputs automatically on when node run() method is finished.

Parameters :
  • buffer_size: number of data objects (rows or records) to be collected before they can be acquired by receiving object. Default is 1000.
  • queue_size: number of buffers in a processing queue. Default is 1. Set to 0 for unlimited.
flush()

Send all remaining data objects into the pipe buffer and signalize end of source.

put(obj)

Put data object into the pipe buffer. When buffer is full it is enqueued and receiving node can get all buffered data objects.

records()

Get data objects from pipe as records (dict objects). This is convenience method with performance costs. Nodes are recommended to process rows instead.

rows()

Get data object from pipe. If there is no buffer ready, wait until source object sends some data.

stop()

Close the pipe from target node: no more data needed.

Previous topic

Data Quality API

This Page