Live Twitter Reach

March 14, 2020

What is the trend of a specific topic, such as a new brand or a current issue happening somewhere in the world? This post shows how to answer this question, and several similar, using a streaming pipeline and an analytic dashboard powered by Twitter Streaming API, Solr, Logstash and Banana.

Coronavirus (COVID-19) Live Demo (1456)

Prerequisites

In order to obtain a stream from Twitter, you need to have a developer account and a Twitter application. Each Twitter application is assigned customer keys that are used as credentials for authentication and authorization API calls.

Twitter offers several endpoints that allows interaction with the platform. The one used here is statuses/filter which streams tweets that match specific filters.

Docker Compose

docker-compose allows defining and running multi-container connected applications which is the common pattern of modern systems. This post illustrates how can Docker Compose be used to achieve that and answer the preceding question.

The docker-compose file that is needed is listed here:

# Copyright 2020

#

# Licensed under the Apache License, Version 2.0 (the "License");

# you may not use this file except in compliance with the License.

# You may obtain a copy of the License at

#

#    http://www.apache.org/licenses/LICENSE-2.0

#

# Unless required by applicable law or agreed to in writing, software

# distributed under the License is distributed on an "AS IS" BASIS,

# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.

# See the License for the specific language governing permissions and

# limitations under the License.


version: '3.7'


services:

  solr:

    image: solr:8.2.0

    ports:

      - 8983:8983

    volumes:

      - ./volumes/solr/configsets:/opt/configsets

    stdin_open: true

    tty: true

    command: ["solr", "-c", "-f"]

  

  logstash:

    build: ./logstash-patch

    ports:

      - 5044:5044

    volumes:

      - ./volumes/logstash/config:/usr/share/logstash/config

      - ./volumes/logstash/xbin:/usr/share/logstash/xbin

      - ./volumes/logstash/xplugins:/usr/share/logstash/xplugins

    links:

      - solr

    depends_on:

      - solr

    stdin_open: true

    tty: true

    command: ["xbin/wait-for-it.sh", "solr:8983", "-s", "-t", "60", "--", "/usr/share/logstash/xbin/setup.sh"]

  

  banana:

    image: aaadel/banana

    ports:

      - 9901:9901

    environment:

      BANANA_SOLR_HOST: solr

    links:

      - solr

    depends_on:

      - solr

    stdin_open: true

    tty: true

Here, we are using a simple orchestration mechanism, which is wait-for-it, to synchronize containers running sequence. We are depending on the same directory structure that was introduced in the previous post and also using Solr ingestion and indexing techniques, such as XSLT transformations and schema-less mode. Twitter credentials along with the endpoint parameters are supplied to Logstash Twitter plugin.

Decomposing Quoted Statuses

The tweets stream objects often comes with embedded objects, which Twitter calls quoted statuses. These quoted statuses are usually as significant or more as the original tweet. Since we are interested in these quoted statuses and we are using the default schema which does not support nested documents, a filter is required to decompose these quoted statuses and index them as original tweets. A Ruby filter fits exactly here. The following snippet decomposes quoted status, if it exists, adding it to a new event using field reference syntax and passes it in the returned events array.

# the value of `params` is the value of the hash passed to `script_params`

# in the logstash configuration

def register(params)

end


def filter(event)

  if event.get("quoted_status")

    quoted_status_event = LogStash::Event.new

    quoted_status = event.get("quoted_status")

      class << quoted_status

        def nested_each_pair(path)

  self.each_pair do |k, v|

    path << k

            if v.is_a?(Hash)

              v.nested_each_pair(path) 

              {|path, k, v| yield path, k, v}

    else

              yield(path, k, v)

            end

          path.pop

        end

      end

    end

    

    quoted_status.nested_each_pair([]) {|path, k, v|

field_ref = path.reduce("") { |memo, e| memo + "[" + e +"]" }

quoted_status_event.set(field_ref, v)

    }

    return [event, quoted_status_event]

  else

    return [event]

  end

end

Note: id_str field should be defined as unique in the schema in order to avoid duplication.

Analytics

Trend might be the most important metric here. In a Banana dashboard, adding a ticker panel calculates the linear trend estimate as follows:

where Vc is the current window volume and Vp is the past window volume as defined in panel settings. Below is the trend of “Coronavirus” and “COVID-19” on March 14, 2020 at 3:00 PM as measured using Twitter stream in a 12 hours sliding window and a 3 hours stride.

A map can be depicted also for geotagged tweets by supplying place object country_code_s field to the map panel settings. Below is a map of the same time window as above.

Tweets about COVID-19 Map

The graph below is a realtime timeseries plot of trend values for the past week:

Trend timeseries