ターミナルにTwitterのUser Streamを流す方法

Brogramming with Tom
ターミナル上には流れる情報があってもいいんじゃないかということで、Twitterのタイムラインをリアルタイムで流してみました。

photo credit: ryanoshea via photopin cc

tweetpy



まず、 TwitterのConsumer keyConsumer secretが必要になります。この値は、後にプログラムに書き込みます。これは、 こちらのページよりアプリを登録し取得してください。





次に、 tweetpyをインストールします。


sudo easy_install tweepy



次に、Twitterの Access token及び Access token secretを取得します。この値は、後にプログラムに書き込みます。なお、公式で取得した場合は、この手順は不要です。


cd

mkdir tweetpy

cd tweetpy

git clone git://gitorious.org/tweepy/examples.git

python examples/oauth/getaccesstoken.py



python examples/oauth/getaccesstoken.py
Consumer key: <<ここにconsumer_keyを入力>>
Consumer secret: <<ここにconsumer_secretを入力>>
Verification pin number from twitter.com: <<ここにPINコードを入力>>
Access token:
Key: <<ここにaccess_key出る>>
Secret: <<ここにaccess_secretが出る>>


tweet


vim tweet


#!/usr/bin/python
# -*- coding: utf-8 -*-
# http://www.nari64.com/?p=200
  
import sys
import tweepy
from tweepy import  Stream, TweepError
import logging
import urllib
 
CONSUMER_KEY=''
CONSUMER_SECRET=''
ACCESS_TOKEN=''
ACCESS_TOKEN_SECRET=''
 
auth = tweepy.OAuthHandler(CONSUMER_KEY, CONSUMER_SECRET)
auth.set_access_token(ACCESS_TOKEN, ACCESS_TOKEN_SECRET)
 
class CustomStreamListener(tweepy.StreamListener):
 
    def on_status(self, status):
 
        try:
            print "---%s---\n  %s\n" % (
                                      status.author.screen_name,
                                      status.text)
        except Exception, e:
            print >> sys.stderr, 'Encountered Exception:', e
            pass
 
    def on_error(self, status_code):
        print >> sys.stderr, 'Encountered error with status code:', status_code
        return True # Don't kill the stream
 
    def on_timeout(self):
        print >> sys.stderr, 'Timeout...'
        return True # Don't kill the stream
 
 
class UserStream(Stream):
  
    def user_stream(self, follow=None, track=None, async=False, locations=None):
        self.parameters = {"delimited": "length", }
        self.headers['Content-type'] = "application/x-www-form-urlencoded"
 
        if self.running:
            raise TweepError('Stream object already connected!')
 
        self.scheme = "https"
        self.host = 'userstream.twitter.com'
        self.url = '/2/user.json'
 
        if follow:
           self.parameters['follow'] = ','.join(map(str, follow))
        if track:
            self.parameters['track'] = ','.join(map(str, track))
        if locations and len(locations) > 0:
            assert len(locations) % 4 == 0
            self.parameters['locations'] = ','.join(['%.2f' % l for l in locations])
 
        self.body = urllib.urlencode(self.parameters)
        logging.debug("[ User Stream URL ]: %s://%s%s" % (self.scheme, self.host, self.url))
        logging.debug("[ Request Body ] :" + self.body)
        self._start(async)
  
def main():
    stream = UserStream(auth, CustomStreamListener())
    stream.timeout = None
    stream.user_stream()
 
if __name__ == "__main__":
  main()


そして、先ほど取得した値を、上のプログラムの特定の場所に書き込みます。


CONSUMER_KEY='idijieji9739fdaaifds'
CONSUMER_SECRET='IIfjdsi98839499'
ACCESS_TOKEN='IDKJJKBJJDE99484-839'
ACCESS_TOKEN_SECRET='Aifsikkekmvidjaeie849dfsua'



chmod +x tweet

./tweet






参考記事



twitterAPI用pythonライブラリtweepyを使えるようになるまで。 - 六番
twitterをターミナル上で楽しむ(python)
Twitter用Pythonライブラリのtweepyを試してみました @ ともの技術メモ