tweepyのstreaming機能を使った基本的な実装例の紹介

この記事では, tweepyを用いてtwitter streaming APIを利用したプログラムの実装例をいくつか紹介する. 
最終的な目標としては,特定の条件でfilterを掛けて取得したアカウント間のfollowする,されている関係を図示化することである.
一応,一個一個の実装例だけをみても理解できるように書いてある.

下準備

twitterのAPI用意する.
keyとtokenを持っていない人は,他のサイトを参照してtwitterのAPIを利用できるようにすること.

以下のmoduleをimportする.

import tweepy
import json
import inspect
import subprocess
import time

twitterのAPIの準備をする.

# 認証キーの設定
consumer_key =  "API key"
consumer_secret = "API secret key"
access_token =  "Access token"
access_token_secret = "Access token secret"

# OAuth認証
auth = tweepy.OAuthHandler(consumer_key, consumer_secret)
auth.set_access_token(access_token, access_token_secret)

# APIのインスタンスを生成
api = tweepy.API(auth)

もし,tweepyでtwitterのstreaming API以外のAPI機能の使い方について怪しかったならば,拙著のtwitterのデータ探索に関わるtweepyの基本機能の解説を読んでおくと良い.
また,jsonの基本的な操作については,拙著のpandasのデータフレームをjson形式で保存,読み取る方法を読んでおくといい.

tweepyでの基本的なstreaming APIの使い方

まずは,一番基本的なコードを確認する.
このコードは,リアルタイムでtweetされたテキストの中にpythonが入っていたらそのtweetをstatus object形式で取得,name, screen_name, textを表示するプログラムだ.コードを止めるにはjupyter notebook上ではinterruptを押せば良い.

#override tweepy.StreamListener to add logic to on_status
class MyStreamListener(tweepy.StreamListener):

    def on_status(self, status):
        print("name : %s, screen_name : %s" % (status.user.name,status.user.screen_name))
        print("text : %s " % status.text)
        print("-"*50)

myStreamListener = MyStreamListener()
myStream = tweepy.Stream(auth = api.auth, listener=myStreamListener)
# filter word 
myStream.filter(track=['python'])

tweepyのstreaming APIを使用する際には,classの書き換えが行われる.
正直,このmoduleを使ってstreaming APIを使うならば,元のソースコードを確認することをお勧めする.トラブルシューティングや,実装したい内容を書いていく上で大きな助けとなる.
ソースコードは以下のコードで確認可能だ.

# see the source code of tweepy.StreamListener
path = inspect.getfile(tweepy.StreamListener)
subprocess.check_call(["open",path ])
print(path)

classのStreamListenerのコードを見ると,様々なメソットが提供されているが,適宜自分で書き換えることを想定されていることがよくわかる.

実装例

ここでは,具体的なプログラムをいくつかみていくことでtweepyによるtwitterのデータ探索の方法を示していくこととする.

tweet・RT・replyを識別,tweet内容全表示

基本的な使い方の部分で,filterをかけたtweetの収拾が可能なことがわかった.ここでは,tweet・RT・replyを識別,tweet内容全表示するプログラムを紹介する.

twitterでは,ユーザーのタイムラインに上がるtweetには通常のもの,Retweet, replyの3種類が存在する.そこで,この3つを識別してjupyter notebook上にtweetを表示していく.ここで注意したいのは,filterによって得られるstatus objectの”.text” 内には全文が入っていない.そこで,識別した3つごとに対応を変えて全てのツイートで全文を表示することとする.

以下のコードは,以上述べたことを実現する.
コードのポイントとしては以下が挙げられる.
・retweetはstatusのメソッドに”retweeted_status”が登場する.”._json.keys()”を用いて存在を確認.
・replyの場合は,”in_reply_to~~~” にNone以外の値が入る.
・textで表示出来る以上の長さの場合,extended_tweetというフィールドが用意される.その中の”full_text”の中に全文が入っている.
・retweetの場合は,retweetした内容を全文表示するようにした.

class MyStreamListener(tweepy.StreamListener):
    def on_status(self, status):
        # check tweet is retweet or not.
        self.checkTweetType(status)
        self.printFullText(status)
        print("-"*50)

    def checkTweetType(self,status):
        if  "retweeted_status" in status._json.keys():
            type_ = "retweet"
        elif status.in_reply_to_user_id != None:
            type_ = "reply"
        else:
            type_ = "normal"
        self.type = type_
        
    def printFullText(self,status):
        
        if self.type == "retweet":
            s_ = status.retweeted_status
            if "extended_tweet" in s_._json.keys():
                text = s_.extended_tweet["full_text"]
            else:
                text = s_.text
        elif "extended_tweet" in status._json.keys():
            text = status.extended_tweet["full_text"]
        else:
            text = status.text 
        print("name : %s, screen_name : %s" %(status.user.name,status.user.screen_name))
        print("tweet type : %s" % self.type)
        print("text : n%s"  % text )

        

pythonでfilterを掛けて表示させるならば以下のコードを追加で回せばよし.

myStreamListener = MyStreamListener()
myStream = tweepy.Stream(auth = api.auth, listener=myStreamListener)
# filter word 
myStream.filter(track=['python'])

地理情報でfilterをかける

以下のように,”.filter”の”locations”に[経度1, 緯度1, 経度2, 緯度2] の形式のリストを渡すことで実装可能.注意事項として,”locations”と共に”filter”も設定すると,これはor選択となってしまう.つまり,一定の範囲内の地域でのtweet “or” 特定のワード,という形の抽出となってしまう.andで検索したい場合は関数の”on_status”の部分で該当のtweetだけを引き出すようにすればよい.

# Sapporo City's geocode.
loc = [141.239098,42.954423, 141.474093,43.16585]

myStreamListener = MyStreamListener()
myStream = tweepy.Stream(auth = api.auth, listener=myStreamListener)
myStream.filter(locations = loc)

myStreamListenerを以下のように書き換え,ユーザーの”location”登録情報まで出力させると,札幌市と登録されている人が多く,地理情報のfilteringは札幌市を狙って行ったので良さそうである.

class MyStreamListener(tweepy.StreamListener):
    def on_status(self, status):
        # check tweet is retweet or not.
        self.checkTweetType(status)
        self.printFullText(status)
        self.printLocation(status)
        print("-"*50)
        
    def printLocation(self,status):
        if "location" in status.user._json.keys():
            print("location : %s" % status.user.location)
        else:
            print("location : None")
        
    def checkTweetType(self,status):
        if  "retweeted_status" in status._json.keys():
            type_ = "retweet"
        elif status.in_reply_to_user_id != None:
            type_ = "reply"
        else:
            type_ = "normal"
        self.type = type_
        
    def printFullText(self,status):
        
        if self.type == "retweet":
            s_ = status.retweeted_status
            if "extended_tweet" in s_._json.keys():
                text = s_.extended_tweet["full_text"]
            else:
                text = s_.text
        elif "extended_tweet" in status._json.keys():
            text = status.extended_tweet["full_text"]
        else:
            text = status.text 
        print("name : %s, screen_name : %s" %(status.user.name,status.user.screen_name))
        print("tweet type : %s" % self.type)
        print("text : n%s"  % text )
        

一定の時間が経過したらstreamingを終了する

tweepyの処理では, whileを用いているため,on_statusでflaseを返り値として与えると処理が終了する.これを用いて一定の時間が経過したら処理を終了するプログラムが書ける.

以下のコードは,15秒経過したら”finished”をプリントして終了するプログラム.

class MyStreamListener(tweepy.StreamListener):
    # json file is saved as line by line
    def __init__(self):
        # limit running 
        self.stTime = time.time()
        self.limit = 15
        super(MyStreamListener, self).__init__()
        
        
        
    def on_status(self, status):
        # stop program after a certain time passed 
        if(time.time() - self.stTime) > self.limit:
            print("finished")
            return(False)
        # check tweet is retweet or not.
        print(status.text)
        print("-"*50)
  

また,先ほどの条件でfilterして試験的にコードを動かす.

loc = [141.239098,42.954423, 141.474093,43.16585]

myStreamListener = MyStreamListener()
myStream = tweepy.Stream(auth = api.auth, listener=myStreamListener)
myStream.filter(locations = loc) # is_async = True

もし,”.filter”に”is_async=True”を渡すとメインのthreadとは別のthreadで処理が行われる.大事.

取得したstatus objectからuser情報だけ抜き出して,jsonファイルとして保存

ここでは,filterを掛けて得たtweetからuserの情報だけを抜き出して,jsonファイルとして保存するプログラムを紹介する.
また,そのjsonファイルを開いてデータとして用いるコードも入れた.

ポイントは以下の通りである.
・__init__をいじっているので,引き継ぎがれるようにコードを追加した.
・retweetに関しては,retweet元のユーザーの情報を取得するようにした.
・user情報に取得した際のtweetの一部を加えるようにした.
・jsonには一行一データとして保存した.
・”.getJson”は保存したデータをjsonとして読み込む関数である.

コードは以下の通り.

class MyStreamListener(tweepy.StreamListener):
    def __init__(self):
        self.path = "getUserAccount.json"

        super(MyStreamListener, self).__init__()

    def on_status(self, status):
        self.file = open(self.path,"a")
        
        self.checkTweetType(status)
        self.collectAccount(status)
        
        self.file.close()
        print(status.text)
        print("-"*50)

        
    def checkTweetType(self,status):
        if  "retweeted_status" in status._json.keys():
            type_ = "retweet"
        elif status.in_reply_to_user_id != None:
            type_ = "reply"
        else:
            type_ = "normal"
        self.type = type_
        
    def collectAccount(self,status):
        if self.type == "retweet":
            user = status.retweeted_status._json["user"]
            user["text"] = status.retweeted_status.text

        elif self.type in ["normal","reply"]:
            user = status._json["user"]
            user["text"] = status.text
            
        json.dump(user,self.file)
        self.file.write("n")
        
    def getJson(self):
        with open(self.path,"r") as f:
            data_ = []
            for buff in f: 
                try:
                    data_.append(json.loads(buff))
                except Exception as e:
                    print(e)
                    print(buff)
        return(data_)

実行は,例として以下のコードとした.

loc = [141.239098,42.954423, 141.474093,43.16585]

myStreamListener = MyStreamListener()
myStream = tweepy.Stream(auth = api.auth, listener=myStreamListener)
myStream.filter(locations = loc)

保存したデータを用いるには以下のようにすれば良い.リストの各要素に一人一人のuser情報が含まれている.

data = myStream.listener.getJson()

参考文献

・Python tweepy.Stream() Examples, https://www.programcreek.com/python/example/86213/tweepy.Stream
・tweepy Streaming API : full text, https://stackoverflow.com/questions/48319243/tweepy-streaming-api-full-text
・How to add a location filter to tweepy module?, https://stackoverflow.com/questions/22889122/how-to-add-a-location-filter-to-tweepy-module
・Unable to stop Streaming in tweepy after one minute, https://stackoverflow.com/questions/33498975/unable-to-stop-streaming-in-tweepy-after-one-minute
・Python3 で JSON ファイル全体を読み込まずに JSON 要素を追加する,https://qiita.com/KEINOS/items/ea4bda15506bbd3e6913

コメント

  1. […] た実装例に興味のある人は以下の記事で書いているので,読んでみてください.・tweepyのstreaming機能を使った基本的な実装例の紹介 ・tweepyでtwitterのアカウント間の関係をグラフ化する  […]

  2. […] 前の記事の内容を使っているので,必要であれば,そちらの記事も確認すること.・twitterのデータ探索に関わるtweepyの基本機能の解説・tweepyのstreaming機能を使った基本的な実装例の紹介 […]

  3. tkmsikd より:

    tweepy.StreamListenerですが、以下のコミットでtweepy.Streamにマージされたようです。
    https://github.com/tweepy/tweepy/commit/39abff4520e291180425ac2219d1d8597ac5da96

タイトルとURLをコピーしました