快捷搜索:  汽车  科技

利用共享内存进行进程间通信(共享内存和信号量的统一封装机制原理与实现)

利用共享内存进行进程间通信(共享内存和信号量的统一封装机制原理与实现)SYSCALL_DEFINE3(shmget key_t key size_t size int shmflg) { struct ipc_namespace *ns; static const struct ipc_ops shm_ops = { .getnew = newseg .associate = shm_security .more_checks = shm_more_checks }; struct ipc_params shm_params; ns = current->nsproxy->ipc_ns; shm_params.key = key; shm_params.flg = shmflg; shm_params.u.size =

一. 前言

本文为进程间通信的最后一篇,介绍共享内存和信号量。之所以将二者一起叙述,是因为二者有着密不可分的关系。共享内存会利用虚拟内存和物理内存的映射关系,让不同进程开辟一块虚拟空间映射到相同的物理内存上,从而实现了两个进程对相同区域的读写,即进程间通信。而信号量则实现了互斥锁,可以为共享内存提供数据一致性的保证,因此二者常结合使用。

二. 基础知识

共享内存的使用范围包括

  • 调用shmget()创建共享内存
  • 调用shmat()映射共享内存至进程虚拟空间
  • 调用shmdt()接触映射关系

信号量有着类似的操作

  • 调用semget()创建信号量集合。
  • 调用semctl(),信号量往往代表某种资源的数量,如果用信号量做互斥,那往往将信号量设置为 1。
  • 调用semop()修改信号量数目,即加锁和解锁之用

整体通信过程可用如下生产者消费者的模式图来理解。

利用共享内存进行进程间通信(共享内存和信号量的统一封装机制原理与实现)(1)

三. 统一封装的接口

消息队列、共享内存和信号量有着统一的封装和管理机制,为此我们提供了对应的名字空间和ipc_ids结构体。根据代码中的定义,第 0 项用于信号量,第 1 项用于消息队列,第 2 项用于共享内存,分别可以通过 sem_ids、msg_ids、shm_ids 来访问。ipc_ids中in_use 表示当前有多少个 ipc,seq 和 next_id 用于一起生成 ipc 唯一的 id,ipcs_idr 是一棵基数树,一旦涉及从一个整数查找一个对象它都是最好的选择。

struct ipc_namespace { ...... struct ipc_ids ids[3]; ...... } __randomize_layout; #define IPC_SEM_IDS 0 #define IPC_MSG_IDS 1 #define IPC_SHM_IDS 2 #define sem_ids(ns) ((ns)->ids[IPC_SEM_IDS]) #define msg_ids(ns) ((ns)->ids[IPC_MSG_IDS]) #define shm_ids(ns) ((ns)->ids[IPC_SHM_IDS]) struct ipc_ids { int in_use; unsigned short seq; struct rw_semaphore rwsem; struct idr ipcs_idr; int max_idx; struct rhashtable key_ht; }; struct idr { struct radix_tree_root idr_rt; unsigned int idr_base; unsigned int idr_next; };

信号量、消息队列、共享内存的通过基数树来管理各自的对象,三种ipc对应的结构体中第一项均为struct kern_ipc_perm,该结构体对应的id会存储在基数树之中,可以通过ipc_obtain_object_idr()获取。

struct sem_array { struct kern_ipc_perm sem_perm; /* permissions .. see ipc.h */ ...... } __randomize_layout; struct msg_queue { struct kern_ipc_perm q_perm; ...... } __randomize_layout; struct shmid_kernel /* private to the kernel */ { struct kern_ipc_perm shm_perm; ...... } __randomize_layout; struct kern_ipc_perm *ipc_obtain_object_idr(struct ipc_ids *ids int id) { struct kern_ipc_perm *out; int lid = ipcid_to_idx(id); out = idr_find(&ids->ipcs_idr lid); return out; }   对于这三种不同的通信方式,会对ipc_obtain_object_idr()进行封装 static inline struct sem_array *sem_obtain_object(struct ipc_namespace *ns int id) { struct kern_ipc_perm *ipcp = ipc_obtain_object_idr(&sem_ids(ns) id); return container_of(ipcp struct sem_array sem_perm); } static inline struct msg_queue *msq_obtain_object(struct ipc_namespace *ns int id) { struct kern_ipc_perm *ipcp = ipc_obtain_object_idr(&msg_ids(ns) id); return container_of(ipcp struct msg_queue q_perm); } static inline struct shmid_kernel *shm_obtain_object(struct ipc_namespace *ns int id) { struct kern_ipc_perm *ipcp = ipc_obtain_object_idr(&shm_ids(ns) id); return container_of(ipcp struct shmid_kernel shm_perm); }

由此,我们实现了对这三种进程间通信方式统一的封装抽象。首先用名字空间存储三种ipc,然后对应的ipc_ids会描述该通信方式的特点,并包含一个基数树存储id从而找到其实际运行的多个通信的结构体。

四. 共享内存的创建和映射4.1 创建共享内存

共享内存的创建通过shmget()实现。该函数创建对应的ipc_namespaace指针并指向该进程的ipc_ns,初始化共享内存对应的操作shm_ops,并将传参key size shmflg封装为传参shm_params,最终调用ipcget()。

SYSCALL_DEFINE3(shmget key_t key size_t size int shmflg) { struct ipc_namespace *ns; static const struct ipc_ops shm_ops = { .getnew = newseg .associate = shm_security .more_checks = shm_more_checks }; struct ipc_params shm_params; ns = current->nsproxy->ipc_ns; shm_params.key = key; shm_params.flg = shmflg; shm_params.u.size = size; return ipcget(ns &shm_ids(ns) &shm_ops &shm_params); }   ipcget()会根据传参key的类型是否是IPC_PRIVATE选择调用ipcget_new()创建或者调用ipcget_public()打开对应的共享内存。 int ipcget(struct ipc_namespace *ns struct ipc_ids *ids const struct ipc_ops *ops struct ipc_params *params) { if (params->key == IPC_PRIVATE) return ipcget_new(ns ids ops params); else return ipcget_public(ns ids ops params); }

ipcget_new()会根据定义的ops->getnew()创建新的ipc对象,即上面定义的newseg()。ipcget_public()会按照 key查找 struct kern_ipc_perm。如果没有找到,那就看是否设置了 IPC_CREAT,如果设置了,就调用ops->getnew()创建一个新的,否则返回错误ENOENT。如果找到了,就将对应的 id 返回。

static int ipcget_new(struct ipc_namespace *ns struct ipc_ids *ids const struct ipc_ops *ops struct ipc_params *params) { int err; down_write(&ids->rwsem); err = ops->getnew(ns params); up_write(&ids->rwsem); return err; } static int ipcget_public(struct ipc_namespace *ns struct ipc_ids *ids const struct ipc_ops *ops struct ipc_params *params) { struct kern_ipc_perm *ipcp; int flg = params->flg; int err; /* * Take the lock as a writer since we are potentially going to add * a new entry read locks are not "upgradable" */ down_write(&ids->rwsem); ipcp = ipc_findkey(ids params->key); if (ipcp == NULL) { /* key not used */ if (!(flg & IPC_CREAT)) err = -ENOENT; else err = ops->getnew(ns params); } else { ...... if (!err) /* * ipc_check_perms returns the IPC id on * success */ err = ipc_check_perms(ns ipcp ops params); } ipc_unlock(ipcp); } up_write(&ids->rwsem); return err; }

所以新的创建最后都会走到注册的newseg()函数。该函数主要逻辑为

  • 通过 kvmalloc() 在直接映射区分配一个 struct shmid_kernel 结构体,该结构体用于描述共享内存。
  • 调用hugetlb_file_setup()或shmem_kernel_file_setup()关联文件。虚拟地址空间可以和物理内存关联,但是页表的申请条件中会避开已分配的映射,即物理内存是某个进程独享的。所以如何实现物理内存向多个进程的虚拟内存映射呢?这里就要靠文件来实现了:虚拟地址空间也可以映射到一个文件,文件是可以跨进程共享的。这里我们并不是映射到硬盘上存储的文件,而是映射到内存文件系统上的文件。这里定要注意区分 shmem 和 shm ,前者是一个文件系统,后者是进程通信机制。
  • 通过 ipc_addid() 将新创建的 struct shmid_kernel 结构挂到 shm_ids 里面的基数树上,返回相应的 id,并且将 struct shmid_kernel 挂到当前进程的 sysvshm 队列中。

/** * newseg - Create a new shared memory segment * @ns: namespace * @params: ptr to the structure that contains key size and shmflg * * Called with shm_ids.rwsem held as a writer. */ static int newseg(struct ipc_namespace *ns struct ipc_params *params) { key_t key = params->key; int shmflg = params->flg; size_t size = params->u.size; int error; struct shmid_kernel *shp; size_t numpages = (size PAGE_SIZE - 1) >> PAGE_SHIFT; struct file *file; char name[13]; ...... shp = kvmalloc(sizeof(*shp) GFP_KERNEL); ...... shp->shm_perm.key = key; shp->shm_perm.mode = (shmflg & S_IRWXUGO); shp->mlock_user = NULL; shp->shm_perm.security = NULL; ...... if (shmflg & SHM_HUGETLB) { ...... file = hugetlb_file_setup(name hugesize acctflag &shp->mlock_user HUGETLB_SHMFS_INODE (shmflg >> SHM_HUGE_SHIFT) & SHM_HUGE_MASK); } else { ...... file = shmem_kernel_file_setup(name size acctflag); } ...... shp->shm_cprid = get_pid(task_tgid(current)); shp->shm_lprid = NULL; shp->shm_atim = shp->shm_dtim = 0; shp->shm_ctim = ktime_get_real_seconds(); shp->shm_segsz = size; shp->shm_nattch = 0; shp->shm_file = file; shp->shm_creator = current; /* ipc_addid() locks shp upon success. */ error = ipc_addid(&shm_ids(ns) &shp->shm_perm ns->shm_ctlmni); ...... list_add(&shp->shm_clist ¤t->sysvshm.shm_clist); /* * shmid gets reported as "inode#" in /proc/pid/maps. * proc-ps tools use this. Changing this will break them. */ file_inode(file)->i_ino = shp->shm_perm.id; ns->shm_tot = numpages; error = shp->shm_perm.id; ...... }

实际上shmem_kernel_file_setup()会在shmem文件系统里面创建一个文件:__shmem_file_setup() 会创建新的 shmem 文件对应的 dentry 和 inode,并将它们两个关联起来,然后分配一个 struct file 结构来表示新的 shmem 文件,并且指向独特的 shmem_file_operations。

/** * shmem_kernel_file_setup - get an unlinked file living in tmpfs which must be kernel internal. * @name: name for dentry (to be seen in /proc/<pid>/maps * @size: size to be set for the file * @flags: VM_NORESERVE suppresses pre-accounting of the entire object size */ struct file *shmem_kernel_file_setup(const char *name loff_t size unsigned long flags) { return __shmem_file_setup(name size flags S_PRIVATE); } static struct file *__shmem_file_setup(const char *name loff_t size unsigned long flags unsigned int i_flags) { struct file *res; struct inode *inode; struct path path; struct super_block *sb; struct qstr this; ...... this.name = name; this.len = strlen(name); this.hash = 0; /* will go */ sb = shm_mnt->mnt_sb; path.mnt = mntget(shm_mnt); path.dentry = d_alloc_pseudo(sb &this); d_set_d_op(path.dentry &anon_ops); ...... inode = shmem_get_inode(sb NULL S_IFREG | S_IRWXUGO 0 flags); inode->i_flags |= i_flags; d_instantiate(path.dentry inode); inode->i_size = size; ...... res = alloc_file(&path FMODE_WRITE | FMODE_READ &shmem_file_operations); return res; }4.2 共享内存的映射

从上面的代码解析中我们知道,共享内存的数据结构 struct shmid_kernel通过它的成员 struct file *shm_file来管理内存文件系统 shmem 上的内存文件。无论这个共享内存是否被映射,shm_file 都是存在的。

对于用户来说,共享内存的映射通过调用shmat()完成。该函数主要逻辑为:

  • 调用shm_obtain_object_check()通过共享内存的 id,在基数树中找到对应的 struct shmid_kernel 结构,通过它找到 shmem 上的内存文件base。
  • 分配结构体struct shm_file_data sfd表示该内存文件base。
  • 创建base的备份文件file,指向该内存文件base,并将private_data保存为sfd。在源码中注释部分已经叙述了为什么要再创建一个文件而不是直接使用base,简而言之就是base是共享内存文件系统shmem中的shm_file,用于管理内存文件,是一个中立、独立于任何一个进程的文件。新创建的 struct file 则专门用于做内存映射。
  • 调用do_mmap_pgoff(),分配vm_area_struct指向虚拟地址空间中未分配区域,其vm_file指向文件file,接着调用shm_file_operations中的mmap()函数,即shm_mmap()完成映射。

SYSCALL_DEFINE3(shmat int shmid char __user * shmaddr int shmflg) { unsigned long ret; long err; err = do_shmat(shmid shmaddr shmflg &ret SHMLBA); force_successful_syscall_return(); return (long)ret; } long do_shmat(int shmid char __user *shmaddr int shmflg ulong *raddr unsigned long shmlba) { struct shmid_kernel *shp; unsigned long addr = (unsigned long)shmaddr; unsigned long size; struct file *file *base; int err; unsigned long flags = MAP_SHARED; unsigned long prot; int acc_mode; struct ipc_namespace *ns; struct shm_file_data *sfd; int f_flags; unsigned long populate = 0; ...... if (shmflg & SHM_RDONLY) { prot = PROT_READ; acc_mode = S_IRUGO; f_flags = O_RDONLY; } else { prot = PROT_READ | PROT_WRITE; acc_mode = S_IRUGO | S_IWUGO; f_flags = O_RDWR; } if (shmflg & SHM_EXEC) { prot |= PROT_EXEC; acc_mode |= S_IXUGO; } /* * We cannot rely on the fs check since SYSV IPC does have an * additional creator id... */ ns = current->nsproxy->ipc_ns; shp = shm_obtain_object_check(ns shmid); ...... /* * We need to take a reference to the real shm file to prevent the * pointer from becoming stale in cases where the lifetime of the outer * file extends beyond that of the shm segment. It's not usually * possible but it can happen during remap_file_pages() emulation as * that unmaps the memory then does ->mmap() via file reference only. * We'll deny the ->mmap() if the shm segment was since removed but to * detect shm ID reuse we need to compare the file pointers. */ base = get_file(shp->shm_file); shp->shm_nattch ; size = i_size_read(file_inode(base)); ipc_unlock_object(&shp->shm_perm); rcu_read_unlock(); err = -ENOMEM; sfd = kzalloc(sizeof(*sfd) GFP_KERNEL); ...... file = alloc_file_clone(base f_flags is_file_hugepages(base) ? &shm_file_operations_huge : &shm_file_operations); ...... sfd->id = shp->shm_perm.id; sfd->ns = get_ipc_ns(ns); sfd->file = base; sfd->vm_ops = NULL; file->private_data = sfd; ...... addr = do_mmap_pgoff(file addr size prot flags 0 &populate NULL); *raddr = addr; err = 0; ...... }   shm_mmap() 中调用了 shm_file_data 中的 file 的 mmap() 函数,这次调用的是 shmem_file_operations 的 mmap,也即 shmem_mmap()。 static int shm_mmap(struct file *file struct vm_area_struct *vma) { struct shm_file_data *sfd = shm_file_data(file); int ret; /* * In case of remap_file_pages() emulation the file can represent an * IPC ID that was removed and possibly even reused by another shm * segment already. Propagate this case as an error to caller. */ ret = __shm_open(vma); if (ret) return ret; ret = call_mmap(sfd->file vma); if (ret) { shm_close(vma); return ret; } sfd->vm_ops = vma->vm_ops; vma->vm_ops = &shm_vm_ops; return 0; } static int shmem_mmap(struct file *file struct vm_area_struct *vma) { file_accessed(file); vma->vm_ops = &shmem_vm_ops; return 0; }

这里vm_area_struct 的 vm_ops 指向 shmem_vm_ops。等从 call_mmap() 中返回之后,shm_file_data 的 vm_ops 指向了 shmem_vm_ops,而 vm_area_struct 的 vm_ops 改为指向 shm_vm_ops。

static const struct vm_operations_struct shm_vm_ops = { .open = shm_open /* callback for a new vm-area open */ .close = shm_close /* callback for when the vm-area is released */ .fault = shm_fault }; static const struct vm_operations_struct shmem_vm_ops = { .fault = shmem_fault .map_pages = filemap_map_pages };

在前文内存映射中,我们提到了实际物理内存的分配不是在映射关系建立时就分配,而是当实际访问的时候通过缺页异常再进行分配。对于共享内存也是一样。当访问不到的时候,先调用 vm_area_struct 的 vm_ops,也即 shm_vm_ops 的 fault 函数 shm_fault()。然后它会转而调用 shm_file_data 的 vm_ops,也即 shmem_vm_ops 的 fault 函数 shmem_fault()。

shmem_fault() 会调用 shmem_getpage_gfp() 在 page cache 和 swap 中找一个空闲页,如果找不到就通过 shmem_alloc_and_acct_page() 分配一个新的页,他最终会调用内存管理系统的 alloc_page_vma 在物理内存中分配一个页。 static int shm_fault(struct vm_fault *vmf) { struct file *file = vmf->vma->vm_file; struct shm_file_data *sfd = shm_file_data(file); return sfd->vm_ops->fault(vmf); } static int shmem_fault(struct vm_fault *vmf) { struct vm_area_struct *vma = vmf->vma; struct inode *inode = file_inode(vma->vm_file); gfp_t gfp = mapping_gfp_mask(inode->i_mapping); ...... error = shmem_getpage_gfp(inode vmf->pgoff &vmf->page sgp gfp vma vmf &ret); ...... } /* * shmem_getpage_gfp - find page in cache or get from swap or allocate * * If we allocate a new one we do not mark it dirty. That's up to the * vm. If we swap it in we mark it dirty since we also free the swap * entry since a page cannot live in both the swap and page cache. * * fault_mm and fault_type are only supplied by shmem_fault: * otherwise they are NULL. */ static int shmem_getpage_gfp(struct inode *inode pgoff_t index struct page **pagep enum sgp_type sgp gfp_t gfp struct vm_area_struct *vma struct vm_fault *vmf int *fault_type) { ...... page = shmem_alloc_and_acct_page(gfp info sbinfo index false); ...... }

至此,共享内存才真的映射到了虚拟地址空间中,进程可以像访问本地内存一样访问共享内存。

五. 信号量的创建和使用5.1 信号量的创建

信号量的创建和共享内存类似,实际调用semget(),操作也大同小异:创建对应的ipc_namespaace指针并指向该进程的ipc_ns,初始化共享内存对应的操作sem_ops,并将传参key size semflg封装为传参sem_params,最终调用ipcget()。

SYSCALL_DEFINE3(semget key_t key int nsems int semflg) { struct ipc_namespace *ns; static const struct ipc_ops sem_ops = { .getnew = newary .associate = sem_security .more_checks = sem_more_checks }; struct ipc_params sem_params; ns = current->nsproxy->ipc_ns; sem_params.key = key; sem_params.flg = semflg; sem_params.u.nsems = nsems; return ipcget(ns &sem_ids(ns) &sem_ops &sem_params); }

共享内存最终走到newseg()函数,而信号量则调用newary(),该函数也有着类似的逻辑:

  • 通过kvmalloc()在直接映射区分配struct sem_array结构体描述该信号量。在该结构体中会有多个信号量保存在struct sem sems[]中,通过semval表示当前信号量。
  • 初始化sem_array和sems中的各个链表
  • 通过ipc_addid()将创建的sem_array挂载到基数树上,并返回对应id

static int newary(struct ipc_namespace *ns struct ipc_params *params) { int retval; struct sem_array *sma; key_t key = params->key; int nsems = params->u.nsems; int semflg = params->flg; int i; ...... sma = sem_alloc(nsems); ...... sma->sem_perm.mode = (semflg & S_IRWXUGO); sma->sem_perm.key = key; sma->sem_perm.security = NULL; ...... for (i = 0; i < nsems; i ) { INIT_LIST_HEAD(&sma->sems[i].pending_alter); INIT_LIST_HEAD(&sma->sems[i].pending_const); spin_lock_init(&sma->sems[i].lock); } sma->complex_count = 0; sma->use_global_lock = USE_GLOBAL_LOCK_HYSTERESIS; INIT_LIST_HEAD(&sma->pending_alter); INIT_LIST_HEAD(&sma->pending_const); INIT_LIST_HEAD(&sma->list_id); sma->sem_nsems = nsems; sma->sem_ctime = ktime_get_real_seconds(); /* ipc_addid() locks sma upon success. */ retval = ipc_addid(&sem_ids(ns) &sma->sem_perm ns->sc_semmni); ...... ns->used_sems = nsems; sem_unlock(sma -1); rcu_read_unlock(); return sma->sem_perm.id; } struct sem_array { struct kern_ipc_perm sem_perm; /* permissions .. see ipc.h */ time64_t sem_ctime; /* create/last semctl() time */ struct list_head pending_alter; /* pending operations */ /* that alter the array */ struct list_head pending_const; /* pending complex operations */ /* that do not alter semvals */ struct list_head list_id; /* undo requests on this array */ int sem_nsems; /* no. of semaphores in array */ int complex_count; /* pending complex operations */ unsigned int use_global_lock;/* >0: global lock required */ struct sem sems[]; } __randomize_layout; struct sem { int semval; /* current value */ /* * PID of the process that last modified the semaphore. For * Linux specifically these are: * - semop * - semctl via SETVAL and SETALL. * - at task exit when performing undo adjustments (see exit_sem). */ struct pid *sempid; spinlock_t lock; /* spinlock for fine-grained semtimedop */ struct list_head pending_alter; /* pending single-sop operations */ /* that alter the semaphore */ struct list_head pending_const; /* pending single-sop operations */ /* that do not alter the semaphore*/ time64_t sem_otime; /* candidate for sem_otime */ } ____cacheline_aligned_in_smp;5.2 信号量的初始化

信号量通过semctl()实现初始化,主要使用semctl_main()和semctl_setval()函数。

SYSCALL_DEFINE4(semctl int semid int semnum int cmd unsigned long arg) { int version; struct ipc_namespace *ns; void __user *p = (void __user *)arg; ns = current->nsproxy->ipc_ns; switch (cmd) { case IPC_INFO: case SEM_INFO: case IPC_STAT: case SEM_STAT: return semctl_nolock(ns semid cmd version p); case GETALL: case GETVAL: case GETPID: case GETNCNT: case GETZCNT: case SETALL: return semctl_main(ns semid semnum cmd p); case SETVAL: return semctl_setval(ns semid semnum arg); case IPC_RMID: case IPC_SET: return semctl_down(ns semid cmd version p); default: return -EINVAL; } }

SETALL操作调用semctl_main(),传参为 union semun 里面的 unsigned short *array,会设置整个信号量集合。semctl_main() 函数中,先是通过 sem_obtain_object_check()根据信号量集合的 id 在基数树里面找到 struct sem_array 对象,发现如果是 SETALL 操作,就将用户的参数中的 unsigned short *array 通过 copy_from_user() 拷贝到内核里面的 sem_io 数组,然后是一个循环,对于信号量集合里面的每一个信号量,设置 semval,以及修改这个信号量值的 pid。

static int semctl_main(struct ipc_namespace *ns int semid int semnum int cmd void __user *p) { struct sem_array *sma; struct sem *curr; int err nsems; ushort fast_sem_io[SEMMSL_FAST]; ushort *sem_io = fast_sem_io; DEFINE_WAKE_Q(wake_q); sma = sem_obtain_object_check(ns semid); nsems = sma->sem_nsems; ...... switch (cmd) { ...... case SETALL: { int i; struct sem_undo *un; ...... if (copy_from_user(sem_io p nsems*sizeof(ushort))) { ...... } ...... for (i = 0; i < nsems; i ) { sma->sems[i].semval = sem_io[i]; sma->sems[i].sempid = task_tgid_vnr(current); } ...... sma->sem_ctime = get_seconds(); /* maybe some queued-up processes were waiting for this */ do_smart_update(sma NULL 0 0 &wake_q); err = 0; goto out_unlock; } } ...... wake_up_q(&wake_q); ...... }

SETVAL 操作调用semctl_setval()函数,传进来的参数 union semun 里面的 int val仅仅会设置某个信号量。在 semctl_setval() 函数中,我们先是通过 sem_obtain_object_check()根据信号量集合的 id 在基数树里面找到 struct sem_array 对象,对于 SETVAL 操作,直接根据参数中的 val 设置 semval,以及修改这个信号量值的 pid。

static int semctl_setval(struct ipc_namespace *ns int semid int semnum unsigned long arg) { struct sem_undo *un; struct sem_array *sma; struct sem *curr; int err val; DEFINE_WAKE_Q(wake_q); ...... sma = sem_obtain_object_check(ns semid); ...... curr = &sma->sems[semnum]; ...... curr->semval = val; curr->sempid = task_tgid_vnr(current); sma->sem_ctime = get_seconds(); /* maybe some queued-up processes were waiting for this */ do_smart_update(sma NULL 0 0 &wake_q); ...... wake_up_q(&wake_q); return 0; }5.3 信号量的操作

信号量的操作通过semop()实现,实际调用sys_emtimedop(),最终调用为do_semtimedop()

SYSCALL_DEFINE3(semop int semid struct sembuf __user * tsops unsigned nsops) { return sys_semtimedop(semid tsops nsops NULL); }

do_semtimedop()是一个很较长的函数,逻辑比较复杂,主要为:

  • 调用copy_from_user()拷贝用户参数至内核态,如对信号量的操作struct sembuf。
  • 如果需要进入等待状态,,则需要设置超时
  • 调用sem_obtain_object_check()根据id获取对应的信号量集合sma
  • 创建struct sem_queue queue表示当前信号量操作。这里之所以称之为queue是因为操作的执行不可预期,因此排在队列之中等待信号量满足条件时再调用perform_atomic_semop()实施信号量操作。
  • 如果不需要等待,则说明信号量操作已完成,也改变了信号量的值。接下来,就是一个标准流程。首先通过 DEFINE_WAKE_Q(wake_q) 声明一个 wake_q,调用 do_smart_update()看这次对于信号量的值的改变可以影响并可以激活等待队列中的哪些 struct sem_queue,然后把它们都放在 wake_q 里面,调用 wake_up_q() 唤醒这些进程。
  • 如果需要等待,则会根据信号量操作是对单个信号量还是整个信号量集合,将queue挂载至信号量链表pending_alter或者信号量集合的链表pending_alter中
  • 进入do-while循环等待,如果没有时间限制则调用schedule()让出CPU资源,如果有则调用schedule_timeout()让出资源并过一段时间后回来。当回来的时候,判断是否等待超时,如果没有等待超时则进入下一轮循环,再次等待,如果超时则退出循环,返回错误。在让出 CPU 的时候,设置进程的状态为 TASK_INTERRUPTIBLE,并且循环的结束会通过 signal_pending 查看是否收到过信号,这说明这个等待信号量的进程是可以被信号中断的,也即一个等待信号量的进程是可以通过 kill 杀掉的。

static long do_semtimedop(int semid struct sembuf __user *tsops unsigned nsops const struct timespec64 *timeout) { int error = -EINVAL; struct sem_array *sma; struct sembuf fast_sops[SEMOPM_FAST]; struct sembuf *sops = fast_sops *sop; struct sem_undo *un; int max locknum; bool undos = false alter = false dupsop = false; struct sem_queue queue; unsigned long dup = 0 jiffies_left = 0; struct ipc_namespace *ns; ns = current->nsproxy->ipc_ns; ...... if (copy_from_user(sops tsops nsops * sizeof(*tsops))) { error = -EFAULT; goto out_free; } if (timeout) { if (timeout->tv_sec < 0 || timeout->tv_nsec < 0 || timeout->tv_nsec >= 1000000000L) { error = -EINVAL; goto out_free; } jiffies_left = timespec64_to_jiffies(timeout); } ...... un = find_alloc_undo(ns semid); ...... sma = sem_obtain_object_check(ns semid); ...... queue.sops = sops; queue.nsops = nsops; queue.undo = un; queue.pid = task_tgid(current); queue.alter = alter; queue.dupsop = dupsop; error = perform_atomic_semop(sma &queue); if (error == 0) { /* non-blocking succesfull path */ DEFINE_WAKE_Q(wake_q); /* * If the operation was successful then do * the required updates. */ if (alter) do_smart_update(sma sops nsops 1 &wake_q); else set_semotime(sma sops); ...... } ...... /* * We need to sleep on this operation so we put the current * task into the pending queue and go to sleep. */ if (nsops == 1) { struct sem *curr; int idx = array_index_nospec(sops->sem_num sma->sem_nsems); curr = &sma->sems[idx]; if (alter) { if (sma->complex_count) { list_add_tail(&queue.list &sma->pending_alter); } else { list_add_tail(&queue.list &curr->pending_alter); } } else { list_add_tail(&queue.list &curr->pending_const); } } else { if (!sma->complex_count) merge_queues(sma); if (alter) list_add_tail(&queue.list &sma->pending_alter); else list_add_tail(&queue.list &sma->pending_const); sma->complex_count ; } do { WRITE_ONCE(queue.status -EINTR); queue.sleeper = current; __set_current_state(TASK_INTERRUPTIBLE); ...... if (timeout) jiffies_left = schedule_timeout(jiffies_left); else schedule(); ...... /* * If an interrupt occurred we have to clean up the queue. */ if (timeout && jiffies_left == 0) error = -EAGAIN; } while (error == -EINTR && !signal_pending(current)); /* spurious */ ...... }

do_smart_update() 会调用 update_queue(),update_queue() 会依次循环整个信号量集合的等待队列 pending_alter或者某个信号量的等待队列,试图在信号量的值变了的情况下,再次尝试 perform_atomic_semop 进行信号量操作。如果不成功,则尝试队列中的下一个;如果尝试成功,则调用 unlink_queue() 从队列上取下来,然后调用 wake_up_sem_queue_prepare()将 q->sleeper 加到 wake_q 上去。q->sleeper 是一个 task_struct,是等待在这个信号量操作上的进程。

static int update_queue(struct sem_array *sma int semnum struct wake_q_head *wake_q) { struct sem_queue *q *tmp; struct list_head *pending_list; int semop_completed = 0; if (semnum == -1) pending_list = &sma->pending_alter; else pending_list = &sma->sems[semnum].pending_alter; again: list_for_each_entry_safe(q tmp pending_list list) { int error restart; ...... error = perform_atomic_semop(sma q); /* Does q->sleeper still need to sleep? */ if (error > 0) continue; unlink_queue(sma q); ...... wake_up_sem_queue_prepare(q error wake_q); ...... } return semop_completed; } static inline void wake_up_sem_queue_prepare(struct sem_queue *q int error struct wake_q_head *wake_q) { wake_q_add(wake_q q->sleeper); ...... }

接下来wake_up_q 就依次唤醒 wake_q 上的所有 task_struct,调用的是进程调度中分析过的 wake_up_process()方法。

void wake_up_q(struct wake_q_head *head) { struct wake_q_node *node = head->first; while (node != WAKE_Q_TAIL) { struct task_struct *task; task = container_of(node struct task_struct wake_q); node = node->next; task->wake_q.next = NULL; wake_up_process(task); put_task_struct(task); } }

perform_atomic_semop() 函数对于所有信号量操作都进行两次循环。在第一次循环中,如果发现计算出的 result 小于 0,则说明必须等待,于是跳到 would_block 中,设置 q->blocking = sop 表示这个 queue 是 block 在这个操作上,然后如果需要等待,则返回 1。如果第一次循环中发现无需等待,则第二个循环实施所有的信号量操作,将信号量的值设置为新的值,并且返回 0。

static int perform_atomic_semop(struct sem_array *sma struct sem_queue *q) { int result sem_op nsops; struct sembuf *sop; struct sem *curr; struct sembuf *sops; struct sem_undo *un; sops = q->sops; nsops = q->nsops; un = q->undo; for (sop = sops; sop < sops nsops; sop ) { curr = &sma->sems[sop->sem_num]; sem_op = sop->sem_op; result = curr->semval; ...... result = sem_op; if (result < 0) goto would_block; ...... if (sop->sem_flg & SEM_UNDO) { int undo = un->semadj[sop->sem_num] - sem_op; ..... } } for (sop = sops; sop < sops nsops; sop ) { curr = &sma->sems[sop->sem_num]; sem_op = sop->sem_op; result = curr->semval; if (sop->sem_flg & SEM_UNDO) { int undo = un->semadj[sop->sem_num] - sem_op; un->semadj[sop->sem_num] = undo; } curr->semval = sem_op; curr->sempid = q->pid; } return 0; would_block: q->blocking = sop; return sop->sem_flg & IPC_NOWAIT ? -EAGAIN : 1; }5.4 SEM_UNDO机制

信号量是整个 Linux 可见的全局资源,而不是某个进程独占的资源,好处是可以跨进程通信,坏处就是如果一个进程通过操作拿到了一个信号量,但是不幸异常退出了,如果没有来得及归还这个信号量,可能所有其他的进程都阻塞了。为此,Linux设计了SEM_UNDO机制解决该问题。

该机制简而言之就是每一个 semop 操作都会保存一个反向 struct sem_undo 操作,当因为某个进程异常退出的时候,这个进程做的所有的操作都会回退,从而保证其他进程可以正常工作。在sem_flg标记位设置SUM_UNDO即可开启该功能。

struct sem_queue { ...... struct sem_undo *undo; /* undo structure */ ...... };

在进程的 task_struct 里面对于信号量有一个成员 struct sysv_sem,里面是一个 struct sem_undo_list将这个进程所有的 semop 所带来的 undo 操作都串起来。

struct task_struct { ...... struct sysv_sem sysvsem; ...... } struct sysv_sem { struct sem_undo_list *undo_list; }; struct sem_undo { struct list_head list_proc; /* per-process list: * * all undos from one process * rcu protected */ struct rcu_head rcu; /* rcu struct for sem_undo */ struct sem_undo_list *ulp; /* back ptr to sem_undo_list */ struct list_head list_id; /* per semaphore array list: * all undos for one array */ int semid; /* semaphore set identifier */ short *semadj; /* array of adjustments */ /* one per semaphore */ }; struct sem_undo_list { atomic_t refcnt; spinlock_t lock; struct list_head list_proc; };

这种设计思想较为常见,在MySQL的innodb的日志系统中也有着类似的实现。

总结

共享内存和信号量是有着相似性有可以共同使用从而完成进程通信的手段。下面引用极客时间中的两幅图来总结二者的整个过程。

猜您喜欢: