I tag input elements based on one of input data elements(date).
class TagElementsWithDate(beam.DoFn):
    def process(self, element):
        dt = element['date'].replace('-', '')[:6]
        yield pvalue.TaggedOutput(dt, element)
input_data = p | 'Read Input' >>  beam.io.Read(beam.io.BigQuerySource(query='select id, date from `project.dataset.tablename`', use_standard_sql=True))
tagged_data = input_data | 'tag data' >> beam.ParDo(TagElementsWithDate()).with_outputs()
tagged_data is DoOutputsTuple. I'm looking to iterate this and write each tagged data to a separate file.
You need to write your own DoFn. Something like
from apache_beam.io.textio import _TextSink
class WriteEachKeyToText(beam.DoFn):
    def __init__(self, file_path_prefix=str):
        super().__init__()
        self.file_path_prefix = file_path_prefix
    def process(self, kv):
        key = kv[0]
        elements = kv[1]
        sink = _TextSink(self.file_path_prefix, file_name_suffix=f"{key}.json")
        writer = sink.open_writer("prefix", self.file_path_prefix)
        for e in elements:  # values
            writer.write(e)
Then you can use it like this:
output_path = "/some/path/"
tagged_data | beam.ParDo(WriteEachKeyToText(output_path))
                        If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With