Встроенный метод объектов .size()
объектов DataFrameGroupBy фактически возвращает объект Series с размерами группы, а не DataFrame. Если вы хотите, чтобы DataFrame, чей столбец был размером группы, индексированный группами, с настраиваемым именем, вы можете использовать метод .to_frame()
и использовать в качестве аргумента имя требуемого столбца.
grpd = df.groupby(['A','B']).size().to_frame('size')
Если вы хотите, чтобы группы были столбцами снова, вы могли бы добавить .reset_index()
в конец.
Не уверенный, если это будет полезно Вам - это работает хорошо в тестовом приложении. Кроме того, не знайте, будет ли это в "благосклонности" высшего персонала Spring, но моя надежда состоит в том, чтобы учиться, таким образом, я отправляю это предложение.
В тестовом приложении Начальной загрузки Spring, следующее вводит репозиторий JPA в ApplicationRunner, который затем вводит то же в Runnables, управляемый ExecutorService. Каждый Выполнимый получает BlockingQueue, который постоянно переполнен отдельным KafkaConsumer (который действует как производитель очереди). Runnables используют queue.takes () для сования от очереди, и это сопровождается repo.save (). (Может с готовностью добавить пакетная вставка для поточной обработки, но не сделала поэтому, так как приложение еще не потребовало его...)
, тестовое приложение в настоящее время реализует JPA для Пост-ГРЭС (или Масштаб времени) DB и выполняет 10 потоков с 10 очередями, питаемыми 10 Потребителями.
репозиторий JPA, обеспечивают
public interface DataRepository extends JpaRepository<DataRecord, Long> {
}
Начальная загрузка Spring, которая Основная программа
@SpringBootApplication
@EntityScan(basePackages = "com.xyz.model")
public class DataApplication {
private final String[] topics = { "x0", "x1", "x2", "x3", "x4", "x5","x6", "x7", "x8","x9" };
ExecutorService executor = Executors.newFixedThreadPool(topics.length);
public static void main(String[] args) {
SpringApplication.run(DataApplication.class, args);
}
@Bean
ApplicationRunner init(DataRepository dataRepository) {
return args -> {
for (String topic : topics) {
BlockingQueue<DataRecord> queue = new ArrayBlockingQueue<>(1024);
JKafkaConsumer consumer = new JKafkaConsumer(topic, queue);
consumer.start();
JMessageConsumer messageConsumer = new JMessageConsumer(dataRepository, queue);
executor.submit(messageConsumer);
}
executor.shutdown();
};
}
}
, И у Выполнимого Потребителя есть конструктор и выполненный () метод следующим образом:
public JMessageConsumer(DataRepository dataRepository, BlockingQueue<DataRecord> queue) {
this.queue = queue;
this.dataRepository = dataRepository;
}
@Override
public void run() {
running.set(true);
while (running.get()) {
// remove record from FIFO blocking queue
DataRecord dataRecord;
try {
dataRecord = queue.take();
} catch (InterruptedException e) {
logger.error("queue exception: " + e.getMessage());
continue;
}
// write to database
dataRepository.save(dataRecord);
}
}
В изучение так любых мыслей/проблем/обратной связи ценится...