I created a dataset in TFRecord format for testing. Every entry contains 200 columns, named C1 - C199, each being a strings list, and a label column to denote the labels. The code to create the data can be found here: https://github.com/codescv/tf-dist/blob/8bb3c44f55939fc66b3727a730c57887113e899c/src/gen_data.py#L25
Then I used a linear model to train the data. The first approach looks like this:
dataset = tf.data.TFRecordDataset(data_file)
dataset = dataset.prefetch(buffer_size=batch_size*10)
dataset = dataset.map(parse_tfrecord, num_parallel_calls=5)
dataset = dataset.repeat(num_epochs)
dataset = dataset.batch(batch_size)
features, labels = dataset.make_one_shot_iterator().get_next()    
logits = tf.feature_column.linear_model(features=features, feature_columns=columns, cols_to_vars=cols_to_vars)
train_op = ...
with tf.Session() as sess:
    sess.run(train_op)
The full code can be found here: https://github.com/codescv/tf-dist/blob/master/src/lr_single.py
When I run the code above, I get 0.85 steps/sec (batch size being 1024).
In the second approach, I manually get batches from Dataset into python, then feed them to a placeholder, like this:
example = tf.placeholder(dtype=tf.string, shape=[None])
features = tf.parse_example(example, features=tf.feature_column.make_parse_example_spec(columns+[tf.feature_column.numeric_column('label', dtype=tf.float32, default_value=0)]))
labels = features.pop('label')
train_op = ...
dataset = tf.data.TFRecordDataset(data_file).repeat().batch(batch_size)
next_batch = dataset.make_one_shot_iterator().get_next()
with tf.Session() as sess:
    data_batch = sess.run(next_batch)
    sess.run(train_op, feed_dict={example: data_batch})
The full code can be found here: https://github.com/codescv/tf-dist/blob/master/src/lr_single_feed.py
When I run the code above, I get 5 steps/sec. That is 5x faster than the first approach. This is what I do not understand, because theoretically the second should be slower due to the extra serialization/deserialization of data batches.
Thanks!
There is currently (as of TensorFlow 1.9) a performance issue when using tf.data to map and batch tensors that have a large number of features with a small amount of data in each. The issue has two causes:
The dataset.map(parse_tfrecord, ...) transformation will execute O(batch_size * num_columns) small operations to create a batch. By contrast, feeding a tf.placeholder() to tf.parse_example() will execute O(1) operations to create the same batch.
Batching many tf.SparseTensor objects using dataset.batch() is much slower than directly creating the same tf.SparseTensor as the output of tf.parse_example().
Improvements to both these issues are underway, and should be available in a future version of TensorFlow. In the meantime, you can improve the performance of the tf.data-based pipeline by switching the order of the dataset.map() and dataset.batch() and rewriting the dataset.map() to work on a vector of strings, like the feeding based version:
dataset = tf.data.TFRecordDataset(data_file)
dataset = dataset.prefetch(buffer_size=batch_size*10)
dataset = dataset.repeat(num_epochs)
# Batch first to create a vector of strings as input to the map(). 
dataset = dataset.batch(batch_size)
def parse_tfrecord_batch(record_batch):
  features = tf.parse_example(
      record_batch,
      features=tf.feature_column.make_parse_example_spec(
          columns + [
              tf.feature_column.numeric_column(
                  'label', dtype=tf.float32, default_value=0)]))
  labels = features.pop('label')
  return features, labels
# NOTE: Parallelism might not be as useful, because the individual map function now does
# more work per invocation, but you might want to experiment with this.
dataset = dataset.map(parse_tfrecord_batch)
# Add a prefetch at the end to pipeline execution.
dataset = dataset.prefetch(1)
features, labels = dataset.make_one_shot_iterator().get_next()    
# ...
EDIT (2018/6/18): To answer your questions from the comments:
- Why is
dataset.map(parse_tfrecord, ...)O(batch_size*num_columns), not O(batch_size)? If parsing requires enumeration of the columns, why doesn't parse_example take O(num_columns)?
When you wrap TensorFlow code in a Dataset.map() (or other functional transformation) a constant number of extra operations per output are added to "return" values from the function and (in the case of tf.SparseTensor values) "convert" them to a standard format. When you directly pass the outputs of tf.parse_example() to the input of your model, these operations aren't added.  While they are very small operations, executing so many of them can become a bottleneck.  (Technically the parsing does take O(batch_size * num_columns) time, but the constants involved in parsing are much smaller than executing an operation.)
- Why do you add a prefetch at the end of the pipeline?
When you're interested in performance, this is almost always the best thing to do, and it should improve the overall performance of your pipeline. For more information about best practices, see the performance guide for tf.data. 
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With