Data processing stream is a network of data processing nodes connected by data pipes. There are several data processing node types:
See also
Stream class documentation: brewery.pipes.Stream
# Prepare nodes
nodes = {
"source": pipes.YamlDirectorySourceNode(path = "data/donations"),
"strip": pipes.StringStripNode(),
"distinct": pipes.DistinctNode(keys = ["year", "receiver", "project"]),
"target": pipes.SQLTableTarget(url = "postgres://localhost/data", table = "donations"),
"audit": pipes.AuditNode(),
"print": pipes.FormattedPrinterNode(output = "audit.txt")
}
# Configure nodes
nodes["source"].fields = ds.fieldlist([ ("year", "integer"),
("receiver", "string"),
("project", "string"),
("requested_amount", "float"),
("received_amount", "float"),
("source_comment", "string")])
nodes["print"].header = u"field nulls empty\n" \
"-----------------------------------------------"
nodes["print"].format = u"{field_name:<30.30} {null_record_ratio:3.2%} {empty_string_count:>10}"
connections = [ ("source", "strip"),
("strip", "distinct"),
("distinct", "target"),
("strip", "audit"),
("audit", "print")
]
# Create and run stream
stream = pipes.Stream(nodes, connections)
stream.initialize()
stream.run()
stream.finalize()
The created audit.txt file will contain:
field nulls empty
-----------------------------------------------
year 0.00% 0
receiver 0.00% 5
project 0.51% 0
requested_amount 0.70% 0
received_amount 6.40% 0
source_comment 99.97% 0
See also
For more information about nodes see Node Reference
Streams are being run using Stream.run(). Stream has to be initialized before running. Stream is run in parallel - each node is run in separate thread.