For introduction on Pipes see Data Pipes and Data Processing Streams
See also
For complete information about nodes see Node Reference
Creates a data stream.
Parameters : |
|
---|
Add a node into the stream.
Connects source node and target node. Nodes can be provided as objects or names.
Initializes the data processing stream:
Returns a node in the stream or node with name. This method is used for coalescing in other methods, where you can pass either node name or node object.
Parameters : |
|
---|
Return nodes that provide data for node.
Return nodes that node passes data into.
Remove a node from the stream. Also all connections will be removed.
Remove connection between source and target nodes, if exists.
Run all nodes in the stream.
Each node is being wrapped and run in a separate thread.
When an exception occurs, the stream is stopped and all catched exceptions are stored in attribute exceptions.
Return topologically sorted nodes.
Algorithm:
L = Empty list that will contain the sorted elements
S = Set of all nodes with no incoming edges
while S is non-empty do
remove a node n from S
insert n into L
for each node m with an edge e from n to m do
remove edge e from the graph
if m has no other incoming edges then
insert m into S
if graph has edges then
raise exception: graph has at least one cycle
else
return proposed topologically sorted order: L
Creates a new data processing node.
Attributes : |
|
---|
Finalizes the node. Default implementation does nothing.
Initializes the node. Initialization is separated from creation. Put any Node subclass initialization in this method. Default implementation does nothing.
Return single node imput if exists. Convenience property for nodes which process only one input. Raises exception if there are no inputs or are more than one imput.
Return fields from input pipe, if there is one and only one input pipe.
Convenience method for gettin names of fields generated by the node. For more information see brewery.pipes.output_fields()
Return fields passed to the output by the node. Subclasses should override this method. Default implementation raises a NotImplementedError.
Put row into all output pipes. Convenience method.
Put record into all output pipes. Convenience method. Not recommended to be used.
Main method for running the node code. Subclasses should implement this method.
Abstract class for all source nodes
All source nodes should provide an attribute or implement a property (@property) called output_fields.
Abstract class for all target nodes
Create a pipe for transfer of structured data between processing nodes.
Pipe passes structured data between processing nodes and node threads by using Queue object. Data are not being send as they come, but they are buffered instead. When buffer is full or when pipe flush() is requeted, then the buffer is send through the queue.
If receiving node is finished with source data and does not want anything any more, it should send stop() to the pipe. In most cases, stream runner will send stop() to all input pipes when node run() method is finished.
If sending node is finished, it should send flush() to the pipe, however this is not necessary in most cases, as the method for running stream flushes outputs automatically on when node run() method is finished.
Parameters : |
|
---|
Send all remaining data objects into the pipe buffer and signalize end of source.
Put data object into the pipe buffer. When buffer is full it is enqueued and receiving node can get all buffered data objects.
Get data objects from pipe as records (dict objects). This is convenience method with performance costs. Nodes are recommended to process rows instead.
Get data object from pipe. If there is no buffer ready, wait until source object sends some data.
Close the pipe from target node: no more data needed.