From 522a5cc9d6a5bf27fb500427c7df06fa6e116db4 Mon Sep 17 00:00:00 2001 From: Mark Date: Tue, 31 Mar 2020 11:30:26 +0300 Subject: [PATCH] Add select() on sockets --- arch/amd64/hw/timer.c | 1 + include/net/class.h | 2 + include/net/socket.h | 4 ++ net/raw.c | 93 +++++++++++++++++++++++++------------------ net/socket.c | 7 ++++ sys/sys_file.c | 16 +++----- sys/thread.c | 12 ++++++ sys/wait.c | 1 + 8 files changed, 87 insertions(+), 49 deletions(-) diff --git a/arch/amd64/hw/timer.c b/arch/amd64/hw/timer.c index fb7383d..1df29ae 100644 --- a/arch/amd64/hw/timer.c +++ b/arch/amd64/hw/timer.c @@ -90,6 +90,7 @@ static uint32_t timer_tick(void *arg) { list_for_each_safe(iter, b_iter, &g_sleep_head) { struct io_notify *n = list_entry(iter, struct io_notify, link); struct thread *t = n->owner; + if (!t) { list_del_init(iter); continue; diff --git a/include/net/class.h b/include/net/class.h index f251e8b..d143a36 100644 --- a/include/net/class.h +++ b/include/net/class.h @@ -14,6 +14,8 @@ struct sockops { int (*bind) (struct socket *, struct sockaddr *, size_t); int (*setsockopt) (struct socket *, int, void *, size_t); + + int (*count_pending) (struct socket *); }; struct socket_class { diff --git a/include/net/socket.h b/include/net/socket.h index 98e5d2b..7ab72c3 100644 --- a/include/net/socket.h +++ b/include/net/socket.h @@ -1,5 +1,6 @@ #pragma once #include "sys/types.h" +#include "sys/wait.h" struct sockaddr; struct vfs_ioctx; @@ -9,6 +10,7 @@ struct netdev; struct socket { struct sockops *op; struct vfs_ioctx *ioctx; + struct io_notify rx_notify; void *data; }; @@ -28,3 +30,5 @@ ssize_t net_recvfrom(struct vfs_ioctx *ioctx, int net_bind(struct vfs_ioctx *ioctx, struct ofile *fd, struct sockaddr *sa, size_t len); int net_setsockopt(struct vfs_ioctx *ioctx, struct ofile *fd, int optname, void *optval, size_t optlen); void net_close(struct vfs_ioctx *ioctx, struct ofile *fd); + +int socket_has_data(struct socket *sock); diff --git a/net/raw.c b/net/raw.c index 8eae579..452322b 100644 --- a/net/raw.c +++ b/net/raw.c @@ -25,6 +25,7 @@ static ssize_t raw_socket_recvfrom(struct socket *s, struct sockaddr *sa, size_t *salen); static void raw_socket_close(struct socket *s); +static int raw_socket_count_pending(struct socket *s); static struct sockops raw_socket_ops = { .open = raw_socket_open, .close = raw_socket_close, @@ -34,39 +35,43 @@ static struct sockops raw_socket_ops = { .bind = NULL, .setsockopt = NULL, + + .count_pending = raw_socket_count_pending }; static struct socket_class raw_socket_class = { .name = "raw-packet", .ops = &raw_socket_ops, .domain = AF_PACKET, - .type = SOCK_DGRAM, + .type = SOCK_RAW, .supports = raw_class_supports, }; //// struct raw_socket { - struct thread *wait; + spin_t lock; + struct socket *gen_sock; struct packet_queue queue; - struct raw_socket *prev, *next; + struct list_head link; }; static spin_t g_raw_lock = 0; -static struct raw_socket *g_raw_sockets = NULL; +static LIST_HEAD(g_raw_sockets); void raw_packet_handle(struct packet *p) { // Queue the packet for all the sockets uintptr_t irq; // TODO: too much time spent in spinlocked region? spin_lock_irqsave(&g_raw_lock, &irq); - for (struct raw_socket *r = g_raw_sockets; r; r = r->next) { + + struct raw_socket *r; + list_for_each_entry(r, &g_raw_sockets, link) { packet_ref(p); + spin_lock(&r->lock); packet_queue_push(&r->queue, p); - struct thread *w = r->wait; - if (w) { - r->wait = NULL; - sched_queue(w); - } + spin_release(&r->lock); + _assert(r->gen_sock); + thread_notify_io(&r->gen_sock->rx_notify); } spin_release_irqrestore(&g_raw_lock, &irq); } @@ -80,54 +85,66 @@ static __init void raw_class_register(void) { static int raw_socket_open(struct socket *s) { // TODO: check your privilege + uintptr_t irq; struct raw_socket *r_sock = kmalloc(sizeof(struct raw_socket)); _assert(r_sock); packet_queue_init(&r_sock->queue); - r_sock->prev = NULL; - r_sock->next = g_raw_sockets; - r_sock->wait = NULL; - g_raw_sockets = r_sock; + list_head_init(&r_sock->link); + r_sock->gen_sock = s; + r_sock->lock = 0; + + spin_lock_irqsave(&g_raw_lock, &irq); + list_add(&r_sock->link, &g_raw_sockets); + spin_release_irqrestore(&g_raw_lock, &irq); s->data = r_sock; return 0; } +static int raw_socket_count_pending(struct socket *s) { + struct raw_socket *sock = s->data; + _assert(sock); + return !!sock->queue.head; +} + static ssize_t raw_socket_recvfrom(struct socket *s, void *buf, size_t lim, struct sockaddr *sa, size_t *salen) { - return -EINVAL; - //struct raw_socket *sock = s->data; - //_assert(sock); - //struct thread *t = thread_self; - //_assert(t); - //struct packet *p; + uintptr_t irq; + int res; + struct raw_socket *sock = s->data; + _assert(sock); + struct thread *t = thread_self; + _assert(t); + struct packet *p; - //if (!sock->queue.head) { - // sock->wait = t; + if (!sock->queue.head) { + res = thread_wait_io(t, &sock->gen_sock->rx_notify); - // while (!sock->queue.head) { - // sched_unqueue(t, THREAD_WAITING_NET); - // thread_check_signal(t, 0); - // } - //} + if (res < 0) { + return res; + } + } - //// Read a single packet - //p = packet_queue_pop(&sock->queue); - //_assert(p); + spin_lock_irqsave(&sock->lock, &irq); + p = packet_queue_pop(&sock->queue); + spin_release_irqrestore(&sock->lock, &irq); - //size_t p_size = p->size; - //if (p_size > lim) { - // kerror("Packet buffer overflow\n"); - // return -1; - //} - //memcpy(buf, p->data, p_size); - //packet_unref(p); + _assert(p); - //return p_size; + size_t p_size = p->size; + if (p_size > lim) { + kerror("Packet buffer overflow\n"); + return -1; + } + memcpy(buf, p->data, p_size); + packet_unref(p); + + return p_size; } static void raw_socket_close(struct socket *s) { diff --git a/net/socket.c b/net/socket.c index 7545857..8572126 100644 --- a/net/socket.c +++ b/net/socket.c @@ -13,6 +13,11 @@ void socket_class_register(struct socket_class *cls) { list_add(&cls->link, &g_socket_class_head); } +int socket_has_data(struct socket *sock) { + _assert(sock->op && sock->op->count_pending); + return !!sock->op->count_pending(sock); +} + int net_open(struct vfs_ioctx *ioctx, struct ofile *fd, int dom, int type, int proto) { struct socket_class *cls, *iter; @@ -29,6 +34,7 @@ int net_open(struct vfs_ioctx *ioctx, struct ofile *fd, int dom, int type, int p } if (!cls) { + kinfo("Not supported: %d:%d:%d\n", dom, type, proto); // No support for (dom:type:proto) tuple return -EINVAL; } @@ -38,6 +44,7 @@ int net_open(struct vfs_ioctx *ioctx, struct ofile *fd, int dom, int type, int p fd->flags = OF_SOCKET; fd->socket.ioctx = ioctx; fd->socket.op = cls->ops; + thread_wait_io_init(&fd->socket.rx_notify); return cls->ops->open(&fd->socket); } diff --git a/sys/sys_file.c b/sys/sys_file.c index 1fb58b2..5228d35 100644 --- a/sys/sys_file.c +++ b/sys/sys_file.c @@ -269,7 +269,7 @@ ssize_t sys_readdir(int fd, struct dirent *ent) { static int sys_select_get_ready(struct ofile *fd) { if (fd->flags & OF_SOCKET) { - panic("TODO\n"); + return socket_has_data(&fd->socket); } else { struct vnode *vn = fd->file.vnode; _assert(vn); @@ -292,7 +292,7 @@ static int sys_select_get_ready(struct ofile *fd) { static struct io_notify *sys_select_get_wait(struct ofile *fd) { if (fd->flags & OF_SOCKET) { - panic("TODO\n"); + return &fd->socket.rx_notify; } else { struct vnode *vn = fd->file.vnode; _assert(vn); @@ -330,15 +330,10 @@ int sys_select(int n, fd_set *inp, fd_set *outp, fd_set *excp, struct timeval *t if (FD_ISSET(i, &_inp)) { struct ofile *fd = thr->fds[i]; - if (!fd || (fd->flags & OF_SOCKET)) { + if (!sys_select_get_wait(fd)) { + // Can't wait on that fd return -EBADF; } - - _assert(fd->file.vnode); - if (fd->file.vnode->type != VN_CHR) { - kerror("Tried to select() on non-char device/file: %s\n", fd->file.vnode->name); - return -ENOSYS; - } } } @@ -396,13 +391,12 @@ int sys_select(int n, fd_set *inp, fd_set *outp, fd_set *excp, struct timeval *t if (res < 0) { // Likely interrupted - timer_remove_sleep(thr); + //timer_remove_sleep(thr); break; } // Check if request timed out if (result == &thr->sleep_notify) { - kdebug("Timed out\n"); res = 0; break; } diff --git a/sys/thread.c b/sys/thread.c index eb22eec..1d1686a 100644 --- a/sys/thread.c +++ b/sys/thread.c @@ -230,6 +230,7 @@ int thread_init(struct thread *thr, uintptr_t entry, void *arg, int user) { list_head_init(&thr->g_link); list_head_init(&thr->shm_list); + list_head_init(&thr->wait_head); thread_wait_io_init(&thr->sleep_notify); thread_wait_io_init(&thr->pid_notify); @@ -364,6 +365,7 @@ int sys_fork(struct sys_fork_frame *frame) { list_head_init(&dst->g_link); list_head_init(&dst->shm_list); + list_head_init(&dst->wait_head); thread_wait_io_init(&dst->sleep_notify); thread_wait_io_init(&dst->pid_notify); @@ -627,6 +629,11 @@ __attribute__((noreturn)) void sys_exit(int status) { struct thread *thr = thread_self; kdebug("Thread %d exited with status %d\n", thr->pid, status); + // Clear pending I/O (if exiting from signal interrupting select()) + if (!list_empty(&thr->wait_head)) { + thread_wait_io_clear(thr); + } + thr->exit_status = status; if (thr->parent) { @@ -683,6 +690,11 @@ void sys_sigreturn(void) { } void thread_signal(struct thread *thr, int signum) { + if (thr->sleep_notify.owner) { + thr->sleep_notify.owner = NULL; + timer_remove_sleep(thr); + } + if (thr->cpu == (int) get_cpu()->processor_id) { if (thr == thread_self) { kdebug("Signal will be handled now\n"); diff --git a/sys/wait.c b/sys/wait.c index a439e0a..3e7c5b5 100644 --- a/sys/wait.c +++ b/sys/wait.c @@ -106,6 +106,7 @@ void thread_wait_io_clear(struct thread *t) { while (!list_empty(&t->wait_head)) { struct list_head *h = t->wait_head.next; struct io_notify *n = list_entry(h, struct io_notify, own_link); + // TODO: maybe check here for sleep descriptors and cancel sleeps if needed n->owner = NULL; list_del_init(h); }