Building a Simple Custom Processor With Apache Ni-Fi

Apache NiFi is a powerful tool for automating data flows with a large number of built-in processors. However, sometimes there is a need for specific processors to handle unique requirements and data stores. In such cases, creating custom processors becomes a necessity.


This content originally appeared on HackerNoon and was authored by Temirlan Amanbayev

\ Apache NiFi is a powerful tool for automating data flows with a large number of built-in processors. However, sometimes there is a need for specific processors to handle unique requirements and data stores. In such cases, creating custom processors becomes a necessity. In this article, we will go through the steps to create a very simple custom processor for Apache NiFi.

\

Setup and sources

gradle setup

dependencies {
    compile "org.apache.nifi:nifi-api:*"
    compile "org.apache.nifi:nifi-utils:1.9.2"
    testCompile "org.apache.nifi:nifi-mock:1.9.2"
}

\ source of the custom processor

import org.apache.nifi.annotation.behavior.ReadsAttribute;
import org.apache.nifi.annotation.behavior.ReadsAttributes;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.SeeAlso;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.*;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;

import java.io.OutputStream;
import java.nio.charset.StandardCharsets;
import java.util.*;

@Tags({"example"})
@CapabilityDescription("Hello World to output")
@SeeAlso({})
@ReadsAttributes({@ReadsAttribute(attribute = "", description = "")})
@WritesAttributes({@WritesAttribute(attribute = "", description = "")})
public class MyProcessor extends AbstractProcessor {

    public static final PropertyDescriptor propertyDescriptor = new PropertyDescriptor
            .Builder().name("name")
            .displayName("name")
            .description("Name to print")
            .required(true)
            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
            .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
            .build();

    public static final Relationship successRelations = new Relationship.Builder()
            .name("success")
            .description("If everything is ok")
            .build();

    public static final Relationship failureRelations = new Relationship.Builder()
            .name("failure")
            .description("If something is wrong")
            .build();

    private List<PropertyDescriptor> descriptorList;

    private Set<Relationship> relationshipSet;

    @Override
    protected void init(final ProcessorInitializationContext context) {
        final List<PropertyDescriptor> descriptors = new ArrayList<PropertyDescriptor>();
        descriptors.add(propertyDescriptor);
        this.descriptorList = Collections.unmodifiableList(descriptors);

        final Set<Relationship> relationships = new HashSet<Relationship>();
        relationships.add(successRelations);
        relationships.add(failureRelations);
        this.relationshipSet = Collections.unmodifiableSet(relationships);
    }

    public Set<Relationship> getRelationshipSet() {
        return this.relationshipSet;
    }

    @Override
    public final List<PropertyDescriptor> getSupportedPropertyDescriptors() {
        return descriptorList;
    }

    @OnScheduled
    public void onScheduled(final ProcessContext context) {

    }

    @Override
    public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
        FlowFile flowFile = session.get();
        if (flowFile == null) {
            return;
        }

        try {
            String name = context.getProperty(propertyDescriptor).evaluateAttributeExpressions(flowFile).getValue();

            String result = "Hello " + name;
            session.putAttribute(flowFile, "result", result);

            try (OutputStream flowFileOutputStream = session.write(flowFile)) {
                flowFileOutputStream.write(result.getBytes(StandardCharsets.UTF_8));
            }

            session.transfer(flowFile, successRelations);
        } catch (IOException e) {
                //IO Error processing error log
                session.transfer(flowFile, failureRelations);
            } catch (ProcessException e) {
                    // Process error log
                getLogger().error("Processing error", e);
                session.transfer(flowFile, failureRelations);
            }
    }
}

\

Build and deploy

  • Build the project with Gradle: gradle build.
  • Find the generated .nar file in the target folder of your module.
  • Copy the .nar file to the lib folder of your NiFi installation.

\ Screenshot of how the custom processor integrates seamlessly

\ I hope this article was useful for you! Creating a custom processor for Apache NiFi allows you to customize it to meet the specific requirements of your project. By following the steps above, you will be able to develop and integrate a custom processor that will enhance the processing capabilities of your system.


This content originally appeared on HackerNoon and was authored by Temirlan Amanbayev


Print Share Comment Cite Upload Translate Updates
APA

Temirlan Amanbayev | Sciencx (2024-07-24T13:33:11+00:00) Building a Simple Custom Processor With Apache Ni-Fi. Retrieved from https://www.scien.cx/2024/07/24/building-a-simple-custom-processor-with-apache-ni-fi/

MLA
" » Building a Simple Custom Processor With Apache Ni-Fi." Temirlan Amanbayev | Sciencx - Wednesday July 24, 2024, https://www.scien.cx/2024/07/24/building-a-simple-custom-processor-with-apache-ni-fi/
HARVARD
Temirlan Amanbayev | Sciencx Wednesday July 24, 2024 » Building a Simple Custom Processor With Apache Ni-Fi., viewed ,<https://www.scien.cx/2024/07/24/building-a-simple-custom-processor-with-apache-ni-fi/>
VANCOUVER
Temirlan Amanbayev | Sciencx - » Building a Simple Custom Processor With Apache Ni-Fi. [Internet]. [Accessed ]. Available from: https://www.scien.cx/2024/07/24/building-a-simple-custom-processor-with-apache-ni-fi/
CHICAGO
" » Building a Simple Custom Processor With Apache Ni-Fi." Temirlan Amanbayev | Sciencx - Accessed . https://www.scien.cx/2024/07/24/building-a-simple-custom-processor-with-apache-ni-fi/
IEEE
" » Building a Simple Custom Processor With Apache Ni-Fi." Temirlan Amanbayev | Sciencx [Online]. Available: https://www.scien.cx/2024/07/24/building-a-simple-custom-processor-with-apache-ni-fi/. [Accessed: ]
rf:citation
» Building a Simple Custom Processor With Apache Ni-Fi | Temirlan Amanbayev | Sciencx | https://www.scien.cx/2024/07/24/building-a-simple-custom-processor-with-apache-ni-fi/ |

Please log in to upload a file.




There are no updates yet.
Click the Upload button above to add an update.

You must be logged in to translate posts. Please log in or register.