bug-hurd
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

Re: [RFC] kern: simple futex for gnumach (version 10)


From: Richard Braun
Subject: Re: [RFC] kern: simple futex for gnumach (version 10)
Date: Mon, 6 Jan 2014 12:34:32 +0100
User-agent: Mutt/1.5.21 (2010-09-15)

On Sun, Jan 05, 2014 at 11:12:29PM +0100, Marin Ramesa wrote:
> +/* Atomic compare using GCC builtin. */
> +#define atomic_compare(value1, value2) __sync_bool_compare_and_swap(&value1, 
> value2, value1)

The kernel needs its own atomic interface, and it will probably use the
"atomic_" namespace. In the mean time, simply use the GCC builtin
directly, as it makes the conversion job actually easier for the guy who
will add this future atomic interface.

> +#define is_black(x) ((x & RBTREE_COLOR_MASK) == RBTREE_COLOR_BLACK)
> +#define set_black(x) (x & RBTREE_PARENT_MASK) | RBTREE_COLOR_BLACK;

LAST WARNING: LEARN ABOUT OPACITY.
http://en.wikipedia.org/wiki/Opaque_data_type

> +#define thread_addr(node) (node)->children[RBTREE_LEFT]

This line violates the opacity of not one but two modules. Think of it:
a reader skipping this part and reading thread_addr later may be looking
for the definition of this macro in the thread module...

> +/*
> + * When color of the node is red, parent of the node is the futex address, 
> left child or 
> + * the right child is the futex waiter offset and right child is the another 
> futex address. 
> + * When color of the node is black, parent of the node is the futex waiter's 
> offset, left 
> + * child is the thread address and right child is another offset.
> + */

This is ugly.

> +static struct rbtree futex_tree;

http://lists.gnu.org/archive/html/bug-hurd/2013-12/msg00545.html :
"Personally, I'd use a per-task red-black tree".
                       ^^^^^^^^

http://lists.gnu.org/archive/html/bug-hurd/2013-12/msg00546.html :
"To finish with, the more I think about it, the less I understand why
there is a need for futex objects in the first place. Instead of
dynamically allocating such structures, it probably makes more sense
to use the addresses of the physical page descriptors instead."
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

> +/* For the lookup to succeed the return value must be zero. Which means the 
> key node's parent must not be the offset. */
> +static inline int compare_nodes_lookup(struct rbtree_node *node1, struct 
> rbtree_node *node2)
> +{
> +     if (node1 == futex_tree.root)
> +             return node1->parent - node2->parent;
> +     else    
> +             return node1->parent - node2->parent + is_black(node1->parent);
> +}

Really ugly. Also, please note that the rbtree interface expects an int,
whereas your code here returns the difference of longs. In addition, the
parent node stores the color in the least significant bit, but again,
since you're supposed to use the public interface, you shouldn't need to
know that, and you must NOT write external code that relies on it.

> +/* Insert a new address as the node in the futex tree. */
> +static int futex_init(int *address)
> +{
> +     struct rbtree_node *futex;
> +
> +     simple_lock(&futex_lock);
> +
> +     futex = (struct rbtree_node *)kalloc(sizeof(*futex));

Allocating memory may sleep, something that is not permitted when
holding a kernel spin lock in a multiprocessor system. Try to find out
why.

> +     if (futex == NULL)
> +             return FUTEX_RESOURCE_SHORTAGE;

Returning here with the futex lock still held.

> +/* Find the node slot of the futex address in the tree. */
> +static unsigned long futex_lookup_address(int *address)
> +{
> +     unsigned long node_slot = 0;
> +     struct rbtree_node node;
> +
> +     simple_lock(&futex_lock);
> +
> +     node.parent = (unsigned long)address;
> +     rbtree_lookup_slot(&futex_tree, &node, compare_nodes_lookup, node_slot);

The red-black tree interface explicitely supports two kinds of
comparison functions, one for lookups and one for inserts. The
difference between them is that the lookup comparison can just compare
the key against a node, and, as it's written in the header itself,
"users can pass only the value they need for comparison instead of
e.g. allocating a full structure on the stack".
                  ^^^^^^^^^^^^^^^^^^^^^^^^^^^

Stop being lazy and RTFM.

> +int futex_wait(int *address, int value)
> +{
> +     unsigned long node_slot = 0;
> +
> +     lookup:

What is this label still doing here ? Rewrite that into a proper loop.

> +     node_slot = futex_lookup_address(address);
> +
> +     if (node_slot != 0) {
> +             
> +             simple_lock(&futex_lock);
> +             
> +             int addr_value;
> +             copyin(address, &addr_value, sizeof(int));

Since you didn't write the RPCs, we can't know how this data is provided
to the kernel. I know I told you copyin might be OK, but actually it's
not. You need to be *very* careful about this part, as it really is at
the core of the futex idea. The RPCs should provide the memory itself,
so that Mach IPC maps it in the kernel space, and the kernel can merely
do an atomic operation directly, without the copyin. This is normally
accomplished through MiG witchcraft, so start learning about them. Read
at least the Mach 3 server writer's guide.

> +             if (atomic_compare(addr_value, value)) {
> +
> +                     vm_offset_t offset;
> +                     struct rbtree_node node;
> +                     
> +                     /* Fetch the offset from the (task map, futex address) 
> value pair. */
> +                     kern_return_t kr;
> +                     kr = vm_map_lookup(&current_map(), 
> (vm_offset_t)address, VM_PROT_READ, NULL, NULL, &offset, NULL, NULL);

The &current_map() expression results in a struct vm_map **, which is
not the type vm_map_lookup expects.

> +                     if (kr != KERN_SUCCESS) 
> +                             return FUTEX_MEMORY_ERROR;                      
>         

I still don't understand what this lookup is here for. It would only be
useful if you'd actually use the physical page descriptors instead of
futex objects as I described earlier, but in this current version, it
really looks useless.

> +                     /* Mark the node black and write the offset. */
> +                     node.parent = set_black(offset);
> +
> +                     /* Save the current thread address. */
> +                     thread_addr(&node) = (struct rbtree_node 
> *)current_thread();
> +
> +                     rbtree_insert(&futex_tree, &node, 
> compare_nodes_insert_waiter);
> +
> +                     /* Block with the offset as the event. */
> +                     thread_sleep((event_t)node.parent, 
> (simple_lock_t)&futex_lock, FALSE);

What makes you think offsets are unique system-wide ? They are
per-object. Again, use physical page descriptors. Those are.

You may want to enclose this sleep in a loop, checking for the actual
condition to be true, in order to avoid spurious wakeups, although the
Mach wait queues normally prevent those already.

> +                     /* Thread was awakened in thread_wakeup_prim(). Remove 
> the offset. */
> +                     rbtree_remove(&futex_tree, &node);
> +
> +                     return FUTEX_RESUMED_BY_WAKE;
> +
> +             } else {
> +                     simple_unlock(&futex_lock);
> +                     return FUTEX_NO_THREAD_SUSPENDED;
> +             }
> +
> +     } else {
> +             
> +             if (futex_init(address) != 0)
> +                     return FUTEX_RESOURCE_SHORTAGE; 
> +             
> +             goto lookup;
> +     }
> +}
> +
> +int futex_wake(int *address, _Bool wake_all)

Why _Bool and not boolean_t, or even int ?

> +     unsigned node_slot = 0; 
> +     node_slot = futex_lookup_address(address);
> +
> +     if (node_slot != 0) {
> +
> +             simple_lock(&futex_lock);
> +
> +             struct rbtree_node *node;               
> +
> +             for (node = rbtree_first(&futex_tree); node != NULL; node = 
> next_node(node)) {
> +
> +                     /* Clean up the futex with zero waiters. */
> +                     if (next_node(node) == NULL && node != futex_tree.root 
> && !is_black(node->parent)) {
> +                             kfree((vm_offset_t)node, sizeof(*node));
> +                             rbtree_remove(&futex_tree, node);               
>                         

Still releasing memory from the thread waking up instead of the one
being awaken. The awaken one does call rbtree_remove though... And
you're releasing memory *before* removing the node from the tree. How
many more mistakes can one do in two small lines ??

> +int futex_wait_timeout(int *address, int value, unsigned int msec)

Don't write two versions of the same function. If you really want
several functions, they should be simple wrappers. Make futex_wait
take a timeout argument, and define a "no timeout" value such as -1.
I'm not sure unsigned int is the best type for that here (instead of,
say, time_value_t), but we can decide that later.

> +void simple_mutex_lock(struct simple_mutex *smutex)
> +{
> +     int c;
> +
> +     if ((c = cmpxchg(smutex->value, SMUTEX_UNLOCKED, SMUTEX_NO_WAITERS)) != 
> SMUTEX_UNLOCKED) {
> +             if (c != SMUTEX_WAITERS)
> +                     c = xchg(smutex->value, SMUTEX_WAITERS);
> +             while (c != 0) {
> +                     futex_wait(&smutex->value, SMUTEX_WAITERS);
> +                     c = xchg(smutex->value, SMUTEX_WAITERS);
> +             }
> +     }
> +}
> +
> +void simple_mutex_unlock(struct simple_mutex *smutex)
> +{
> +     if (atomic_dec(smutex->value) != SMUTEX_NO_WAITERS) {
> +             smutex->value = SMUTEX_UNLOCKED;
> +             futex_wake(&smutex->value, 0);
> +     }
> +}

Until those are called from userspace, they really don't matter much.

> +/* Definitions for return values. */
> +#define FUTEX_RESUMED_BY_WAKE 0
> +#define FUTEX_NO_THREAD_SUSPENDED 1
> +#define FUTEX_SOME_NUMBER_RESUMED 2
> +#define FUTEX_NO_THREAD 3
> +#define FUTEX_MEMORY_ERROR 4
> +#define FUTEX_RESOURCE_SHORTAGE 6
> +#define FUTEX_TIMED_OUT 7

I'm not sure we need those error codes. RPCs are expected to return the
common Mach error codes anyway. Use a dedicated OUT parameter if you
really need one, but unless you can thoroughly explain why it is needed,
try without.

> +/* Initialization of the futex tree and the locks. */
> +void futex_setup(void);

External users don't care whether there is a tree and a lock.

> +/*
> + * This is a method for a thread to wait for a change of value at a given 
> address.
> + * 
> + * Function performs a lookup of the address in the futex tree. If address is
> + * found, value at the address is atomically compared with the argument 
> value.
> + * If the values are same, offset from the task's map and futex address is
> + * calculated and thread blocks.
> + */
> +int futex_wait(int *address, int value);

Again, external users don't care about the implementation, only the
behaviour they can expect.

> +/*
> + * This is a method for waking up one or all threads waiting for a change of 
> + * value at a given address.
> + * 
> + * Function performs a lookup of the address in the futex tree. If address is
> + * found and wake_all argument is TRUE, all threads with the same offsets are
> + * awakened. If wake_all is FALSE, only one thread from the futex is 
> awakened. 
> + */
> +int futex_wake(int *address, _Bool wake_all);

Likewise.

> +/* RPCs to call from userspace. */
> +int r_futex_wait(task_t task, pointer_t address, int value);
> +int r_futex_wake(task_t task, pointer_t address, boolean_t wake_all);

?

> +/*
> + * The idea is that each thread calls sync_circle_wait() with the same 
> object as 
> + * the argument. Then a thread outside of the sync circle calls 
> sync_circle_signal() 
> + * to send a wakeup signal. Function sync_circle_signal() first modifies the 
> sync_circle's 
> + * value, which closes the sync circle and then calls futex_wake().
> + */
> +struct sync_circle {
> +
> +     int value;
> +
> +};
> +
> +#define decl_sync_circle(class, name) \
> +class struct sync_circle name;
> +
> +void sync_circle_init(struct sync_circle *scircle);
> +int sync_circle_wait(struct sync_circle *scircle);
> +int sync_circle_signal(struct sync_circle *scircle);
> +
> +/* Simple mutex implementation based on futex calls and GCC builtins. */
> +struct simple_mutex {
> +
> +     #define SMUTEX_UNLOCKED 0
> +     #define SMUTEX_NO_WAITERS 1
> +     #define SMUTEX_WAITERS 2
> +
> +     int value;
> +
> +};
> +
> +#define decl_simple_mutex(class, name) \
> +class struct simple_mutex name;
> +
> +void simple_mutex_init(struct simple_mutex *smutex);
> +void simple_mutex_lock(struct simple_mutex *smutex);
> +void simple_mutex_unlock(struct simple_mutex *smutex);

What does all this have antyhing to do in the public header ?

> @@ -158,6 +159,8 @@ void setup_main(void)
>       recompute_priorities(NULL);
>       compute_mach_factor();
>  
> +     futex_setup();
> +

Well, you got that one right, but I suppose it won't be needed once all
the other changes are made, since there should be no globally shared
structure after that point.

-- 
Richard Braun



reply via email to

[Prev in Thread] Current Thread [Next in Thread]