Reworked how waiting/select()ing FDs works

This commit is contained in:
Mark
2020-03-30 18:08:23 +03:00
parent 37ed963272
commit 55d2efda8e
13 changed files with 371 additions and 162 deletions
+35 -40
View File
@@ -23,36 +23,35 @@
#define PIT_CMD 0x43
uint64_t int_timer_ticks = 0;
static struct thread *sleep_head = NULL;
static spin_t g_sleep_lock = 0;
static LIST_HEAD(g_sleep_head);
void timer_add_sleep(struct thread *thr) {
thr->wait_prev = NULL;
thr->wait_next = sleep_head;
sleep_head = thr;
if (thr->pid > 0)
kdebug("Adding %d to sleepers\n", thr->pid);
uintptr_t irq;
struct io_notify *n = &thr->sleep_notify;
spin_lock_irqsave(&g_sleep_lock, &irq);
list_add(&n->link, &g_sleep_head);
spin_release_irqrestore(&g_sleep_lock, &irq);
}
void timer_remove_sleep(struct thread *target) {
struct thread *thr = sleep_head, *prev = NULL;
while (thr) {
struct thread *next = thr->wait_next;
if (thr == target) {
thr->wait_next = NULL;
if (prev) {
prev->wait_next = next;
} else {
sleep_head = next;
}
sched_queue(thr);
thr = next;
void timer_remove_sleep(struct thread *thr) {
if (thr->pid > 0)
kdebug("Removing sleep %d\n", thr->pid);
uintptr_t irq;
struct io_notify *n = &thr->sleep_notify;
struct io_notify *it;
spin_lock_irqsave(&g_sleep_lock, &irq);
list_for_each_entry(it, &g_sleep_head, link) {
if (it == n) {
list_del_init(&it->link);
spin_release_irqrestore(&g_sleep_lock, &irq);
return;
}
prev = thr;
thr = next;
}
spin_release_irqrestore(&g_sleep_lock, &irq);
panic("No such thread\n");
}
@@ -85,27 +84,23 @@ static uint32_t timer_tick(void *arg) {
}
#endif
// Wake up sleepers
struct thread *thr = sleep_head, *prev = NULL;
while (thr) {
struct thread *next = thr->wait_next;
if (thr->sleep_deadline <= system_time) {
thr->wait_next = NULL;
if (prev) {
prev->wait_next = next;
} else {
sleep_head = next;
}
sched_queue(thr);
thr = next;
struct list_head *iter, *b_iter;
uintptr_t irq;
spin_lock_irqsave(&g_sleep_lock, &irq);
list_for_each_safe(iter, b_iter, &g_sleep_head) {
struct io_notify *n = list_entry(iter, struct io_notify, link);
struct thread *t = n->owner;
if (!t) {
list_del_init(iter);
continue;
}
prev = thr;
thr = next;
if (t->sleep_deadline <= system_time) {
list_del_init(iter);
thread_notify_io(n);
}
}
spin_release_irqrestore(&g_sleep_lock, &irq);
return IRQ_UNHANDLED;
}
+2 -1
View File
@@ -60,7 +60,8 @@ OBJS+=$(O)/sys/debug.o \
$(O)/sys/font/logo.o \
$(O)/sys/font/psf.o \
$(O)/sys/font/default8x16.o \
$(O)/sys/shmem.o
$(O)/sys/shmem.o \
$(O)/sys/wait.o
DIRS+=$(O)/sys \
$(O)/sys/char \
$(O)/sys/block \
+3 -1
View File
@@ -1,5 +1,6 @@
#pragma once
#include "sys/types.h"
#include "sys/wait.h"
#include "sys/spin.h"
struct thread;
@@ -15,7 +16,8 @@ struct ring {
char *base;
int flags;
struct thread *reader_head;
// Reader notification
struct io_notify wait;
};
int ring_readable(struct ring *b);
+9
View File
@@ -23,6 +23,10 @@ struct list_head {
#define list_next_entry(pos, member) \
list_entry((pos)->member.next, typeof(*(pos)), member)
#define list_for_each_safe(pos, n, head) \
for (pos = (head)->next, n = pos->next; pos != (head); \
pos = n, n = pos->next)
static inline void list_head_init(struct list_head *list) {
list->next = list;
list->prev = list;
@@ -57,3 +61,8 @@ static inline void list_del(struct list_head *entry) {
static inline int list_empty(const struct list_head *head) {
return head->next == head;
}
static inline void list_del_init(struct list_head *entry) {
__list_del(entry->prev, entry->next);
list_head_init(entry);
}
+7 -3
View File
@@ -2,6 +2,7 @@
#if defined(ARCH_AMD64)
#include "arch/amd64/asm/asm_thread.h"
#endif
#include "sys/wait.h"
#include "sys/list.h"
#include "fs/vfs.h"
#include "sys/mm.h"
@@ -14,8 +15,9 @@ enum thread_state {
THREAD_READY = 1,
THREAD_RUNNING,
THREAD_WAITING,
THREAD_WAITING_IO,
THREAD_WAITING_NET,
// THREAD_WAITING_IO,
// THREAD_WAITING_NET,
THREAD_WAITING_IO2,
THREAD_WAITING_PID,
THREAD_STOPPED
};
@@ -45,10 +47,12 @@ struct thread {
// I/O
struct vfs_ioctx ioctx;
struct ofile *fds[THREAD_MAX_FDS];
struct list_head wait_head;
// Wait
uint64_t sleep_deadline;
struct thread *wait_prev, *wait_next;
struct io_notify sleep_notify;
//struct thread *wait_prev, *wait_next;
// Signal
uintptr_t signal_entry;
+22
View File
@@ -0,0 +1,22 @@
#pragma once
#include "sys/types.h"
#include "sys/spin.h"
#include "sys/list.h"
struct io_notify {
spin_t lock;
struct thread *owner;
size_t value;
struct list_head link, own_link;
};
// Multiple-notifier wait
void thread_wait_io_add(struct thread *t, struct io_notify *n);
int thread_wait_io_any(struct thread *t, struct io_notify **r_n);
void thread_wait_io_clear(struct thread *t);
// Wait for single specific I/O notification
int thread_wait_io(struct thread *t, struct io_notify *n);
void thread_notify_io(struct io_notify *n);
void thread_wait_io_init(struct io_notify *n);
+46 -44
View File
@@ -5,6 +5,7 @@
#include "sys/thread.h"
#include "sys/string.h"
#include "net/socket.h"
#include "user/errno.h"
#include "net/class.h"
#include "sys/sched.h"
#include "sys/debug.h"
@@ -98,60 +99,61 @@ static ssize_t raw_socket_recvfrom(struct socket *s,
size_t lim,
struct sockaddr *sa,
size_t *salen) {
struct raw_socket *sock = s->data;
_assert(sock);
struct thread *t = thread_self;
_assert(t);
struct packet *p;
return -EINVAL;
//struct raw_socket *sock = s->data;
//_assert(sock);
//struct thread *t = thread_self;
//_assert(t);
//struct packet *p;
if (!sock->queue.head) {
sock->wait = t;
//if (!sock->queue.head) {
// sock->wait = t;
while (!sock->queue.head) {
sched_unqueue(t, THREAD_WAITING_NET);
thread_check_signal(t, 0);
}
}
// while (!sock->queue.head) {
// sched_unqueue(t, THREAD_WAITING_NET);
// thread_check_signal(t, 0);
// }
//}
// Read a single packet
p = packet_queue_pop(&sock->queue);
_assert(p);
//// Read a single packet
//p = packet_queue_pop(&sock->queue);
//_assert(p);
size_t p_size = p->size;
if (p_size > lim) {
kerror("Packet buffer overflow\n");
return -1;
}
memcpy(buf, p->data, p_size);
packet_unref(p);
//size_t p_size = p->size;
//if (p_size > lim) {
// kerror("Packet buffer overflow\n");
// return -1;
//}
//memcpy(buf, p->data, p_size);
//packet_unref(p);
return p_size;
//return p_size;
}
static void raw_socket_close(struct socket *s) {
uintptr_t irq;
struct raw_socket *r_sock = s->data;
_assert(r_sock);
//uintptr_t irq;
//struct raw_socket *r_sock = s->data;
//_assert(r_sock);
spin_lock_irqsave(&g_raw_lock, &irq);
struct raw_socket *prev = r_sock->prev;
struct raw_socket *next = r_sock->next;
if (prev) {
prev->next = next;
} else {
g_raw_sockets = next;
}
if (next) {
next->prev = prev;
}
spin_release_irqrestore(&g_raw_lock, &irq);
//spin_lock_irqsave(&g_raw_lock, &irq);
//struct raw_socket *prev = r_sock->prev;
//struct raw_socket *next = r_sock->next;
//if (prev) {
// prev->next = next;
//} else {
// g_raw_sockets = next;
//}
//if (next) {
// next->prev = prev;
//}
//spin_release_irqrestore(&g_raw_lock, &irq);
// Flush packet queue
while (r_sock->queue.head) {
struct packet *p = packet_queue_pop(&r_sock->queue);
packet_unref(p);
}
//// Flush packet queue
//while (r_sock->queue.head) {
// struct packet *p = packet_queue_pop(&r_sock->queue);
// packet_unref(p);
//}
kfree(r_sock);
//kfree(r_sock);
s->data = NULL;
}
+9 -17
View File
@@ -9,6 +9,7 @@
#include "sys/sched.h"
#include "user/inet.h"
#include "sys/debug.h"
#include "sys/wait.h"
#include "net/util.h"
#include "sys/heap.h"
#include "sys/attr.h"
@@ -69,6 +70,7 @@ struct udp_socket {
uint32_t recv_inaddr;
struct netdev *bound; // Bound interface (all packets go to this interface if set)
struct thread *owner; // Thread to suspend on blocking operation
struct io_notify wait;
struct packet_queue pending;
};
@@ -82,12 +84,15 @@ static struct udp_socket *udp_ports[UDP_BIND_COUNT] = {0};
struct udp_socket *udp_socket_create(void) {
struct udp_socket *sock = kmalloc(sizeof(struct udp_socket));
_assert(sock);
struct thread *t = thread_self;
_assert(t);
sock->flags = 0;
sock->port = 0;
sock->recv_port = 0;
sock->owner = NULL;
sock->owner = t;
sock->bound = NULL;
thread_wait_io_init(&sock->wait);
packet_queue_init(&sock->pending);
return sock;
@@ -112,11 +117,7 @@ void udp_handle_frame(struct packet *p, struct eth_frame *eth, struct inet_frame
// Add "pending" packet for this socket
packet_ref(p);
packet_queue_push(&sock->pending, p);
if (sock->flags & UDP_SOCKET_PENDING) {
sock->flags &= ~UDP_SOCKET_PENDING;
sched_queue(sock->owner);
}
thread_notify_io(&sock->wait);
} else {
kdebug("Packet is destined to unbound port: %u\n", dpt);
}
@@ -148,17 +149,8 @@ static ssize_t udp_socket_recvfrom(struct socket *s,
struct thread *t = thread_self;
struct packet *p;
_assert(t);
sock->owner = t;
if (!sock->pending.head) {
sock->flags |= UDP_SOCKET_PENDING;
while ((sock->flags & UDP_SOCKET_PENDING)) {
sched_unqueue(t, THREAD_WAITING_NET);
thread_check_signal(t, 0);
}
}
_assert(thread_wait_io(t, &sock->wait) == 0);
_assert(sock->pending.head);
// Read a single packet
p = packet_queue_pop(&sock->pending);
+17 -23
View File
@@ -1,5 +1,6 @@
#include "sys/char/ring.h"
#include "sys/thread.h"
#include "user/errno.h"
#include "sys/assert.h"
#include "sys/sched.h"
#include "sys/debug.h"
@@ -48,16 +49,17 @@ static void ring_advance_write(struct ring *ring) {
}
}
static void ring_notify_reader(struct ring *ring) {
if (ring->reader_head) {
struct thread *thr = ring->reader_head;
ring->reader_head = NULL; //thr->wait_next;
sched_queue(thr);
}
}
//static void ring_notify_reader(struct ring *ring) {
// if (ring->reader_head) {
// struct thread *thr = ring->reader_head;
// ring->reader_head = NULL; //thr->wait_next;
//
// sched_queue(thr);
// }
//}
int ring_getc(struct thread *ctx, struct ring *ring, char *c, int err) {
int res;
if (err) {
if (!ring_readable(ring)) {
return -1;
@@ -71,18 +73,10 @@ int ring_getc(struct thread *ctx, struct ring *ring, char *c, int err) {
}
if (!ring_readable(ring)) {
asm volatile ("cli");
_assert(ctx);
ctx->wait_prev = NULL;
ctx->wait_next = ctx;
ring->reader_head = ctx;
_assert(ctx->cpu >= 0);
sched_unqueue(ctx, THREAD_WAITING_IO);
thread_check_signal(ctx, 0);
_assert(ctx->cpu >= 0);
if ((res = thread_wait_io(ctx, &ring->wait)) != 0) {
_assert(res == -EINTR);
return res;
}
} else {
break;
}
@@ -103,7 +97,7 @@ int ring_putc(struct thread *ctx, struct ring *ring, char c, int wait) {
ring->base[ring->wr] = c;
ring_advance_write(ring);
ring_notify_reader(ring);
thread_notify_io(&ring->wait);
return 0;
}
@@ -119,7 +113,7 @@ int ring_write(struct thread *ctx, struct ring *ring, const void *buf, size_t le
void ring_signal(struct ring *r, int s) {
r->flags |= s;
ring_notify_reader(r);
thread_notify_io(&r->wait);
}
int ring_init(struct ring *r, size_t cap) {
@@ -127,7 +121,7 @@ int ring_init(struct ring *r, size_t cap) {
r->rd = 0;
r->wr = 0;
r->flags = 0;
r->reader_head = NULL;
thread_wait_io_init(&r->wait);
if (!(r->base = kmalloc(cap))) {
return -1;
+2 -3
View File
@@ -105,9 +105,8 @@ void sched_unqueue(struct thread *thr, enum thread_state new_state) {
_assert((new_state == THREAD_WAITING) ||
(new_state == THREAD_STOPPED) ||
(new_state == THREAD_WAITING_IO) ||
(new_state == THREAD_WAITING_PID) ||
(new_state == THREAD_WAITING_NET));
(new_state == THREAD_WAITING_IO2) ||
(new_state == THREAD_WAITING_PID));
_assert(queue_sizes[cpu_no]);
--queue_sizes[cpu_no];
thr->state = new_state;
+94 -19
View File
@@ -1,4 +1,5 @@
#include "user/errno.h"
#include "arch/amd64/hw/timer.h"
#include "arch/amd64/cpu.h"
#include "fs/ofile.h"
#include "sys/char/ring.h"
@@ -266,7 +267,47 @@ ssize_t sys_readdir(int fd, struct dirent *ent) {
return vfs_readdir(&thr->ioctx, thr->fds[fd], ent);
}
static int sys_select_get_ready(struct ofile *fd) {
if (fd->flags & OF_SOCKET) {
panic("TODO\n");
} else {
struct vnode *vn = fd->file.vnode;
_assert(vn);
switch (vn->type) {
case VN_CHR: {
struct chrdev *chr = vn->dev;
_assert(chr);
if (chr->type == CHRDEV_TTY && (chr->tc.c_iflag & ICANON)) {
return (chr->buffer.flags & (RING_SIGNAL_RET | RING_SIGNAL_EOF | RING_SIGNAL_BRK));
} else {
return !!ring_readable(&chr->buffer);
}
}
default:
panic("Not implemented\n");
}
}
}
static struct io_notify *sys_select_get_wait(struct ofile *fd) {
if (fd->flags & OF_SOCKET) {
panic("TODO\n");
} else {
struct vnode *vn = fd->file.vnode;
_assert(vn);
switch (vn->type) {
case VN_CHR: {
struct chrdev *chr = vn->dev;
_assert(chr);
return &chr->buffer.wait;
}
default:
panic("Not implemented\n");
}
}
}
int sys_select(int n, fd_set *inp, fd_set *outp, fd_set *excp, struct timeval *tv) {
struct thread *thr = get_cpu()->thread;
@@ -307,34 +348,68 @@ int sys_select(int n, fd_set *inp, fd_set *outp, fd_set *excp, struct timeval *t
}
int res;
while (system_time < deadline) {
list_head_init(&thr->wait_head);
for (int i = 0; i < n; ++i) {
if (FD_ISSET(i, &_inp)) {
struct ofile *fd = thr->fds[i];
_assert(fd);
struct io_notify *w = sys_select_get_wait(fd);
_assert(w);
thread_wait_io_add(thr, w);
}
}
thr->sleep_deadline = deadline;
thread_wait_io_add(thr, &thr->sleep_notify);
timer_add_sleep(thr);
struct io_notify *result;
int ready = 0;
while (1) {
res = 0;
// Check if data is available in any of the FDs
for (int i = 0; i < n; ++i) {
if (FD_ISSET(i, &_inp)) {
struct ofile *fd = thr->fds[i];
_assert(fd);
struct vnode *vn = fd->file.vnode;
_assert(vn && vn->type == VN_CHR);
struct chrdev *chr = vn->dev;
_assert(chr);
if (chr->type == CHRDEV_TTY && (chr->tc.c_iflag & ICANON)) {
if (chr->buffer.flags & (RING_SIGNAL_RET | RING_SIGNAL_EOF | RING_SIGNAL_BRK)) {
FD_SET(i, inp);
return 1;
}
} else {
if (ring_readable(&chr->buffer)) {
FD_SET(i, inp);
return 1;
}
if (sys_select_get_ready(fd)) {
// Data available, don't wait
FD_SET(i, inp);
res = 1;
timer_remove_sleep(thr);
break;
}
}
}
// TODO: add something like "listen to all fds events"
asm volatile ("sti; hlt");
thread_check_signal(thr, 0);
if (res) {
break;
}
// Perform a wait for any single event
res = thread_wait_io_any(thr, &result);
ready = 1;
if (res < 0) {
// Likely interrupted
timer_remove_sleep(thr);
break;
}
// Check if request timed out
if (result == &thr->sleep_notify) {
kdebug("Timed out\n");
res = 0;
break;
}
}
return 0;
// Remove select()ed io_notify structures from wait list
thread_wait_io_clear(thr);
return res;
}
+2 -11
View File
@@ -257,6 +257,7 @@ int thread_init(struct thread *thr, uintptr_t entry, void *arg, int user) {
list_head_init(&thr->g_link);
list_head_init(&thr->shm_list);
thread_wait_io_init(&thr->sleep_notify);
uint64_t *stack = (uint64_t *) (thr->data.rsp0_base + thr->data.rsp0_size);
@@ -389,6 +390,7 @@ int sys_fork(struct sys_fork_frame *frame) {
list_head_init(&dst->g_link);
list_head_init(&dst->shm_list);
thread_wait_io_init(&dst->sleep_notify);
dst->data.rsp0_base = MM_VIRTUALIZE(stack_pages);
dst->data.rsp0_size = MM_PAGE_SIZE * 2;
@@ -691,17 +693,6 @@ int sys_waitpid(pid_t pid, int *status) {
return 0;
}
int thread_sleep(struct thread *thr, uint64_t deadline, uint64_t *int_time) {
thr->sleep_deadline = deadline;
timer_add_sleep(thr);
sched_unqueue(thr, THREAD_WAITING);
// Store time when interrupt occured
if (int_time) {
*int_time = system_time;
}
return thread_check_signal(thr, 0);
}
void sys_sigreturn(void) {
context_sigreturn();
}
+123
View File
@@ -0,0 +1,123 @@
#include "arch/amd64/hw/timer.h"
#include "sys/thread.h"
#include "sys/assert.h"
#include "sys/sched.h"
#include "sys/debug.h"
#include "sys/wait.h"
void thread_wait_io_init(struct io_notify *n) {
n->owner = NULL;
n->value = 0;
n->lock = 0;
list_head_init(&n->link);
// For wait_io_any
list_head_init(&n->own_link);
}
int thread_wait_io(struct thread *t, struct io_notify *n) {
uintptr_t irq;
while (1) {
spin_lock_irqsave(&n->lock, &irq);
// Check value
if (n->value) {
// Consume value
n->value = 0;
spin_release_irqrestore(&n->lock, &irq);
return 0;
}
// Wait for the value to change
// TODO: multiple threads waiting on same io_notify
_assert(!n->owner);
n->owner = t;
spin_release_irqrestore(&n->lock, &irq);
sched_unqueue(t, THREAD_WAITING_IO2);
// Check if we were interrupted during io wait
int r = thread_check_signal(t, 0);
if (r != 0) {
n->owner = NULL;
return r;
}
}
}
void thread_notify_io(struct io_notify *n) {
uintptr_t irq;
struct thread *t = NULL;
spin_lock_irqsave(&n->lock, &irq);
++n->value;
t = n->owner;
n->owner = NULL;
spin_release_irqrestore(&n->lock, &irq);
if (t) {
sched_queue(t);
}
}
void thread_wait_io_add(struct thread *thr, struct io_notify *n) {
uintptr_t irq;
spin_lock_irqsave(&n->lock, &irq);
_assert(!n->owner);
n->owner = thr;
list_add(&n->own_link, &thr->wait_head);
spin_release_irqrestore(&n->lock, &irq);
}
int thread_wait_io_any(struct thread *thr, struct io_notify **r_n) {
uintptr_t irq;
struct io_notify *n, *it;
while (1) {
// Check if any of values are non-zero
n = NULL;
list_for_each_entry(it, &thr->wait_head, own_link) {
spin_lock_irqsave(&it->lock, &irq);
if (it->value) {
n = it;
break;
}
spin_release_irqrestore(&it->lock, &irq);
}
if (n) {
// Found ready descriptor
n->value = 0;
spin_release_irqrestore(&it->lock, &irq);
*r_n = n;
return 0;
} else {
// Wait
// TODO: reset owners
sched_unqueue(thr, THREAD_WAITING_IO2);
int r = thread_check_signal(thr, 0);
if (r != 0) {
return r;
}
}
}
}
void thread_wait_io_clear(struct thread *t) {
while (!list_empty(&t->wait_head)) {
struct list_head *h = t->wait_head.next;
struct io_notify *n = list_entry(h, struct io_notify, own_link);
n->owner = NULL;
list_del_init(h);
}
}
int thread_sleep(struct thread *thr, uint64_t deadline, uint64_t *int_time) {
thr->sleep_deadline = deadline;
timer_add_sleep(thr);
return thread_wait_io(thr, &thr->sleep_notify);
//// Store time when interrupt occured
//if (int_time) {
// *int_time = system_time;
//}
}