菜单 学习猿地 - LMONKEY

VIP

开通学习猿地VIP

尊享10项VIP特权 持续新增

知识通关挑战

打卡带练!告别无效练习

接私单赚外块

VIP优先接,累计金额超百万

学习猿地私房课免费学

大厂实战课仅对VIP开放

你的一对一导师

每月可免费咨询大牛30次

领取更多软件工程师实用特权

入驻
53
0

linux ipc/its

原创
05/13 14:22
阅读数 38201

linux线程间消息队列

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <pthread.h>
 
#include <unistd.h>
#include <sys/ipc.h>
#include <sys/msg.h>
 
typedef struct {
    long type;
    char buf[100];
}msg_t;
 
void *write_routine(void *arg)
{
    int msgid = *(int *)arg;
    msg_t msg;
    
    while(1)
    {
        msg_t msg={1,"login"};
        int res = msgsnd(msgid,&msg,sizeof(msg.buf),IPC_NOWAIT);

        sleep(1);
    }
}
 
void *read_routine(void *arg)
{
    int msgid = *(int *)arg;

     msg_t msg;
     //while(1)
     //{
        int res = msgrcv(msgid,&msg,sizeof(msg_t),1,0);
        if(res==-1){
            //perror("msgrcv");
        }

        switch (msg.type) {
            case 1: {
                printf("++++%s\n",msg.buf);
                break;
            }
            case 2: {
                printf("++++++++++++%s\n",msg.buf);
                break;
            }
        }
     //   sleep(1);   
     //}
}
 
int main()
{
    pthread_t wid;
    pthread_t rid;

    int msgid = msgget(1234,IPC_CREAT|0666);
    if(msgid==-1)
    {
        perror("msgget");
    }
 
    int res = pthread_create(&wid,0,write_routine,&msgid);
    if(res){
        printf("%s\n",strerror(res));
    }
 
    int res2 = pthread_create(&rid,0,read_routine,&msgid);
    if(res2){
        printf("%s\n",strerror(res2));
    }
 
     pthread_join(wid,0);
     pthread_join(rid,0);
 
    return 0;
}

gcc -o msg msg.c -pthread

 

linux进程间双向消息队列

server.c

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <pthread.h>
 
#include <unistd.h>
#include <sys/ipc.h>
#include <sys/msg.h>
 
typedef struct {
    long type;
    int  id;
    char buf[100];
}msg_t;
 
void *write_routine(void *arg)
{
    key_t key = ftok("/",'a');
    if(key==-1)
    {
        perror("ftok");
    }
 
     int msqid = msgget(key,IPC_CREAT|0666);
    if(msqid==-1)
    {
        perror("msgget");
    }
 
    msg_t msg;
    
    while(1)
    {
        msg.type = 2;
        msg.id = 2; 
        fgets(msg.buf,100,stdin); 
        //msg_t msg={1,"login"};
        int res = msgsnd(msqid,&msg,sizeof(msg.buf),IPC_NOWAIT);
    }
 
}
 
void *read_routine(void *arg)
{
    key_t key = ftok("/",'b');
    if(key==-1)
    {
        perror("ftok");
    }
 
 
    int msqid = msgget(key,IPC_CREAT|0666);
    if(msqid==-1)
    {
        perror("msgget");
    }
 
     msg_t msg;
     while(1)
     {
        int res = msgrcv(msqid,&msg,sizeof(msg_t),1,IPC_NOWAIT);
        if(res==-1){
            //perror("msgrcv");
        }
        switch (msg.id) {
            case 1: {
                printf("%s\n",msg.buf);
                break;
            }
            case 2: {
                printf("%s\n",msg.buf);
                break;
            }
        }

        msg.id = 0;
        
     }
 
}
  
int main()
{
    pthread_t wid;
    pthread_t rid;
 
    int res = pthread_create(&wid,0,write_routine,0);
    if(res){
        printf("%s\n",strerror(res));
    }
 
    int res2 = pthread_create(&rid,0,read_routine,0);
    if(res2){
        printf("%s\n",strerror(res2));
    }
 
     pthread_join(wid,0);
     pthread_join(rid,0);
 
    return 0;
}

 

client.c

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <pthread.h>
 
#include <unistd.h>
#include <sys/ipc.h>
#include <sys/msg.h>
 
typedef struct {
    long type;
    int  id;
    char buf[100];
}msg_t;

void *read_routine(void *arg)
{
    key_t key = ftok("/",'a');
    if(key==-1)
    {
        perror("ftok");
    }
  
    int msqid = msgget(key,IPC_CREAT|0666);
    if(msqid==-1)
    {
        perror("msgget");
    }
 
     msg_t msg;
     while(1)
     {
        int res = msgrcv(msqid,&msg,sizeof(msg_t),2,IPC_NOWAIT);
        if(res==-1){
            //perror("msgrcv");
        }
        switch (msg.id) {
            case 1: {
                printf("%s\n",msg.buf);
                break;
            }
            case 2: {
                printf("%s\n",msg.buf);
                break;
            }
        }

        msg.id = 0;
     }
  
}
 
void *write_routine(void *arg)
{
    key_t key = ftok("/",'b');
    if(key==-1)
    {
        perror("ftok");
    }
  
    int msqid = msgget(key,IPC_CREAT|0666);
    if(msqid==-1)
    {
        perror("msgget");
    }
    
    msg_t msg;
    msg.type = 1;
    msg.id = 1;
    //msg_t msg={1,"login"};
    while(1)
    {              
        fgets(msg.buf,100,stdin);
        int res = msgsnd(msqid,&msg,sizeof(msg_t),IPC_NOWAIT);
    }
}
 
int main()
{
    pthread_t wid;
    pthread_t rid;
 
    int res = pthread_create(&wid,0,write_routine,0);
    if(res){
        printf("%s\n",strerror(res));
    }
 
    int res2 = pthread_create(&rid,0,read_routine,0);
    if(res2){
        printf("%s\n",strerror(res2));
    }
 
     pthread_join(wid,0);
     pthread_join(rid,0);
 
    return 0;
}

 

Makefile

.PHONY:all
all:server client
server:server.c
    gcc -o server server.c -pthread
client:client.c
    gcc -o client client.c -pthread
.PHONY:clean
clean:
    rm -f server client

 

 

https://blog.csdn.net/weixin_41215479/article/details/81511188

 

线程间消息

https://blog.csdn.net/qq_26391203/article/details/75220449

 

将共享内存改造成一个共享fifo,酷 !

https://github.com/zhoudd1/linux_ipc/tree/master/shm_fifo

 

message queue

https://github.com/mdminhazulhaque/mqueue

 

1、nanomsg

nanomsg(ZeroMQ with C)

https://www.cnblogs.com/dong1/p/9213214.html

2、libmsgque

linux进程通信消息队列

https://gitee.com/fulinux/libmsgque

3、dbus

https://blog.csdn.net/quinta2008/article/details/78472170

 

 

linux ipc

1)signal 

recv.c

#include <stdio.h>
#include <signal.h>
#include <sys/types.h>
#include <unistd.h>
void new_op(int,siginfo_t*,void*);
int main(int argc,char**argv)
{
    struct sigaction act;
    int sig;
    pid_t pid;      
     
    pid=getpid();
    sig=atoi(argv[1]);  
     
    sigemptyset(&act.sa_mask);
    act.sa_sigaction=new_op;
    act.sa_flags=SA_SIGINFO;
    if(sigaction(sig,&act,NULL)<0)
    {
        printf("install sigal error\n");
    }
    while(1)
    {
        sleep(2);
        printf("pid %d wait for the signal\n",pid);
    }
}
void new_op(int signum,siginfo_t *info,void *myact)
{
    printf("the int value is %d \n",info->si_int);
}

send.c

#include <stdio.h>
#include <signal.h>
#include <sys/time.h>
#include <unistd.h>
#include <sys/types.h>
main(int argc,char**argv)
{
    pid_t pid;
    int signum;
    union sigval mysigval;
    signum=atoi(argv[1]);
    pid=(pid_t)atoi(argv[2]);
    mysigval.sival_int=8;
    if(sigqueue(pid,signum,mysigval)==-1)
        printf("send error\n");
    sleep(2);
}

gcc -o recv recv.c

gcc -o send send.c

./recv 1

./send 1 39823

参考  https://www.ibm.com/developerworks/cn/linux/l-ipc/part2/index2.html

 

2)message queue

recv.c

#include <stdio.h>
#include <string.h>
#include <stdlib.h>
#include <stdbool.h>
#include <sys/ipc.h>
#include <sys/msg.h>
#include <errno.h>

#define MSG_LEN     100
#define MSG_KEY    1234

//msg_queue
typedef struct 
{
    long int type; 
    char data[MSG_LEN];   
}msg_t;

static int msgid;

//msg queue init
int msg_queue_creat(key_t key)
{   
    msgid=msgget(key,0666 | IPC_CREAT);
    if(msgid==-1)
    {
        fprintf(stderr,"msgget failed with error: %d\n",errno);        
    }
    return 0;
}

int main()
{    
    msg_queue_creat(MSG_KEY);

    while(1){

        msg_t msg; 
        if(msgrcv(msgid, &msg, sizeof(msg_t), 0, IPC_NOWAIT) == -1)
        {
            //fprintf(stderr,"msgrcv failed with error: %d\n",errno);
        }
        else printf("recv a message: %ld,%s\n",msg.type,msg.data);

        sleep(1);
    }
}

send.c

#include <stdio.h>
#include <string.h>
#include <stdlib.h>
#include <stdbool.h>
#include <sys/ipc.h>
#include <sys/msg.h>
#include <errno.h>

#define MSG_LEN     100
#define MSG_KEY    1234

//msg_queue
typedef struct 
{
    long int type; 
    char data[MSG_LEN];   
}msg_t;

static int msgid;

//msg queue init
int msg_queue_creat(key_t key)
{   
    msgid=msgget(key,0666 | IPC_CREAT);
    if(msgid==-1)
    {
        fprintf(stderr,"msgget failed with error: %d\n",errno);        
    }
    return 0;
}

int main()
{
    msg_queue_creat(MSG_KEY);
 
    while(1) {  
        msg_t msg={1,"hello,world"};
        if(msgsnd(msgid, &msg, sizeof(msg_t), IPC_NOWAIT) == -1)
        {
            //fprintf(stderr,"msgsnd failed with error: %d\n",errno);
        }
        else printf("send a message\n");        
        sleep(2);
    }
}

参考 https://www.ibm.com/developerworks/cn/aix/library/au-ipc/

 

3) semaphore (命名信号量)

有名信号量实现进程间同步功能(print2 先打印,再到 print1 打印)

print1.c

    #include <fcntl.h>           /* For O_* constants */  
    #include <sys/stat.h>        /* For mode constants */  
    #include <semaphore.h>  
    #include <stdio.h>  
      
    void print(sem_t *print1, sem_t *print2)  
    {  
        int i = 0;  
        while(1)  
        {  
            sem_wait(print1);  
            i++;  
            printf("int print1 i = %d\n", i);  
            sem_post(print2);  
        }  
    }  
      
    int main(int argc, char **argv)  
    {     
        sem_t *print1, *print2;  
      
        print1 = sem_open("sem_print1", O_CREAT, 0777, 0);    
        if(SEM_FAILED == print1)  
        {  
            perror("sem_open");  
        }  
          
        print2 = sem_open("sem_print2", O_CREAT, 0777, 1);      
        if(SEM_FAILED == print2)  
        {  
            perror("sem_open");  
        }  
          
        print(print1, print2);  
          
        return 0;  
    }  

print2.c

#include <fcntl.h>           /* For O_* constants */  
#include <sys/stat.h>        /* For mode constants */  
#include <semaphore.h>  
#include <stdio.h>  
  
void print(sem_t *print1, sem_t *print2)  
{  
    int i = 0;  
    while(1)  
    {  
        sem_wait(print2);  
        i++;  
        printf("in print2 i = %d\n", i);  
        sleep(1);  
        sem_post(print1);  
    }  
}  
  
int main(int argc, char **argv)  
{     
    sem_t *print1, *print2;  
      
    print1 = sem_open("sem_print1", O_CREAT, 0777, 0);    
    if(SEM_FAILED == print1)  
    {  
        perror("sem_open");  
    }  
      
    print2 = sem_open("sem_print2", O_CREAT, 0777, 1);    
    if(SEM_FAILED == print2)  
    {  
        perror("sem_open");  
    }  
      
    print(print1, print2);  
      
    return 0;  
}

删除有名信号量示例代码如下:

    #include <semaphore.h>  
    #include <stdio.h>  
      
    void sem_del(char *name)  
    {  
        int ret;  
      
        ret = sem_unlink(name);  
        if(ret < 0)  
        {  
            perror("sem_unlink");  
        }  
    }  
      
    int main(int argc, char **argv)  
    {  
        sem_del("sem_print1"); //删除信号量文件sem_print1  
        sem_del("sem_print2"); //删除信号量文件sem_print2  
          
        return 0;  
    }  

makefile 代码如下:

all:  
    gcc sem_del.c -o sem_del -lpthread  
    gcc print1.c -o print1 -lpthread  
    gcc print2.c -o print2 -lpthread  
clean:  
    rm sem_del print1 print2

运行程序时,先把有名信号量删除(sem_del),再分别运行 print1 和 print2

参考 https://www.cnblogs.com/jfyl1573/p/6820372.html

 

4)Shared Memory

write_shm.c

#include <sys/ipc.h> 
#include <sys/shm.h> 
#include <sys/types.h> 
#include <string.h> 
#include <stdio.h> 
#include <stdlib.h>
int main() { // 1. 创建 SHM int shm_id = shmget(13, 2048, IPC_CREAT | 0666); if (shm_id != -1) { // 2. 映射 SHM void* shm = shmat(shm_id, NULL, 0); if (shm != (void*)-1) { // 3. 写 SHM char str[] = "I'm share memory"; memcpy(shm, str, strlen(str) + 1); // 4. 关闭 SHM shmdt(shm); } else { perror("shmat:"); } } else { perror("shmget:"); } return 0; }

read_shm.c

#include <sys/ipc.h>
#include <sys/shm.h>
#include <sys/types.h>
#include <string.h>
#include <stdio.h>
#include <stdlib.h>

int main() { 
    // 1. 获取 SHM
    int shm_id = shmget(13, 2048, IPC_CREAT | 0666);
    
    if (shm_id != -1) {
        // 2. 映射 SHM
        void* shm = shmat(shm_id, NULL, 0);
        if (shm != (void*)-1) {
            // 3. 读取 SHM
            char str[50] = { 0 };
            memcpy(str, shm, strlen("I'm share memory"));
            printf("shm = %s\n", (char *)shm); 
            // 4. 关闭 SHM
            shmdt(shm);
        } else {
            perror("shmat:");
        }
    } else {
        perror("shmget:");
    }
    if (0 == shmctl(shm_id, IPC_RMID))
        printf("delete shm success.\n");
    return 0;
}

编译:

gcc write_shm.c -o write_shm
gcc read_shm.c -o read_shm

先运行写入 SHM:

./write_shm

再运行读取 SHM:

./read_shm
I'm share memory

成功读取了写进程的写入的数据,虽然不是同步的,但是至少能够获取数据。最后再来分析分析内核中的 SHM 调用过程吧。


参考:https://www.jianshu.com/p/494c2d32e3bb

5)Shared Memory  & sem
这个也是网上找的,忘了在哪里找的。
sem.h
#include <stdio.h>
#include <stdlib.h>
#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/sem.h>
#include <unistd.h>

void init_sem(int , int );
void delete_sem(int );
void sem_p(int );
void sem_v(int );

union semun
{
    int val;
    struct semid_ds *buf;
    unsigned short *array;
};

void init_sem(int sem_id, int init_value)
{
    union semun sem_union;

    sem_union.val = init_value;

    if (semctl(sem_id, 0, SETVAL, sem_union) < 0)
    {
        perror("failed to init_sem");
        exit(-1);
    }

    return ;
}

void delete_sem(int sem_id)
{
    union semun sem_union;

    if (semctl(sem_id, 0, IPC_RMID, sem_union) < 0)
    {
        perror("failed to delete_sem");
        exit(-1);
    }

    return ;
}

void sem_p(int sem_id)
{
    struct sembuf sem_b;

    sem_b.sem_num = 0;
    sem_b.sem_op = -1;
    sem_b.sem_flg = SEM_UNDO;

    if (semop(sem_id, &sem_b, 1) < 0)
    {
        perror("failed to sem_p");
        exit(-1);
    }

    return;
}

void sem_v(int sem_id)
{
    struct sembuf sem_b;

    sem_b.sem_num = 0;
    sem_b.sem_op = 1;
    sem_b.sem_flg = SEM_UNDO;

    if (semop(sem_id, &sem_b, 1) < 0)
    {
        perror("failed to sem_v");
        exit(-1);
    }

    return ;
}
read.c
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <stdbool.h>
#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/shm.h>
#include <errno.h>
#include "sem.h"

typedef struct
{
    double lon;
    double lat;
    double bd_lon;
    double bd_lat;
}gps_info_t;

int main(int argc, const char *argv[])
{
    key_t key;
    int shmid;
    gps_info_t *gps = NULL;
    int create_flag = 0;
    int sem_id;

    if ((key = ftok(".", 'a')) < 0)
    {
        perror("failed to get key");
        exit(-1);
    }

    if ((sem_id = semget(key, 1, 0666 | IPC_CREAT | IPC_EXCL)) < 0)
    {
        if (errno == EEXIST)
        {
            if ((sem_id = semget(key, 1, 0666)) < 0)
            {
                perror("failed to semget");
                exit(-1);
            }
        }
    }

    init_sem(sem_id, 0);

    if ((shmid = shmget(key, sizeof(gps_info_t), 0666 | IPC_CREAT | IPC_EXCL)) < 0)
    {
        if (errno == EEXIST)
        {
            if ((shmid = shmget(key, sizeof(gps_info_t), 0666)) < 0)
            {
                perror("failed to create share memory");
                exit(-1);
            }
        }
        else
        {
            perror("failed to shmget");
            exit(-1);
        }
    }
    else
        create_flag = 1;

    if ((gps = shmat(shmid, NULL, 0)) == (void *)(-1))
    {
        perror("failed to shmat");
        exit(-1);
    }
    
    while(1)
    {
        sem_p(sem_id);

        printf("recv lon: %f\n", gps->lon);
        printf("recv lat: %f\n", gps->lat);
        sleep(1);

    }

    if (create_flag == 1)
    {
        if (shmdt(gps) < 0)
        {
            perror("failed to shmdt");
            exit(-1);
        }

        if (shmctl(shmid, IPC_RMID, NULL) == -1)
        {
            perror("failed to delete share memory");
            exit(-1);
        }

        delete_sem(sem_id);
    }

    return 0;
}

write.c

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <stdbool.h>
#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/shm.h>
#include <errno.h>
#include "sem.h"

typedef struct
{
    double lon;
    double lat;
}gps_info_t;


int main(int argc, const char *argv[])
{
    key_t key;
    gps_info_t *gps = NULL;
    int shmid;
    int create_flag = 0;
    int sem_id;

    if ((key = ftok(".", 'a')) < 0)
    {
        perror("failed to get key");
        exit(-1);
    }

    if ((sem_id = semget(key, 1, 0666 | IPC_CREAT | IPC_EXCL)) < 0)
    {
        if (errno == EEXIST)
        {
            if ((sem_id = semget(key, 1, 0666)) < 0)
            {
                perror("failed to semget");
                exit(-1);
            }
        }
    }

    init_sem(sem_id, 0);

    if ((shmid = shmget(key, sizeof(gps_info_t), 0666 | IPC_CREAT | IPC_EXCL)) < 0)
    {
        if (errno == EEXIST)
        {       
            if ((shmid = shmget(key, sizeof(gps_info_t), 0666)) < 0)
            {
                perror("failed to shmget memory");
                exit(-1);
            }
        }
        else
        {
            perror("failed to shmget");
            exit(-1);
        }
    }
    else 
        create_flag = 1;

    if ((gps = shmat(shmid, NULL, 0)) == (void *)(-1))
    {
        perror("failed to shmat memory");
        exit(-1);
    }

    while(1)
    {
        gps->lon += 1.1;
        gps->lat += 2.2;
        printf("send lon: %f\n", gps->lon);
        printf("send lat: %f\n", gps->lat);
        sem_v(sem_id);

        sleep(2);
    }

    if (create_flag == 1)
    {
        if (shmdt(gps) < 0)
        {
            perror("failed to shmdt memory");
            exit(-1);
        }

        if (shmctl(shmid, IPC_RMID, NULL) == -1)
        {   
            perror("failed to delete share memory");
            exit(-1);
        }

        delete_sem(sem_id);
    }

    return 0;
}

Makefile

OBJ= write read 
all: ${OBJ}
read:
	gcc -g -o read read.c
write:
	gcc -g -o write write.c
clean:
	rm -f ${OBJ}
.PHONY: ${OBJ}

 

 

linux its

1) semaphore

#include <stdio.h>
#include <stdlib.h>
#include <semaphore.h>
#include <pthread.h>

#define err_sys(msg) \
    do { perror(msg); exit(-1); } while(0)
#define err_exit(msg) \
    do { fprintf(stderr, msg); exit(-1); } while(0)

void *r1(void *arg)
{
    sem_t* sems = (sem_t *)arg;
    static int cnt = 10;

    while(cnt--)
    {
        sem_wait(sems);
        printf("I am in r1. I get the sems.\n");
    }
}

void *r2(void *arg)
{
    sem_t* sems = (sem_t *)arg;
    static int cnt = 10;

    while(cnt--)
    {
        printf("I am in r2. I send the sems\n");
        sem_post(sems);
        sleep(1);
    }
}

int main(void)
{
    sem_t sems;
    pthread_t t1, t2;
    
    printf("sems size: %d\n", sizeof(sems));
    /* sem_init()第二个参数为0表示这个信号量是当前进程的局部信号量,否则该信号
     * 就可以在多个进程之间共享 */
    if(sem_init(&sems, 0, 0) < 0)
        err_sys("sem_init error");
    pthread_create(&t1, NULL, r1, &sems);
    pthread_create(&t2, NULL, r2, &sems);

    pthread_join(t1, NULL);
    pthread_join(t2, NULL);
    sem_destroy(&sems);

    return 0;
}

gcc -o pthread_sem_demo pthread_sem_demo.c -pthread

./pthread_sem_demo

参考 http://blog.csdn.net/u012796139/article/details/46743677

 

 3) Linux 线程挂起与唤醒功能 实例

https://blog.csdn.net/waldmer/article/details/23422943

 

 linux进程的休眠(等待队列)

https://www.cnblogs.com/noaming1900/archive/2011/01/14/1935526.html

1)Linux下进程通信方式(共享内存,管道,消息队列,Socket)

https://www.cnblogs.com/lou424/p/5018966.html

2)Compare performance of IPC(Inter-Process Communication), include file, zeromq, socket, unixsocket, share-memory, msq-queue and so on

https://github.com/zhiyuan2007/IPC-performance-compare

 

 3)linux线程间同步(Inter-Thread Synchronization)方式汇总

https://github.com/clpsz/linux-itss

 

linux ipc 扫盲

https://www.ibm.com/developerworks/cn/linux/l-ipc/

 

发表评论

0/200
53 点赞
0 评论
收藏