Python Pipeline: Here’s How to Build Your Python Package and install it in StreamSets

09

Coding in Python gives developers ultimate control over every aspect of their design, but with a plethora of choices comes the dangers of becoming distracted. Low code, graphical environments provide for easy operation and reuse of components but with shallow levels of control than hand coding. Custom processors allow data engineers to operationalize their code and provide powerful extensibility for coders.

This Python pipeline walkthrough is created for the purpose of demonstrating how a user can run their own Python code into StreamSets Data Collector.

This demo is for installing a Data Collector using Docker. Your complex Python code will be packaged and deployed into PyPI. Simply add the Python package and build your own Docker image on top of StreamSets Docker image. Once the Data Collector is installed using your own custom image, you’ll simply import the Python packages in the jython evaluator.

Let’s get started!

Prerequisites:

  1. Have an account in PyPI.
  2. Install twine to connect to PyPI.
  3. Install docker.
  4. Have a github account to upload your code.
  5. Have an account in Streamsets DataOps platform.

High level steps:

  1. Create a package for your python package.
  2. Build the project.
  3. Upload to PyPI.
  4. Create a Dockerfile and install the python package.
  5. Run the script generated from StreamSets deployment with your custom image.
  6. Create a pipeline with Jython evaluator.

Below are the steps to create your own python package and upload to PyPI.

PyPI is the official Python repository where all Python Packages are stored. You can think of it as the Github for Python Packages.

To make your Python Package available to people around the world, you’ll need to have an account with PyPI.

I am using twine to upload the files to PyPI. you can install twine using the below command:

pip install twine

Benefits and Limitations of a Python Pipeline in StreamSets

Benefits:

  1. Customers can use their existing python packages into StreamSets.
  2. Users do not have to rewrite the complex logic into StreamSets.

Limitations:

  1. Customers have to rebuild the custom images during upgrades.

Python Calculator Project

Folder Structure

Folder Structure

Content of Addition.py

print("Performing Addition:") 
def add(x,y): 
return x+y

You can grab the complete code here on GitHub.

Create a init.py inside the WilsonCalculator and add below content

__author__ = 'wilson shamim'
__version__ = '1.1.0'
from WilsonCalculator.Addition import add
from WilsonCalculator.Subtraction import sub
from WilsonCalculator.Division import div
from WilsonCalculator.Multiplication import Mul
Create a setup.py under WilsonProject. 

Setup.py can be found here.

Navigate to WilsonProject and run the below command to build the project

python setup.py sdist

This will create below folders

wilsonshamim@Wilsons-MBP PythonTutorials % cd WilsonProject 
wilsonshamim@Wilsons-MBP WilsonProject % python setup.py sdist 
running sdist
running egg_info
creating WilsonCalculators.egg-info
writing WilsonCalculators.egg-info/PKG-INFO
writing top-level names to WilsonCalculators.egg-info/top_level.txt
writing dependency_links to WilsonCalculators.egg-info/dependency_links.txt
writing manifest file 'WilsonCalculators.egg-info/SOURCES.txt'
reading manifest file 'WilsonCalculators.egg-info/SOURCES.txt'
writing manifest file 'WilsonCalculators.egg-info/SOURCES.txt'
warning: sdist: standard file not found: should have one of README, README.rst, README.txt, README.md
running check
warning: check: missing required meta-data: url
creating WilsonCalculators-1.1.0
creating WilsonCalculators-1.1.0/WilsonCalculator
creating WilsonCalculators-1.1.0/WilsonCalculators.egg-info
copying files to WilsonCalculators-1.1.0...
copying setup.py -> WilsonCalculators-1.1.0
copying WilsonCalculator/Addition.py -> WilsonCalculators-1.1.0/WilsonCalculator
copying WilsonCalculator/Division.py -> WilsonCalculators-1.1.0/WilsonCalculator
copying WilsonCalculator/Multiplication.py -> WilsonCalculators-1.1.0/WilsonCalculator
copying WilsonCalculator/Subtraction.py -> WilsonCalculators-1.1.0/WilsonCalculator
copying WilsonCalculator/__init__.py -> WilsonCalculators-1.1.0/WilsonCalculator
copying WilsonCalculators.egg-info/PKG-INFO -> WilsonCalculators-1.1.0/WilsonCalculators.egg-info
copying WilsonCalculators.egg-info/SOURCES.txt -> WilsonCalculators-1.1.0/WilsonCalculators.egg-info
copying WilsonCalculators.egg-info/dependency_links.txt -> WilsonCalculators-1.1.0/WilsonCalculators.egg-info
copying WilsonCalculators.egg-info/top_level.txt -> WilsonCalculators-1.1.0/WilsonCalculators.egg-info
Writing WilsonCalculators-1.1.0/setup.cfg
creating dist
Creating tar archive
removing 'WilsonCalculators-1.1.0' (and everything under it)

Once the setup is complete, 2 new folders dist and WilsonCalculator.egg.info will be created.

Navigate to WilsonProject and run below command to upload the project in the PyPI

twine upload dist/*

It will ask for username and password.

Once uploaded successfully, you can now install it using pip

pip install WilsonCalculators

Once installed successfully, you can now use it in your code

WilsonCalculators

Next, create a dockerfile with below content

FROM streamsets/datacollector:4.3.0
USER root
RUN apk add --update --no-cache bash \
 curl \
 grep \
 krb5-libs \
 krb5 \
 libidn \
 libstdc++ \
 libuuid \
 protobuf \
 sed \
 python2 \
 py-pip \
 sudo && \
 echo 'hosts: files mdns4_minimal [NOTFOUND=return] dns mdns4' >> /etc/nsswitch.conf
RUN python2.7 -m ensurepip --default-pip
RUN python2 -m pip install WilsonCalculators

A docker image named wilsoncalculator will be created

ubuntu@ip-10-10-52-110:~/customImage$ docker images
REPOSITORY TAG IMAGE ID CREATED SIZE
wilsoncalculator latest 5a33c8117856 About a minute ago 952MB

Now we can use this image to run the engine and connect to DataOps

You can refer to the documentation for creating deployments and how to get the install engine scripts. Once you have obtained the install scripts from the deployments, replace streamsets/datacollector: with wilsoncalculator

docker run -d -e STREAMSETS_DEPLOYMENT_SCH_URL=https://na01.hub.streamsets.com -e STREAMSETS_DEPLOYMENT_ID=<deployment ID> -e STREAMSETS_DEPLOYMENT_TOKEN=<Token> wilsoncalculator:latest
3d59d6d37d45598a1dbf6401208e4fa1917ddbeba5dd726c4e8c23b99ff1e667
ubuntu@ip-10-10-52-110:~/customImage$ docker ps
CONTAINER ID IMAGE COMMAND CREATED STATUS PORTS NAMES
3d59d6d37d45 wilsoncalculator:latest "https://streamsets.b-cdn.net/docker-entrypoint.…" 15 seconds ago Up 13 seconds 18630/tcp

Now you can create your Python pipeline.

I have created a WilsonCalculator pipeline with the jython evaluator.

Jython scripts:

try:
 sdc.importLock()
 import sys
 sys.path.append('/usr/lib/python2.7/site-packages')
 from WilsonCalculator.Addition import add
finally:
 sdc.importUnlock()

# Sample Jython code
for record in sdc.records:
 try:
 sdc.log.info("Start calculation.....")
 sdc.log.info(str(record.value['input1']))
 sdc.log.info(str(record.value['input2']))
 sdc.log.info(str(add(int(record.value['input1']),int(record.value['input2']))))
 sdc.log.info("----------------------")
 sdc.output.write(record)
except Exception as e:
# Send record to error
 sdc.error.write(record, str(e))

Output:

Generated 3 records

Logged the additional result.

You can see from this example the benefits and limitations of embedded python in your smart data pipelines. StreamSets aims to bridge the gap between the ultimate control of hand coding and ease and repeatability of a graphical interface.

With StreamSets you can:

  • Quickly build, deploy, and scale streaming, batch, CDC, ETL and ML pipelines
  • Handle data drift automatically, keeping jobs running even when schemas and structures change
  • Deploy, monitor, and manage all your data pipelines – across hybrid and multi-cloud – from a single dashboard

Try smart data pipelines out yourself with StreamSets, a fully cloud-based, all-in-one DataOps platform. Sign up now and start building pipelines for free!