Как вызвать несколько вызовов БД из разных потоков в рамках одной транзакции?

Встроенный метод объектов .size() объектов DataFrameGroupBy фактически возвращает объект Series с размерами группы, а не DataFrame. Если вы хотите, чтобы DataFrame, чей столбец был размером группы, индексированный группами, с настраиваемым именем, вы можете использовать метод .to_frame() и использовать в качестве аргумента имя требуемого столбца.

grpd = df.groupby(['A','B']).size().to_frame('size')

Если вы хотите, чтобы группы были столбцами снова, вы могли бы добавить .reset_index() в конец.

0
задан DBreaker 17 January 2019 в 03:25
поделиться

1 ответ

Не уверенный, если это будет полезно Вам - это работает хорошо в тестовом приложении. Кроме того, не знайте, будет ли это в "благосклонности" высшего персонала Spring, но моя надежда состоит в том, чтобы учиться, таким образом, я отправляю это предложение.

В тестовом приложении Начальной загрузки Spring, следующее вводит репозиторий JPA в ApplicationRunner, который затем вводит то же в Runnables, управляемый ExecutorService. Каждый Выполнимый получает BlockingQueue, который постоянно переполнен отдельным KafkaConsumer (который действует как производитель очереди). Runnables используют queue.takes () для сования от очереди, и это сопровождается repo.save (). (Может с готовностью добавить пакетная вставка для поточной обработки, но не сделала поэтому, так как приложение еще не потребовало его...)

, тестовое приложение в настоящее время реализует JPA для Пост-ГРЭС (или Масштаб времени) DB и выполняет 10 потоков с 10 очередями, питаемыми 10 Потребителями.

репозиторий JPA, обеспечивают

public interface DataRepository extends JpaRepository<DataRecord, Long> {
}

Начальная загрузка Spring, которая Основная программа

@SpringBootApplication
@EntityScan(basePackages = "com.xyz.model")
public class DataApplication {

    private final String[] topics = { "x0", "x1", "x2", "x3", "x4", "x5","x6", "x7", "x8","x9" };
    ExecutorService executor = Executors.newFixedThreadPool(topics.length);


    public static void main(String[] args) {
        SpringApplication.run(DataApplication.class, args);
    }

    @Bean
    ApplicationRunner init(DataRepository dataRepository) {
        return args -> {

            for (String topic : topics) {

                BlockingQueue<DataRecord> queue = new ArrayBlockingQueue<>(1024);
                JKafkaConsumer consumer = new JKafkaConsumer(topic, queue);
                consumer.start();

                JMessageConsumer messageConsumer = new JMessageConsumer(dataRepository, queue);
                executor.submit(messageConsumer);
            }
            executor.shutdown();
        };
    }
}

, И у Выполнимого Потребителя есть конструктор и выполненный () метод следующим образом:

public JMessageConsumer(DataRepository dataRepository, BlockingQueue<DataRecord> queue) {
    this.queue = queue;
    this.dataRepository = dataRepository;
}

@Override
public void run() {
    running.set(true);
    while (running.get()) {
        // remove record from FIFO blocking queue
        DataRecord dataRecord;
        try {
            dataRecord = queue.take();
        } catch (InterruptedException e) {
            logger.error("queue exception: " + e.getMessage());
            continue;
        }
        // write to database 
        dataRepository.save(dataRecord);
    }
}

В изучение так любых мыслей/проблем/обратной связи ценится...

0
ответ дан barnwaldo 7 April 2019 в 21:55
поделиться
Другие вопросы по тегам:

Похожие вопросы: