Interested in exploring Labs?
The endpoints we release in Labs are previews of tools that may be released more broadly in the future, but will likely undergo changes before then. We encourage you to take that into consideration as you explore. Before getting started, please read more about Twitter Developer Labs.

Quick start

We prepared quick start apps for the most popular languages. These apps connect to the stream and save the real-time Tweets into a CSV file (one Tweet per line). The templates also reconnect to the stream in case of a disconnection with an exponential backoff approach to optimize for rate limits. Should a disconnection happen, the app will create a different file, so the output from the stream can spread across multiple files.
 

Authentication

This endpoint is authenticated using OAuth 2.0 Bearer token (also known as app-only authentication). Note that the apps will automatically request a Bearer Token, so you don’t have to generate one.

Other clients like curl and Insomnia will require you to generate a token manually.
 

Prerequisites

To run the quick start apps, you will need to add your consumer key and secret to the code. To add your consumer key and secret, please follow these steps:

  1. Navigate to your app dashboard.
  2. Select the app you've enabled with the COVID-19 Stream preview, then click Details.
  3. Select the Keys and tokens tab.
  4. In the Consumer API keys section, copy the values for API key into consumer_key and API secret key into consumer_secret.
     

Important: Never check consumer keys and secrets into source control. Learn how to secure your credentials.

  • Python 3
  • JavaScript (Node.js)

import requests
import json
import time
from datetime import datetime
from threading import Thread

consumer_key = ""  # Add your API key here
consumer_secret = ""  # Add your API secret key here
records_per_file = 5000  # Replace this with the number of tweets you want to store per file
file_path = "/file/path/"  # Replace with appropriate file path followed by / where you want to store the file

count = 0
file_object = None
file_name = None


def get_bearer_token(key, secret):
    response = requests.post(
        "https://api.twitter.com/oauth2/token",
        auth=(key, secret),
        data={'grant_type': 'client_credentials'},
        headers={"User-Agent": "TwitterDevCovid19StreamQuickStartPython"})

    if response.status_code is not 200:
        raise Exception(f"Cannot get a Bearer token (HTTP %d): %s" % (response.status_code, response.text))

    body = response.json()
    return body['access_token']


# Helper method that saves the tweets to a file at the specified path
def save_data(item):
    global file_object, count, file_name
    if file_object is None:
        file_name = int(datetime.utcnow().timestamp() * 1e3)
        count += 1
        file_object = open(f'{file_path}covid19-{file_name}.csv', 'a')
        file_object.write("{}\n".format(item))
        return
    if count == records_per_file:
        file_object.close()
        count = 1
        file_name = int(datetime.utcnow().timestamp() * 1e3)
        file_object = open(f'{file_path}covid19-{file_name}.csv', 'a')
        file_object.write("{}\n".format(item))
    else:
        count += 1
        file_object.write("{}\n".format(item))


def stream_connect(partition):
    response = requests.get("https://api.twitter.com/labs/1/tweets/stream/covid19?partition={}".format(partition),
                            headers={"User-Agent": "TwitterDevCovid19StreamQuickStartPython",
                                     "Authorization": "Bearer {}".format(
                                         get_bearer_token(consumer_key, consumer_secret))},
                            stream=True)
    for response_line in response.iter_lines():
        if response_line:
            save_data(json.loads(response_line))


def main():
    timeout = 0
    while True:
        for partition in range(1, 5):
            Thread(target=stream_connect, args=(partition,)).start()
        time.sleep(2 ** timeout * 1000)
        timeout += 1


if __name__ == "__main__":
    main()

To disconnect from the stream, simply press Ctrl + C to terminate.

By default, the app creates a new file every 5000 Tweets received by the stream. You can change this by specifying a different number in the records_per_file variable.

// when runnning for the first time, install the request module:
// npm install request

const request = require('request');
const fs = require('fs');
const { EventEmitter } = require('events');
const { URL } = require('url');
const { promisify } = require('util');

const post = promisify(request.post);

const consumer_key = ''; // Add your API key here
const consumer_secret = ''; // Add your API secret key here
const recordsPerFile = 5000; // Replace this with the number of tweets you want to store per file
const bearerTokenURL = new URL('https://api.twitter.com/oauth2/token');

// FIXME Replace the path below (in backticks) where you want to store the file
const getFilename = (name) => `${name}-${Math.round(Date.now() / 1000)}.csv`;

const bearerToken = async ({consumer_key, consumer_secret}) => {
  const requestConfig = {
    url: bearerTokenURL,
    auth: {
      user: consumer_key,
      pass: consumer_secret,
    },
    form: {
      grant_type: 'client_credentials',
    },
  };

  const response = await post(requestConfig);
  const body = JSON.parse(response.body);
  if (response.statusCode < 200 || response.statusCode > 299) {
    console.log(response.statusCode);
    throw new Error(body.errors[0].message);
  } else {
    return body.access_token;
  }
}

class TwitterStream extends EventEmitter {
  constructor(url, token) {
    super();
    this.url = url instanceof URL ? url : new URL(url);
    this.stream = null;
    this.requestConfig = {
      url: url,
      auth: {
        bearer: token,
      },
      headers: {
        'User-Agent': 'TwitterDevCovid19StreamQuickStartJS',
      },
      timeout: 20000,
    };
  }

  start() {
    try {
      this.stream = request.get(this.requestConfig);
  
      this.stream
        .on('data', data => this.emit('data', data))
        .on('error', error => {
          if (error.code === 'ETIMEDOUT') {
            this.emit('disconnect', error);
          }
        });
    } catch (e) {
      this.emit('disconnect', e);
    }
  }

  emitFn() {
    const disconnect = Math.random();
    const pDisconnect = 0.02;
    if (disconnect <= pDisconnect) {
      this.emit('disconnect', this.url);
      this.stop();
      return;
    }

    const dataFn = this.url.pathname.split('/').pop();
    const data = this.events[dataFn]();
    this.emit('data', data);
  }

  stop() {
    this.stream.abort();
  }
}

(async () => {
  let token;
  try {
    token = await bearerToken({consumer_key, consumer_secret});
  } catch (e) {
    console.error(e);
    process.exit(-1);
  }

  const tweetStreams = [...Array(4).keys()].map(i => `https://api.twitter.com/labs/1/tweets/stream/covid19?partition=${i + 1}`);
  const complianceStreams = [...Array(8).keys()].map(i => `https://api.twitter.com/labs/1/tweets/stream/compliance?partition=${i + 1}`);
  const streams = tweetStreams.concat(complianceStreams);
  let timeout = 0;

  const fileStreams = {
    covid19: {stream: fs.createWriteStream(getFilename('covid19'), { flags: 'a+' }), currentRecords: 0},
    compliance: {stream: fs.createWriteStream(getFilename('compliance'), { flags: 'a+' }), currentRecords: 0},
  };
  process.on('SIGINT', () => {
    for (const name in fileStreams) {
      fileStreams[name].stream.end();
    }
    process.exit(-1);
  });

  streams.forEach((url) => {
    const stream = new TwitterStream(url, token);
    stream.on('data', (data) => {
      const name = stream.url.pathname.split('/').pop();
      fileStreams[name].stream.write(data);
      fileStreams[name].currentRecords++;
      if (fileStreams[name].currentRecords === recordsPerFile) {
        fileStreams[name].currentRecords = 0;
        fileStreams[name].stream.end();
        fileStreams[name].stream = fs.createWriteStream(getFilename(name), { flags: 'a+' });
      }
    });

    stream.on('disconnect', () => {
      setTimeout(() => stream.start(), 2 ** ++timeout * 1000);
    });  

    stream.start();
  });

})();

To disconnect from the stream, simply press Ctrl + C to terminate.

By default, the app creates a new file every 5000 Tweets received by the stream. You can change this by specifying a different number in the recordsPerFile variable.

Filtering Tweets by language

You might want to only consume Tweets in a particular language. In order to do so, at the time of consuming the stream data you can filter using the ‘lang’ field in the returned JSON. A python snippet for doing so is shown below:

  for response_line in response.iter_lines():
   if response_line:
       data = json.loads(response_line)
       If data['lang'] == 'en' :
           print(data)

In the example above, we are filtering for the english language. To filter Tweets for a different language, use the corresponding BCP 47 identifier for that language.

Filtering for geo-tagged Tweets

You may want to filter for Tweets from a certain geo-location. There are multiple ways to do this for Tweets by Tweet-specific location. Some examples are shown below

a. Using geo coordinates for exact location

When a Tweet has a specific latitude/longitude “Point” coordinate associated with it, the json returns that value as the ‘geo’ field. (This value is null when there is no geo value associated with a Tweet). This geo field consists of a type “Point” and the corresponding coordinates. The snippet below shows how to filter on only those tweets that contain geo information

  for response_line in response.iter_lines():
   if response_line:
       data = json.loads(response_line)
       If ’geo’ in data:
           print(data)

b. Using place object

Tweets may also contain a Twitter “Place” object, that contains additional information like country code, country, name etc. The snippet below shows how to filter on only those tweets that contain geo information and limit those to the country United States.

  for response_line in response.iter_lines():
   if response_line:
       data = json.loads(response_line)
       If 'place' in data and data['place']['country_code'] == 'US':
           print(data)


For additional information on filtering for data by geo-location, check out this tutorial.

Was this document helpful?

Thank you for the feedback. We’re really glad we could help!

Thank you for the feedback. How could we improve this document?

Thank you for the feedback. Your comments will help us improve our documents in the future.