The whole system is comprised of three different modules, Kafka twitter streaming producer, sentiment analysis consumer, and Scala Play server consumer. Sentiment analysis consumer is made up of Apache Spark streaming and Naive Bayes Classifier model trained by using Apache Spark MLlib. Apache Kafka serves as the central data backbone to connecting all three different decoupled parts by publish-subscribe messaging style.
Kafka twitter streaming producer publishes streaming tweets on the ‘tweets’ topic to the central Apache Kafka, and sentiment analysis consumer has subscribed that ‘tweets’ topic. The sentiment analysis consumer leverage Apache Spark Streaming to perform batch processing on incoming tweets and load trained Naive Bayes model to perform sentiment analysis. And then accumulated count of each positive sentiment and negative sentiment reduced by each location are published on topic ‘sentiment’ to central Kafka, and this ‘sentiment’ topic subscribed by Scala Play Server. The sentiment analysis results will be send to web clients through webSocket connections.
All different parts of modules, Kafka Twitter Streamming Producer, Apache Kafka, Apache Spark Streaming and Scala Play server are run docker locally.
It’s a Kafka producer used for publishing streaming tweets to central Apache Kafka on topic ‘tweets’ in real time from all over the world in English by using twitter4j library for twitter API
Apache Kafka is a high-performance distributed, partitioned, replicated publish-subscribe messaging system. Kafka cluster serve as the central data backbone for a large organization and highly valuable for enterprise infrastructures to process streaming data.
A stream of messages belonging to a particular category is called a topic. Data is stored in topics. Topics are split into partitions. Each partition is an ordered immutable sequence of message that is continually appended to—a commit log. And each partition is replicated across a configurable number of servers for fault tolerance. Each partition has one server which acts as the “leader” and zero or more servers which act as “followers”. The leader handles all read and write requests for the partition while the followers passively replicate the leader.
ZooKeeper is used for managing and coordinating Kafka broker in kafka cluster. If the leader fails, zookeeper will choose one of the followers as the new leader.
This project was configured as one kafka broker and one zookeeper. It mainly used as PUB/SUB message queue for raw streaming tweets and results of sentiment analysis of tweets. It connects decoupled modules of the entire system, so each module work independently without realize the existence of other. It maintained two topics in this project, ‘tweets’ and ‘sentiment’, one for raw steaming tweets and the other for results of sentiment analysis of each location.
Machine learning is the study and construction of algorithm that can learn from data and make data-driven prediction. Depending on learning ‘feedback’ available to learning system, Machine learning tasks are classified into three categories:
Machine learning algorithms are break down in several categories based on similarity
Bayes’ theorem describes the probability of an event, based on conditions that might be related to the event:
Naive Bayes is not a single algorithm, but a family of probabilistic classifiers of supervised learning algorithms based on applying Bayes’ theorem with the “naive” assumption of independence between every pair of features solving classification problem. Naive Bayes algorithm has widely use case in our daily life, such as classify spam email, zombie account and document classification.
Based on assumptions on distributions of features from training set, Naive Bayes classifier, for discrete features like the ones encountered in document classification (include spam filtering), multinomial and Bernoulli distributions are popular. Apache Spark MLlib supports Multinomial Naive Bayes and Bernoulli Naive Bayes. These models are typically used for document classification. The third one is Gaussian naive Bayes dealing with continuous data that has Gaussian distribution. Multinomial Naive bayesas is the default in Spark MLlib and used in this project.
The training data set and test data set are from sentiment140, which contains 1.6 million training data set of tweets marked as positive and negative emotion for each tweet. The spark naive bayes model was trained by these amount of data with removal of stop words to improve accuracy. The accuracy of trained model is around 78% by running test data set.
(stop words are most common words in a language, like is, a, the, an, which, such and on, they don’t contribute much information to processing of natural language)
Apache Spark v1.6 was used to build this system
Apache Spark is a fast and general-purpose distributed cluster computing framework. Spark core is the foundation of overall project. It supports a rich set of higher-level tools including Spark SQL for SQL and structured data processing, MLlib for machine learning, GraphX for graph processing, and Spark Streaming.
Spark’s primary data abstraction is a distributed collection of items called a Resilient Distributed Dataset (RDD). RDD represents an immutable, partitioned collection of elements that can be operated on in parallel with fault-tolerance.
RDD has several traits:
The motivation of creating RDD was to solve two type of application that current computing framework handles inefficiently:
The goal is to reuse intermediate in-memory results across multiple data-intensive workloads with no need for copying large amounts of data over the network.
Spark streaming leverages spark core to perform streaming analysis. Discretized Stream or DStream is the basic abstraction provided by Spark Streaming. It represents a continuous stream of data, either the input data stream received from source, or the processed data stream generated by transforming the input stream. Internally, a DStream is represented by a continuous series of RDDs.Each RDD in a DStream contains data from a certain interval, as shown in the following figure.
Any operation applied on a DStream translates to operations on the underlying RDDs.
The major part of the project is to perform serial of transformations on input of raw tweets streaming while apply trained Naive Bayes model on each of them, then publish results to Kafka. The transformation on DStreams can be grouped into either stateless or stateful.
Stateless transformations are simple RDD transformation being applied on every batch, that is, every RDD in a DStream, such map(), flatMap(), filter(), reducedByKey() and so on. Stateless transformations was used to filter emotion icon, link and non alphanumeric characters in each tweet, map each tweet to tuple format of (city, positive/negative sentiment)
Statefull transformation are operations on DStream that track data across time, that is, some data from previous batches is used to generate the results for a new batch. updateStateByKey() is used to accumulate total number of positive or negative count of sentiment results for each location from beginning of running the project.
def start = {
val tweetsStreaming = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder] (sc, kafkaParams, topic)
val rawTweets = tweetsStreaming.map(_._2)
rawTweets.window(Seconds(5), Seconds(5)).saveAsTextFiles(s"/rawResults/tweet")
val rawLocationText= rawTweets.map(TwitterObjectFactory.createStatus).map(CustomizeTweet(_))
rawLocationText.window(Seconds(5), Seconds(5)).saveAsTextFiles(s"/rawLocationText/ct")
//filtering tuple, sanitise location
val cityText = rawLocationText.filter(x => TweetUtils.sanitiseLocation(x._1)).map{ case (k,v) => (k.split(",")(0), v)} //location from tweet, "city, country"
cityText.window(Seconds(5), Seconds(5)).saveAsTextFiles(s"/cityText/ct")
//filtering tuple, sanitise text
val cleanCityText = cityText.map{ case (k, v) => (k, TweetUtils.filterOnlyWords(v))}.filter(x => TweetUtils.filterEmptyString(x._2))
cleanCityText.window(Seconds(5), Seconds(5)).saveAsTextFiles(s"/cleanCityText/ct")
//loading Naive Bayes model
println("Loading the Naive Bayes model...")
val model = NaiveBayesModel.load(sc.sparkContext, "src/main/model/")
println("Start predicting...")
/**
* analyze sentiment of each tweet
* data format: (city, (msg, predicted value))
*/
val cityTextPredictedValue= cleanCityText.map(x => (x._1, (x._2, model.predict(TrainingUtils.featureVectorization(x._2)))))
cityTextPredictedValue.window(Seconds(5), Seconds(5)).saveAsTextFiles(s"/cityTextPredictedValue/ct") // save this data structure for reference and debugging
/**
* predicted sentiment value for each city
* data format: (city, 4.0) or (city, 1.0)
*/
val cityPredictedValue = cleanCityText.map(x => (x._1, model.predict(TrainingUtils.featureVectorization(x._2)).toString))
/**
* group predicted sentiment values for each city
* data format: (city, 4.0 1.0 4.0)
*/
val reducedCityPredictedValue = cityPredictedValue.reduceByKey((x, y) => x + " " + y)
reducedCityPredictedValue.window(Seconds(5), Seconds(5)).saveAsTextFiles(s"/reducedCityPredictedValue/ct")
/**
* classify positive sentiment value and count them
* data format: (city, num of positive value) e.g. (city, 2) ...
*/
val numPositivePredictedValue = reducedCityPredictedValue.map(x => (x._1, x._2.split(" ").filter( x => x.toDouble == 4.0).mkString(" "))) // filtering negative value
.map{ case (k:String, v:String) => if(!v.isEmpty) (k, v.split(" ").length) else (k, 0)} //counting the number of positive predicted value
numPositivePredictedValue.window(Seconds(5), Seconds(5)).saveAsTextFiles(s"/numPositivePredictedValue/ct")
//accumulate counting of positive sentiment tweet per city, accumulate from beginning of the app
val totalPositiveTweetPerCity = numPositivePredictedValue.updateStateByKey(TweetUtils.accumulateSentimentCount _ )
totalPositiveTweetPerCity.window(Seconds(5), Seconds(5)).saveAsTextFiles(s"/totalPositiveTweetPerCity/ct")
/**
* classify negative sentiment value and count them
* data format: (city, num of negative value) e.g. (city, 2) ...
*/
val numNegativePredictedValue = reducedCityPredictedValue.map(x => (x._1, x._2.split(" ").filter( x => x.toDouble == 0.0).mkString(" "))) //filtering positive value
.map{ case (k:String, v:String) => if(!v.isEmpty) (k, v.split(" ").length) else (k, 0)} //counting the number of positive predicted value
numNegativePredictedValue.window(Seconds(5), Seconds(5)).saveAsTextFiles(s"/numNegativePredictedValue/ct")
//accumulate counting of positive sentiment tweet per city, accumulate from beginning of the app
val totalNegativeTweetPerCity = numNegativePredictedValue.updateStateByKey(TweetUtils.accumulateSentimentCount _ )
totalNegativeTweetPerCity.window(Seconds(5), Seconds(5)).saveAsTextFiles(s"/totalNegativeTweetPerCity/ct")
/**
* join positive and negative count of sentiment value for each city
* data format: (city, (count of positive, count of negative))
*/
val joinPositiveNegativeCountPerCity = totalPositiveTweetPerCity.join(totalNegativeTweetPerCity)
joinPositiveNegativeCountPerCity.print()
joinPositiveNegativeCountPerCity.window(Seconds(5), Seconds(5)).saveAsTextFiles(s"/joinPositiveNegativeCountPerCity/ct")
/**
* publish sentiment count of each city back to Kafka with different kafka topic
*/
joinPositiveNegativeCountPerCity.foreachRDD{
rdd => rdd.foreachPartition {
partitionOfRecords => {
val producer = new Producer[String, String](new ProducerConfig(KafkaConfig.kafkaProps))
partitionOfRecords.foreach {
message => producer.send(new KeyedMessage[String, String](KafkaConfig.KAFKA_SENTIMENT_TOPIC, message.toString()))
}
producer.close()
}
}
}
// Start the streaming computation
println("Spark Streaming begin...")
sc.start()
sc.awaitTermination()
}
}
Play framework is build on Akka, which is a toolkit and runtime for building highly concurrent, distributed, and resilient message-driven applications on the JVM. Akka emphasizes actor-based concurrency, with inspiration drawn from Erlang. The Play server subscribed topic ‘sentiment’ from Kafka and continuously consumes streaming sentiment results from Kafka, and send them to web client through WebSocket connection.
This is a really fun and valuable project that I have learned distributed system, like Apache Kafka and Apache Spark(both have partitioned data to achieve data distribution and parallelism), machine learning algorithm, Naive Bayes Classifier, and Scala in functional programming. When I review this project, concurrent computing on JVM distributed system catch my attention because of Akka toolkit used by Apache Spark and Play framework, which emphasizes actor-model to achieve concurrency. Concurrent programming on JVM will be my next milestone.