I want to write a Kiba Etl script which has a source from a CSV to Destination CSV with a list of transformation rules among which the 2nd transformer is an Aggregation in which operation such as select name, sum(euro) group by name
Kiba ETL Script file
source CsvSource, 'users.csv', col_sep: ';', headers: true, header_converters: :symbol
transform VerifyFieldsPresence, [:name, :euro]
transform AggregateFields, { sum: :euro, group_by: :name}
transform RenameField,from: :euro, to: :total_amount
destination CsvDestination, 'result.csv', [:name, :total_amount]
users.csv
date;euro;name
7/3/2015;10;Jack
7/3/2015;85;Jill
8/3/2015;6;Jack
8/3/2015;12;Jill
9/3/2015;99;Mack
result.csv (expected result)
total_amount;name
16;Jack
97;Jill
99;Mack
As etl transformers execute one after the other on a single row at one time, But my 2nd transformer behavior depends on the entire collection of row which I cant access it in the class which is passed to transform method.
transform AggregateFields, { sum: :euro, group_by: :name }
Is there possibly any which this behavior can be achieved using kiba gem
Thank you in Advance
EDIT: it's 2020, and Kiba ETL v3 includes a much better way to do this. Check out this article https://thibautbarrere.com/2020/03/05/new-in-kiba-etl-v3 for all the relevant information.
Kiba author here! You can achieve that in many different ways, depending mainly on the data size and your actual needs. Here are a couple of possibilities.
require 'awesome_print'
transform do |r|
  r[:amount] = BigDecimal.new(r[:amount])
  r
end
total_amounts = Hash.new(0)
transform do |r|
  total_amounts[r[:name]] += r[:amount]
  r
end
post_process do
  # pretty print here, but you could save to a CSV too
  ap total_amounts
end
This is the simplest way, yet this is quite flexible.
It will keep your aggregates in memory though, so this may be good enough or not, depending on your scenario. Note that currently Kiba is mono-threaded (but "Kiba Pro" will be multi-threaded), so there is no need to add a lock or use a thread-safe structure for the aggregate, for now.
Another quick and easy way to aggregate is to generate a non-aggregated CSV file first, then leverage TextQl to actually do the aggregation, like this:
destination CsvSource, 'non-aggregated-output.csv', [:name, :amount]
post_process do
  query = <<SQL
    select
      name,
      /* apparently sqlite has reduced precision, round to 2 for now */
      round(sum(amount), 2) as total_amount
    from tbl group by name
SQL
  textql('non-aggregated-output.csv', query, 'aggregated-output.csv')
end
With the following helpers defined:
def system!(cmd)
  raise "Failed to run command #{command}" unless system(command)
end
def textql(source_file, query, output_file)
  system! "cat #{source_file} | textql -header -output-header=true -sql \"#{query}\" > #{output_file}"
  # this one uses csvfix to pretty print the table
  system! "cat #{output_file} | csvfix ascii_table"
end
Be careful with the precision though when doing computations.
A useful trick that can work here is to wrap a given destination with a class to do the aggregation. Here is how it could look like:
class InMemoryAggregate
  def initialize(sum:, group_by:, destination:)
    @aggregate = Hash.new(0)
    @sum = sum
    @group_by = group_by
    # this relies a bit on the internals of Kiba, but not too much
    @destination = destination.shift.new(*destination)
  end
  def write(row)
    # do not write, but count here instead
    @aggregate[row[@group_by]] += row[@sum]
  end
  def close
    # use close to actually do the writing
    @aggregate.each do |k,v|
      # reformat BigDecimal additions here
      value = '%0.2f' % v
      @destination.write(@group_by => k, @sum => value)
    end
    @destination.close
  end
end
which you can use this way:
# convert your string into an actual number
transform do |r|
  r[:amount] = BigDecimal.new(r[:amount])
  r
end
destination CsvDestination, 'non-aggregated.csv', [:name, :amount]
destination InMemoryAggregate,
  sum: :amount, group_by: :name,
  destination: [
    CsvDestination, 'aggregated.csv', [:name, :amount]
  ]
post_process do
  system!("cat aggregated.csv | csvfix ascii_table")
end
The nice thing about this version is that you can reuse your aggregator with different destinations (like a database one, or anything else).
Note though that this will keep all the aggregates in memory, like the first version.
Another way (especially useful if you have very large volumes) is to send the resulting data into something that will be able to aggregate the data for you. It could be a regular SQL database, Redis, or anything more fancy, which you would then be able to query as needed.
So as I said, the implementation will largely depend on your actual needs. Hope you will find something that works for you here!
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With