Позвольте мне начать с того, что я новичок в Scala; тем не менее, я нахожу модель параллелизма на основе Актера интересной, и я попытался попробовать ее для относительно простого приложения. Проблема, с которой я сталкиваюсь, заключается в том, что, хотя я могу заставить приложение работать, результат гораздо менее эффективен (с точки зрения реального времени, процессорного времени и использования памяти ), чем эквивалентный Решение на основе Java, использующее потоки, которые извлекают сообщения из очереди ArrayBlockingQueue. Я хотел бы понять, почему. Я подозреваю, что это, вероятно, связано с моим недостатком знаний о Scala и что я являюсь причиной всей неэффективности, но после нескольких безуспешных попыток переработать приложение я решил обратиться за помощью к сообществу.
Моя проблема в следующем :У меня есть сжатый файл с большим количеством строк в формате:
SomeID разделенный запятой __список _из _значений
Например:
1234 12,45,82
Я хотел бы проанализировать каждую строку и получить общее количество вхождений каждого значения в списке, разделенном запятыми.
Этот файл может быть довольно большим (сжатым в несколько ГБ ), но количество уникальных значений в файле довольно мало (не более 500 ). Я подумал, что это будет неплохой возможностью попробовать написать параллельное приложение Scala на базе Актера -. В моем решении используется основной драйвер, который создает пул актеров парсера. Затем основной драйвер считывает строки со стандартного ввода, передает строку Актеру, который анализирует строку и ведет локальный подсчет значений. Когда главный водитель прочитал последнюю строку,он передает сообщение каждому актеру, указывающее, что все строки были прочитаны. Когда актер получает сообщение «Готово», он передает свои счетчики агрегатору, который суммирует счетчики всех актеров. После того, как счетчики всех парсеров будут объединены, главный драйвер распечатывает статистику.
Проблема :Основная проблема, с которой я сталкиваюсь, — невероятная неэффективность этого приложения. Он использует гораздо больше ЦП и гораздо больше памяти, чем «эквивалентное» приложение Java, использующее потоки и ArrayBlockingQueue. Чтобы представить это в перспективе, вот некоторые статистические данные, которые я собрал для тестового входного файла из 10 миллионов строк:
Scala 1 Актер (парсер):
real 9m22.297s
user 235m31.070s
sys 21m51.420s
Парсер потока Java 1 ():
real 1m48.275s
user 1m58.630s
sys 0m33.540s
Актеры Скала 5:
real 2m25.267s
user 63m0.730s
sys 3m17.950s
Потоки Java 5:
real 0m24.961s
user 1m52.650s
sys 0m20.920s
Кроме того, top сообщает, что приложение Scala имеет размер резидентной памяти примерно в 10 раз больше. Таким образом, мы говорим о том, что здесь на несколько порядков больше ЦП и памяти, а производительность на несколько порядков ниже, и я просто не могу понять, что является причиной этого. Это проблема GC, или я каким-то образом создаю гораздо больше копий объектов, чем осознаю?
Дополнительные детали, которые могут иметь или не иметь значения:
Вот код:
Главный драйвер:
import scala.actors.Actor._
import scala.collection.{ immutable, mutable }
import scala.io.Source
class StatCollector (numParsers : Int ) {
private val parsers = new mutable.ArrayBuffer[StatParser]()
private val aggregator = new StatAggregator()
def generateParsers {
for ( i <- 1 to numParsers ) {
val parser = new StatParser( i, aggregator )
parser.start
parsers += parser
}
}
def readStdin {
var nextParserIdx = 0
var lineNo = 1
for ( line <- Source.stdin.getLines() ) {
parsers( nextParserIdx ) ! line
nextParserIdx += 1
if ( nextParserIdx >= numParsers ) {
nextParserIdx = 0
}
lineNo += 1
}
}
def informParsers {
for ( parser <- parsers ) {
parser ! true
}
}
def printCounts {
val countMap = aggregator.getCounts()
println( "ID,Count" )
/*
for ( key <- countMap.keySet ) {
println( key + "," + countMap.getOrElse( key, 0 ) )
//println( "Campaign '" + key + "': " + countMap.getOrElse( key, 0 ) )
}
*/
countMap.toList.sorted foreach {
case (key, value) =>
println( key + "," + value )
}
}
def processFromStdIn {
aggregator.start
generateParsers
readStdin
process
}
def process {
informParsers
var completedParserCount = aggregator.getNumParsersAggregated
while ( completedParserCount < numParsers ) {
Thread.sleep( 250 )
completedParserCount = aggregator.getNumParsersAggregated
}
printCounts
}
}
Парсер-актер:
import scala.actors.Actor
import collection.mutable.HashMap
import scala.util.matching
class StatParser( val id: Int, val aggregator: StatAggregator ) extends Actor {
private var countMap = new HashMap[String, Int]()
private val sep1 = "\t"
private val sep2 = ","
def getCounts(): HashMap[String, Int] = {
return countMap
}
def act() {
loop {
react {
case line: String =>
{
val idx = line.indexOf( sep1 )
var currentCount = 0
if ( idx > 0 ) {
val tokens = line.substring( idx + 1 ).split( sep2 )
for ( token <- tokens ) {
if ( !token.equals( "" ) ) {
currentCount = countMap.getOrElse( token, 0 )
countMap( token ) = ( 1 + currentCount )
}
}
}
}
case doneProcessing: Boolean =>
{
if ( doneProcessing ) {
// Send my stats to Aggregator
aggregator ! this
}
}
}
}
}
}
Актер-агрегатор:
import scala.actors.Actor
import collection.mutable.HashMap
class StatAggregator extends Actor {
private var countMap = new HashMap[String, Int]()
private var parsersAggregated = 0
def act() {
loop {
react {
case parser: StatParser =>
{
val cm = parser.getCounts()
for ( key <- cm.keySet ) {
val currentCount = countMap.getOrElse( key, 0 )
val incAmt = cm.getOrElse( key, 0 )
countMap( key ) = ( currentCount + incAmt )
}
parsersAggregated += 1
}
}
}
}
def getNumParsersAggregated: Int = {
return parsersAggregated
}
def getCounts(): HashMap[String, Int] = {
return countMap
}
}
Любая помощь, которая может быть предложена в понимании того, что здесь происходит, будет принята с благодарностью.
Заранее спасибо!
----Изменить---
Поскольку многие люди откликнулись и попросили код Java, вот простое приложение Java, которое я создал для целей сравнения. Я понимаю, что это не лучший код Java,но когда я увидел производительность приложения Scala, я быстро набросал что-нибудь, чтобы увидеть, как реализация на основе Java Thread -будет работать в качестве базовой -строки:
Разбор темы:
import java.util.Hashtable;
import java.util.Map;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.TimeUnit;
public class JStatParser extends Thread
{
private ArrayBlockingQueue<String> queue;
private Map<String, Integer> countMap;
private boolean done;
public JStatParser( ArrayBlockingQueue<String> q )
{
super( );
queue = q;
countMap = new Hashtable<String, Integer>( );
done = false;
}
public Map<String, Integer> getCountMap( )
{
return countMap;
}
public void alldone( )
{
done = true;
}
@Override
public void run( )
{
String line = null;
while( !done || queue.size( ) > 0 )
{
try
{
// line = queue.take( );
line = queue.poll( 100, TimeUnit.MILLISECONDS );
if( line != null )
{
int idx = line.indexOf( "\t" ) + 1;
for( String token : line.substring( idx ).split( "," ) )
{
if( !token.equals( "" ) )
{
if( countMap.containsKey( token ) )
{
Integer currentCount = countMap.get( token );
currentCount++;
countMap.put( token, currentCount );
}
else
{
countMap.put( token, new Integer( 1 ) );
}
}
}
}
}
catch( InterruptedException e )
{
// TODO Auto-generated catch block
System.err.println( "Failed to get something off the queue: "
+ e.getMessage( ) );
e.printStackTrace( );
}
}
}
}
Драйвер:
import java.io.BufferedReader;
import java.io.FileReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Hashtable;
import java.util.List;
import java.util.Map;
import java.util.TreeSet;
import java.util.concurrent.ArrayBlockingQueue;
public class JPS
{
public static void main( String[] args )
{
if( args.length <= 0 || args.length > 2 || args[0].equals( "-?" ) )
{
System.err.println( "Usage: JPS [filename]" );
System.exit( -1 );
}
int numParsers = Integer.parseInt( args[0] );
ArrayBlockingQueue<String> q = new ArrayBlockingQueue<String>( 1000 );
List<JStatParser> parsers = new ArrayList<JStatParser>( );
BufferedReader reader = null;
try
{
if( args.length == 2 )
{
reader = new BufferedReader( new FileReader( args[1] ) );
}
else
{
reader = new BufferedReader( new InputStreamReader( System.in ) );
}
for( int i = 0; i < numParsers; i++ )
{
JStatParser parser = new JStatParser( q );
parser.start( );
parsers.add( parser );
}
String line = null;
while( (line = reader.readLine( )) != null )
{
try
{
q.put( line );
}
catch( InterruptedException e )
{
// TODO Auto-generated catch block
System.err.println( "Failed to add line to q: "
+ e.getMessage( ) );
e.printStackTrace( );
}
}
// At this point, we've put everything on the queue, now we just
// need to wait for it to be processed.
while( q.size( ) > 0 )
{
try
{
Thread.sleep( 250 );
}
catch( InterruptedException e )
{
}
}
Map<String,Integer> countMap = new Hashtable<String,Integer>( );
for( JStatParser jsp : parsers )
{
jsp.alldone( );
Map<String,Integer> cm = jsp.getCountMap( );
for( String key : cm.keySet( ) )
{
if( countMap.containsKey( key ))
{
Integer currentCount = countMap.get( key );
currentCount += cm.get( key );
countMap.put( key, currentCount );
}
else
{
countMap.put( key, cm.get( key ) );
}
}
}
System.out.println( "ID,Count" );
for( String key : new TreeSet<String>(countMap.keySet( )) )
{
System.out.println( key + "," + countMap.get( key ) );
}
for( JStatParser parser : parsers )
{
try
{
parser.join( 100 );
}
catch( InterruptedException e )
{
// TODO Auto-generated catch block
e.printStackTrace();
}
}
System.exit( 0 );
}
catch( IOException e )
{
System.err.println( "Caught exception: " + e.getMessage( ) );
e.printStackTrace( );
}
}
}