Проблема неэффективности акторов Scala

Позвольте мне начать с того, что я новичок в Scala; тем не менее, я нахожу модель параллелизма на основе Актера интересной, и я попытался попробовать ее для относительно простого приложения. Проблема, с которой я сталкиваюсь, заключается в том, что, хотя я могу заставить приложение работать, результат гораздо менее эффективен (с точки зрения реального времени, процессорного времени и использования памяти ), чем эквивалентный Решение на основе Java, использующее потоки, которые извлекают сообщения из очереди ArrayBlockingQueue. Я хотел бы понять, почему. Я подозреваю, что это, вероятно, связано с моим недостатком знаний о Scala и что я являюсь причиной всей неэффективности, но после нескольких безуспешных попыток переработать приложение я решил обратиться за помощью к сообществу.

Моя проблема в следующем :У меня есть сжатый файл с большим количеством строк в формате:

SomeID разделенный запятой __список _из _значений

Например:

1234 12,45,82

Я хотел бы проанализировать каждую строку и получить общее количество вхождений каждого значения в списке, разделенном запятыми.

Этот файл может быть довольно большим (сжатым в несколько ГБ ), но количество уникальных значений в файле довольно мало (не более 500 ). Я подумал, что это будет неплохой возможностью попробовать написать параллельное приложение Scala на базе Актера -. В моем решении используется основной драйвер, который создает пул актеров парсера. Затем основной драйвер считывает строки со стандартного ввода, передает строку Актеру, который анализирует строку и ведет локальный подсчет значений. Когда главный водитель прочитал последнюю строку,он передает сообщение каждому актеру, указывающее, что все строки были прочитаны. Когда актер получает сообщение «Готово», он передает свои счетчики агрегатору, который суммирует счетчики всех актеров. После того, как счетчики всех парсеров будут объединены, главный драйвер распечатывает статистику.

Проблема :Основная проблема, с которой я сталкиваюсь, — невероятная неэффективность этого приложения. Он использует гораздо больше ЦП и гораздо больше памяти, чем «эквивалентное» приложение Java, использующее потоки и ArrayBlockingQueue. Чтобы представить это в перспективе, вот некоторые статистические данные, которые я собрал для тестового входного файла из 10 миллионов строк:

Scala 1 Актер (парсер):

    real    9m22.297s
    user    235m31.070s
    sys     21m51.420s

Парсер потока Java 1 ():

    real    1m48.275s
    user    1m58.630s
    sys     0m33.540s

Актеры Скала 5:

    real    2m25.267s
    user    63m0.730s
    sys     3m17.950s

Потоки Java 5:

    real    0m24.961s
    user    1m52.650s
    sys     0m20.920s

Кроме того, top сообщает, что приложение Scala имеет размер резидентной памяти примерно в 10 раз больше. Таким образом, мы говорим о том, что здесь на несколько порядков больше ЦП и памяти, а производительность на несколько порядков ниже, и я просто не могу понять, что является причиной этого. Это проблема GC, или я каким-то образом создаю гораздо больше копий объектов, чем осознаю?

Дополнительные детали, которые могут иметь или не иметь значения:

  • Приложение scala обернуто классом Java, чтобы я мог доставить самостоятельный -исполняемый файл JAR (У меня нет Scala jar на каждой машине, на которой мне может понадобиться запустить это приложение ).
  • Приложение вызывается следующим образом :gunzip -c gzFilename | java -jar StatParser.jar

Вот код:

Главный драйвер:

import scala.actors.Actor._
import scala.collection.{ immutable, mutable }
import scala.io.Source

class StatCollector (numParsers : Int ) {
    private val parsers = new mutable.ArrayBuffer[StatParser]()
    private val aggregator = new StatAggregator()

    def generateParsers {
        for ( i <- 1 to numParsers ) {
            val parser = new StatParser( i, aggregator )
            parser.start
            parsers += parser
        }
    }


    def readStdin {
        var nextParserIdx = 0
        var lineNo = 1
        for ( line <- Source.stdin.getLines() ) {
            parsers( nextParserIdx ) ! line
            nextParserIdx += 1
            if ( nextParserIdx >= numParsers ) {
                nextParserIdx = 0
            }
            lineNo += 1
        }
    }

    def informParsers {
        for ( parser <- parsers ) {
            parser ! true
        }
    }

    def printCounts {
        val countMap = aggregator.getCounts()
        println( "ID,Count" )
        /*
        for ( key <- countMap.keySet ) {
            println( key + "," + countMap.getOrElse( key, 0 ) )
            //println( "Campaign '" + key + "': " + countMap.getOrElse( key, 0 ) )
        }
        */
        countMap.toList.sorted foreach {
            case (key, value) =>
                println( key + "," + value )
        }
    }

    def processFromStdIn {
        aggregator.start

        generateParsers

        readStdin
        process
    }

    def process {

        informParsers

        var completedParserCount = aggregator.getNumParsersAggregated
        while ( completedParserCount < numParsers ) {
            Thread.sleep( 250 )
            completedParserCount = aggregator.getNumParsersAggregated
        }

        printCounts
    }
}

Парсер-актер:

import scala.actors.Actor
import collection.mutable.HashMap
import scala.util.matching

class StatParser( val id: Int, val aggregator: StatAggregator ) extends Actor {

    private var countMap = new HashMap[String, Int]()
    private val sep1 = "\t"
    private val sep2 = ","


    def getCounts(): HashMap[String, Int] = {
        return countMap
    }

    def act() {
        loop {
            react {
                case line: String =>
                    {
                        val idx = line.indexOf( sep1 )
                        var currentCount = 0
                        if ( idx > 0 ) {
                            val tokens = line.substring( idx + 1 ).split( sep2 )
                            for ( token <- tokens ) {
                                if ( !token.equals( "" ) ) {
                                    currentCount = countMap.getOrElse( token, 0 )
                                    countMap( token ) = ( 1 + currentCount )
                                }
                            }

                        }
                    }
                case doneProcessing: Boolean =>
                    {
                        if ( doneProcessing ) {
                            // Send my stats to Aggregator
                            aggregator ! this
                        }
                    }
            }
        }
    }
}

Актер-агрегатор:

import scala.actors.Actor
import collection.mutable.HashMap

class StatAggregator extends Actor {
    private var countMap = new HashMap[String, Int]()
    private var parsersAggregated = 0

    def act() {
        loop {
            react {
                case parser: StatParser =>
                    {
                        val cm = parser.getCounts()
                        for ( key <- cm.keySet ) {
                            val currentCount = countMap.getOrElse( key, 0 )
                            val incAmt = cm.getOrElse( key, 0 )
                            countMap( key ) = ( currentCount + incAmt )
                        }
                        parsersAggregated += 1
                    }
            }
        }
    }

    def getNumParsersAggregated: Int = {
        return parsersAggregated
    }

    def getCounts(): HashMap[String, Int] = {
        return countMap
    }
}

Любая помощь, которая может быть предложена в понимании того, что здесь происходит, будет принята с благодарностью.

Заранее спасибо!

----Изменить---

Поскольку многие люди откликнулись и попросили код Java, вот простое приложение Java, которое я создал для целей сравнения. Я понимаю, что это не лучший код Java,но когда я увидел производительность приложения Scala, я быстро набросал что-нибудь, чтобы увидеть, как реализация на основе Java Thread -будет работать в качестве базовой -строки:

Разбор темы:

import java.util.Hashtable;
import java.util.Map;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.TimeUnit;

public class JStatParser extends Thread
{
    private ArrayBlockingQueue<String> queue;
    private Map<String, Integer> countMap;
    private boolean done;

    public JStatParser( ArrayBlockingQueue<String> q )
    {
        super( );
        queue = q;
        countMap = new Hashtable<String, Integer>( );
        done = false;
    }

    public Map<String, Integer> getCountMap( )
    {
        return countMap;
    }

    public void alldone( )
    {
        done = true;
    }

    @Override
    public void run( )
    {
        String line = null;
        while( !done || queue.size( ) > 0 )
        {
            try
            {
                // line = queue.take( );
                line = queue.poll( 100, TimeUnit.MILLISECONDS );
                if( line != null )
                {
                    int idx = line.indexOf( "\t" ) + 1;
                    for( String token : line.substring( idx ).split( "," ) )
                    {
                        if( !token.equals( "" ) )
                        {
                            if( countMap.containsKey( token ) )
                            {
                                Integer currentCount = countMap.get( token );
                                currentCount++;
                                countMap.put( token, currentCount );
                            }
                            else
                            {
                                countMap.put( token, new Integer( 1 ) );
                            }
                        }
                    }
                }
            }
            catch( InterruptedException e )
            {
                // TODO Auto-generated catch block
                System.err.println( "Failed to get something off the queue: "
                        + e.getMessage( ) );
                e.printStackTrace( );
            }
        }
    }
}

Драйвер:

import java.io.BufferedReader;
import java.io.FileReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Hashtable;
import java.util.List;
import java.util.Map;
import java.util.TreeSet;
import java.util.concurrent.ArrayBlockingQueue;

public class JPS
{
    public static void main( String[] args )
    {
        if( args.length <= 0 || args.length > 2 || args[0].equals( "-?" ) )
        {
            System.err.println( "Usage: JPS [filename]" );
            System.exit( -1 );
        }

        int numParsers = Integer.parseInt( args[0] );
        ArrayBlockingQueue<String> q = new ArrayBlockingQueue<String>( 1000 );
        List<JStatParser> parsers = new ArrayList<JStatParser>( );

        BufferedReader reader = null;

        try
        {
            if( args.length == 2 )
            {
                reader = new BufferedReader( new FileReader( args[1] ) );
            }
            else
            {
                reader = new BufferedReader( new InputStreamReader( System.in ) );
            }

            for( int i = 0; i < numParsers; i++ )
            {
                JStatParser parser = new JStatParser( q );
                parser.start( );
                parsers.add( parser );
            }

            String line = null;
            while( (line = reader.readLine( )) != null )
            {
                try
                {
                    q.put( line );
                }
                catch( InterruptedException e )
                {
                    // TODO Auto-generated catch block
                    System.err.println( "Failed to add line to q: "
                            + e.getMessage( ) );
                    e.printStackTrace( );
                }
            }

            // At this point, we've put everything on the queue, now we just
            // need to wait for it to be processed.
            while( q.size( ) > 0 )
            {
                try
                {
                    Thread.sleep( 250 );
                }
                catch( InterruptedException e )
                {
                }
            }

            Map<String,Integer> countMap = new Hashtable<String,Integer>( );
            for( JStatParser jsp : parsers )
            {
                jsp.alldone( );
                Map<String,Integer> cm = jsp.getCountMap( );
                for( String key : cm.keySet( ) )
                {
                    if( countMap.containsKey( key ))
                    {
                        Integer currentCount = countMap.get(  key );
                        currentCount += cm.get( key );
                        countMap.put( key, currentCount );
                    }
                    else
                    {
                        countMap.put(  key, cm.get( key ) );
                    }
                }
            }

            System.out.println( "ID,Count" );
            for( String key : new TreeSet<String>(countMap.keySet( ))  )
            {
                System.out.println( key + "," + countMap.get( key ) );
            }

            for( JStatParser parser : parsers )
            {
                try
                {
                    parser.join( 100 );
                }
                catch( InterruptedException e )
                {
                    // TODO Auto-generated catch block
                    e.printStackTrace();
                }
            }

            System.exit(  0  );
        }
        catch( IOException e )
        {
            System.err.println( "Caught exception: " + e.getMessage( ) );
            e.printStackTrace( );
        }
    }
}
6
задан FuriousGeorge 30 July 2012 в 19:01
поделиться