Вот способ сделать это с помощью Pandas:
# Create example data frame
df = pd.DataFrame([('../dir_a/1.png', 5.14),
('../dir_a/2.png', 5.15),
('../dir_b/3.png', 4.19),
('../dir_b/4.png', 3.81)], columns = ['path', 'score'])
# Split the file path by '/' and expand into columns with original data frame
df = pd.concat([df.path.str.split('/', expand=True), df], axis=1)
# Group the rows based on the directory name (column 1) and find the max score
df.groupby(1)['score'].max().reset_index()
1 score
0 dir_a 5.15
1 dir_b 4.19
Затем вы можете преобразовать значения обратно в список, если это будет необходимо.
Действительно, Abort
должен избежаться. Было бы лучше дать им некоторое время, чтобы выйти корректно - затем, возможно, после тайм-аута, возможно, рассмотреть прерывание их - но в конечном счете, сервисная остановка может сделать это все равно путем уничтожения процесса вместо этого.
Я попытался бы иметь сигнал в своих очередях, который говорит "сброс и выход" - во многом как Close
метод здесь, но с некоторым сигналом, когда завершенный.
Если Вы обращаетесь к Abort
- считайте процесс фатально раненным. Уничтожьте его как можно скорее.
Создайте тип задачи для "завершения работы" и введите это в свою очередь производителя/потребителя однажды для каждого рабочего потока.
Затем используйте Поток. Соединение (с тайм-аутом) для обеспечения завершения работы завершилось.
Вопрос, как исправлено на самом деле меньше имеет отношение к поточной обработке и больше сделать с тем, как остановить длительные действия. Лично я всегда использую APM для длинного потока и коммуникационных операций, таких как большие передачи файлов. Каждый обратный вызов работает на потоке пула завершения IO и завершается быстро, обрабатывая скромный блок и планируя следующую передачу. Незаконченные операции могут быть отменены просто путем вызова Close()
на объекте сокета. Это намного более дешево и более эффективно, чем управление потоком DIY.
Как уже упомянуто, Abort()
плохая карма и должна избежаться.
Материал ниже был записан до исключения случая цикличного выполнения от вопроса.
Когда длительный цикл процессов, они должны все включать флаг выхода в свое условие цикла так, чтобы можно было предупредить, чтобы они вышли.
bool _run; //member of service class
//in OnStart
_run = true;
//in a method on some other thread
while ((yourLoopCondition) & _run)
{
//do stuff
foreach (thing in things)
{
//do more stuff
if (!_run) break;
}
}
if (!_run) CleanUp();
//in OnStop
_run = false;
Строго необходимо использовать летучие вещества, но так как только управляющая логика когда-либо устанавливает флаг, это не имеет значения. Технически состояние состязания существует, но это просто означает, что Вы могли бы обойти кругом еще раз.
Если Ваш процесс является завершением работы так или иначе, я лично не вижу проблемы с использованием Аварийного прекращения работы (). Я попытался бы найти иначе, но в конце, не имеет значения при выполнении очистки в основном потоке так или иначе.
Другая опция состояла бы в том, чтобы отметить Ваши рабочие потоки как фоновые потоки. Тем путем они закроются автоматически, когда процесс закроется. Вы смогли использовать AppDomain. Событие ProcessExit для чистки перед выходом.
Используйте ManualResetEvent, чтобы проверить, сигнализировано ли событие, посмотрите образец при Синхронизации Потока (Руководство по программированию C#)
Вот код, который я использую для остановки потоков в службе Windows (обратите внимание, я использую Потоки непосредственно и не использующий пулы потоков):
// signal all threads to stop
this.AllThreadsStopSignal.Set();
if (logThreads.IsDebugEnabled)
logThreads.Debug ("Stop workers");
// remember the time of the signal
DateTime signalTime = DateTime.Now;
// create an array of workers to be stopped
List<IServiceWorker> workersToBeStopped = new List<IServiceWorker> (workers);
while (true)
{
// wait for some time
Thread.Sleep (1000);
// go through the list and see if any workers have stopped
int i = 0;
while (i < workersToBeStopped.Count)
{
IServiceWorker workerToBeStopped = workersToBeStopped [i];
if (log.IsDebugEnabled)
log.Debug (String.Format (System.Globalization.CultureInfo.InvariantCulture,
"Stopping worker '{0}'. Worker state={1}",
workerToBeStopped.WorkerDescription,
workerToBeStopped.WorkerState));
bool stopped = workerToBeStopped.JoinThread (TimeSpan.Zero);
// if stopped, remove it from the list
if (stopped)
{
workersToBeStopped.RemoveAt (i);
if (log.IsDebugEnabled)
log.Debug (String.Format (System.Globalization.CultureInfo.InvariantCulture,
"Worker '{0}' was stopped.", workerToBeStopped.WorkerDescription));
}
else
{
i++;
if (log.IsDebugEnabled)
log.Debug (String.Format (System.Globalization.CultureInfo.InvariantCulture,
"Worker '{0}' could not be stopped, will try again later. Worker state={1}",
workerToBeStopped.WorkerDescription,
workerToBeStopped.WorkerState));
}
}
// if all workers were stopped, exit from the loop
if (workersToBeStopped.Count == 0)
break;
// check if the duration of stopping has exceeded maximum time
DateTime nowTime = DateTime.Now;
TimeSpan duration = nowTime - signalTime;
if (duration > serviceCustomization.ThreadTerminationMaxDuration)
{
// execute forced abortion of all workers which have not stopped
foreach (IServiceWorker worker in workersToBeStopped)
{
try
{
log.Warn (String.Format (System.Globalization.CultureInfo.InvariantCulture,
"Aborting worker '{0}.", worker.WorkerDescription));
worker.Abort ();
log.Warn (String.Format (System.Globalization.CultureInfo.InvariantCulture,
"Worker '{0}' aborted.", worker.WorkerDescription));
}
catch (ThreadStateException ex)
{
log.Warn (String.Format (System.Globalization.CultureInfo.InvariantCulture,
"Worker '{0}' could not be aborted.", worker.WorkerDescription), ex);
}
}
break;
}
}
В .NET 4.0 вы можете использовать пространство имен System.Threading.Tasks
, чтобы воспользоваться преимуществами объекта Task
. Вкратце, вы можете назначить CancellationToken
для более изящной обработки отмен / прерываний в задачах, будь то длительные или краткосрочные.
Подробнее см. здесь в MSDN.