У меня создалось впечатление, что flock (2) является потокобезопасным, я недавно наткнулся на случай в коде, когда несколько потоков могут получить блокировку одного и того же файла, что все синхронизируется с использованием получения монопольной блокировки с помощью c api flock. Процесс 25554 - это многопоточное приложение, которое имеет 20 потоков, количество потоков, имеющих блокировку одного и того же файла, меняется, когда происходит взаимоблокировка. Многопоточное приложение testEvent записывает в файл, где было нажатие - это средство чтения из файла. К сожалению, lsof
не выводит значение LWP, поэтому я не могу найти, какие потоки удерживают блокировку. Когда происходит упомянутое ниже условие, и процесс, и потоки застревают в вызове flock, как показано вызовом pstack
или strace
на pid 25569 и 25554. Любые предложения по устранению это в RHEL 4.x.
Одна вещь, которую я хотел обновить, - это то, что flock не ведет себя постоянно неправильно, когда скорость передачи сообщений превышает 2 Мбит / с, только тогда я сталкиваюсь с этой проблемой тупика с flock, ниже этой скорости передачи все файлы. Я сохранил константу num_threads
= 20, size_of_msg
= 1000 байт и просто изменил количество tx сообщений в секунду, начиная с 10 до 100 сообщений, что составляет 20 * 1000 * 100 = 2. мбит / с, когда я увеличиваю количество сообщений до 150, возникает проблема стая.
Я просто хотел спросить, что вы думаете о flockfile c api.
sudo lsof filename.txt
COMMAND PID USER FD TYPE DEVICE SIZE NODE NAME
push 25569 root 11u REG 253.4 1079 49266853 filename.txt
testEvent 25554 root 27uW REG 253.4 1079 49266853 filename.txt
testEvent 25554 root 28uW REG 253.4 1079 49266853 filename.txt
testEvent 25554 root 29uW REG 253.4 1079 49266853 filename.txt
testEvent 25554 root 30uW REG 253.4 1079 49266853 filename.txt
Многопоточная тестовая программа, которая вызовет функцию write_data_lib_func
lib.
void* sendMessage(void *arg) {
int* numOfMessagesPerSecond = (int*) arg;
std::cout <<" Executing p thread id " << pthread_self() << std::endl;
while(!terminateTest) {
Record *er1 = Record::create();
er1.setDate("some data");
for(int i = 0 ; i <=*numOfMessagesPerSecond ; i++){
ec = _write_data_lib_func(*er1);
if( ec != SUCCESS) {
std::cout << "write was not successful" << std::endl;
}
}
delete er1;
sleep(1);
}
return NULL;
Вышеупомянутый метод будет вызываться в потоках pthread в основной функции теста.
for (i=0; i<_numThreads ; ++i) {
rc = pthread_create(&threads[i], NULL, sendMessage, (void *)&_num_msgs);
assert(0 == rc);
}
Вот источник писателя / читателя, по причинам, связанным с собственностью, я не хотел просто вырезать и вставить, источник писателя будет обращаться к нескольким потокам в процессе.
int write_data_lib_func(Record * rec) {
if(fd == -1 ) {
fd = open(fn,O_RDWR| O_CREAT | O_APPEND, 0666);
}
if ( fd >= 0 ) {
/* some code */
if( flock(fd, LOCK_EX) < 0 ) {
print "some error message";
}
else {
if( maxfilesize) {
off_t len = lseek ( fd,0,SEEK_END);
...
...
ftruncate( fd,0);
...
lseek(fd,0,SEEK_SET);
} /* end of max spool size */
if( writev(fd,rec) < 0 ) {
print "some error message" ;
}
if(flock(fd,LOCK_UN) < 0 ) {
print some error message;
}
Со стороны читателя есть процесс-демон без потоков.
int readData() {
while(true) {
if( fd == -1 ) {
fd= open (filename,O_RDWR);
}
if( flock (fd, LOCK_EX) < 0 ) {
print "some error message";
break;
}
if( n = read(fd,readBuf,readBufSize)) < 0 ) {
print "some error message" ;
break;
}
if( off < n ) {
if ( off <= 0 && n > 0 ) {
corrupt_file = true;
}
if ( lseek(fd, off-n, SEEK_CUR) < 0 ) {
print "some error message";
}
if( corrupt_spool ) {
if (ftruncate(fd,0) < 0 ) {
print "some error message";
break;
}
}
}
if( flock(fd, LOCK_UN) < 0 )
print some error message ;
}
}
}