Обновление
Самым простым может быть выбор каждого столбца, union
их и вызов distinct
:
from pyspark.sql.functions import col
df2 = df.select(col("src").alias("id")).union(df.select(col("dst").alias("id"))).distinct()
df2.show()
#+---+
#| id|
#+---+
#| 1|
#| 3|
#| 2|
#+---+
<час> можно также выполнить это с помощью внешнего соединения:
df2 = df.select(col("src").alias("id"))\
.join(
df.select(col("dst").alias("id")),
on="id",
how="outer"
)\
.distinct()
Чтобы это работало, Вам нужен 1) способ передать число в новый поток, 2) запустить поток, 3) ожидать потока для окончания и 4) способ вернуть результат от потока.
Можно передать в числе через конструктора. У Вас может быть общедоступный элемент данных, названный "ответом" для содержания результата вычисления. Запуск потока может быть, покончили start()
метод, и join()
метод ожидает потока для завершения.
Следующий пример демонстрирует это. Это должно быть хорошей начальной точкой; отсюда можно абстрагировать далеко часть беспорядка для получения лучшего API, как желаемый.
public class Fib extends Thread
{
private int x;
public int answer;
public Fib(int x) {
this.x = x;
}
public void run() {
if( x <= 2 )
answer = 1;
else {
try {
Fib f1 = new Fib(x-1);
Fib f2 = new Fib(x-2);
f1.start();
f2.start();
f1.join();
f2.join();
answer = f1.answer + f2.answer;
}
catch(InterruptedException ex) { }
}
}
public static void main(String[] args)
throws Exception
{
try {
Fib f = new Fib( Integer.parseInt(args[0]) );
f.start();
f.join();
System.out.println(f.answer);
}
catch(Exception e) {
System.out.println("usage: java Fib NUMBER");
}
}
}
Используя потоки обычно предназначается для улучшения производительности. Однако каждый поток добавляет издержки и если выполненная задача является небольшой, там может намного больше находиться вне понимания, чем фактическая сделанная работа. Дополнительно большинство ПК может только обработать приблизительно 1 000 потоков и зависнет, если Вы имеете намного больше, чем потоки 10K.
В Вашем случае выдумка (20) генерирует 6 765 потоков, выдумка (30) создает 832K, выдумка (40) создает 102M потоки, выдумка (50) создает более чем 12 триллионов. Я надеюсь, что Вы видите, что это не масштабируемо.
Однако с помощью другого подхода можно вычислить выдумку (1000000) через менее чем одну минуту.
import java.math.BigInteger;
/*
250000th fib # is: 36356117010939561826426 .... 10243516470957309231046875
Time to compute: 3.466557 seconds.
1000000th fib # is: 1953282128707757731632 .... 93411568996526838242546875
Time to compute: 58.1 seconds.
*/
public class Main {
public static void main(String... args) {
int place = args.length > 0 ? Integer.parseInt(args[0]) : 250 * 1000;
long start = System.nanoTime();
BigInteger fibNumber = fib(place);
long time = System.nanoTime() - start;
System.out.println(place + "th fib # is: " + fibNumber);
System.out.printf("Time to compute: %5.1f seconds.%n", time / 1.0e9);
}
private static BigInteger fib(int place) {
BigInteger a = new BigInteger("0");
BigInteger b = new BigInteger("1");
while (place-- > 1) {
BigInteger t = b;
b = a.add(b);
a = t;
}
return b;
}
}
Вы имеете верное представление о запуске потоков в fib
функция, и о передаче x
к объекту через конструктора; у Вас должен будет также быть способ вытащить результат вычисления из объекта в конце - я уверен, что можно понять это ;-) Запускающая поток процедура Вы используете в fib
просто тот же способ, как который Вы всегда запускаете поток, (new Fib(x-1)).start()
хотя Вы могли бы хотеть сохранить поток в переменной, потому что Вам будет нужен он для получения результата вычисления позже.
Таким образом с помощью Вас всех мне удалось сделать то же самое с реализацией выполнимого вместо того, чтобы использовать Класс Потока.
Можно ли все взглянуть и сказать ли мне, если это - путь, как сделать это, если задача состоит в том, чтобы реализовать выполнимый. Сам Код работает.
public class Fib implements Runnable
{
private int x;
public int answer;
public Fib(int x) {
this.x = x;
}
public void run() {
if( x < 2 )
answer = 1;
else {
try {
Fib f1= new Fib(x-1);
Fib f2= new Fib(x-2);
Thread threadf1=new Thread(f1);
Thread threadf2=new Thread(f2);
threadf1.start();
threadf2.start();
threadf1.join();
threadf2.join();
answer = f1.answer + f2.answer;
}
catch(InterruptedException ex) { }
}
}
public static void main(String[] args)
{
try {
for (int i=0;i<19;i++){
Fib f= new Fib(i);
Thread threadf= new Thread(f);
threadf.start();
threadf.join();
System.out.println("Ergebnis:"+f.answer);
}
}
catch(Exception e) {
System.out.println("usage: java Fib NUMBER");
}
}
}