Getting started with the filtered stream endpoint

The endpoints we release in Labs will be previews and are likely to change before they are released broadly, so we encourage you to take that into consideration as you build. Before getting started, we encourage you to read more about Twitter Developer Labs.

The filtered stream endpoint allows developers to filter the real-time stream of public Tweets. To use this preview, you will need to set up rules in order to receive Tweets from the streaming endpoint.

Before you can start, you will need the following:

Authentication

All the endpoints in this preview are authenticated using OAuth 2.0 Bearer token (also known as application-only authentication). This means you will need to generate a Bearer token, and pass this token in all of your requests. Once configured, our Postman collection will handle the generation of a Bearer token for you. Other clients like curl and Insomnia will require you to generate a token manually.

REST client

REST applications such as Postman can be used for organizing, testing, and debugging HTTP requests.

  • Python 3
  • Ruby
  • JavaScript (Node.js)

To run this example, you will need to add your consumer key and secret to this example. You will also have to enter one or several comma-seperated Tweet ID(s) from owned/authorized account(s) as indicated towards the bottom of this example. 

To add your consumer key and secret:

  1. Navigate to your app dashboard.
  2. Select the app you've enabled with the Filtered stream preview, then click Details.
  3. Select the Keys and tokens tab.
  4. In the Consumer API keys section, copy the values for API key into consumer_key and API secret key into consumer_secret.
     

Important: Never check consumer keys and secrets into source control. Learn how to secure your credentials.

import os
import requests
import json
import time
from pprint import pprint
from requests.auth import AuthBase
from requests.auth import HTTPBasicAuth

consumer_key = ""  # Add your API key here
consumer_secret = ""  # Add your API secret key here

stream_url = "https://api.twitter.com/labs/1/tweets/stream/filter"
rules_url = "https://api.twitter.com/labs/1/tweets/stream/filter/rules"

sample_rules = [
  { 'value': 'dog has:images', 'tag': 'dog pictures' },
  { 'value': 'cat has:images -grumpy', 'tag': 'cat pictures' },
]

# Gets a bearer token
class BearerTokenAuth(AuthBase):
  def __init__(self, consumer_key, consumer_secret):
    self.bearer_token_url = "https://api.twitter.com/oauth2/token"
    self.consumer_key = consumer_key
    self.consumer_secret = consumer_secret
    self.bearer_token = self.get_bearer_token()

  def get_bearer_token(self):
    response = requests.post(
      self.bearer_token_url, 
      auth=(self.consumer_key, self.consumer_secret),
      data={'grant_type': 'client_credentials'},
      headers={'User-Agent': 'TwitterDevFilteredStreamQuickStartPython'})

    if response.status_code is not 200:
      raise Exception(f"Cannot get a Bearer token (HTTP %d): %s" % (response.status_code, response.text))

    body = response.json()
    return body['access_token']

  def __call__(self, r):
    r.headers['Authorization'] = f"Bearer %s" % self.bearer_token
    r.headers['User-Agent'] = 'TwitterDevFilteredStreamQuickStartPython'
    return r


def get_all_rules(auth):
  response = requests.get(rules_url, auth=auth)

  if response.status_code is not 200:
    raise Exception(f"Cannot get rules (HTTP %d): %s" % (response.status_code, response.text))

  return response.json()


def delete_all_rules(rules, auth):
  if rules is None or 'data' not in rules:
    return None

  ids = list(map(lambda rule: rule['id'], rules['data']))

  payload = {
    'delete': {
      'ids': ids
    }
  }

  response = requests.post(rules_url, auth=auth, json=payload)

  if response.status_code is not 200:
    raise Exception(f"Cannot delete rules (HTTP %d): %s" % (response.status_code, response.text))

def set_rules(rules, auth):
  if rules is None:
    return

  payload = {
    'add': rules
  }

  response = requests.post(rules_url, auth=auth, json=payload)

  if response.status_code is not 201:
    raise Exception(f"Cannot create rules (HTTP %d): %s" % (response.status_code, response.text))

def stream_connect(auth):
  response = requests.get(stream_url, auth=auth, stream=True)
  for response_line in response.iter_lines():
    if response_line:
      pprint(json.loads(response_line))

bearer_token = BearerTokenAuth(consumer_key, consumer_secret)

def setup_rules(auth):
  current_rules = get_all_rules(auth)
  delete_all_rules(current_rules, auth)
  set_rules(sample_rules, auth)


# Comment this line if you already setup rules and want to keep them
setup_rules(bearer_token)

# Listen to the stream.
# This reconnection logic will attempt to reconnect when a disconnection is detected.
# To avoid rate limites, this logic implements exponential backoff, so the wait time
# will increase if the client cannot reconnect to the stream.
timeout = 0
while True:
  stream_connect(bearer_token)
  sleep(2 ** timeout)
  timeout += 1

To run this example, you will need to add your consumer key and secret to this example. You will also have to enter one or several comma-seperated Tweet ID(s) from owned/authorized account(s) as indicated towards the bottom of this example. 

To add your consumer key and secret:

  1. Navigate to your app dashboard.
  2. Select the app you've enabled with the Filtered stream preview, then click Details.
  3. Select the Keys and tokens tab.
  4. In the Consumer API keys section, copy the values for API key into @consumer_key and API secret key into @consumer_secret.
     

Important: Never check consumer keys and secrets into source control. Learn how to secure your credentials.

require 'typhoeus'
require 'base64'
require 'json'

@consumer_key = "" # Add your API key here
@consumer_secret = "" # Add your API secret key here

@bearer_token_url = "https://api.twitter.com/oauth2/token"
@stream_url = "https://api.twitter.com/labs/1/tweets/stream/filter"
@rules_url = "https://api.twitter.com/labs/1/tweets/stream/filter/rules"

@sample_rules = [
  { 'value': 'dog has:images', 'tag': 'dog pictures' },
  { 'value': 'cat has:images -grumpy', 'tag': 'cat pictures' },
]

def bearer_token
  return @bearer_token unless @bearer_token.nil?

  @credentials = Base64.encode64("#{@consumer_key}:#{@consumer_secret}").gsub("\n", "")
  
  @options = {
    body: {
      grant_type: "client_credentials"
    },
    headers: {
      "Authorization": "Basic #{@credentials}",
      "User-Agent": "TwitterDevFilteredStreamQuickStartRuby"
    }
  }

  @response = Typhoeus.post(@bearer_token_url, @options)
  @body = JSON.parse(@response.body)
  @bearer_token = @body["access_token"] ||= nil
end

def get_all_rules
  @options = {
    headers: {
      "User-Agent": "TwitterDevFilteredStreamQuickStartRuby"
      "Authorization": "Bearer #{bearer_token}"
    }
  }

  @response = Typhoeus.get(@rules_url, @options)

  raise "An error occurred while getting a list of rules: #{@response.body}" unless @response.success?

  @body = JSON.parse(@response.body)
end

def delete_all_rules(rules)
  return if rules.nil?

  @ids = rules['data'].map { |rule| rule["id"] }
  @payload = {
    delete: {
      ids: @ids
    }
  }

  @options = {
    headers: {
      "User-Agent": "TwitterDevFilteredStreamQuickStartRuby"
      "Authorization": "Bearer #{bearer_token}",
      "Content-type": "application/json"
    },
    body: JSON.dump(@payload)
  }

  @response = Typhoeus.post(@rules_url, @options)

  raise "An error occurred while deleting your rules: #{@response.status_message}" unless @response.success?
end

def set_rules(rules)
  return if rules.nil?

  @payload = {
    add: rules
  }

  @options = {
    headers: {
      "User-Agent": "TwitterDevFilteredStreamQuickStartRuby"
      "Authorization": "Bearer #{bearer_token}",
      "Content-type": "application/json"
    },
    body: JSON.dump(@payload)
  }

  @response = Typhoeus.post(@rules_url, @options)
  raise "An error occurred while adding rules: #{@response.status_message}" unless @response.success?
end

def stream_connect
  @options = {
    timeout: 20,
    method: 'get',
    headers: {
      "User-Agent": "TwitterDevFilteredStreamQuickStartRuby"
      "Authorization": "Bearer #{bearer_token}",
    },
    params: {
      format: 'compact'
    }
  }

  @request = Typhoeus::Request.new(@stream_url, @options)
  @request.on_body do |chunk|
    puts chunk
  end
  @request.run
end

def setup_rules
  # Gets the complete list of rules currently applied to the stream
  @rules = get_all_rules

  # Delete all rules
  delete_all_rules(@rules)
  
  # Add rules to the stream
  set_rules(@sample_rules)
end


# Comment this line if you already setup rules and want to keep them
setup_rules

# Listen to the stream.
# This reconnection logic will attempt to reconnect when a disconnection is detected.
# To avoid rate limites, this logic implements exponential backoff, so the wait time
# will increase if the client cannot reconnect to the stream.
timeout = 0
while true
  stream_connect
  sleep 2 ** timeout
  timeout += 1
end

To run this example, you will need to add your consumer key and secret to this example. You will also have to enter one or several comma-seperated Tweet ID(s) from owned/authorized account(s) as indicated towards the bottom of this example. 

To add your consumer key and secret:

  1. Navigate to your app dashboard.
  2. Select the app you've enabled with the Filtered stream preview, then click Details.
  3. Select the Keys and tokens tab.
  4. In the Consumer API keys section, copy the values for API key into consumer_key and API secret key into consumer_secret.
     

Important: Never check consumer keys and secrets into source control. Learn how to secure your credentials.

const https = require('https');
const request = require('request');
const util = require('util');

const get = util.promisify(request.get);
const post = util.promisify(request.post);

const consumer_key = ''; // Add your API key here
const consumer_secret = ''; // Add your API secret key here

const bearerTokenURL = new URL('https://api.twitter.com/oauth2/token');
const streamURL = new URL('https://api.twitter.com/labs/1/tweets/stream/filter');
const rulesURL = new URL('https://api.twitter.com/labs/1/tweets/stream/filter/rules');

async function bearerToken (auth) {
  const requestConfig = {
    url: bearerTokenURL,
    auth: {
      user: consumer_key,
      pass: consumer_secret,
    },
    form: {
      grant_type: 'client_credentials',
    },
  };

  const response = await post(requestConfig);
  return JSON.parse(response.body).access_token;
}

async function getAllRules(token) {
  const requestConfig = {
    url: rulesURL,
    auth: {
      bearer: token
    }
  };

  const response = await get(requestConfig);
  if (response.statusCode !== 200) {
    throw new Error(response.body);
    return null;
  }

  return JSON.parse(response.body);
}

async function deleteAllRules(rules, token) {
  if (!Array.isArray(rules.data)) {
    return null;
  }

  const ids = rules.data.map(rule => rule.id);

  const requestConfig = {
    url: rulesURL,
    auth: {
      bearer: token
    },
    json: {
      delete: {
        ids: ids
      }
    }
  };

  const response = await post(requestConfig);
  if (response.statusCode !== 200) {
    throw new Error(JSON.stringify(response.body));
    return null;
  }

  return response.body;
}

async function setRules(rules, token) {
  const requestConfig = {
    url: rulesURL,
    auth: {
      bearer: token
    },
    json: {
      add: rules  
    }
  };

  const response = await post(requestConfig);
  if (response.statusCode !== 201) {
    throw new Error(JSON.stringify(response.body));
    return null;
  }

  return response.body;
}

function streamConnect(token) {
  // Listen to the stream
  const config = {
    url: 'https://api.twitter.com/labs/1/tweets/stream/filter?format=compact',
    auth: {
      bearer: token,
    },
    timeout: 20000,
  };

  const stream = request.get(config);

  stream.on('data', data => {
      try {
        const json = JSON.parse(data);
        console.log(json);
      } catch (e) {
        // Heartbeat received. Do nothing.
      }
      
  }).on('error', error => {
    if (error.code === 'ETIMEDOUT') {
      stream.emit('timeout');
    }
  });

  return stream;
}

(async () => {
  let token, currentRules;
  const rules = [
    { 'value': 'dog has:images', 'tag': 'dog pictures' },
    { 'value': 'cat has:images -grumpy', 'tag': 'cat pictures' },
  ];

  try {
    // Exchange your credentials for a Bearer token
    token = await bearerToken({consumer_key, consumer_secret});
  } catch (e) {
    console.error(`Could not generate a Bearer token. Please check that your credentials are correct and that the Filtered Stream preview is enabled in your Labs dashboard. (${e})`);
    process.exit(-1);
  }

  try {
    // Gets the complete list of rules currently applied to the stream
    currentRules = await getAllRules(token);
    
    // Delete all rules. Comment this line if you want to keep your existing rules.
    await deleteAllRules(currentRules, token);

    // Add rules to the stream. Comment this line if you want to keep your existing rules.
    await setRules(rules, token);
  } catch (e) {
    console.error(e);
    process.exit(-1);
  }

  // Listen to the stream.
  // This reconnection logic will attempt to reconnect when a disconnection is detected.
  // To avoid rate limites, this logic implements exponential backoff, so the wait time
  // will increase if the client cannot reconnect to the stream.

  const stream = streamConnect(token);
  let timeout = 0;
  stream.on('timeout', () => {
    // Reconnect on error
    console.warn('A connection error occurred. Reconnecting…');
    setTimeout(() => {
      timeout++;
      streamConnect(token);
    }, 2 ** timeout);
    streamConnect(token);
  });
})();