Data::Tubes

The Missing Manual

by polettix

Dinky theme by Diana Mounter

The Missing Manual

Data::Tubes helps you manage transformations steps on a stream of data that can be though as a sequence of records. It does it by passing records through tubes, usually a sequence of them.

This manual is a bit long… here’s a table of contents for guiding you:

A Few Definitions

A record can be whatever scalar you can think of. It might even be undef. What’s in a record, and what it does mean, is completely up to you –and, of course, to the tube that is going to manage that record.

A record might even evolve through the pipeline to become something different, e.g. multiple records or no record at all. In other words, some tubes might take an input record of some nature, and emit output record(s) of a completely different one. Again, it’s up to the tube to decide this, which means that, eventually, it’s up to you.

So, a tube is a transformation function, that turns one single input record into zero, one or more output records, according to the model drawn below:

           \________/
                       --X nothing
 input  --\            --> one output record
 record --/            ==> some output records (array reference)
            _________  ==> some output records (iterator)
           /         \

In more formal terms, a tube is a sub reference that:

The iterator has some additional constraints:

So far so good with definitions, but let’s recap: a record is a scalar, a tube is a sub reference.

Typical Use Case

The typical use case for transforming data (e.g. in some ETL) can be summarized as follows, at least for me:

All the above operations have to be performed in sequence, and repeated until the sources cannot emit any more data to be read.

Data::Tubes helps you with managing the sequencing, make sure that you exhaust your sources, and also provides tools for addressing each step.

Note that your steps might be different. For example, you might want to introduce an intermediate step between the parsing and the rendering, to perform some data validation. You might want to perform a different rendering based on what has been parsed, or validated. You might want to select a different output channel based on what was parsed. So, although it’s a sequence, you might actually think your pipeline as possibly dividing into multiple alternatives at some steps, using data that come from any of the previous steps. Data::Tubes tools help you with this too!

In the following sections, we will gradually introduce the different tools available from Data::Tubes and show you how they can help you address the use case above, or the variant you might have.

What You Need 99% Of The Times

In most of the cases, you will only need to use function pipeline from the main module Data::Tubes. It allows you to define a sequence of tubes, or things that can be turned into tubes, with minimal hassle.

To show you how to use it, we will replicate the behaviour of the following simple program:

my @names = qw< Foo Bar Baz >;

for my $name (@names) {
   print "Hey, $name!\n";
}

We can re-implement it with Data::Tubes using much, much more code! Here’s some way to do it:

my @names = qw< Foo Bar Baz >;

use Data::Tubes qw< pipeline >;
pipeline(
   sub { return records => $_[0] }, # will iterate over items
   sub { return "Hey, $_[0]!\n"  }, # returns the string
   sub { print $_[0]; return },     # prints it, returns nothing
   { tap => 'sink' },               # makes sure the input is drained
)->(\@names);

Does not seem to be very exciting, huh? Whatever, it allows us to get our feet wet with pipeline:

Of course, we might have decided that the rendering step was not needed in our case, so we might have done something like this:

pipeline(
   sub { return records => $_[0] }, # will iterate over items
   sub { print "Hey, $_[0]!\n"   },
   { tap => 'sink' },               # makes sure the input is drained
)->(\@names);

It really depends on what you want to do. In general terms, it’s still useful to think the pipeline in terms of the “Typical Use Case”, because the toolset provided by Data::Tubes’ plugins usually provide you only one of those steps but, again, it’s up to you.

You might be wondering at this point: is it worth the effort? How would complicating a simple operation like the initial loop lead to a benefit? Here are the few advantages that actually led to Data::Tubes:

Hence, if your program is going to remain a simple loop, there’s really no absolute reason for using Data::Tubes. If it’s just the initial proof of concept of something that you suspect might turn into a beast, you might want to read on.

Managing Sources

The initial tube of a sequence is, arguably, your source. From a phylosophical point of view, you can view it as a transformation of the inputs you will eventually provide (as in the example we saw in the last section), or as a self-contained tube that is able to conjure things out of nothing. You decide.

If you happen to take your inputs from files, the toolbox provides you with a few tools inside Data::Tubes::Plugin::Source One, in particular, will be your friend most of the times: iterate_files.

This function is a factory, in the sense that you give it some arguments and it returns you a tube-compliant sub reference. All it does is to transform an input array reference containing file names into a data structure that contains a filehandle suitable for reading from those files.

Suppose you have your names in two files instead of an array:

$ cat mydata-01.txt
Foo
Bar
$ cat mydata-02.txt
Baz

You can do like this:

pipeline(
   '+Source::iterate_files',
   sub {
      my $fh = $_[0]->{source}{fh};
      chomp(my @names = <$fh>);
      return records => \@names;
   }                                # read records from one source
   sub { return "Hey, $_[0]!\n"  }, # returns the string
   sub { print $_[0]; return },     # prints it, returns nothing
   { tap => 'sink' },               # makes sure the input is drained
)->([qw< mydata-01.txt mydata-02.txt >]);

In other terms, you have substituted the input gathering process with different tubes, while keeping the rest of the pipeline as it was before.

We can notice one interesting thing about pipeline: in addition to real tubes, i.e. sub references, it can accept simple strings as well, that it will take care to automatically transform into tubes. In this case, it first turns Source::iterate_files into Data::Tubes::Plugin::Source::iterate_files, then loads the function iterate_files from the relevant module and used it as a factory to generate the real tube. We will see later how we can pass additional parameters to this factory functions.

It is worth mentioning how the automatic expansion of a string like Source::iterate_files is performed:

So, it tries to DWIM while still leaving space for being very precise.

There are also a few things that are interesting about iterate_files (you’re encouraged to read the docs in Data::Tubes::Plugin::Source, of course):

What Is A Record, Toolkit Style

The representation of the output record from iterate_files explained in the previous section does not come out of the blue: the whole plugins toolkit, with very few exceptions, works under the assumption that a record, in each step, is a hash reference where each tube takes and adds data. In particular, The different components in the toolkit make the following assumptions:

So, basically, whatever is emitted by one plugin type is good as input for the following plugin type:

Source       Reader      Parser           Renderer       Writer
  |           ^  |        ^  |              ^  |            ^
  |           |  |        |  |              |  |            |
  +-> source -+  +-> raw -+  +- structured -+  +- rendered -+

with the notable exception that each step actually receives all the sub-fields populated by the previous tubes, which can be used to customize the behaviour depending on your actual use case.

This sequence can also be useful for you to know if you want to insert some behaviour in between. We will see some examples later.

Reading

Remember the last example from “Managing Sources”? Here’s a refresher:

pipeline(
   '+Source::iterate_files',
   sub {
      my $fh = $_[0]->{source}{fh};
      chomp(my @names = <$fh>);
      return records => \@names;
   }                                # read records from one source
   sub { return "Hey, $_[0]!\n"  }, # returns the string
   sub { print $_[0]; return },     # prints it, returns nothing
   { tap => 'sink' },               # makes sure the input is drained
)->([qw< mydata-01.txt mydata-02.txt >]);

It turns out that you actually don’t need to do the reading of the file line by line yourself, because there’s a pre-canned function to do that, read_by_line (also known as by_line). The only thing that we have to consider is that the read line will be put in the raw field of a hash-based record:

pipeline(
   'Source::iterate_files',
   'Reader::by_line',
   sub { return "Hey, $_[0]->{raw}!\n"  }, # returns the string
   sub { print $_[0]; return },     # prints it, returns nothing
   { tap => 'sink' },               # makes sure the input is drained
)->([qw< mydata-01.txt mydata-02.txt >]);

Your eagle eye will surely have noticed that we got rid of the initial plus sign before the name of the plugin. As explained in “Managing Sources”, when it’s only the plugin name (as a single keyword) and the factory name it will DWIM, so we will not bother any more in the rest of this manual. Your approach might differ of course.

A plugin to read by line might seem overkill, but it already started sparing us a few lines of code, and I guess there are plenty of occasions where your input records are line-based. You’re invited to take a look at Data::Tubes::Plugin::Reader, where you might find by_paragraph, that reads… by the paragraph, and other reading functions.

Before the end of this section, it’s also worth taking a look at the equivalent “straight code” in Perl:

for my $filename (qw< mydata-01.txt mydata-02.txt >) {
    open my $fh, '<', $filename or die "open(): $!\n";
    binmode $fh, ':encoding(UTF-8)';
    while (<$fh>) {
        chomp();
        print "Hey, $_!\n";
    }
}

We’re already at a comparable number of lines…

Passing Options To Plugins

All of a sudden, your greetings applications starts to choke and you eventually figure that it depends on the encoding of the input file. In particular, we saw at the end of the previous section that iterate_files defaults on opening files as UTF-8 (look at the binmode in the equivalent code at the end), which is fine per-se, but when you print things out you get strange messages and unfortunately your boss stops you from setting the same encoding on STDOUT.

Don’t despair! You have a few arrows available. The first one is to just turn the input filehandle back to :raw, like this:

pipeline(
   'Source::iterate_files',
   sub { binmode $_[0]->{source}{fh}, ':raw'; return $_[0]; },
   'Reader::by_line',
   sub { return "Hey, $_[0]->{raw}!\n"  }, # returns the string
   sub { print $_[0]; return },     # prints it, returns nothing
   { tap => 'sink' },               # makes sure the input is drained
)->([qw< mydata-01.txt mydata-02.txt >]);

The saner alternative is to avoid setting the encoding in iterate_files in the first place, which can be obtained by passing options to the factory. Just substitute the simple string with an array reference, where the first item is the same as the string (i.e. a locator for the factory function), and the following ones are arguments for the factory itself:

pipeline(
   ['Source::iterate_files', open_file_args => {binmode => ':raw'}],
   'Reader::by_line',
   sub { return "Hey, $_[0]->{raw}!\n"  }, # returns the string
   sub { print $_[0]; return },     # prints it, returns nothing
   { tap => 'sink' },               # makes sure the input is drained
)->([qw< mydata-01.txt mydata-02.txt >]);

Most functions (i.e. tube factories) in the distribution accept as input a hash (i.e. a list of key/value pairs) or a hash reference. In many cases, anyway, there is one parameter that somehow is outstanding; if you hit such a factory function, you will be allowed to pass the first parameter without explicitly naming it. We will see an example with Parser::by_format below.

Parsing

So far, we relied upon the assumption that the whole input line is what we are really after, and we don’t need to parse it any more. Alas, this is not the case most of the times.

If you have some complicated format, your best option is to just code a tube to deal with it. For example, the following code would turn a paragraph with HTTP headers into a hash:

sub parse_HTTP_headers {
   my $headers = shift;
   $headers =~ s{\n\s+}{ }gmxs; # remove multi-lines
   my %retval;
   for my $line (split /\n/mxs, $headers) {
      my ($name, $value) = split /\s*:\s*/, $line, 2;
      $value =~ s{\s+\z}{}mxs; # trailing whitespace
      $retval{$name} =
        exists($retval{$name})
        ? $retval{$name} . ',' . $value
        : $value;
   }
   return \%retval;
}

Now, suppose your input changes to a sequence of header groups, divided into paragraphs, where you look for header X-Name:

$ cat mydata-03.txt
X-Name: Foo
Host: example.com

X-Name: Bar
 Barious
Date: 1970-01-01

Adapting to this input is quite easy now:

pipeline(
   ['Source::iterate_files', open_file_args => {binmode => ':raw'}],

   'Reader::by_paragraph',                  # change how we read!
   sub { parse_HTTP_Headers($_[0]->{raw}) } # wrap parse_HTTP_headers
   sub { return "Hey, $_[0]->{'X-Name'}!\n"  }, # use new field

   sub { print $_[0]; return },     # prints it, returns nothing
   { tap => 'sink' },               # makes sure the input is drained
)->([qw< mydata-03.txt >]);

From now on, anyway, we will stick to the convention described in “What Is A Record, Toolkit Style” about what’s available at the different stages, which allows us have stable inputs for tubes without having to worry too much about what we have before (within certain limits). Hence, here’s how our example transforms:

pipeline(
   # Source management
   ['Source::iterate_files', open_file_args => {binmode => ':raw'}],

   # Reading, gets `source`, puts `raw`
   'Reader::by_paragraph',

   # Parsing, gets `raw`, puts `structured`
   sub {
      my $record = shift;
      $record->{structured} = parse_HTTP_Headers($record->{raw});
      return $record;
   }

   # Rendering, gets `structured`, puts `rendered`
   sub {
      my $record = shift;
      $record->{rendered} = 
         "Hey, $record->{structured}{'X-Name'}!\n";
      return $record;
   },

   # Printing, gets `rendered`, returns input unchanged
   sub { print $_[0]{rendered}; return $_[0]; },

   # Options, just flush the output to the sink
   { tap => 'sink' },
)->([qw< mydata-03.txt >]);

As anticipated, Data::Tubes::Plugin::Parser contains some pre-canned factories for generating common parsers, mostly geared to line-oriented inputs (which can be quite common, anyway). So, if your input is as simple as a sequence of fields separated by a character, without anything fancy like quoting or escaping, you can simply rely on a format. For example, suppose you have the following data, with a name (that we will assume does not contain a semicolon inside), the nickname (ditto) and the age:

$ cat mydata-04.txt
Foo;foorious;32
Bar;barious;19
Baz;bazzecolas;44

You might describe each line as being name;nick;age, and this is exactly what’s needed to use by_format:

pipeline(
   # Source management
   ['Source::iterate_files', open_file_args => {binmode => ':raw'}],

   # Reading, gets `source`, puts `raw`
   'Reader::by_line',

   # Parsing, gets `raw`, puts `structured`
   ['Parser::by_format', format => 'name;nick;age'],

   # Rendering, gets `structured`, puts `rendered`
   sub {
      my $record = shift;
      my $v = $record->{structured};
      $record->{rendered} = 
         "Hey, $v->{name} (alias $v->{nick}), it's $v->{age}!\n";
      return $record;
   },

   # Printing, gets `rendered`, returns input unchanged
   sub { print $_[0]{rendered}; return $_[0]; },

   # Options, just flush the output to the sink
   { tap => 'sink' },
)->([qw< mydata-04.txt >]);

Actually, any sequence of non-word characters (Perl-wise) is considered a separator in the format, and any sequence of word characters is considered the name of a field.

As anticipated, by_format is one of those tube factories where one of the allowed arguments outstands with respect to the others, and you can guess what it is: format. For this reason, you can be more concise:

  ...
  # Parsing, gets `raw`, puts `structured`
  ['Parser::by_format', 'name;nick;age'], # no "format =>"!
  ...

Another useful pre-canned parser in the toolkit is hashy, that allows you to handle something more complicated like sequences of key-value pairs. The assumption here is that there are two one-character-long separators: one for different key-value pairs, one for separating the key from the value.

Here’s another example, assuming that the pipe character separates pairs, and the equal sign separates the key from the value:

$ cat mydata-05.txt
name=Foo Foorious|nick=foorious|age=32
name=Bar Barious|age=19|nick=barious
age=44|nick=bazzecolas|name=Baz Bazzecolas

As you see, explicit naming of fields allows you to put them in any order inside the input.

The choice of separators is purely up to you, in this case we have pipe characters for groups and the equal sign for separating keys from values.

pipeline(
   ['Source::iterate_files', open_file_args => {binmode => ':raw'}],
   'Reader::by_line',

   # Parsing, gets `raw`, puts `structured`
   [
      'Parser::hashy',
      chunks_separator => '|',
      key_value_separator = '='
   ],

   # Rendering, gets `structured`, puts `rendered`
   sub {
      my $record = shift;
      my $v = $record->{structured};
      $record->{rendered} = 
         "Hey, $v->{name} (alias $v->{nick}), it's $v->{age}!\n";
      return $record;
   },

   # Printing, gets `rendered`, returns input unchanged
   sub { print $_[0]{rendered}; return $_[0]; },

   # Options, just flush the output to the sink
   { tap => 'sink' },
)->([qw< mydata-05.txt >]);

hashy also allows setting a default key, in case none is found for a pair, so that you can have something like this if your lines are indexed by nick:

$ cat mydata-06.txt
foorious|name=Foo Foorious|age=32
barious|name=Bar Barious|age=19
bazzecolas|age=44|name=Baz Bazzecolas

Ordering in this case is purely incidental, again the un-keyed element can occur anywhere. The transformation is easy:

pipeline(
   ['Source::iterate_files', open_file_args => {binmode => ':raw'}],
   'Reader::by_paragraph',

   # Parsing, gets `raw`, puts `structured`
   [
      'Parser::hashy',
      default_key => 'nick',
      chunks_separator => '|',
      key_value_separator = '='
   ],

   # Rendering, gets `structured`, puts `rendered`
   sub {
      my $record = shift;
      my $v = $record->{structured};
      $record->{rendered} = 
         "Hey, $v->{name} (alias $v->{nick}), it's $v->{age}!\n";
      return $record;
   },

   # Printing, gets `rendered`, returns input unchanged
   sub { print $_[0]{rendered}; return $_[0]; },

   # Options, just flush the output to the sink
   { tap => 'sink' },
)->([qw< mydata-06.txt >]);

If you’re in real hurry and you find hashy a bit too strict (single character to separate chunks, single character to separate keys and values, no automatic removal of leading/trailing spaces, defaults that might not be what you have… are we joking?!?) you can increase your chances to get the job done using ghashy (the g stands for generalized). Beware of what you desire.

There are more to discover, take a look at Data::Tubes::Plugin::Parser.

Rendering

If you have to render a very simple string like the salutation we saw so far, the simple system we used so far is quite effective. If your output gets any more complicated, chances are you can benefit from using a template. The plugin Data::Tubes::Plugin::Renderer provides you a factory to use templates built for Template::Perlish, let’s see how with_template_perlish works.

pipeline(
   ['Source::iterate_files', open_file_args => {binmode => ':raw'}],
   'Reader::by_line',
   ['Parser::by_format', 'name;nick;age'],

   # Rendering, gets `structured`, puts `rendered`
   ['Renderer::with_template_perlish', template => <<'END' ],
Hey [% name %]!

... or should I call you [% nick %]?

It's your birthday, you're [% age %] now!
END

   # Printing, gets `rendered`, returns input unchanged
   sub { print $_[0]{rendered}; return $_[0]; },

   # Options, just flush the output to the sink
   { tap => 'sink' },
)->([qw< mydata-04.txt >]);

As long as you only have simple variables, Template::Perlish behaves much like the famous Template Toolkit. Anything more complicated leads you to using Perl, anyway.

The same sequence can of course be used to render the input data in some other format, e.g. as YAML as in the following example (we’re ignoring the need to do any escaping… just to get the idea):

use Data::Tubes qw< pipeline >;
pipeline(
   ['Source::iterate_files', open_file_args => {binmode => ':raw'}],
   'Reader::by_line',
   ['Parser::by_format', 'name;nick;age'],

   # Rendering, gets `structured`, puts `rendered`
   ['Renderer::with_template_perlish', template => <<'END' ],
-
   name: [% name %]
   nick: [% nick %]
   age: [% age %]
END

   # Printing, gets `rendered`, returns input unchanged
   sub { print $_[0]{rendered}; return $_[0]; },

   # Options, just flush the output to the sink
   { tap => 'sink' },
)->([qw< mydata-04.txt >]);

We’re putting the object element inside an array, so that the sequence will print out smoothly as an overall YAML file.

As you have probably guessed by now, you can get rid of the name template because this argument can also be passed un-named as the first one. It is also worth noting that this parameter undergoes some DWIMmery before being used. In particular, if you pass an array reference it will be expanded into a list that is passed to a helper function Data::Tubes::Util::read_file… you can guess what it does, can’t you?

This is only scratching the surface, if you want a longer story take a look at the dedicated article.

Writing

The last step in our typical pipeline is writing out stuff. So far, we just printed things out on STDOUT, but by no means we’re limited to this! Let’s take a look at Data::Tubes::Plugin::Writer.

The first tool that can help us is write_to_files (or its alias to_files), that allows us to transform our pipeline like this (without changing the behaviour):

pipeline(
   ['Source::iterate_files', open_file_args => {binmode => ':raw'}],
   'Reader::by_line',
   ['Parser::by_format', 'name;nick;age'],
   ['Renderer::with_template_perlish', <<'END' ],
-
   name: [% name %]
   nick: [% nick %]
   age: [% age %]
END

   # Printing, gets `rendered`, returns input unchanged. Goes
   # to standard output by default
   'Writer::to_files',

   # Options, just flush the output to the sink
   { tap => 'sink' },
)->([qw< mydata-04.txt >]);

From here, it’s easy to deduce that you can pass other things as filename, for example… a filename!

You have surely noticed that we talked about function write_to_files but then used the alias to_files. Every plugin usually has the double version of each factory function for easier typing.

There is much, much more in to_files than simply writing to standard output, e.g.:

You’re encouraged to look at the specific documentation or at the extensive article in the Wiki.

The other noteworthy tool that you can find for writing is dispatch_to_files. This comes useful when you not only want to shape and partition your outputs like to_files allows you to do, but also when you want to decide a totally different output channel depending on the nature of the record (e.g. you might want to send errors to a log file, good records down the pipeline, and records that you want to think about in some other sink).

pipeline(
   ['Source::iterate_files', open_file_args => {binmode => ':raw'}],
   'Reader::by_line',
   ['Parser::by_format', 'name;nick;age'],
   ['Renderer::with_template_perlish',
      '  {"name":"[% name %];"nick":"[% nick %]";"age":[% age %]}'],

   # Printing, gets `rendered`, returns input unchanged
   [
      'Writer::dispatch_to_files',
      header    => "[\n",
      footer    => "\n]\n",
      interlude => ",\n",
      policy    => {records_threshold => 2},

      filename => 'output-[% key %]-%03n.json',
      selector => sub {
         my $record = shift;
         my $first_char = substr $record->{structured}{nick}, 0, 1;
         return 'lower' if $first_char =~ m{[a-m]}mxs;
         return 'other';
      },
   ],

   # Options, just flush the output to the sink
   { tap => 'sink' },
)->([qw< mydata-04.txt >]);

Most parameters are the same as to_files, so we already know about them. The filename seems familiar but somewhat different. The selector is definitely a new entry.

The latter (selector) is a sub reference that receives the record as input, and is supposed to provide a key back. Whenever this key is the same, the output channel chosen by the dispatcher will be the same. In this case, we are outputting two strings, namely lower and other.

The filename is an extension on parameter filename in to_files, that allows you to put additional things in the filename that is eventually passed to to_files. As you might have guessed already, it’s a Template::Perlish-compatible template string, where you can expand the variable key. So:

You can discover more in the specific article in the wiki.

Dispatching, Reloaded

In “Dispatching, the movie” we saw that you can dispatch to the right output channel depending on what’s inside each single record. The dispatching technique can be applied to other stages, though, if you are brave enough to look at dispatch.

General dispatching

For example, suppose that you want to divide your input records stream into two different flows, one for records that are good, one for the bad (e.g. records that do not parse correctly, or do not adhere to some additional validation rules).

In the example below, we will assume that nicknames starting with any letter are good, and bad otherwise. We still want to do some rendering for the bad ones, though, because we want to write out an error file.

summon('Renderer::with_template_perlish');
my $render_good = with_template_perlish(
   '  {"name":"[% name %];"nick":"[% nick %]";"age":[% age %]}'],);
# the "bad" renderer just complains about the nickname
my $render_bad = with_template_perlish(
   '  {"nick":"[% nick %]";"error":"invalid"}'],);

pipeline(
   ['Source::iterate_files', open_file_args => {binmode => ':raw'}],
   'Reader::by_line',
   ['Parser::by_format', 'name;nick;age'],

   # let's put an intermediate step to "classify" the record
   sub {
      my $record;
      my $first_char = substr $record->{structured}{nick}, 0, 1;
      $record->{class} = ($first_char =~ m{[a-m]}mxs)  ? 'lower'
                        :($first_char =~ m{[a-z]}imxs) ? 'other'
                        :                                'error';
      return $record;
   }

   # Rendering is wrapped by dispatch here
   [
      'Plumbing::dispatch',
      key => 'class',
      factory => sub {
         my $key = shift;
         return $render_bad if $key eq 'error';
         return $render_good;
      }
   ]

   # Printing, gets `rendered`, returns input unchanged
   [
      'Writer::dispatch_to_files',
      'output-[% key %]-%03n.json',
      key => 'class',
      header    => "[\n",
      footer    => "\n]\n",
      interlude => ",\n",
      policy    => {records_threshold => 2},
   ],

   # Options, just flush the output to the sink
   { tap => 'sink' },
)->([qw< mydata-04.txt >]);

The dispatcher is based on two steps: one is the selection, the other one is the generation.

As we saw previously, the selection process is about getting a key that allows the dispatcher to figure out what channel to use. In “Dispatching, the movie” we saw that we can put a selector key in the arguments, but if you already have your key in the record you can just set a key argument. In this example, we’re doing this classification immediately after the parse phase, so from that point on we have a class key inside the record, that we can use (and we do, both in dispatch and in dispatch_to_files. This is the advantage of having all the details about a record along the pipeline, although you might argue that this might introduce some action at a distance).

In case the dispatcher does not (yet) know which tube is associated to a given string returned by the selector, it’s time for some generation. dispatch_to_files already knows how to generate files (although you can override this), and is fine with filename_template; on the other hand, the generic dispatch needs to know something more, which is why we’re using factory here.

The factory in a dispatcher allows you to receive the key returned by the selector (and also the whole record, should you need it) and return a tube back. Guess what? That tube will be associated to that key from now on!

Pre-loading the cache

If you already have your downstream tubes available (as in our case), you can pre-load the cache and avoid coding the factory completely:

summon('Renderer::with_template_perlish');
my $render_good = with_template_perlish(
   '  {"name":"[% name %];"nick":"[% nick %]";"age":[% age %]}'],);
# the "bad" renderer just complains about the nickname
my $render_bad = with_template_perlish(
   '  {"nick":"[% nick %]";"error":"invalid"}'],);

pipeline(
   ['Source::iterate_files', open_file_args => {binmode => ':raw'}],
   'Reader::by_line',
   ['Parser::by_format', 'name;nick;age'],

   # let's put an intermediate step to "classify" the record
   sub {
      my $record;
      my $first_char = substr $record->{structured}{nick}, 0, 1;
      $record->{class} = ($first_char =~ m{[a-m]}mxs)  ? 'lower'
                        :($first_char =~ m{[a-z]}imxs) ? 'other'
                        :                                'error';
      return $record;
   }

   # Rendering is wrapped by dispatch here
   [
      'Plumbing::dispatch',
      key => 'class',
      handlers => {
         lower => $render_good,
         other => $render_good,
         error => $render_bad,
      },
   ]

   # Printing, gets `rendered`, returns input unchanged
   [
      'Writer::dispatch_to_files',
      'output-[% key %]-%03n.json',
      key => 'class',
      header    => "[\n",
      footer    => "\n]\n",
      interlude => ",\n",
      policy    => {records_threshold => 2},
   ],

   # Options, just flush the output to the sink
   { tap => 'sink' },
)->([qw< mydata-04.txt >]);

Dispatching, TIMTOWTDI

One drawback of the technique we saw in the previous sections about dispatching is that, as a matter of fact, we have two dispatching happening at two different times, i.e. at rendering and at writing. Many times this might be what you actually need, but in our example it actually limited us a bit, because it’s somehow assuming that we want to report errors as JSON structures, which is a bit overkill.

One alternative is to realize that the dispatcher’s factory, and the filename_template expansion is no exception, also receives the whole record in addition to the key. Hence, we might modify the pipeline as follows:

summon('Renderer::with_template_perlish');
my $render_good = with_template_perlish(
   '  {"name":"[% name %];"nick":"[% nick %]";"age":[% age %]}');
# the "bad" renderer just complains about the nickname
my $render_bad = with_template_perlish(
   '  {"nick":"[% nick %]";"error":"invalid"}');

pipeline(
   ['Source::iterate_files', open_file_args => {binmode => ':raw'}],
   'Reader::by_line',
   ['Parser::by_format', 'name;nick;age'],

   # let's put an intermediate step to "classify" the record
   sub {
      my $record;
      my $first_char = substr $record->{structured}{nick}, 0, 1;
      $record->{class} = ($first_char =~ m{[a-m]}mxs)  ? 'lower'
                        :($first_char =~ m{[a-z]}imxs) ? 'other'
                        :                                'error';
      $record->{format} =
         ($record->{class} eq 'error') ? 'txt' : 'json';
      return $record;
   }

   # Rendering is wrapped by dispatch here
   [
      'Plumbing::dispatch',
      key => 'class',
      handlers => {
         lower => $render_good,
         other => $render_good,
         error => $render_bad,
      },
   ]

   # Printing, gets `rendered`, returns input unchanged
   [
      'Writer::dispatch_to_files',
      'output-[% key %]-%03n.[% record.format %]', # Look!
      header    => "[\n", footer => "\n]\n",
      interlude => ",\n",
      policy    => {records_threshold => 2},
      key       => 'class',
   ],

   # Options, just flush the output to the sink
   { tap => 'sink' },
)->([qw< mydata-04.txt >]);

Sequence

In the previous section we solved our problem, but the solution might still be considered a bit clunky. What if we need to have different intermediate processing steps, depending on the specific record? For example, we might want to avoid processing wrong records too much, but do some additional mangling on good ones.

Enter sequence from Data::Tubes::Plugin::Plumbing. This function is similar to pipeline –as a matter of fact, pipeline uses it behind the scenes, and returns it if you don’t provide any tap.

Hence, you can use dispatch to divide your flow across different sequences, each with its own processing. Let’s see how.

my $good = pipeline(
   ['Renderer::with_template_perlish',
      '  {"name":"[% name %];"nick":"[% nick %]";"age":[% age %]}'],
   ['Writer::dispatch_to_files',
      'output-[% key %]-%03n.json',
      header => "[\n", footer => "\n]\n", interlude => ",\n",
      policy => {records_threshold => 2},
      key => 'class',
   ],
); # note... no tap here!

my $bad = pipeline(
   sub { $_[0]{error}{message} = $_[0]{raw} },
   ['Renderer::with_template_perlish', "[% message %]\n",
      input => 'error',
   ],
   ['Writer:to_files', 'ouput-exception-%t.txt']
); # note.. no tap here!

# The pipeline that feeds them all
pipeline(
   ['Source::iterate_files', open_file_args => {binmode => ':raw'}],
   'Reader::by_line',
   ['Parser::by_format', 'name;nick;age'],
   sub { # classification of input record
      my $record;
      my $first_char = substr $record->{structured}{nick}, 0, 1;
      $record->{class} = ($first_char =~ m{[a-m]}mxs)  ? 'lower'
                        :($first_char =~ m{[a-z]}imxs) ? 'other'
                        :                                'error';
      return $record;
   }

   # Further processing depends on class
   [
      'Plumbing::dispatch',
      key => 'class',
      handlers => {
         lower => $good,
         other => $good,
         error => $bad,
      },
   ]

   # Options, just flush the output to the sink
   { tap => 'sink' },
)->([qw< mydata-04.txt >]);

There are a few things going on here, let’s take a look.

The first thing that pops out is that the renderer in $bad is using a new parameter input, set to error. Quite suspiciously, the renderer is preceded by a small tube that populates an error field in the record… is this a coincidence?

It turns out that “What Is A Record, Toolkit Style” did not tell us the whole story: all the tools in the toolkit can take a different input and produce a different output, all you have to do is to specify the relevant key in the arguments to the factory function. So, “What Is A Record, Toolkit Style” actually describes the default values for these parameters.

Of course, we might have just added message to the sub-hash structured, but that would have been sort of cheating, wouldn’t it?

The second thing is that, as anticipated, we managed to create two different tracks for the input records, where dispatch does the split of the records stream across them. This allows each of the sub-tube to be independent of each other (there are two here, they might be many more of course). Note that $good and $bad are created using pipeline (so that we avoid summoning sequence and shave off a few characters), taking care to avoid setting a tap. This is not strictly necessary, because the sink tap actually generates a valid tube, but we want to be sure that we get a record from either tube in case we need to add some post-processing at some later time in the future.

Sequence Gates

You might sometimes run into the following situation: you have a sequence of input records, which have to satisfy a whole lot of different conditions to be really applicable for some processing. These conditions might involve loading additional data, etc.

It’s easy to say that, in this case, all records that do not satisfy all conditions can be pruned as soon as the first condition is not met. So, for example, in a sequence of numbers, we can easily get rid of all multiples of 3 like this tube:

sub {
    my $record = shift; # it is a number in our example
    return $record if $record % 3;
    return;
}

The final return signals that this track is finished, so the sequence handler will just backtrack.

So far so good, but what if you want to provide some kind of final operation on your sequence? For example, you might want to record a reason why you excluded specific numbers (like, in this case, we would say it is a multiple of 3 or so). Let’s consider the following pipeline (remember that it uses sequence behind the scenes, so it’s basically the same):

my $pl = pipeline(
    sub { # turn an array reference in a sequence of numbers
        return records => shift;
    },
    sub { return ($_[0] % 3) ? $_[0] : () },
    sub { return ($_[0] % 5) ? $_[0] : () },
    sub { return ($_[0] % 7) ? $_[0] : () },
    sub { say $_[0]; return },
    {tap => 'sink'}
);

This will get rid of all multiples of 3, 5 and 7, printing what remains. So, with this input:

$pl->([1..15]);

it will print:

1
2
4
8
11
13

What if I would like to print somewhere else at which step the filter was applied though? We might e.g. setup a logging pipeline for this;

my $logger = pipeline(
    sub {
        my $number = shift;
        return {rendered => "bailed out at multiple of $number"};
    },
    ['Writer:to_files', 'ouput-exception-%t.txt'],
    {tap => 'sink'},
);

Now, we should call C<$logger> at each step, like this:

my $pl = pipeline(
    sub { # turn an array reference in a sequence of numbers
        return records => shift;
    },
    sub {
        return $_[0] if $_[0] % 3;
        $logger->(3);
        return;
    },
    sub {
        return $_[0] if $_[0] % 5;
        $logger->(5);
        return;
    },
    sub {
        return $_[0] if $_[0] % 7;
        $logger->(7);
        return;
    },
    sub { say $_[0]; return },
    {tap => 'sink'}
);

Admittedly, this is not very elegant. Enter gate (since version 0.736), a sub reference that performs a test for ealy interruption of a sequence, while still returning records. This is how we can transform our whole thing:

my $pl = pipeline(
    sub { # turn an array reference in a sequence of numbers
        return records => [map {;{number => $_}} @{$_[0]}];
    },
    
    pipeline(
        sub {
            return $_[0] if $_[0] % 3;
            return {bail => 3};
        },
        sub {
            return $_[0] if $_[0] % 5;
            return {bail => 5};
        },
        sub {
            return $_[0] if $_[0] % 7;
            return {bail => 7};
        },
        {
            gate => sub {
                return 1 unless exists $_[0]{bail};
                $logger->($_[0]{bail});
                return;
            }
        }
    ),
    sub {
        return if exists $_[0]{bail}; # bail out if requested
        return $_[0]{number};
    }

    sub { say $_[0]; return },
    {tap => 'sink'},
);

In this way, we can concentrate our logging stuff in one single point.

Process In Peace

Alas, we have come to the end of our journey through Data::Tubes. There’s much more to discover in the manual pages for each individual module:

If you want to contribute, Data::Tubes is on GitHub at https://github.com/polettix/Data-Tubes. One way to contribute might be releasing your own plugins… e.g. if you prefer to use Template Toolkit instead of Template::Perlish!