Оптимизация: дамп JSON из Streaming API в Mongo

Предыстория: У меня есть модуль python, настроенный для захвата объектов JSON из потокового API и их сохранения (массовая вставка по 25 за раз) в MongoDB с использованием pymongo. Для сравнения, у меня также есть команда bash для curlиз того же потокового API и pipeдля mongoimport. Оба этих подхода хранят данные в отдельных коллекциях.

Периодически я отслеживаю count()коллекций, чтобы проверить, как они работают.

На данный момент я вижу, что модуль pythonотстает примерно на 1000 объектов JSON от модуля curl | подход mongoimport.

Проблема: Как я могу оптимизировать мой модуль python , чтобы он был синхронизирован с curl | монгоимпорт?

Я не могу использовать tweetstream, так как я использую не Twitter API, а стороннюю службу потоковой передачи.

Кто-нибудь может мне помочь?

Модуль Python:


class StreamReader:
    def __init__(self):
        try:
            self.buff = ""
            self.tweet = ""
            self.chunk_count = 0
            self.tweet_list = []
            self.string_buffer = cStringIO.StringIO()
            self.mongo = pymongo.Connection(DB_HOST)
            self.db = self.mongo[DB_NAME]
            self.raw_tweets = self.db["raw_tweets_gnip"]
            self.conn = pycurl.Curl()
            self.conn.setopt(pycurl.ENCODING, 'gzip')
            self.conn.setopt(pycurl.URL, STREAM_URL)
            self.conn.setopt(pycurl.USERPWD, AUTH)
            self.conn.setopt(pycurl.WRITEFUNCTION, self.handle_data)
            self.conn.perform()
        except Exception as ex:
            print "error ocurred : %s" % str(ex)

    def handle_data(self, data):
        try:
            self.string_buffer = cStringIO.StringIO(data)
            for line in self.string_buffer:
                try:
                    self.tweet = json.loads(line)
                except Exception as json_ex:
                    print "JSON Exception occurred: %s" % str(json_ex)
                    continue

                if self.tweet:
                    try:
                        self.tweet_list.append(self.tweet)
                        self.chunk_count += 1
                        if self.chunk_count % 1000 == 0
                            self.raw_tweets.insert(self.tweet_list)
                            self.chunk_count = 0
                            self.tweet_list = []

                    except Exception as insert_ex:
                        print "Error inserting tweet: %s" % str(insert_ex)
                        continue
        except Exception as ex:
            print "Exception occurred: %s" % str(ex)
            print repr(self.buff)

    def __del__(self):
        self.string_buffer.close()

Спасибо за внимание.

8
задан Sagar Hatekar 3 June 2012 в 01:57
поделиться