Artificial Intelligence (AI) with its subset ML (Machine learning) is probably one of the hottest topics in IT industry today.

Many companies are struggling to implement AI algorithms into data pipelines to make smarter decisions with more or less success.

First of all, the AI is a wide topics which requires knowledge of math, statistics, software architecture, programming, performance tuning and deep understanding of business processes and data to be successful, which is a combination you can rarely find.

Even if you find such experts, there are still lot of issues you need to resolve, starting with data ingestion and processing in a real time, monitoring, alerting, debugging etc.

In this article I’ll share an idea of how to successfully implement real time machine learning in your company.

In the previous blog I’ve described how to combine StreamSets Data Collector and Transformer to replicate data from Oracle to Kafka in a real time by using Data Collector CDC (Change Data Capture) technology to ingest changes into the Kafka topic, which serves as a starting point into the second pipeline based on Apache Spark and Transformer.

In addition to data transformation, in the second pipeline you can also implement AI/ML by using PySpark or Scala stage.

The first pipeline leverages CDC to get all DML (UPDATE, DELETE, INSERT) changes on the source table and propagates it to Kafka topic is the same as in the previous blog:

As I mentioned before, end of the first pipeline (Kafka topic) serves as a starting point of the second pipeline.

On the following picture you can see how it looks like.

Real time AI/ML pipeline

The most important part is hidden inside PySpark stage.

Down below is an code excerpt from the PySpark stage.

PySpark code ecerpt to apply ML on a new data in real time


The basic idea is to apply already trained model to a new data which arrives as a stream from Kafka topic.

The AI/ML model you can develop separately by using all AI/ML standard tools and programming languages.

Below is an code excerpt which create model based on historical data.

Excerpt of a code for creating a ML model

The similar thing you can do in Scala.


I hope this article will get you an idea how to implement real time AI/ML within your company, with monitoring, alerting and debugging all included along with a visual pipeline designer.

You would still need someone to create model itself, but the rest of the process would be easier and faster than with other tools.

Get notified when a new post is published!



Best AGV and AMR
2022-01-06 06:50:10
Very impressive I appreciate your working and thanks for share with us all coding and informative blog post.

Leave a Reply

Your email address will not be published. Required fields are marked *

This site uses Akismet to reduce spam. Learn how your comment data is processed.