tweepyに関わる記事3個目の内容です.この記事では,streaming APIやAPI.searchメソッドを使って手に入れたtwitterのアカウント情報を使って,アカウント間の関係性を図示することを行う.
前の記事の内容を使っているので,必要であれば,そちらの記事も確認すること.
・twitterのデータ探索に関わるtweepyの基本機能の解説
・tweepyのstreaming機能を使った基本的な実装例の紹介
下準備
以下のmoduleをimportしておく.
import tweepy import json from graphviz import Digraph import time
また,twitter APIの準備も行う.
# 認証キーの設定 consumer_key = "API key" consumer_secret = "API secret key" access_token = "Access token" access_token_secret = "Access token secret" # OAuth認証 auth = tweepy.OAuthHandler(consumer_key, consumer_secret) auth.set_access_token(access_token, access_token_secret) # APIのインスタンスを生成 api = tweepy.API(auth)
データの収集
前回の記事で紹介したstreaming APIの記事を使って,北海道大学とゆかりのあるtwitter accountを収集しよう.
class MyStreamListener(tweepy.StreamListener): def __init__(self,path_): self.path = path_ super(MyStreamListener, self).__init__() def on_status(self, status): self.file = open(self.path,"a") self.checkTweetType(status) self.collectAccount(status) self.file.close() print(status.text) print("-"*50) def checkTweetType(self,status): if "retweeted_status" in status._json.keys(): type_ = "retweet" elif status.in_reply_to_user_id != None: type_ = "reply" else: type_ = "normal" self.type = type_ def collectAccount(self,status): if self.type == "retweet": user = status.retweeted_status._json["user"] user["text"] = status.retweeted_status.text elif self.type in ["normal","reply"]: user = status._json["user"] user["text"] = status.text json.dump(user,self.file) self.file.write("n") def getJson(self): with open(self.path,"r") as f: data_ = [] for buff in f: try: data_.append(json.loads(buff)) except Exception as e: print(e) print(buff) return(data_)
filterは以下の条件で行った.
myStreamListener = MyStreamListener(path_="hokudai_account.json") myStream = tweepy.Stream(auth = api.auth, listener=myStreamListener) myStream.filter(track=["北大","北海道大学","hokudai","hokkaido university"])
これで,localに”hokudai_account.json”が出来るので,このデータに入っているアカウント間のfollowされている,している関係性を図示していく.
関係性のグラフ化
このプログラムのポイントは以下の通り.
・重複するアカウントは消去する.
・followの関係のために,API.friends_idsを用いた.(1)
・API.friends_idsの速度制限に掛かったら5分間待機する.
・グラフ化には,graphvizを用いた.
・取得出来なかったアカウントについては,青字で表示した.
(1)については,ネットワークが大きくなっていったときでも,API.friends_idsならば1 requestで5000件取得出来るため,大体 nのオーダーで処理が完了できるからだ.
また,アカウントによっては,こういった情報の取得を許可していないものもあるので注意すること.
コードは以下の通り.
class twitterNetwork(): def __init__(self,path4Account): self.path4Account = path4Account self.path4Save = self.path4Account[:-5] + "_network.gv" data = self.getJson(path4Account) self.data = self.delDuplicates(data) self.sl = 60*5 # how many data are actually used print("# of data : %d " % len(data)) print("# of unique data : %d " % len(self.data)) def getJson(self,path_): with open(path_,"r") as f: data_ = [] for buff in f: try: data_.append(json.loads(buff)) except Exception as e: print(e) print(buff) return(data_) def delDuplicates(self,data_): # delete duplicated data dataCle = data_.copy() n = len(data_) delInd = [False for i in range(n)] for i in range(n): for j in range(i+1,n): if dataCle[i]["id"] == dataCle[j]["id"] : delInd[j] = True for j in range(n-1,-1,-1): if delInd[j]: dataCle.pop(j) return(dataCle) def networkOneArrow(self,ids_,source_id): # create network rest_ids = ids_.copy() rest_ids.remove(source_id) # keep rate limits, if remaining is few, sleep for a moment rem = api.rate_limit_status()["resources"]["friends"]["/friends/ids"]["remaining"] while rem < 1: print("-"*50) print("friedns/ids/remaining : %dnsleep for %d sec" %(rem,self.sl)) print("-"*50) time.sleep(self.sl) rem = api.rate_limit_status()["resources"]["friends"]["/friends/ids"]["remaining"] try: self.network[source_id] = [] for page in tweepy.Cursor(api.friends_ids,user_id=source_id).pages(): for i,id_ in enumerate(page): # check id_ is in nodes (focusing twitter ids) if id_ in rest_ids: self.network[source_id].append(id_) return(True) except Exception as e: print(e) print(source_id) del self.network[source_id] return(False) def createNetwork(self): self.network = {} n = len(self.data) ids = [] for u in self.data: ids.append(u["id"]) u["friendsSearch"] = False for i, u in enumerate(self.data): source_id = u["id"] flag = self.networkOneArrow(ids,source_id) print(" %d / %d "%(i + 1 ,n)) u["friendsSearch"] = flag print("finish! ") def checkRateLimit(self): print(json.dumps(api.rate_limit_status()["resources"]["followers"],indent=4)) print(json.dumps(api.rate_limit_status()["resources"]["friends"],indent=4)) def createGraph(self): dot = Digraph( comment = "Hokudai_Account",name="hello world") for u in self.data: id_ = u["id"] if u["friendsSearch"]: dot.node(str(id_),label=u["screen_name"] ) else: dot.node(str(id_),label=u["screen_name"] ,fontcolor="blue") # make edge if id_ not in self.network.keys(): continue if len(self.network[id_]) > 0: for v in self.network[id_]: dot.edge(str(id_), str(v)) dot.render(self.path4Save,view=True) def saveNetwork(self): path4Network = self.path4Account[:-5] + "_network.json" with open(path4Network,"w") as f: json.dump(self.network,f)
以下のコードを回すと,グラフが生成される.
twNet = twitterNetwork(path4Account="hokudai_account.json") twNet.createNetwork() twNet.checkRateLimit() twNet.createGraph() twNet.saveNetwork()
今回,得られたグラフはこちら!
(アカウント名勝手に載せてごめんなさい.)
*グラフを面白くするために,mkuriki_さんがfollowしている人を引っ張り出しています.普通は,みんな関係性ありません.
———-雑感(`・ω・´)———-
あとは,このプログラムをサーバー上で回してデータ収集を継続的に行えるようにすれば,まともなtwitterのデータ解析が可能になりそう.
コメント
[…] tweepyはStreaming機能を提供している.検索ワードに対して,リアルタイムの投稿を取得する方法だ.Streaming APIを使った実装例に興味のある人は以下の記事で書いているので,読んでみてください.・tweepyのstreaming機能を使った基本的な実装例の紹介 ・tweepyでtwitterのアカウント間の関係をグラフ化する […]