2015-06-21 34 views
0

我设置了一个系统来过滤twitter实时流样本。显然,数据库写入速度太慢,无法跟上比一些低容量关键字更复杂的任何事情。我实现了django-rq作为一个简单的排队系统,以在推送过程中将推文推送到基于redis的队列中,这很有效。我的问题在另一方面。这个问题的背景是我现在有一个系统正在运行,有150万推文用于分析,另有375,000个通过redis排队。按照目前的表现速度,如果我关掉那些我不想要的流,我需要花费3天才能赶上。如果我维持这些流,那么根据我最近的估计,这需要大约一个月的时间。对大型(ish)数据集进行缓慢的django数据库操作。

数据库现在跨两个主表有数百万行,写入速度非常慢。 rq工作者的最佳数量似乎是4,并且平均每秒排列1.6个任务。 (下面列入的是什么代码)。我认为可能问题在于为每个新的队列任务打开数据库连接,因此将CONN_MAX_AGE设置为60,但这并没有改进任何东西。

刚刚在localhost上测试了这个,我在Macbook 2011上以超过13次写入/秒的速度运行了Chrome等,但是该数据库中只有几千行,这让我相信它与尺寸有关。有几个我正在使用的get_or_create命令(见下文),这可能会减慢速度,但不能通过使用它们看到任何其他方式 - 我需要检查用户是否存在,并且我需要检查是否推文已经存在(我可能,我怀疑,移动后者尝试/除非,基于从直播流进来的推文不应该已经存在,原因很明显)。我会获得很多性能增益那个?由于这仍在运行,我很想优化代码,并在那里获得一些更快/更高效的工作,这样我就可以赶上!将运行一个预审员工批量工作? (即我可以批量创建不存在的用户,或类似的东西?)

我在数字海洋上运行4核/ 8Gb Ram液滴,所以感觉这是一些相当糟糕的性能,并且大概是代码我在这里错了什么地方?
(我在这里发布了这个,而不是代码审查,因为我认为这与Q &格式相关,因为我试图解决特定的代码问题,而不是'我怎么能这样做通常更好?')

注:我正在django 1.6工作,因为这是我已经浮动了一段时间的代码,并没有信心升级当时 - 这不是公众所面临的,所以除非现在有一个令人信服的理由(比如这个性能问题),否则我不会升级(对于这个项目)。

流监听器:

class StdOutListener(tweepy.StreamListener): 
      def on_data(self, data): 
       # Twitter returns data in JSON format - we need to decode it first 
       decoded = json.loads(data) 
       #print type(decoded), decoded 
       # Also, we convert UTF-8 to ASCII ignoring all bad characters sent by users 
       try: 
        if decoded['lang'] == 'en': 
         django_rq.enqueue(read_both, decoded) 
        else: 
         pass 
       except KeyError,e: 
        print "Error on Key", e 
       except DataError, e: 
        print "DataError", e 
       return True 

      def on_error(self, status): 
       print status 

读取用户/ Twitter微博/两者

def read_user(tweet): 
    from harvester.models import User 
    from django.core.exceptions import ObjectDoesNotExist, MultipleObjectsReturned 
    #We might get weird results where user has changed their details"], so first we check the UID. 
    #print "MULTIPLE USER DEBUG", tweet["user"]["id_str"] 
    try: 
     current_user = User.objects.get(id_str=tweet["user"]["id_str"]) 
     created=False 
     return current_user, created 
    except ObjectDoesNotExist: 
     pass 
    except MultipleObjectsReturned: 
     current_user = User.objects.filter(id_str=tweet["user"]["id_str"])[0] 
     return current_user, False 
    if not tweet["user"]["follow_request_sent"]: 
     tweet["user"]["follow_request_sent"] = False 
    if not tweet["user"]["following"]: 
     tweet["user"]["following"] = False 
    if not tweet["user"]["description"]: 
     tweet["user"]["description"] = " " 
    if not tweet["user"]["notifications"]: 
     tweet["user"]["notifications"] = False 

    #If that doesn't work"], then we'll use get_or_create (as a failback rather than save()) 
    from dateutil.parser import parse 
    if not tweet["user"]["contributors_enabled"]: 
     current_user, created = User.objects.get_or_create(
      follow_request_sent=tweet["user"]["follow_request_sent"], 
      _json = {}, 
      verified = tweet["user"]["verified"], 
      followers_count = tweet["user"]["followers_count"], 
      profile_image_url_https = tweet["user"]["profile_image_url_https"], 
      id_str = tweet["user"]["id_str"], 
      listed_count = tweet["user"]["listed_count"], 
      utc_offset = tweet["user"]["utc_offset"], 
      statuses_count = tweet["user"]["statuses_count"], 
      description = tweet["user"]["description"], 
      friends_count = tweet["user"]["friends_count"], 
      location = tweet["user"]["location"], 
      profile_image_url= tweet["user"]["profile_image_url"], 
      following = tweet["user"]["following"], 
      geo_enabled = tweet["user"]["geo_enabled"], 
      profile_background_image_url =tweet["user"]["profile_background_image_url"], 
      screen_name = tweet["user"]["screen_name"], 
      lang = tweet["user"]["lang"], 
      profile_background_tile = tweet["user"]["profile_background_tile"], 
      favourites_count = tweet["user"]["favourites_count"], 
      name = tweet["user"]["name"], 
      notifications = tweet["user"]["notifications"], 
      url = tweet["user"]["url"], 
      created_at = parse(tweet["user"]["created_at"]), 
      contributors_enabled = False, 
      time_zone = tweet["user"]["time_zone"], 
      protected = tweet["user"]["protected"], 
      default_profile = tweet["user"]["default_profile"], 
      is_translator = tweet["user"]["is_translator"] 
     ) 
    else: 
     current_user, created = User.objects.get_or_create(
      follow_request_sent=tweet["user"]["follow_request_sent"], 
      _json = {}, 
      verified = tweet["user"]["verified"], 
      followers_count = tweet["user"]["followers_count"], 
      profile_image_url_https = tweet["user"]["profile_image_url_https"], 
      id_str = tweet["user"]["id_str"], 
      listed_count = tweet["user"]["listed_count"], 
      utc_offset = tweet["user"]["utc_offset"], 
      statuses_count = tweet["user"]["statuses_count"], 
      description = tweet["user"]["description"], 
      friends_count = tweet["user"]["friends_count"], 
      location = tweet["user"]["location"], 
      profile_image_url= tweet["user"]["profile_image_url"], 
      following = tweet["user"]["following"], 
      geo_enabled = tweet["user"]["geo_enabled"], 
      profile_background_image_url =tweet["user"]["profile_background_image_url"], 
      screen_name = tweet["user"]["screen_name"], 
      lang = tweet["user"]["lang"], 
      profile_background_tile = tweet["user"]["profile_background_tile"], 
      favourites_count = tweet["user"]["favourites_count"], 
      name = tweet["user"]["name"], 
      notifications = tweet["user"]["notifications"], 
      url = tweet["user"]["url"], 
      created_at = parse(tweet["user"]["created_at"]), 
      contributors_enabled = tweet["user"]["contributers_enabled"], 
      time_zone = tweet["user"]["time_zone"], 
      protected = tweet["user"]["protected"], 
      default_profile = tweet["user"]["default_profile"], 
      is_translator = tweet["user"]["is_translator"] 
     ) 
    #print "CURRENT USER:""], type(current_user)"], current_user 
    #current_user"], created = User.objects.get_or_create(current_user) 
    return current_user, created 

def read_tweet(tweet, current_user): 
    import logging 
    logger = logging.getLogger('django') 
    from datetime import date, datetime 
    #print "Inside read_Tweet" 
    from harvester.models import Tweet 
    from django.core.exceptions import ObjectDoesNotExist, MultipleObjectsReturned 
    from django.db import DataError 
    #We might get weird results where user has changed their details"], so first we check the UID. 
    #print tweet_data["created_at"] 
    from dateutil.parser import parse 
    tweet["created_at"] = parse(tweet["created_at"]) 
    try: 
     #print "trying tweet_data["id" 
     current_tweet =Tweet.objects.get(id_str=tweet["id_str"]) 
     created=False 
     return current_user, created 
    except ObjectDoesNotExist: 
     pass 
    except MultipleObjectsReturned: 
     current_tweet =Tweet.objects.filter(id_str=tweet["id_str"])[0] 
    try: 
     current_tweet, created = Tweet.objects.get_or_create(
     truncated=tweet["truncated"], 
     text=tweet["text"], 
     favorite_count=tweet["favorite_count"], 
     author = current_user, 
     _json = {}, 
     source=tweet["source"], 
     retweeted=tweet["retweeted"], 
     coordinates = tweet["coordinates"], 
     entities = tweet["entities"], 
     in_reply_to_screen_name = tweet["in_reply_to_screen_name"], 
     id_str = tweet["id_str"], 
     retweet_count = tweet["retweet_count"], 
     favorited = tweet["favorited"], 
     user = tweet["user"], 
     geo = tweet["geo"], 
     in_reply_to_user_id_str = tweet["in_reply_to_user_id_str"], 
     lang = tweet["lang"], 
     created_at = tweet["created_at"], 
     place = tweet["place"]) 
     print "DEBUG", current_user, current_tweet 
     return current_tweet, created 
    except DataError, e: 
     #Catchall to pick up non-parsed tweets 
     print "DEBUG ERROR", e, tweet 
     return None, False 

def read_both(tweet): 
    current_user, created = read_user(tweet) 
    current_tweet, created = read_tweet(tweet, current_user) 
+0

你确定错误来自django吗?我的问题的理由是因为twitter限制了您从Streaming API获取数据的速度。 13写/秒是巨大的,你得到什么django?另外,我建议在您将流监听器调用到所需语言时更改过滤器,而不是在获取数据后对其进行过滤(单靠这应该会加快速度)。 – Leb

+0

这个问题不在流监听器中 - 它将设置为获取广泛的地理位置和关键字列表,并且可以在周末的某个点上跟上超过100条推/每秒的内存队列 - 队列在Redis中有超过350,000个排队等待写入数据库的推文,所以这是写入方面 - 我可以看到在过去的500秒内完成了多少个推文。 13 /秒是在本地主机上的django上基本为空的数据库。 Twitter过滤器没有(afaik)让你直接过滤语言,你必须在收到流时做,但很乐意被纠正。 – Withnail

回答

0

我最终设法凑齐一些redditors答案和一对夫妇的其他东西。

基本上,虽然我在id_str字段上进行了双向查找,但未对其进行索引。我在read_tweetread_user上增加了索引db_index=True到该字段,并将读取的推文移动到尝试/除了Tweet.objects.create方法,如果出现问题则回退到get_or_create,并且看到速度提高了50-60倍,工作人员现在可扩展 - 如果我添加10名工人,我会获得10倍的速度。

我目前有一名工作人员正在愉快地处理6秒左右的推文。接下来,我将添加一个监控守护程序来检查队列大小,如果它仍在增加,请添加额外的工作人员。

tl; dr - 记住索引!