Click here to Skip to main content
13,895,994 members
Click here to Skip to main content
Add your own
alternative version

Tagged as

Stats

1.5K views
1 bookmarked
Posted 21 Jan 2019
Licenced CPOL

Apache Flume: Converting CSV to AVRO and Parquet using Morphline and Kite

, 21 Jan 2019
Rate this:
Please Sign up or sign in to vote.
How to convert CSV flume events to Avro and Parquet using Morphline interceptor and Kite sink

This blog explains how to convert csv flume events to Avro and Parquet using Morphline interceptor and Kite sink.

The code is available on github here. Look at the two files:

  1. orders.avsc, which describes avro schema for input events, and
  2. part-m-00000 which hold our csv data.

Remember the field names given in avro schema file - "orders.avsc."

Pre-requisites

  • This code is tested on my cloudera setup version 5.15. Explained in this blog.

Step 1 - Create Kite Datasets with Avro and Parquet Format

kite-dataset create orders_parquet --schema orders.avsc --format parquet
kite-dataset create orders_avro --schema orders.avsc --format avro

Step 2 - Create Configuration File for morphline Interceptor (morphline.conf)

morphlines : [{
    id : morphline1
    importCommands : ["org.kitesdk.**"]
    commands : [
      { readCSV {
          charset : UTF-8
          columns : [order_id,order_date,order_customer_id,order_status]
        }
      }
      { toAvro {
           schemaFile : /home/skamalj/learn-flume2/orders.avsc
        }
      }
      { writeAvroToByteArray {
          format : containerlessBinary
        }
      }
   ]
  }
]
  • In morphline, commands are piped to each other where events pass from one command to another. The first one in list is readCSV - Here, we read the line and assign name to each field which is exactly the same as what we defined in our avro schema file - orders.avsc.
  • The next command is to convert the events to avro using schema file. Here, column header names are mapped 1-on-1 to avro schema.
  • Next, we need containerless events, Kite will create container for us (with snappy compression which is default)
  • Make sure you change the highlighted path as per your setup. This is local directory path, not hdfs.

Step 3 - Create Flume Configuration

Only a section of the code is explained below:

a1.sources.k1.interceptors = schemaheader morphlineinterceptor

a1.sources.k1.interceptors.schemaheader.type = static
a1.sources.k1.interceptors.schemaheader.key = flume.avro.schema.url
a1.sources.k1.interceptors.schemaheader.value = 
    hdfs://cloudera-master:8020/user/skamalj/avro-schemas/orders.avsc

a1.sources.k1.interceptors.morphlineinterceptor.type = 
    org.apache.flume.sink.solr.morphline.MorphlineInterceptor$Builder
a1.sources.k1.interceptors.morphlineinterceptor.morphlineFile = 
    /home/skamalj/learn-flume2/morphline.conf
a1.sources.k1.interceptors.morphlineinterceptor.morphlineId = morphline1

a1.sinks.s1.type = org.apache.flume.sink.kite.DatasetSink
a1.sinks.s1.channel = c1
a1.sinks.s1.kite.dataset.uri = dataset:hive://cloudera-master:9083/default/orders_parquet
  • We need two interceptors: 
    • One to inject "flume.avro.schema.url" header value (lines 3-5). This is used by Kite sink to parse avro events. To use this, load the orders.avsc file to hdfs and provide the url as value.
    • Number two is the morphline interceptor (lines 7-9) which we created in step 2. This one converts the events to avro.
  • Lines 11-13 configure Kite sink, this one is for parquet.  
  • If you look at the whole code in github, you will see that we have two channels and two sinks - One set is for parquet and another is for Avro.
  • Make sure you change the highlighted path as per your setup. (Morphline path is local directory not hdfs.)

Step 4 - Execute

To execute, you need to set hive and hcat home variables. In addition, ensure that you bump up JVM memory (-X...) as well, default memory setting is insufficient to work with Morphline and Kite.

export HIVE_HOME=/opt/cloudera/parcels/CDH/lib/hive
export HCAT_HOME=/opt/cloudera/parcels/CDH/lib/hive-hcatalog
flume-ng agent --name a1 --conf . --conf-file flume_morph_kite.conf -Xms4096m -Xmx8192m

Check your results for both as shown in the screenshot.

All done!

History

  • 21st January, 2019: Initial version

License

This article, along with any associated source code and files, is licensed under The Code Project Open License (CPOL)

Share

About the Author


You may also be interested in...

Pro

Comments and Discussions

 
-- There are no messages in this forum --
Permalink | Advertise | Privacy | Cookies | Terms of Use | Mobile
Web03 | 2.8.190306.1 | Last Updated 21 Jan 2019
Article Copyright 2019 by Member 13359958
Everything else Copyright © CodeProject, 1999-2019
Layout: fixed | fluid