bug-hurd
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

Re: [RFC] kern: simple futex for gnumach (version 11)


From: Richard Braun
Subject: Re: [RFC] kern: simple futex for gnumach (version 11)
Date: Thu, 9 Jan 2014 15:45:35 +0100
User-agent: Mutt/1.5.21 (2010-09-15)

On Wed, Jan 08, 2014 at 08:43:28PM +0100, Marin Ramesa wrote:
> As far the code is concerned I bypassed the opacity of red-black tree 
> structures in two places, as I always get a segfault somewhere in the
> rbtree module.

If you need to do that, it's very likely you're not using it correctly.

> Also, I had to create a recursion to generate unique event numbers, but it's
> bounded so I don't think there will be a problem. I expect only one or two
> levels of recursion in actual usage, if any.

Should be fine.

> +routine futex_wait(
> +             task            : task_t;
> +             address         : vm_offset_t;
> +             value           : int;
> +             msec            : int;
> +             private_futex   : boolean_t);
> +
> +routine futex_wake(
> +             task    : task_t;
> +             address : vm_offset_t;
> +             wake_all: boolean_t);

Looks good. You'll probably want the requeue operation too some day, but
that can wait.

> +struct futex {
> +
> +     /* Futex node in the futex tree. */
> +     struct rbtree_node *node;
> +
> +     decl_simple_lock_data( , futex_private_lock);

Don't bother prefixing struct members, just call this one "lock".

> +     vm_offset_t address;
> +
> +     /* If a shared futex this is NULL, otherwise it holds the address of 
> the task. */
> +     task_t task;
> +
> +     /* Block and wakeup event. */
> +     event_t event;
> +
> +     /* Save the map for futex_check_consistency() routine. */
> +     vm_map_t map;

I don't understand this map parameter. You already have the task if the
futex is private, and you don't want to keep the map is it's shared.

> +     /* Save the vm_object for futex_check_consistency() routine. */
> +     vm_object_t object;
> +
> +     /* Number of futex waiters. Needed to release the memory after 
> thread_wakeup_prim(). */
> +     unsigned int num_waiters;

You might want to call that "nr_refs", as it better matches its intended
purpose.

> +static struct futex *shared_futexes;

What is that variable for ?

In addition, keep using the module namespace.

> +/* TODO Should be per-task. */
> +static struct futex *pfutexes; 

Most locks are private, so yes, do work on that too.

> +static inline int compare_nodes_insert(struct rbtree_node *node1, struct 
> rbtree_node *node2)
> +{
> +     if (rbtree_entry(node1, struct futex, node)->address - 
> rbtree_entry(node2, struct futex, node)->address != 0) 
> +             return 1;
> +     else 
> +             return 0;
> +}
> +
> +static inline int compare_nodes_lookup(vm_offset_t address, struct 
> rbtree_node *node2)
> +{
> +     if (address - rbtree_entry(node2, struct futex, node)->address != 0) 
> +             return 1;
> +     else 
> +             return 0;
> +}

The interface is the same as common C comparison functions : 0 for
equality, negative if less than, positive if greater than. I thought
it was rather obvious but the rbtree header could indeed explicitely
state it, 

I also insist again on sticking to the futex_ namespace. Call your
functions futex_cmp_lookup and futex_cmp_insert. Make the _insert one
call the _lookup one for code reuse.

Take a look at kern/slab.c for a usage example of the rbtree interface.

> +static unsigned long futex_init(task_t task, vm_offset_t address, boolean_t 
> private_futex, struct futex *futex)
> +{
> +     unsigned long node_slot = 0;
> +
> +     futex = (struct futex *)kalloc(sizeof(*futex));
> +     if (futex == NULL)
> +             return 0;
> +
> +     simple_lock(&futex_shared_lock);
> +
> +     struct rbtree_node node;
> +     rbtree_node_init(&node);
> +
> +     rbtree_insert(&futex_tree, &node, compare_nodes_insert);
> +     futex->node = &node;
> +     futex->address = address;

You can't insert a node before setting what's necessary for comparisons.
You comparison functions use the futex address for that, which is only
set after insertion... In addition, you only need to lock when
inserting, preparing a new entry can be done completely concurrently
since the object can't be shared yet. Finally, an address alone is not
enough, your comparison functions must compare the whole (object,
offset) pair, and they should even take into account the futex type,
i.e. (object, offset) for shared futexes and (map, address) for private
ones. But then you should have per-task trees of (map, address) pairs,
and different comparison functions for the global tree of shared futexes
and task-local trees of private futexes.

> +     futex->event = NULL;
> +     futex->map = current_map();
> +     futex->num_waiters = 0;
> +     futex->object = NULL;
> +
> +     if (private_futex) {
> +             futex->task = task;
> +             simple_lock_init(&futex->futex_private_lock);
> +     } else 
> +             futex->task = NULL;
> +     
> +     rbtree_lookup_slot(&futex_tree, address, compare_nodes_lookup, 
> node_slot);

Using the slot interface is only useful if you intend to reuse the slot
later for insertion. The goal is to avoid a redundant lookup when
inserting. Since that's not what you're doing, just use rbtree_lookup.

> +     simple_unlock(&futex_shared_lock);
> +
> +     return node_slot;               
> +}
> +
> +static unsigned long futex_lookup_address(vm_offset_t address)
> +{
> +     unsigned long node_slot = 0;
> +
> +     simple_lock(&futex_shared_lock);
> +
> +     rbtree_lookup_slot(&futex_tree, address, compare_nodes_lookup, 
> node_slot);
> +
> +     simple_unlock(&futex_shared_lock);
> +
> +     return node_slot;
> +}    
> +
> +static void futex_deallocate_array_slot(struct futex *futex, struct 
> rbtree_node *node)
> +{
> +     if (futex->task == NULL) {
> +             int i;
> +             for (i = 0; i < ARRAY_SIZE(shared_futexes); i++)
> +                     if (shared_futexes[i].node == node) break;
> +             kfree((vm_offset_t)&shared_futexes[i], 
> sizeof(&shared_futexes[i]));
> +     } else {
> +             int i;
> +             for (i = 0; i < ARRAY_SIZE(pfutexes); i++)
> +                     if (pfutexes[i].node == node) break;
> +             kfree((vm_offset_t)&pfutexes[i], sizeof(&pfutexes[i]));
> +     }
> +}

No, you don't need that second data structure, the trees should be
sufficient. I also don't see any locking or even pointer update...
What's this for, really ?

> +static vm_offset_t futex_gen_unique_event_num(vm_offset_t sum, vm_offset_t 
> add1, vm_offset_t add2)
> +{
> +     if (sum == 0) return 0;
> +     
> +     struct rbtree_node *node;
> +
> +     for(node = futex_tree.root; node != NULL; node = 
> node->children[RBTREE_RIGHT]) {
> +
> +             struct futex *futex;
> +             futex = rbtree_entry(node, struct futex, node);
> +
> +             vm_offset_t futex_event_num = (vm_offset_t)futex->event; 
> +             
> +             if (futex_event_num == sum) {
> +                     
> +                     if (futex_event_num - add2 == add1)
> +                             continue;
> +                     else
> +                             return futex_gen_unique_event_num(sum - 1, 
> add1, add2);
> +
> +             }               
> +     }
> +
> +     return sum;
> +}

??

> +void futex_wait(task_t task, vm_offset_t address, int value, int /* TODO Use 
> time_value_t */ msec, boolean_t private_futex)
> +{
> +     unsigned long node_slot = 0;
> +
> +     node_slot = futex_lookup_address(address);
> +     if (node_slot == 0) {

The values of slot are completely opaque, you're absolutely forbidden
to make any assumption about them. In practice, expect them to not even
be regular pointers since the node index is stored in the lowest bit.
Also, when using slots, you must make sure they remain consistent across
the rbtree calls, which means you must not release the tree lock between
looking up and inserting.

> +             if (private_futex) {
> +                     pfutexes = (struct futex *)kalloc(sizeof(pfutexes));
> +                     if (pfutexes == NULL)
> +                             return;
> +                     node_slot = futex_init(task, address, TRUE, 
> &pfutexes[ARRAY_SIZE(pfutexes) - 1]);
> +             } else {
> +                     shared_futexes = (struct futex 
> *)kalloc(sizeof(shared_futexes));
> +                     if (shared_futexes == NULL)
> +                             return;
> +                     node_slot = futex_init(task, address, FALSE, 
> &shared_futexes[ARRAY_SIZE(shared_futexes) - 1]);
> +             }
> +             if (node_slot == 0) return;
> +     }
> +             
> +     if (__sync_bool_compare_and_swap((int *)address, value, value)) {

What are you trying to do here ?? This call has no particular effect,
except being a memory barrier ...

> +             if (msec != 0) {
> +
> +                     simple_lock(&futex_shared_lock);
> +                             
> +                     thread_timeout_setup(current_thread());
> +
> +                     assert_wait(NULL, TRUE);
> +                     thread_will_wait_with_timeout(current_thread(), 
> (unsigned int)msec);
> +
> +                     simple_unlock(&futex_shared_lock);
> +                     
> +                     thread_block(NULL);
> +                     
> +                     return;
> +
> +             }

This can't work... You're completely isolating the bounded wait case
from the unbounded one. This is merely a sleep, not waiting for
anything.

> +             if (private_futex) {
> +
> +                     struct futex *pfutex;
> +
> +                     simple_lock(&futex_shared_lock);
> +
> +                     pfutex = rbtree_entry(rbtree_slot_parent(node_slot), 
> struct futex, node);
> +                     
> +                     simple_unlock(&futex_shared_lock);

The node slot might become stale when not holding the shared lock.

> +                     simple_lock(&pfutex->futex_private_lock);
> +
> +                     pfutex->num_waiters++;
> +
> +                     pfutex->event = (event_t)futex_gen_unique_event_num
> +                             ((vm_offset_t)pfutex->map + address, 
> (vm_offset_t)pfutex->map, address);

Use the address of the futex object in kernel memory for events, this
is the traditional way of using them.

> +                     thread_sleep(pfutex->event, 
> (simple_lock_t)&pfutex->futex_private_lock, FALSE);

This probably needs to be interruptible.

> +                     pfutex->num_waiters--;
> +                     if (pfutex->num_waiters == 0) {
> +                             rbtree_remove(&futex_tree, pfutex->node);
> +                             futex_deallocate_array_slot(pfutex, 
> pfutex->node);
> +                             kfree((vm_offset_t)pfutex, sizeof(*pfutex));
> +                     }       

Grabbing the futex lock but not releasing it.

> +             } else {
> +
> +                     struct futex *futex;
> +                     vm_object_t object;
> +                     vm_offset_t offset;
> +                     vm_map_version_t version;
> +                     vm_prot_t prot;
> +                     boolean_t wired;
> +                     
> +                     simple_lock(&futex_shared_lock);
> +
> +                     futex = rbtree_entry(rbtree_slot_parent(node_slot), 
> struct futex, node);
> +
> +                     vm_map_lookup(&current_map(), address, VM_PROT_READ, 
> &version, &object, &offset, &prot, &wired);
> +
> +                     futex->object = object;
> +
> +                     futex->event = (event_t)futex_gen_unique_event_num
> +                             ((vm_offset_t)object + offset, 
> (vm_offset_t)object, offset);
> +                     
> +                     thread_sleep(futex->event, 
> (simple_lock_t)&futex_shared_lock, FALSE);
> +                     
> +                     futex->num_waiters--;
> +                     if (futex->num_waiters == 0) {
> +                             rbtree_remove(&futex_tree, futex->node);
> +                             futex_deallocate_array_slot(futex, futex->node);
> +                             kfree((vm_offset_t)futex, sizeof(*futex));
> +                     }
> +             }

Likewise with the shared lock.

> +static int futex_check_consistency(struct futex *futex)
> +{
> +     vm_object_t object;
> +     vm_offset_t offset;
> +     vm_map_version_t version;
> +     vm_prot_t prot;
> +     boolean_t wired;
> +
> +     vm_map_lookup(&futex->map, futex->address, VM_PROT_READ, &version, 
> &object, &offset, &prot, &wired);
> +
> +     if (futex->event == (event_t)((vm_offset_t)object + offset))
> +             return 0;
> +     else {
> +             if ((futex->object->shadowed) && (futex->object->shadow == 
> object))
> +                     return 0;
> +             else
> +                     return -1;
> +     }
> +}

I'm not sure about the usefulness of this function.

> +void futex_wake(task_t task, vm_offset_t address, boolean_t wake_all)
> +{
> +     unsigned node_slot = 0; 
> +     node_slot = futex_lookup_address(address);
> +
> +     if (node_slot != 0) {

Consistency issues again. Fix your mind to always consider everything
might have changed when dropping and reacquiring a lock, except if you
explicitely designed your code otherwise.

> +             simple_lock(&futex_shared_lock);
> +
> +             struct rbtree_node *node;
> +
> +             for(node = futex_tree.root; node != NULL; node = 
> node->children[RBTREE_RIGHT]) {

No. The rbtree interface provides you with what you need for that,
use it.

> +                     struct futex *futex;
> +                     futex = rbtree_entry(node, struct futex, node);
> +                     
> +                     if (futex->task == NULL)
> +                             if (futex_check_consistency(futex) != 0)
> +                                     continue;
> +                                             
> +                     if (wake_all) {                                 
> +                             thread_wakeup(futex->event);
> +                             continue;
> +                     }
> +                     else { 
> +                             thread_wakeup_one(futex->event);
> +                             break;
> +                     }
> +             }
> +
> +             simple_unlock(&futex_shared_lock);
> +     }
> +}

Just get the futex trees done right and the implementation of this
function should become obvious.

> +struct sync_circle {
> +
> +     int value;
> +
> +};
> +                             
> +void sync_circle_init(struct sync_circle *scircle)
> +{
> +     scircle->value = 0;
> +}
> +
> +void sync_circle_wait(struct sync_circle *scircle)
> +{
> +       futex_wait(current_thread()->task, (vm_offset_t)&scircle->value, 
> scircle->value, 0, 1);
> +}
> +
> +void sync_circle_signal(struct sync_circle *scircle)
> +{
> +     scircle->value++;
> +       futex_wake(current_thread()->task, (vm_offset_t)&scircle->value, 1);
> +}
> +
> +struct simple_mutex {
> +
> +     #define SMUTEX_UNLOCKED 0
> +     #define SMUTEX_NO_WAITERS 1
> +     #define SMUTEX_WAITERS 2
> +
> +     int value;
> +
> +};
> +
> +void simple_mutex_init(struct simple_mutex *smutex)
> +{
> +     smutex->value = SMUTEX_UNLOCKED;
> +}
> +
> +void simple_mutex_lock(struct simple_mutex *smutex)
> +{
> +     int c;
> +
> +     if ((c = __sync_val_compare_and_swap(&smutex->value, SMUTEX_UNLOCKED, 
> SMUTEX_NO_WAITERS)) != SMUTEX_UNLOCKED) {
> +             if (c != SMUTEX_WAITERS)
> +                     c = __sync_lock_test_and_set(&smutex->value, 
> SMUTEX_WAITERS);
> +             while (c != SMUTEX_UNLOCKED) {
> +                     futex_wait(current_thread()->task, 
> (vm_offset_t)&smutex->value, SMUTEX_WAITERS, 0, 1);
> +                     c = __sync_lock_test_and_set(&smutex->value, 
> SMUTEX_WAITERS);
> +             }
> +     }
> +}
> +
> +void simple_mutex_unlock(struct simple_mutex *smutex)
> +{
> +     if (__sync_lock_test_and_set(&smutex->value, smutex->value - 1) != 
> SMUTEX_NO_WAITERS) {
> +             smutex->value = SMUTEX_UNLOCKED;
> +             futex_wake(current_thread()->task, (vm_offset_t)&smutex->value, 
> 0);
> +     }
> +}

Userspace tests please.

> +struct futex;
> +
> +void futex_setup(void);
> +
> +/*
> + * This is a method for a thread to wait for a change of value at a given 
> address.
> + * If msec is non-zero, thread blocks for msec amount of time. If it's zero, 
> no
> + * timeout is used. If private_futex is one, futex is not shared between 
> tasks.
> + */
> +void futex_wait(task_t task, vm_offset_t address, int value, int msec, 
> boolean_t private_futex);

s/one/true/

> +/*
> + * This is a method for waking up one or all threads waiting for a change of 
> + * value at a given address.
> + * 
> + * If wake_all is one, all threads are awakened. If it's zero, only one 
> thread is 
> + * awakened.
> + */
> +void futex_wake(task_t task, vm_offset_t address, boolean_t wake_all);

This requires more description. For example, state the address
requirements (pointing and aligned to an int).

-- 
Richard Braun



reply via email to

[Prev in Thread] Current Thread [Next in Thread]