Следуя фрагменту кода spektom для scala, я создал аналогичный код на Java. Поскольку java 8 не имеет foldLeft, я использовал forEachOrdered. Этот код подходит для искры 2.x (я использую 2.1). Также я заметил, что удаление столбца и добавление его с помощью withColumn с тем же именем не работает, поэтому я просто заменяю столбец, и, похоже, работа.
Код не полностью протестирован, надеюсь, что он работает: -)
public class DataFrameUtils {
public static Dataset<Row> dropNestedColumn(Dataset<Row> dataFrame, String columnName) {
final DataFrameFolder dataFrameFolder = new DataFrameFolder(dataFrame);
Arrays.stream(dataFrame.schema().fields())
.flatMap( f -> {
if (columnName.startsWith(f.name() + ".")) {
final Optional<Column> column = dropSubColumn(col(f.name()), f.dataType(), f.name(), columnName);
if (column.isPresent()) {
return Stream.of(new Tuple2<>(f.name(), column));
} else {
return Stream.empty();
}
} else {
return Stream.empty();
}
}).forEachOrdered(colTuple -> dataFrameFolder.accept(colTuple));
return dataFrameFolder.getDF();
}
private static Optional<Column> dropSubColumn(Column col, DataType colType, String fullColumnName, String dropColumnName) {
Optional<Column> column = Optional.empty();
if (!fullColumnName.equals(dropColumnName)) {
if (colType instanceof StructType) {
if (dropColumnName.startsWith(fullColumnName + ".")) {
column = Optional.of(struct(getColumns(col, (StructType)colType, fullColumnName, dropColumnName)));
}
} else {
column = Optional.of(col);
}
}
return column;
}
private static Column[] getColumns(Column col, StructType colType, String fullColumnName, String dropColumnName) {
return Arrays.stream(colType.fields())
.flatMap(f -> {
final Optional<Column> column = dropSubColumn(col.getField(f.name()), f.dataType(),
fullColumnName + "." + f.name(), dropColumnName);
if (column.isPresent()) {
return Stream.of(column.get().alias(f.name()));
} else {
return Stream.empty();
}
}
).toArray(Column[]::new);
}
private static class DataFrameFolder implements Consumer<Tuple2<String, Optional<Column>>> {
private Dataset<Row> df;
public DataFrameFolder(Dataset<Row> df) {
this.df = df;
}
public Dataset<Row> getDF() {
return df;
}
@Override
public void accept(Tuple2<String, Optional<Column>> colTuple) {
if (!colTuple._2().isPresent()) {
df = df.drop(colTuple._1());
} else {
df = df.withColumn(colTuple._1(), colTuple._2().get());
}
}
}
Пример использования:
private class Pojo {
private String str;
private Integer number;
private List<String> strList;
private Pojo2 pojo2;
public String getStr() {
return str;
}
public Integer getNumber() {
return number;
}
public List<String> getStrList() {
return strList;
}
public Pojo2 getPojo2() {
return pojo2;
}
}
private class Pojo2 {
private String str;
private Integer number;
private List<String> strList;
public String getStr() {
return str;
}
public Integer getNumber() {
return number;
}
public List<String> getStrList() {
return strList;
}
}
SQLContext context = new SQLContext(new SparkContext("local[1]", "test"));
Dataset<Row> df = context.createDataFrame(Collections.emptyList(), Pojo.class);
Dataset<Row> dfRes = DataFrameUtils.dropNestedColumn(df, "pojo2.str");
Оригинальная структура:
root
|-- number: integer (nullable = true)
|-- pojo2: struct (nullable = true)
| |-- number: integer (nullable = true)
| |-- str: string (nullable = true)
| |-- strList: array (nullable = true)
| | |-- element: string (containsNull = true)
|-- str: string (nullable = true)
|-- strList: array (nullable = true)
| |-- element: string (containsNull = true)
После капли:
root
|-- number: integer (nullable = true)
|-- pojo2: struct (nullable = false)
| |-- number: integer (nullable = true)
| |-- strList: array (nullable = true)
| | |-- element: string (containsNull = true)
|-- str: string (nullable = true)
|-- strList: array (nullable = true)
| |-- element: string (containsNull = true)
Я вслепую не пропустил бы первые три байта; что, если производитель остановки добавление BOM снова? То, что необходимо сделать, , исследуют первые несколько байтов, и если они - 0xEF 0xBB 0xBF, игнорируют их. Это - форма, которую символ BOM (U+FEFF) принимает в UTF-8; я предпочитаю иметь дело с ним прежде, чем попытаться декодировать поток, потому что обработка BOM так непоследовательна от одного языка/инструмента/платформы до следующего.
На самом деле, это - то, как Вы , предположил иметь дело с BOM. Если файлом служили UTF-16, необходимо исследовать первые два байта, прежде чем Вы начнете декодировать так, Вы знаете, считать ли его как или прямой порядок байтов с обратным порядком байтов. Конечно, BOM UTF-8 не имеет никакого отношения к порядку байтов, он должен просто там сообщить, что кодирование является UTF-8, в случае, если Вы уже не знали это.