チュートリアル

ストリーミングデータの使用

 
エンタープライズ

概要

PowerTrack、Volumeストリーム(例:Decahose、Firehose)、Replayストリームでは、HTTPストリーミングプロトコルを利用してオープンなストリーミングAPI接続を介してデータを配信します。REST APIのように、クライアントアプリから繰り返されるリクエストを通じてデータが一括配信されるのではなく、アプリとAPIの間で接続を1つ開いて、新たな一致が発生するたびにその接続を通じて新しい結果が送信されます。これにより、非常に高いスループットに対応できるレイテンシの低い配信メカニズムが実現します。

以下の図は、こうしたストリームにおけるアプリとTwitter APIの間のやりとり全体を示したものです。

手順1:ストリームを構成する
DecahoseまたはFirehoseを使用している場合は、ストリームを構成する必要はありません。

PowerTrackまたはReplayストリームを使用する場合は、ストリームを通じて配信するコンテンツを定義するルールをいくつか設定する必要があります。PowerTrackルールの構文の詳細については、プレミアム版の演算子に関するTwitterのドキュメントを参照してください。

ストリームを構成したら接続の準備は完了です。

手順2:APIに接続する
データストリームを開いてツイートを配信するには、APIに接続リクエストを送信する必要があります。ストリーミングモデルでは、リアルタイムで配信されるデータ用のパイプラインがこの接続によって開かれ、永続的に存在します。接続の確立に関する詳細は、個別のAPIのドキュメントを参照してください。

手順3:配信されたデータをリアルタイムで利用する
接続が確立されると、開いている接続を通じてストリームから新しいツイートがリアルタイムで送信され、クライアントアプリでは受信したデータをリアルタイムで読み取る必要があります。クライアントアプリでは、さまざまな種類のメッセージを認識して処理する必要があります。詳細についてはこのドキュメント内で後ほど説明します。

手順4:接続が切断された際にAPIに再接続する
ある時点でストリームへの接続が閉じてしまう、つまり切断されることがありますが、これを回避することはできません。リアルタイムストリームでは、ストリームから切断されたときにデータが失われる可能性があると理解することが重要です。接続が切断されたら、新しい接続を確立して、クライアントアプリでプロセスを再開する必要があります。
さらに、データが失われないようにするために、冗長接続、バックフィル、またはReplayストリームを利用して、ストリームからの切断によるデータ損失の軽減、またはデータの復元を行う必要があります。

 

ストリームの使用

ストリームへの接続を確立したら、データのストリームの受信を開始します。応答の本文は、複数のキャリッジリターン( )で区切られた、JSONでエンコーディングされたアクティビティ、システムメッセージ、空白行で構成されます。

クライアントでは、ストリームから読み込まれるアクティビティを、文字を使って切り離す必要があります。データ処理を開始するために、「ドキュメント」の末尾までアプリが待機する必要はありません。実際、ドキュメントに終わりはなく、クライアントでは届いたデータをリアルタイムで読み取る必要があります。

JSONデータ
このAPIによってストリームされる各データはJSONでエンコーディングされ、以下のいずれかのタイプになります。

  • ツイート:個別のツイートJSONオブジェクト
  • キープアライブシグナル:接続がタイムアウトするのを防ぐキャリッジリターン
  • システムメッセージ:強制切断の通知など。実際の切断はメッセージ自体ではなく、通常のHTTPプロトコルによって実行されます。一部のケースでは切断システムメッセージが届かない場合があるため、キープアライブシグナルをモニタリングすることが重要になります(詳細については以降を参照)。

JSONオブジェクトの各フィールドの順序は決まっておらず、一部のフィールドが表示されない場合があります。また、各アクティビティは順不同で配信され、メッセージが重複する場合もあります。時間が経つにつれ、新しいメッセージタイプが追加されて、ストリームを通じて送信される場合があります。

そのため、クライアントでは以下に対応できるようにしておく必要があります。

  • 表示する順序の決まっていないフィールド
  • 予期しないフィールドや不足しているフィールド
  • 順不同のツイート
  • 重複メッセージ
  • タイミングを問わずストリームに送られてくる、新しい任意のメッセージタイプ

ツイート
指定されたデータソースへのツイートを表すJSONオブジェクトは、Twitterで情報を付加されたネイティブ形式で提供されます。このデータ形式の詳細は、ツイートJSONの概要に関するドキュメントを参照してください。

ツイートは1行に収められます。ラインフィード文字が含まれる場合がありますが、キャリッジリターンは含まれません。

システムメッセージ
ツイートストリームにはシステムメッセージが含まれている場合もあります。以下はこれらのメッセージの基本形式といくつかの例を示したものです。新しいメッセージが導入されたことで、配信済みのメッセージが変更される場合があるためご注意ください。クライアントアプリケーションでは、変化するシステムメッセージペイロードに対応できるようにしておく必要があります。

「送信」タイムスタンプの形式は「YYYY-MM-DDTHH:mm:ssZZ」でタイムゾーンはUTCです。

メッセージ形式:

{"<message type>":{"message":"<the message>","sent":"<date time sent>"}}

システムメッセージのタイプ:現在、3つのメッセージタイプがあり、メッセージ数はそれぞれ異なります。ただし、これは予告なしに変更されることがあります。

  • 情報 - Replayストリームリクエストの完了時に「Replay Request Complete(Replayリクエストの完了)」が送信されます。以下を参照してください。
  • 警告 - ゼロルールがあるPowerTrackストリームに接続している場合などが当てはまります。
  • エラー - 特定の問題が発生した時に送信されることがあります(例:「Error while streaming data(データストリーミング中のエラー)」や強制切断が発生した場合)。

強制切断の原因となったバックアップによって送信が妨げられた場合、フルバッファーによる強制切断を示すエラーメッセージがクライアントに送信されない場合があります。そのため、再接続の開始について、アプリがこれらのメッセージに依存しないようにしてください。

例:

{"error":{"message":"Forced Disconnect:Too many connections.(Allowed Connections = 2)","sent":"2017-01-11T18:12:52+00:00"}}

{"error":{"message":"Invalid date format for query parameter 'fromDate'.Expected format is 'yyyyMMddHHmm'.For example, '201701012315' for January 1st, 11:15 pm 2017 UTC. ","sent":"2017-01-11T17:04:13+00:00"}}

{"error":{"message":"Force closing connection to because it reached the maximum allowed backup (buffer size is ).","sent":"2017-01-11T17:04:13+00:00"}}

キープアライブシグナル
少なくとも10秒ごとに、ストリームによって開いている接続を通じて、キャリッジリターン形式のキープアライブシグナル(ハートビート)が送信されることで、クライアントがタイムアウトしないようにします。クライアントアプリケーションでは、ストリームの文字に対応できるようにしておく必要があります。

こちらで説明されているように、クライアントのHTTPライブラリで読み取りタイムアウトが適切に実装されていれば、この期間内に読み取られたデータがない場合でも、アプリでHTTPプロトコルとHTTPライブラリを使用してイベントをスローできるため、文字を明示的にモニタリングする必要がありません。

このイベントは通常、例外としてスローされます。あるいは、使用するHTTPライブラリによっては別のイベントになります。こうしたタイムアウトを検出するために、エラー/イベントハンドラーでHTTPメソッドをラップすることが強く推奨されています。タイムアウトが発生したら、アプリケーションで再接続を行う必要があります。

Gzip圧縮
ストリームはGzip形式で圧縮されて配信されます。データを読み取る際には、クライアントで圧縮を解除する必要があります。ボリュームの少ないストリームを利用する場合、一部のライブラリ(JavaのGZIPInputStreamや多くのRubyストリームコンシューマー)では受信データの圧縮解除を適切に処理できないため、設定したしきい値のデータを受信する前に、受信したツイートをリアルタイムで圧縮解除できるようにライブラリをオーバーライドする必要があります。この例については、こちらのJavaコードを参照してください。

チャンク形式エンコーディング
応答にTransfer-Encoding:チャンクHTTPヘッダーがあることからもわかるとおり、ストリーミングAPI接続はチャンク形式転送エンコーディングを使ってエンコードされます。大半のHTTPライブラリではチャンク形式転送エンコーディングが透過的に処理されるため、このドキュメントでは再構成されたHTTPストリームにコードがアクセスでき、このエンコードを処理する必要がないことを前提にしています。

この前提と異なる場合は、ツイートやその他のストリーム済みメッセージがHTTPチャンク境界内に収まらない場合があるため、さきほど定義した区切り記号を使用してストリーム再構成時のアクティビティ境界を決定してください。

 

バックアップのバッファリング

ソースのFirehoseからデータを受信すると直ちにストリームを通じてデータが送信されるため、多くの場合ボリュームの増加につながります。Twitterのシステムがストリームに新しいデータを即座に書き込めない場合(クライアントの読み取り速度が遅かったり、ネットワークのボトルネックがあるなど)、システム側でコンテンツをバッファーしてクライアントの処理が追いつけるようにします。ただし、このバッファーがいっぱいになった場合、接続を解除するために強制切断が実行され、バッファー済みのツイートが失われて再送信されません。詳細については以下を参照してください。

アプリの処理が遅れている時間を特定する方法の1つとして、現在受信しているツイートのタイムスタンプを比較し、時間を追って追跡する方法があります。

潜在的なレイテンシとパブリックインターネット上の問題により、ストリームのバックアップを完全になくすことはできませんが、アプリを適切に構成することでその大半をなくせます。バックアップの発生を最小限に抑えるには以下を行います。

  • クライアントがストリームを十分な速さで読み取れるようにする。通常、ストリームを読み取る際に実際の処理を行う必要はありません。ストリームを読み取ったら、アクティビティを別のスレッド/プロセス/データストアに渡して非同期的に処理が行われます。
  • 大量のデータボリュームを保持しつつ、ボリュームの大幅な急増(通常時の3~4倍など)に対応できるように、十分な着信帯域幅をデータセンターに用意しておいてください。PowerTrackなど、フィルタリングされたストリームの場合、ボリュームとそれに対応するためにお客様側で必要な帯域幅は、トラッキング対象とフィルターに一致するツイートの数によって異なります。

アプリを使用してストリームに接続し、同時にcURLを使用して冗長接続経由でも接続することで、簡単にアプリのスピードをテストできます。cURL接続を利用することで、チェーンから取得したアプリコードで発生するパフォーマンスの種類を把握し、デバッグ時に変数をなくすことができます。

ボリュームのモニタリング
予期しない事態に備えて、ストリームデータのボリュームをモニタリングすることを検討してください。データボリュームの減少は、ストリームの切断以外の問題が発生している兆候を示している場合があります。このような状況でも、ストリームは引き続きキープアライブシグナルを受信し、場合によっては一部の新しいアクティビティデータも受信します。ただし、ツイートの数が激減した場合は、アプリケーションやネットワークへの受信データボリュームの減少につながる原因がないかを調査し、関連するアナウンスがないかapi.twitterstat.usを確認する必要があります。

このようなモニタリングを行うには、一定期間内に予測される新しいツイートの数を追跡します。ストリームのデータボリュームが指定したしきい値を大きく下回り、指定した時間内に戻らない場合、アラートと通知が送られるようにします。特に、PowerTrackストリームのルールの修正中や、ツイートアクティビティの急増を引き起こすイベントの発生時には、データボリュームの大幅な増加をモニタリングすることも検討してください。

ストリームの「通常の」データボリュームに関する期待値はお客様ごとに大きく異なるため、減少率/増加率や期間に関する一般的な推奨事項は設けていません。このようなタイプのモニタリングにはお客様ごとに独自の指標を設けることをおすすめします。

さらに、PowerTrackストリームの場合は、アプリが取り込むデータボリュームを注意深く監視して細かく管理できるように、個別のルールを使用してリアルタイム、またはほぼリアルタイムでボリュームの使用をモニタリングする必要があります。また、事前に設定したしきい値をボリュームが超えた場合にチームにアラート送信する対策をアプリに実装することもおすすめしています。大量のデータを受信するルールの自動削除や、異常時にストリームから完全に切断するなどの対策の導入も検討してください。こうした対策を行うことで、多額のデータ使用料金の発生につながりかねない、予想外のシナリオを防ぐことができます。

 

接続の切断

Twitterでは、ストリームの切断をクライアント側の切断と強制切断の2つのカテゴリに分類しています。

クライアント側の切断
クライアント側の切断とは、コードによって意図的に接続が閉じられたか、ネットワークの設定または条件によって接続が終了したことにより、アプリケーション自身がデータストリームへの接続を切断した場合を指します。

この動作を引き起こす可能性のある、クライアントコードのよくある問題は以下のとおりです。

  • 永続的に開いたままにすることを許可せずに、接続を閉じる
  • 接続を通じてデータ(アクティビティ、システムメッセージ、またはキープアライブシグナル)が送信され続けているにもかかわらず、接続をタイムアウトする
  • チャンクサイズの予測を間違ったことによる、圧縮ストリームの圧縮解除
  • お使いのインフラストラクチャのネットワーク制限やファイアウォール制限(ファイアウォールセッションの制限など)によって、データストリームへの接続が閉じられる場合があります。断続的に既存の接続を切断する可能性があるコードがないか確認するようにしてください。
  • 繰り返しになりますが、cURLを使用して切断動作を比較するのが効果的です。アプリでクライアント側の切断が発生したにもかかわらず、cURL接続が維持されている場合は、コードを確認する必要があります。cURLとアプリの切断がほぼ同時に発生した場合、ネットワークによって接続がタイムアウトされた可能性があります。 

強制切断
強制切断とは、Twitterのシステムによって意図的にストリームからアプリを切断することを指します。強制切断には次の3つの種類があります。

  • フルバッファー – アプリによるデータの読み取り速度が十分でないか、ネットワークのボトルネックによってデータフローが遅くなっている。
  • 接続過多 – データストリームに対してアプリが大量の同時接続を確立した。これが発生すると、Twitterでは1分待機し、それでも制限を超過している場合は、最後に確立された接続を切断します。
  • サーバーメンテナンス – Twitterチームがシステムサーバーに変更や更新を行った。この場合、「運用上の理由によりTwitterは停止しています」という旨のメッセージが表示されます。こうした作業は約2週間ごとに実施され、こちらのステータスページでアナウンスされます。 

強制切断では、HTTPチャンク形式エンコーディングの標準的なプラクティスに従ってゼロバイトチャンクを送信することで接続を閉じます。詳細については「http://www.httpwatch.com/httpgallery/chunked」のセクション9を参照してください。フルバッファーの場合は、Twitterアプリからゼロバイトチャンクが送信されますが、インターネットやお使いのアプリ内でデータバックアップが発生していると、アプリでゼロバイトチャンクを受信できない場合があります。

この状況に対応するために、30秒を超えてデータ(ツイートアクティビティ、またはTwitterのキープアライブシグナル)をまったく受信しない場合にタイムアウト/再接続を行うように、アプリを設定してください。Twitterのサーバー側で実行されない「クライアント側の切断」とは異なり、「強制切断」はコンソール/ダッシュボードに登録されます。

フルバッファーを原因とする強制切断を最小限に抑える方法については、さきほどの「バックアップのバッファリング」を参照してください。

 

再接続

確立した接続が解除されたら、すぐに再接続するようにしてください。最初の再接続の試行に失敗した場合、再接続に成功するまで、エクスポネンシャルバックオフパターンを使用してクライアントで再接続の試行を続ける必要があります。

クライアントがどのように切断されても、即座に再接続するようにアプリを構成する必要があります。最初の再接続に失敗した場合、適切な上限を設定したうえで、以降の再接続試行にはエクスポネンシャルバックオフパターン(例:1秒待機した後は2秒待機、その後は、4秒、8秒、16秒待機)をアプリに実装することをおすすめします。この上限に達した場合に、クライアントからチームに通知するよう構成することで、詳しい調査を進めることができます。

アプリで次のすべてのシナリオに対処できるようにしてください。

  • お客様側で何らかの理由により接続が閉じられる(クライアント側の切断)
  • ゼロバイトチャンクを使って、Twitterから接続が閉じられる(強制切断)
  • 接続を通じてアプリが30秒以上データを受信しなかった(新しいツイートがなく、かつキープアライブのキャリッジリターンシグナルもなかった)。この状況では、接続がタイムアウトするようにアプリが設定されている必要があります。デフォルトでは、前述のシグナルがない場合に接続を閉じるようコードで指定しない限り、一部のクライアントの永続接続は新しいデータがなくても開いたままにできます。前述の3番目のシナリオとは異なり、新しいデータはないが、キープアライブシグナルを引き続き受信している場合には、アプリは接続されたままになります。Twitter側でデータの送信を妨げる問題が発生しているか、または単にフィルターに一致する新しいアクティビティがない場合が考えられます。少なくとも、キープアライブシグナルを受信できている限りは接続を維持することで、Twitterが受信した、フィルターに一致する新たなツイートを受信できます。通常は、クライアントで読み取りタイムアウトを30秒以上に設定することでこれを実現します。

切断によるデータ損失
ストリームから切断されると、そのタイミングで送信されるはずだったデータが失われる可能性があります。ただし、Twitterではこうした切断の影響を軽減し、切断時のデータを復元するための手段を複数用意しています。詳細については以下を参照してください。

  • 冗長接続 - 複数のサーバーからのストリームを使用して、いずれかが切断された場合のデータ損失を防ぎます。
  • バックフィル - 5分以内に再接続して失われたデータのバックフィルをリクエストします。
  • Replay - 別のストリームを使用して、過去5日間のデータを復元します。
 

Replayストリーム固有の考慮事項

Replayを使用することで、企業のお客様は技術的な問題によって失われたアクティビティを履歴データのローリングウィンドウから復元できます。また、データ配信にはストリーミングAPIを使用します。Replay APIの詳細なドキュメントについてはこちらを参照してください。

ただし、Replayストリームからのデータを使用する場合は、クライアントで次の要件と独自の考慮事項を確認してください。

再接続
Replayリクエスト中にストリームから切断された場合、リクエスト開始時に使用したのと同じURLにアプリで再接続する際は、fromDateを変更しないまま再接続しないようにしてください。これを行ってしまうと、リクエストが最初から再開され、切断前の処理が繰り返されることになります。リクエスト完了前に切断された場合は、クライアントでURLのfromDateとtoDateを調整し、切断前に収集した最後のアクティビティの時点から開始して、切断されたのと同じところから処理を行うことで、重複の量を最小限に抑える必要があります。

重複
Replayリクエストの結果配信時に、以前のリクエストや通常のPowerTrackストリームから取得済みのアクティビティをTwitter側で除外することはないため、アプリで適切に重複を除去する必要があります。ただし、請求の都合上、同日(UTC時間)に発生したPowerTrack Replayリクエスト全体でアクティビティ数の重複を除去することで、Replay経由で配信されるアクティビティ1つにつき1回だけ請求されるようにしています。リアルタイムのPowerTrackストリーム経由で配信されたアクティビティは、Replay経由で配信されたアクティビティとは別にカウントされます。

 

復元

これまでに説明したとおり、PowerTrackストリームとVolumeストリームにはリアルタイムストリームをサポートする重要な機能が用意されています。その他のストリーム、冗長接続、Replayストリーム、バックフィルに関するドキュメントはこちらから参照してください。

ソリューション作成の準備が整った方は

ドキュメントを確認して利用を開始しましょう