Apache Nifi — Data Ingestion Tool
Apache Nifi — is a data flow management system that comes along with a UI tool that will be easy to handle. It’s a data logistics platform that automates the transfer of data between different systems. Since it provides real-time control it is easy to manage the movement of data between any source and destination. It is configurable plumbing for moving data around, similar to how “Fedex”, “UPS” or other courier delivery services move parcels around. Similar to those devices, Apache NIFI can track the data in real-time.
How NIFI works :
- It consists of atomic elements that can be combined into groups to build a data flow
- Processors
-NIFI has more than 280+ processors
- Every processor has its responsibility
For example
-Get file processors to read a file from the source
- Put file processors write a file to the destination
- It fetches data from various data sources and writes the data into various data sources
- The data source can be anything it can be SQL databases such as Postgres, Oracle, MySQL or no SQL database like MongoDB, couch base or it can also be search engine -solr or elastic search, cache server — HBase, Redis it can also connect to Kafka messaging queue, aws entities like stree buckets and dynamo DB
- NIFI has many processors to serve the requirement. we can also write our custom processors
- Flow File
- NIFI propagates data in the form of a flow file
-It can contain any form of data — “CSV, JSON, XML, text or even a binary data”
- NIFI can propagate any form of data from any source to any destination since it has a flow file abstraction
- We can use the processor to process the flow file to generate a new flow file as a result - Connection
- All Processors together connected to create a data flow called connections which act as a queue for flow files - Process Group
- We can create the processor groups by connecting one or more processors which will help in complex data flow for better maintenance - Controller Service
- It is a shared service that processors can use
- For example — the processor which gets and puts data to the SQL database can have a controller service with the required DB connection details
Flow File :
- Has Data
- Composed of two components
- Content — data itself
- Attribute — It’s metadata from the flow file which resembles Key-value pairs - The processor can manipulate the flow file attribute — “ update, add, remove attributes “ or it can change the content of the flow file or it can do both
- Lifecycle
- Persisted in the disk
- Passed — by — reference
- Whenever a new flow file is generated by the processor it immediately gets persisted in the disk and nifi will just pass the reference of the flow file to the next processor
- Ingesting the new data into the existing flow file or changing the content of the existing flow file will generate a new flow file
- It won’t be created by just manipulating the attributes of the flow file - Types of Processors
- ( https://www.nifi.rocks/apache-nifi-processors/ )
- Nifi has 280+ processors (increases in each release )
- It can distribute data to many systems
- Some of them are
Data Ingestion Processors — Ingest data from a various data source
Data Transformation Processors- Transforms the data to various formats according to the requirements
Data Egress/sending data Processors — Send the processed data to various types of destination systems
Routing And Mediation Processors — Helps us to conditionally change the way how a flow file to be processed
Database Access Processors — Commonly used processors to access the database
Attribute Extraction Processors — Help us to extract and manipulate the attribute of the flow file from its content or other existing attributes or both. Usually, it is provided with the right set of attributes which can be used with routing and mediation processors
System Interaction Processors -Supports us to run OS-specific commands specified by the user and writes the output of the command to the flow file
Splitting and Aggregation Processors — Split and aggregate data according to our requirement
HTTP and UDP Processors — NIFI can ingest and send data using HTTP and UDP protocol
Amazon Web Services Processors — Can communicate with AWS entities
Processor configuration and connection relationship
- Standard configuration
- Common across all processors - Unique Configuration
- Unique for specific processors - Relationships
- Each processor has zero or more relationships defined for it
- Once the processor has finished processing it routes one or more flow files to their relationships
- Its flow file is responsible for handling these relationships by creating the connection for each of them to another processor. u can also terminate the relationship when not in use
- NIFI will complain if we have any unhandled relationship in the processor we can’t start the processor until we handle it
So apart from this, it has a concept of Backpressure and data provenance.
Backpressure :
- In NIFI each processor will take their own time to process based on the complexity involved
- To handle this NIFI has a backpressure configuration each connection can have its backpressure defined
- Object threshold
- Size threshold - NIFI stops the processor if either of the thresholds is reached
Data provenance :
Here It logs all the information occurring on the ingested data flow. so it’s possible to search the event information from UI, Data is accessible for all the Nifi levels and processor levels
The correct place to use :
For Hadoop data ingestion
Data egress for Hadoop to other destination
Data movement outside Hadoop within the data center — “NON-ETL FLOWS”
What I Have Done
As a part of my POC I used NIFI to ingest streaming data from Apache Kafka to hive instead of the storm :
Need for NIFI:
Apache NiFi is a fantastic tool for transforming data into the format, structure, and content you’ll need for further analysis and processing. You may create processors that can achieve this flow file or use the existing processors in many cases, and you can even use the ExecuteScript processors to build scripts on the fly to change the data.
My Work :
Processors and configuration:
Flow 1:
Consume kafka — -> Extract Text — -> convert Attributes to CSV — -> convert CSV to Avro — -> convert avro to orc — -> puthdfs
Challenges faced:
Hive table should be created in such a way that it stores data in orc format and it works
Flow 2 :
Consume kafka — ->Update Attribute — -> Extract Text — -> convert Attributes to CSV — -> Update Record — ->Partition Record — -> convert avro to orc — -> puthdfs
Processor configuration:
consume Kafka configuration
UpdateAttribute Configurations:
Configure the processor as I have shown below
Next, I Added the schema. name attribute to the flow file so that I can use this name in my AvroSchemaRegistry and use the same AvroSchemaRegistry for the Record Reader/Writer controller services.
3.UpdateRecord Configs:
Record Reader:
Our input data is in CSV format so configure the CSV reader controller service as shown below.
If my incoming data has a header then I can define it as a header property and set that to true or we have to define the scheme in the registry later. The “ schema access strategy “ uses the string fields from the header. So using these two we can generate schema dynamically.
Record Writer:
As a next step convert the CSV format data into Avro using the Using “, Next store the orc files to hdfs using “ Convert Avro to orc “ processor
The schema should look like below
- Here I’m allowing all the values including null, in your case you can exclude null by keeping the type as string/integer.
- So the processor will route the flow file data content to failure status [“relationship”]
Add new property in the update record processor as shown in the below image
We should add a partition_dt column to flow file content with respect to joined date field
PartitionRecord Configuration :
Record Reader Processor
Configure the AvroReader controller service as shown in the below image
Record Writer Processor
Configure the AvroSetWriter controller service as shown in the below image
Add a fresh dynamic property to the Partition Record processor
- Now the partitioning of the records will happen based on the partition_dt value when the processor reads the incoming data.
2. Based on the number of unique partition values, output flow files are generated.
3. every flow file will have a partition_dt attribute with the value describing which partition this flow file belongs to.
The output of the Flowfile attributes:
Flow 3 :
Consume kafka — -> Extract Text — -> Replace Text — -> Puthiveql
Challenges faced:
- Data ingestion to hive table is very slow
Flow 4:
Consume kafka — -> Extract Text — -> Replace text — -> inferavroschema — -> convert record — -> puthivestreaming
Limitations:
- It does not support nontransactional table
Author: Nivedha B