Dataflow Python Sdk Avro Source/sync
I am looking to ingest and write Avro files in GCS with the Python SDK. Is this currently possible with Avro leveraging the Python SDK? If so how would I do this? I see TODO commen
Solution 1:
As of version 2.6.0 of the Apache Beam/Dataflow Python SDK, it is indeed possible to read (and write to) avro files in GCS.
Even better, the Python SDK for Beam now supports fastavro
reads and writes which can be upto 10x faster than regular avro IO.
Sample code:
import apache_beam as beam
from apache_beam.io import ReadFromAvro
from apache_beam.io import WriteToAvro
import avro.schema
RUNNER = 'DataflowRunner'
GCP_PROJECT_ID = 'YOUR_PROJECT_ID'
BUCKET_NAME = 'YOUR_BUCKET_HERE'
STAGING_LOCATION = 'gs://{}/staging'.format(BUCKET_NAME)
TEMP_LOCATION = 'gs://{}/temp'.format(BUCKET_NAME)
GCS_INPUT = "gs://{}/input-*.avro".format(BUCKET_NAME)
GCS_OUTPUT = "gs://{}/".format(BUCKET_NAME)
JOB_NAME = 'conversion-test'
SCHEMA_PATH="YOUR_AVRO_SCHEMA.avsc"
AVRO_SCHEMA = avro.schema.parse(open(SCHEMA_PATH).read())
OPTIONS = {
'runner': RUNNER,
'job_name': JOB_NAME,
'staging_location': STAGING_LOCATION,
'temp_location': TEMP_LOCATION,
'project': GCP_PROJECT_ID,
'max_num_workers': 2,
'save_main_session': True,
}
PIPELINE = beam.Pipeline(options=beam.pipeline.PipelineOptions(flags=[], **OPTIONS))
defmain():
# note: have to force `use_fastavro` to enable `fastavro`:
results = PIPELINE | ReadFromAvro(file_pattern=GCS_INPUT, use_fastavro=True)
results | WriteToAvro(file_path_prefix=GCS_OUTPUT, schema=AVRO_SCHEMA, use_fastavro=True)
if __name__ == '__main__':
import os
os.environ['GOOGLE_APPLICATION_CREDENTIALS'] = 'PATH_TO_YOUR_SERVICE_ACCOUNT_KEY'
main()
Solution 2:
You are correct: the Python SDK does not yet support this, but it will soon.
Post a Comment for "Dataflow Python Sdk Avro Source/sync"