Merge branch 'for-linus' of git://git.kernel.org/pub/scm/linux/kernel/git/teigland/dlm
authorLinus Torvalds <torvalds@linux-foundation.org>
Fri, 22 Jul 2011 20:16:07 +0000 (13:16 -0700)
committerLinus Torvalds <torvalds@linux-foundation.org>
Fri, 22 Jul 2011 20:16:07 +0000 (13:16 -0700)
* 'for-linus' of git://git.kernel.org/pub/scm/linux/kernel/git/teigland/dlm:
  dlm: don't limit active work items
  dlm: use workqueue for callbacks
  dlm: remove deadlock debug print
  dlm: improve rsb searches
  dlm: keep lkbs in idr
  dlm: fix kmalloc args
  dlm: don't do pointless NULL check, use kzalloc and fix order of arguments
  dlm: dump address of unknown node
  dlm: use vmalloc for hash tables
  dlm: show addresses in configfs

12 files changed:
fs/dlm/ast.c
fs/dlm/ast.h
fs/dlm/config.c
fs/dlm/config.h
fs/dlm/dlm_internal.h
fs/dlm/lock.c
fs/dlm/lockspace.c
fs/dlm/lowcomms.c
fs/dlm/memory.c
fs/dlm/memory.h
fs/dlm/recoverd.c
fs/dlm/user.c

index abc49f2..90e5997 100644 (file)
 #include "dlm_internal.h"
 #include "lock.h"
 #include "user.h"
-#include "ast.h"
-
-#define WAKE_ASTS  0
-
-static uint64_t                        ast_seq_count;
-static struct list_head                ast_queue;
-static spinlock_t              ast_queue_lock;
-static struct task_struct *    astd_task;
-static unsigned long           astd_wakeflags;
-static struct mutex            astd_running;
 
+static uint64_t                        dlm_cb_seq;
+static spinlock_t              dlm_cb_seq_spin;
 
 static void dlm_dump_lkb_callbacks(struct dlm_lkb *lkb)
 {
@@ -57,21 +49,13 @@ static void dlm_dump_lkb_callbacks(struct dlm_lkb *lkb)
        }
 }
 
-void dlm_del_ast(struct dlm_lkb *lkb)
-{
-       spin_lock(&ast_queue_lock);
-       if (!list_empty(&lkb->lkb_astqueue))
-               list_del_init(&lkb->lkb_astqueue);
-       spin_unlock(&ast_queue_lock);
-}
-
 int dlm_add_lkb_callback(struct dlm_lkb *lkb, uint32_t flags, int mode,
                         int status, uint32_t sbflags, uint64_t seq)
 {
        struct dlm_ls *ls = lkb->lkb_resource->res_ls;
        uint64_t prev_seq;
        int prev_mode;
-       int i;
+       int i, rv;
 
        for (i = 0; i < DLM_CALLBACKS_SIZE; i++) {
                if (lkb->lkb_callbacks[i].seq)
@@ -100,7 +84,8 @@ int dlm_add_lkb_callback(struct dlm_lkb *lkb, uint32_t flags, int mode,
                                          mode,
                                          (unsigned long long)prev_seq,
                                          prev_mode);
-                               return 0;
+                               rv = 0;
+                               goto out;
                        }
                }
 
@@ -109,6 +94,7 @@ int dlm_add_lkb_callback(struct dlm_lkb *lkb, uint32_t flags, int mode,
                lkb->lkb_callbacks[i].mode = mode;
                lkb->lkb_callbacks[i].sb_status = status;
                lkb->lkb_callbacks[i].sb_flags = (sbflags & 0x000000FF);
+               rv = 0;
                break;
        }
 
@@ -117,21 +103,24 @@ int dlm_add_lkb_callback(struct dlm_lkb *lkb, uint32_t flags, int mode,
                          lkb->lkb_id, (unsigned long long)seq,
                          flags, mode, status, sbflags);
                dlm_dump_lkb_callbacks(lkb);
-               return -1;
+               rv = -1;
+               goto out;
        }
-
-       return 0;
+ out:
+       return rv;
 }
 
 int dlm_rem_lkb_callback(struct dlm_ls *ls, struct dlm_lkb *lkb,
                         struct dlm_callback *cb, int *resid)
 {
-       int i;
+       int i, rv;
 
        *resid = 0;
 
-       if (!lkb->lkb_callbacks[0].seq)
-               return -ENOENT;
+       if (!lkb->lkb_callbacks[0].seq) {
+               rv = -ENOENT;
+               goto out;
+       }
 
        /* oldest undelivered cb is callbacks[0] */
 
@@ -163,7 +152,8 @@ int dlm_rem_lkb_callback(struct dlm_ls *ls, struct dlm_lkb *lkb,
                                  cb->mode,
                                  (unsigned long long)lkb->lkb_last_cast.seq,
                                  lkb->lkb_last_cast.mode);
-                       return 0;
+                       rv = 0;
+                       goto out;
                }
        }
 
@@ -176,171 +166,150 @@ int dlm_rem_lkb_callback(struct dlm_ls *ls, struct dlm_lkb *lkb,
                memcpy(&lkb->lkb_last_bast, cb, sizeof(struct dlm_callback));
                lkb->lkb_last_bast_time = ktime_get();
        }
-
-       return 0;
+       rv = 0;
+ out:
+       return rv;
 }
 
-void dlm_add_ast(struct dlm_lkb *lkb, uint32_t flags, int mode, int status,
-                uint32_t sbflags)
+void dlm_add_cb(struct dlm_lkb *lkb, uint32_t flags, int mode, int status,
+               uint32_t sbflags)
 {
-       uint64_t seq;
+       struct dlm_ls *ls = lkb->lkb_resource->res_ls;
+       uint64_t new_seq, prev_seq;
        int rv;
 
-       spin_lock(&ast_queue_lock);
-
-       seq = ++ast_seq_count;
+       spin_lock(&dlm_cb_seq_spin);
+       new_seq = ++dlm_cb_seq;
+       spin_unlock(&dlm_cb_seq_spin);
 
        if (lkb->lkb_flags & DLM_IFL_USER) {
-               spin_unlock(&ast_queue_lock);
-               dlm_user_add_ast(lkb, flags, mode, status, sbflags, seq);
+               dlm_user_add_ast(lkb, flags, mode, status, sbflags, new_seq);
                return;
        }
 
-       rv = dlm_add_lkb_callback(lkb, flags, mode, status, sbflags, seq);
-       if (rv < 0) {
-               spin_unlock(&ast_queue_lock);
-               return;
-       }
+       mutex_lock(&lkb->lkb_cb_mutex);
+       prev_seq = lkb->lkb_callbacks[0].seq;
 
-       if (list_empty(&lkb->lkb_astqueue)) {
+       rv = dlm_add_lkb_callback(lkb, flags, mode, status, sbflags, new_seq);
+       if (rv < 0)
+               goto out;
+
+       if (!prev_seq) {
                kref_get(&lkb->lkb_ref);
-               list_add_tail(&lkb->lkb_astqueue, &ast_queue);
-       }
-       spin_unlock(&ast_queue_lock);
 
-       set_bit(WAKE_ASTS, &astd_wakeflags);
-       wake_up_process(astd_task);
+               if (test_bit(LSFL_CB_DELAY, &ls->ls_flags)) {
+                       mutex_lock(&ls->ls_cb_mutex);
+                       list_add(&lkb->lkb_cb_list, &ls->ls_cb_delay);
+                       mutex_unlock(&ls->ls_cb_mutex);
+               } else {
+                       queue_work(ls->ls_callback_wq, &lkb->lkb_cb_work);
+               }
+       }
+ out:
+       mutex_unlock(&lkb->lkb_cb_mutex);
 }
 
-static void process_asts(void)
+void dlm_callback_work(struct work_struct *work)
 {
-       struct dlm_ls *ls = NULL;
-       struct dlm_rsb *r = NULL;
-       struct dlm_lkb *lkb;
+       struct dlm_lkb *lkb = container_of(work, struct dlm_lkb, lkb_cb_work);
+       struct dlm_ls *ls = lkb->lkb_resource->res_ls;
        void (*castfn) (void *astparam);
        void (*bastfn) (void *astparam, int mode);
        struct dlm_callback callbacks[DLM_CALLBACKS_SIZE];
        int i, rv, resid;
 
-repeat:
-       spin_lock(&ast_queue_lock);
-       list_for_each_entry(lkb, &ast_queue, lkb_astqueue) {
-               r = lkb->lkb_resource;
-               ls = r->res_ls;
+       memset(&callbacks, 0, sizeof(callbacks));
 
-               if (dlm_locking_stopped(ls))
-                       continue;
-
-               /* we remove from astqueue list and remove everything in
-                  lkb_callbacks before releasing the spinlock so empty
-                  lkb_astqueue is always consistent with empty lkb_callbacks */
-
-               list_del_init(&lkb->lkb_astqueue);
-
-               castfn = lkb->lkb_astfn;
-               bastfn = lkb->lkb_bastfn;
+       mutex_lock(&lkb->lkb_cb_mutex);
+       if (!lkb->lkb_callbacks[0].seq) {
+               /* no callback work exists, shouldn't happen */
+               log_error(ls, "dlm_callback_work %x no work", lkb->lkb_id);
+               dlm_print_lkb(lkb);
+               dlm_dump_lkb_callbacks(lkb);
+       }
 
-               memset(&callbacks, 0, sizeof(callbacks));
+       for (i = 0; i < DLM_CALLBACKS_SIZE; i++) {
+               rv = dlm_rem_lkb_callback(ls, lkb, &callbacks[i], &resid);
+               if (rv < 0)
+                       break;
+       }
 
-               for (i = 0; i < DLM_CALLBACKS_SIZE; i++) {
-                       rv = dlm_rem_lkb_callback(ls, lkb, &callbacks[i], &resid);
-                       if (rv < 0)
-                               break;
-               }
-               spin_unlock(&ast_queue_lock);
+       if (resid) {
+               /* cbs remain, loop should have removed all, shouldn't happen */
+               log_error(ls, "dlm_callback_work %x resid %d", lkb->lkb_id,
+                         resid);
+               dlm_print_lkb(lkb);
+               dlm_dump_lkb_callbacks(lkb);
+       }
+       mutex_unlock(&lkb->lkb_cb_mutex);
 
-               if (resid) {
-                       /* shouldn't happen, for loop should have removed all */
-                       log_error(ls, "callback resid %d lkb %x",
-                                 resid, lkb->lkb_id);
-               }
+       castfn = lkb->lkb_astfn;
+       bastfn = lkb->lkb_bastfn;
 
-               for (i = 0; i < DLM_CALLBACKS_SIZE; i++) {
-                       if (!callbacks[i].seq)
-                               break;
-                       if (callbacks[i].flags & DLM_CB_SKIP) {
-                               continue;
-                       } else if (callbacks[i].flags & DLM_CB_BAST) {
-                               bastfn(lkb->lkb_astparam, callbacks[i].mode);
-                       } else if (callbacks[i].flags & DLM_CB_CAST) {
-                               lkb->lkb_lksb->sb_status = callbacks[i].sb_status;
-                               lkb->lkb_lksb->sb_flags = callbacks[i].sb_flags;
-                               castfn(lkb->lkb_astparam);
-                       }
+       for (i = 0; i < DLM_CALLBACKS_SIZE; i++) {
+               if (!callbacks[i].seq)
+                       break;
+               if (callbacks[i].flags & DLM_CB_SKIP) {
+                       continue;
+               } else if (callbacks[i].flags & DLM_CB_BAST) {
+                       bastfn(lkb->lkb_astparam, callbacks[i].mode);
+               } else if (callbacks[i].flags & DLM_CB_CAST) {
+                       lkb->lkb_lksb->sb_status = callbacks[i].sb_status;
+                       lkb->lkb_lksb->sb_flags = callbacks[i].sb_flags;
+                       castfn(lkb->lkb_astparam);
                }
-
-               /* removes ref for ast_queue, may cause lkb to be freed */
-               dlm_put_lkb(lkb);
-
-               cond_resched();
-               goto repeat;
        }
-       spin_unlock(&ast_queue_lock);
-}
-
-static inline int no_asts(void)
-{
-       int ret;
 
-       spin_lock(&ast_queue_lock);
-       ret = list_empty(&ast_queue);
-       spin_unlock(&ast_queue_lock);
-       return ret;
+       /* undo kref_get from dlm_add_callback, may cause lkb to be freed */
+       dlm_put_lkb(lkb);
 }
 
-static int dlm_astd(void *data)
+int dlm_callback_start(struct dlm_ls *ls)
 {
-       while (!kthread_should_stop()) {
-               set_current_state(TASK_INTERRUPTIBLE);
-               if (!test_bit(WAKE_ASTS, &astd_wakeflags))
-                       schedule();
-               set_current_state(TASK_RUNNING);
-
-               mutex_lock(&astd_running);
-               if (test_and_clear_bit(WAKE_ASTS, &astd_wakeflags))
-                       process_asts();
-               mutex_unlock(&astd_running);
+       ls->ls_callback_wq = alloc_workqueue("dlm_callback",
+                                            WQ_UNBOUND |
+                                            WQ_MEM_RECLAIM |
+                                            WQ_NON_REENTRANT,
+                                            0);
+       if (!ls->ls_callback_wq) {
+               log_print("can't start dlm_callback workqueue");
+               return -ENOMEM;
        }
        return 0;
 }
 
-void dlm_astd_wake(void)
+void dlm_callback_stop(struct dlm_ls *ls)
 {
-       if (!no_asts()) {
-               set_bit(WAKE_ASTS, &astd_wakeflags);
-               wake_up_process(astd_task);
-       }
+       if (ls->ls_callback_wq)
+               destroy_workqueue(ls->ls_callback_wq);
 }
 
-int dlm_astd_start(void)
+void dlm_callback_suspend(struct dlm_ls *ls)
 {
-       struct task_struct *p;
-       int error = 0;
-
-       INIT_LIST_HEAD(&ast_queue);
-       spin_lock_init(&ast_queue_lock);
-       mutex_init(&astd_running);
-
-       p = kthread_run(dlm_astd, NULL, "dlm_astd");
-       if (IS_ERR(p))
-               error = PTR_ERR(p);
-       else
-               astd_task = p;
-       return error;
-}
+       set_bit(LSFL_CB_DELAY, &ls->ls_flags);
 
-void dlm_astd_stop(void)
-{
-       kthread_stop(astd_task);
+       if (ls->ls_callback_wq)
+               flush_workqueue(ls->ls_callback_wq);
 }
 
-void dlm_astd_suspend(void)
+void dlm_callback_resume(struct dlm_ls *ls)
 {
-       mutex_lock(&astd_running);
-}
+       struct dlm_lkb *lkb, *safe;
+       int count = 0;
 
-void dlm_astd_resume(void)
-{
-       mutex_unlock(&astd_running);
+       clear_bit(LSFL_CB_DELAY, &ls->ls_flags);
+
+       if (!ls->ls_callback_wq)
+               return;
+
+       mutex_lock(&ls->ls_cb_mutex);
+       list_for_each_entry_safe(lkb, safe, &ls->ls_cb_delay, lkb_cb_list) {
+               list_del_init(&lkb->lkb_cb_list);
+               queue_work(ls->ls_callback_wq, &lkb->lkb_cb_work);
+               count++;
+       }
+       mutex_unlock(&ls->ls_cb_mutex);
+
+       log_debug(ls, "dlm_callback_resume %d", count);
 }
 
index 8aa89c9..757b551 100644 (file)
@@ -18,14 +18,15 @@ int dlm_add_lkb_callback(struct dlm_lkb *lkb, uint32_t flags, int mode,
                          int status, uint32_t sbflags, uint64_t seq);
 int dlm_rem_lkb_callback(struct dlm_ls *ls, struct dlm_lkb *lkb,
                          struct dlm_callback *cb, int *resid);
-void dlm_add_ast(struct dlm_lkb *lkb, uint32_t flags, int mode, int status,
-                uint32_t sbflags);
+void dlm_add_cb(struct dlm_lkb *lkb, uint32_t flags, int mode, int status,
+                uint32_t sbflags);
 
-void dlm_astd_wake(void);
-int dlm_astd_start(void);
-void dlm_astd_stop(void);
-void dlm_astd_suspend(void);
-void dlm_astd_resume(void);
+void dlm_callback_work(struct work_struct *work);
+int dlm_callback_start(struct dlm_ls *ls);
+void dlm_callback_stop(struct dlm_ls *ls);
+void dlm_callback_suspend(struct dlm_ls *ls);
+void dlm_callback_resume(struct dlm_ls *ls);
 
 #endif
 
+
index 9b026ea..6cf72fc 100644 (file)
@@ -28,7 +28,8 @@
  * /config/dlm/<cluster>/spaces/<space>/nodes/<node>/weight
  * /config/dlm/<cluster>/comms/<comm>/nodeid
  * /config/dlm/<cluster>/comms/<comm>/local
- * /config/dlm/<cluster>/comms/<comm>/addr
+ * /config/dlm/<cluster>/comms/<comm>/addr      (write only)
+ * /config/dlm/<cluster>/comms/<comm>/addr_list (read only)
  * The <cluster> level is useless, but I haven't figured out how to avoid it.
  */
 
@@ -80,6 +81,7 @@ static ssize_t comm_local_write(struct dlm_comm *cm, const char *buf,
                                size_t len);
 static ssize_t comm_addr_write(struct dlm_comm *cm, const char *buf,
                                size_t len);
+static ssize_t comm_addr_list_read(struct dlm_comm *cm, char *buf);
 static ssize_t node_nodeid_read(struct dlm_node *nd, char *buf);
 static ssize_t node_nodeid_write(struct dlm_node *nd, const char *buf,
                                size_t len);
@@ -92,7 +94,6 @@ struct dlm_cluster {
        unsigned int cl_tcp_port;
        unsigned int cl_buffer_size;
        unsigned int cl_rsbtbl_size;
-       unsigned int cl_lkbtbl_size;
        unsigned int cl_dirtbl_size;
        unsigned int cl_recover_timer;
        unsigned int cl_toss_secs;
@@ -101,13 +102,13 @@ struct dlm_cluster {
        unsigned int cl_protocol;
        unsigned int cl_timewarn_cs;
        unsigned int cl_waitwarn_us;
+       unsigned int cl_new_rsb_count;
 };
 
 enum {
        CLUSTER_ATTR_TCP_PORT = 0,
        CLUSTER_ATTR_BUFFER_SIZE,
        CLUSTER_ATTR_RSBTBL_SIZE,
-       CLUSTER_ATTR_LKBTBL_SIZE,
        CLUSTER_ATTR_DIRTBL_SIZE,
        CLUSTER_ATTR_RECOVER_TIMER,
        CLUSTER_ATTR_TOSS_SECS,
@@ -116,6 +117,7 @@ enum {
        CLUSTER_ATTR_PROTOCOL,
        CLUSTER_ATTR_TIMEWARN_CS,
        CLUSTER_ATTR_WAITWARN_US,
+       CLUSTER_ATTR_NEW_RSB_COUNT,
 };
 
 struct cluster_attribute {
@@ -160,7 +162,6 @@ __CONFIGFS_ATTR(name, 0644, name##_read, name##_write)
 CLUSTER_ATTR(tcp_port, 1);
 CLUSTER_ATTR(buffer_size, 1);
 CLUSTER_ATTR(rsbtbl_size, 1);
-CLUSTER_ATTR(lkbtbl_size, 1);
 CLUSTER_ATTR(dirtbl_size, 1);
 CLUSTER_ATTR(recover_timer, 1);
 CLUSTER_ATTR(toss_secs, 1);
@@ -169,12 +170,12 @@ CLUSTER_ATTR(log_debug, 0);
 CLUSTER_ATTR(protocol, 0);
 CLUSTER_ATTR(timewarn_cs, 1);
 CLUSTER_ATTR(waitwarn_us, 0);
+CLUSTER_ATTR(new_rsb_count, 0);
 
 static struct configfs_attribute *cluster_attrs[] = {
        [CLUSTER_ATTR_TCP_PORT] = &cluster_attr_tcp_port.attr,
        [CLUSTER_ATTR_BUFFER_SIZE] = &cluster_attr_buffer_size.attr,
        [CLUSTER_ATTR_RSBTBL_SIZE] = &cluster_attr_rsbtbl_size.attr,
-       [CLUSTER_ATTR_LKBTBL_SIZE] = &cluster_attr_lkbtbl_size.attr,
        [CLUSTER_ATTR_DIRTBL_SIZE] = &cluster_attr_dirtbl_size.attr,
        [CLUSTER_ATTR_RECOVER_TIMER] = &cluster_attr_recover_timer.attr,
        [CLUSTER_ATTR_TOSS_SECS] = &cluster_attr_toss_secs.attr,
@@ -183,6 +184,7 @@ static struct configfs_attribute *cluster_attrs[] = {
        [CLUSTER_ATTR_PROTOCOL] = &cluster_attr_protocol.attr,
        [CLUSTER_ATTR_TIMEWARN_CS] = &cluster_attr_timewarn_cs.attr,
        [CLUSTER_ATTR_WAITWARN_US] = &cluster_attr_waitwarn_us.attr,
+       [CLUSTER_ATTR_NEW_RSB_COUNT] = &cluster_attr_new_rsb_count.attr,
        NULL,
 };
 
@@ -190,6 +192,7 @@ enum {
        COMM_ATTR_NODEID = 0,
        COMM_ATTR_LOCAL,
        COMM_ATTR_ADDR,
+       COMM_ATTR_ADDR_LIST,
 };
 
 struct comm_attribute {
@@ -217,14 +220,22 @@ static struct comm_attribute comm_attr_local = {
 static struct comm_attribute comm_attr_addr = {
        .attr   = { .ca_owner = THIS_MODULE,
                     .ca_name = "addr",
-                    .ca_mode = S_IRUGO | S_IWUSR },
+                    .ca_mode = S_IWUSR },
        .store  = comm_addr_write,
 };
 
+static struct comm_attribute comm_attr_addr_list = {
+       .attr   = { .ca_owner = THIS_MODULE,
+                    .ca_name = "addr_list",
+                    .ca_mode = S_IRUGO },
+       .show   = comm_addr_list_read,
+};
+
 static struct configfs_attribute *comm_attrs[] = {
        [COMM_ATTR_NODEID] = &comm_attr_nodeid.attr,
        [COMM_ATTR_LOCAL] = &comm_attr_local.attr,
        [COMM_ATTR_ADDR] = &comm_attr_addr.attr,
+       [COMM_ATTR_ADDR_LIST] = &comm_attr_addr_list.attr,
        NULL,
 };
 
@@ -435,7 +446,6 @@ static struct config_group *make_cluster(struct config_group *g,
        cl->cl_tcp_port = dlm_config.ci_tcp_port;
        cl->cl_buffer_size = dlm_config.ci_buffer_size;
        cl->cl_rsbtbl_size = dlm_config.ci_rsbtbl_size;
-       cl->cl_lkbtbl_size = dlm_config.ci_lkbtbl_size;
        cl->cl_dirtbl_size = dlm_config.ci_dirtbl_size;
        cl->cl_recover_timer = dlm_config.ci_recover_timer;
        cl->cl_toss_secs = dlm_config.ci_toss_secs;
@@ -444,6 +454,7 @@ static struct config_group *make_cluster(struct config_group *g,
        cl->cl_protocol = dlm_config.ci_protocol;
        cl->cl_timewarn_cs = dlm_config.ci_timewarn_cs;
        cl->cl_waitwarn_us = dlm_config.ci_waitwarn_us;
+       cl->cl_new_rsb_count = dlm_config.ci_new_rsb_count;
 
        space_list = &sps->ss_group;
        comm_list = &cms->cs_group;
@@ -720,6 +731,50 @@ static ssize_t comm_addr_write(struct dlm_comm *cm, const char *buf, size_t len)
        return len;
 }
 
+static ssize_t comm_addr_list_read(struct dlm_comm *cm, char *buf)
+{
+       ssize_t s;
+       ssize_t allowance;
+       int i;
+       struct sockaddr_storage *addr;
+       struct sockaddr_in *addr_in;
+       struct sockaddr_in6 *addr_in6;
+       
+       /* Taken from ip6_addr_string() defined in lib/vsprintf.c */
+       char buf0[sizeof("AF_INET6      xxxx:xxxx:xxxx:xxxx:xxxx:xxxx:255.255.255.255\n")];
+       
+
+       /* Derived from SIMPLE_ATTR_SIZE of fs/configfs/file.c */
+       allowance = 4096;
+       buf[0] = '\0';
+
+       for (i = 0; i < cm->addr_count; i++) {
+               addr = cm->addr[i];
+
+               switch(addr->ss_family) {
+               case AF_INET:
+                       addr_in = (struct sockaddr_in *)addr;
+                       s = sprintf(buf0, "AF_INET      %pI4\n", &addr_in->sin_addr.s_addr);
+                       break;
+               case AF_INET6:
+                       addr_in6 = (struct sockaddr_in6 *)addr;
+                       s = sprintf(buf0, "AF_INET6     %pI6\n", &addr_in6->sin6_addr);
+                       break;
+               default:
+                       s = sprintf(buf0, "%s\n", "<UNKNOWN>");
+                       break;
+               }
+               allowance -= s;
+               if (allowance >= 0)
+                       strcat(buf, buf0);
+               else {
+                       allowance += s;
+                       break;
+               }
+       }
+       return 4096 - allowance;
+}
+
 static ssize_t show_node(struct config_item *i, struct configfs_attribute *a,
                         char *buf)
 {
@@ -983,7 +1038,6 @@ int dlm_our_addr(struct sockaddr_storage *addr, int num)
 #define DEFAULT_TCP_PORT       21064
 #define DEFAULT_BUFFER_SIZE     4096
 #define DEFAULT_RSBTBL_SIZE     1024
-#define DEFAULT_LKBTBL_SIZE     1024
 #define DEFAULT_DIRTBL_SIZE     1024
 #define DEFAULT_RECOVER_TIMER      5
 #define DEFAULT_TOSS_SECS         10
@@ -992,12 +1046,12 @@ int dlm_our_addr(struct sockaddr_storage *addr, int num)
 #define DEFAULT_PROTOCOL           0
 #define DEFAULT_TIMEWARN_CS      500 /* 5 sec = 500 centiseconds */
 #define DEFAULT_WAITWARN_US       0
+#define DEFAULT_NEW_RSB_COUNT    128
 
 struct dlm_config_info dlm_config = {
        .ci_tcp_port = DEFAULT_TCP_PORT,
        .ci_buffer_size = DEFAULT_BUFFER_SIZE,
        .ci_rsbtbl_size = DEFAULT_RSBTBL_SIZE,
-       .ci_lkbtbl_size = DEFAULT_LKBTBL_SIZE,
        .ci_dirtbl_size = DEFAULT_DIRTBL_SIZE,
        .ci_recover_timer = DEFAULT_RECOVER_TIMER,
        .ci_toss_secs = DEFAULT_TOSS_SECS,
@@ -1005,6 +1059,7 @@ struct dlm_config_info dlm_config = {
        .ci_log_debug = DEFAULT_LOG_DEBUG,
        .ci_protocol = DEFAULT_PROTOCOL,
        .ci_timewarn_cs = DEFAULT_TIMEWARN_CS,
-       .ci_waitwarn_us = DEFAULT_WAITWARN_US
+       .ci_waitwarn_us = DEFAULT_WAITWARN_US,
+       .ci_new_rsb_count = DEFAULT_NEW_RSB_COUNT
 };
 
index dd0ce24..3099d0d 100644 (file)
@@ -20,7 +20,6 @@ struct dlm_config_info {
        int ci_tcp_port;
        int ci_buffer_size;
        int ci_rsbtbl_size;
-       int ci_lkbtbl_size;
        int ci_dirtbl_size;
        int ci_recover_timer;
        int ci_toss_secs;
@@ -29,6 +28,7 @@ struct dlm_config_info {
        int ci_protocol;
        int ci_timewarn_cs;
        int ci_waitwarn_us;
+       int ci_new_rsb_count;
 };
 
 extern struct dlm_config_info dlm_config;
index 0262451..fe2860c 100644 (file)
@@ -37,6 +37,7 @@
 #include <linux/jhash.h>
 #include <linux/miscdevice.h>
 #include <linux/mutex.h>
+#include <linux/idr.h>
 #include <asm/uaccess.h>
 
 #include <linux/dlm.h>
@@ -52,7 +53,6 @@ struct dlm_ls;
 struct dlm_lkb;
 struct dlm_rsb;
 struct dlm_member;
-struct dlm_lkbtable;
 struct dlm_rsbtable;
 struct dlm_dirtable;
 struct dlm_direntry;
@@ -108,11 +108,6 @@ struct dlm_rsbtable {
        spinlock_t              lock;
 };
 
-struct dlm_lkbtable {
-       struct list_head        list;
-       rwlock_t                lock;
-       uint16_t                counter;
-};
 
 /*
  * Lockspace member (per node in a ls)
@@ -248,17 +243,18 @@ struct dlm_lkb {
        int8_t                  lkb_wait_count;
        int                     lkb_wait_nodeid; /* for debugging */
 
-       struct list_head        lkb_idtbl_list; /* lockspace lkbtbl */
        struct list_head        lkb_statequeue; /* rsb g/c/w list */
        struct list_head        lkb_rsb_lookup; /* waiting for rsb lookup */
        struct list_head        lkb_wait_reply; /* waiting for remote reply */
-       struct list_head        lkb_astqueue;   /* need ast to be sent */
        struct list_head        lkb_ownqueue;   /* list of locks for a process */
        struct list_head        lkb_time_list;
        ktime_t                 lkb_timestamp;
        ktime_t                 lkb_wait_time;
        unsigned long           lkb_timeout_cs;
 
+       struct mutex            lkb_cb_mutex;
+       struct work_struct      lkb_cb_work;
+       struct list_head        lkb_cb_list; /* for ls_cb_delay or proc->asts */
        struct dlm_callback     lkb_callbacks[DLM_CALLBACKS_SIZE];
        struct dlm_callback     lkb_last_cast;
        struct dlm_callback     lkb_last_bast;
@@ -299,7 +295,7 @@ struct dlm_rsb {
        int                     res_recover_locks_count;
 
        char                    *res_lvbptr;
-       char                    res_name[1];
+       char                    res_name[DLM_RESNAME_MAXLEN+1];
 };
 
 /* find_rsb() flags */
@@ -465,12 +461,12 @@ struct dlm_ls {
        unsigned long           ls_scan_time;
        struct kobject          ls_kobj;
 
+       struct idr              ls_lkbidr;
+       spinlock_t              ls_lkbidr_spin;
+
        struct dlm_rsbtable     *ls_rsbtbl;
        uint32_t                ls_rsbtbl_size;
 
-       struct dlm_lkbtable     *ls_lkbtbl;
-       uint32_t                ls_lkbtbl_size;
-
        struct dlm_dirtable     *ls_dirtbl;
        uint32_t                ls_dirtbl_size;
 
@@ -483,6 +479,10 @@ struct dlm_ls {
        struct mutex            ls_timeout_mutex;
        struct list_head        ls_timeout;
 
+       spinlock_t              ls_new_rsb_spin;
+       int                     ls_new_rsb_count;
+       struct list_head        ls_new_rsb;     /* new rsb structs */
+
        struct list_head        ls_nodes;       /* current nodes in ls */
        struct list_head        ls_nodes_gone;  /* dead node list, recovery */
        int                     ls_num_nodes;   /* number of nodes in ls */
@@ -506,8 +506,12 @@ struct dlm_ls {
 
        struct miscdevice       ls_device;
 
+       struct workqueue_struct *ls_callback_wq;
+
        /* recovery related */
 
+       struct mutex            ls_cb_mutex;
+       struct list_head        ls_cb_delay; /* save for queue_work later */
        struct timer_list       ls_timer;
        struct task_struct      *ls_recoverd_task;
        struct mutex            ls_recoverd_active;
@@ -544,6 +548,7 @@ struct dlm_ls {
 #define LSFL_RCOM_WAIT         4
 #define LSFL_UEVENT_WAIT       5
 #define LSFL_TIMEWARN          6
+#define LSFL_CB_DELAY          7
 
 /* much of this is just saving user space pointers associated with the
    lock that we pass back to the user lib with an ast */
index f71d0b5..83b5e32 100644 (file)
@@ -305,7 +305,7 @@ static void queue_cast(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
                rv = -EDEADLK;
        }
 
-       dlm_add_ast(lkb, DLM_CB_CAST, lkb->lkb_grmode, rv, lkb->lkb_sbflags);
+       dlm_add_cb(lkb, DLM_CB_CAST, lkb->lkb_grmode, rv, lkb->lkb_sbflags);
 }
 
 static inline void queue_cast_overlap(struct dlm_rsb *r, struct dlm_lkb *lkb)
@@ -319,7 +319,7 @@ static void queue_bast(struct dlm_rsb *r, struct dlm_lkb *lkb, int rqmode)
        if (is_master_copy(lkb)) {
                send_bast(r, lkb, rqmode);
        } else {
-               dlm_add_ast(lkb, DLM_CB_BAST, rqmode, 0, 0);
+               dlm_add_cb(lkb, DLM_CB_BAST, rqmode, 0, 0);
        }
 }
 
@@ -327,19 +327,68 @@ static void queue_bast(struct dlm_rsb *r, struct dlm_lkb *lkb, int rqmode)
  * Basic operations on rsb's and lkb's
  */
 
-static struct dlm_rsb *create_rsb(struct dlm_ls *ls, char *name, int len)
+static int pre_rsb_struct(struct dlm_ls *ls)
+{
+       struct dlm_rsb *r1, *r2;
+       int count = 0;
+
+       spin_lock(&ls->ls_new_rsb_spin);
+       if (ls->ls_new_rsb_count > dlm_config.ci_new_rsb_count / 2) {
+               spin_unlock(&ls->ls_new_rsb_spin);
+               return 0;
+       }
+       spin_unlock(&ls->ls_new_rsb_spin);
+
+       r1 = dlm_allocate_rsb(ls);
+       r2 = dlm_allocate_rsb(ls);
+
+       spin_lock(&ls->ls_new_rsb_spin);
+       if (r1) {
+               list_add(&r1->res_hashchain, &ls->ls_new_rsb);
+               ls->ls_new_rsb_count++;
+       }
+       if (r2) {
+               list_add(&r2->res_hashchain, &ls->ls_new_rsb);
+               ls->ls_new_rsb_count++;
+       }
+       count = ls->ls_new_rsb_count;
+       spin_unlock(&ls->ls_new_rsb_spin);
+
+       if (!count)
+               return -ENOMEM;
+       return 0;
+}
+
+/* If ls->ls_new_rsb is empty, return -EAGAIN, so the caller can
+   unlock any spinlocks, go back and call pre_rsb_struct again.
+   Otherwise, take an rsb off the list and return it. */
+
+static int get_rsb_struct(struct dlm_ls *ls, char *name, int len,
+                         struct dlm_rsb **r_ret)
 {
        struct dlm_rsb *r;
+       int count;
 
-       r = dlm_allocate_rsb(ls, len);
-       if (!r)
-               return NULL;
+       spin_lock(&ls->ls_new_rsb_spin);
+       if (list_empty(&ls->ls_new_rsb)) {
+               count = ls->ls_new_rsb_count;
+               spin_unlock(&ls->ls_new_rsb_spin);
+               log_debug(ls, "find_rsb retry %d %d %s",
+                         count, dlm_config.ci_new_rsb_count, name);
+               return -EAGAIN;
+       }
+
+       r = list_first_entry(&ls->ls_new_rsb, struct dlm_rsb, res_hashchain);
+       list_del(&r->res_hashchain);
+       ls->ls_new_rsb_count--;
+       spin_unlock(&ls->ls_new_rsb_spin);
 
        r->res_ls = ls;
        r->res_length = len;
        memcpy(r->res_name, name, len);
        mutex_init(&r->res_mutex);
 
+       INIT_LIST_HEAD(&r->res_hashchain);
        INIT_LIST_HEAD(&r->res_lookup);
        INIT_LIST_HEAD(&r->res_grantqueue);
        INIT_LIST_HEAD(&r->res_convertqueue);
@@ -347,7 +396,8 @@ static struct dlm_rsb *create_rsb(struct dlm_ls *ls, char *name, int len)
        INIT_LIST_HEAD(&r->res_root_list);
        INIT_LIST_HEAD(&r->res_recover_list);
 
-       return r;
+       *r_ret = r;
+       return 0;
 }
 
 static int search_rsb_list(struct list_head *head, char *name, int len,
@@ -405,16 +455,6 @@ static int _search_rsb(struct dlm_ls *ls, char *name, int len, int b,
        return error;
 }
 
-static int search_rsb(struct dlm_ls *ls, char *name, int len, int b,
-                     unsigned int flags, struct dlm_rsb **r_ret)
-{
-       int error;
-       spin_lock(&ls->ls_rsbtbl[b].lock);
-       error = _search_rsb(ls, name, len, b, flags, r_ret);
-       spin_unlock(&ls->ls_rsbtbl[b].lock);
-       return error;
-}
-
 /*
  * Find rsb in rsbtbl and potentially create/add one
  *
@@ -432,35 +472,48 @@ static int search_rsb(struct dlm_ls *ls, char *name, int len, int b,
 static int find_rsb(struct dlm_ls *ls, char *name, int namelen,
                    unsigned int flags, struct dlm_rsb **r_ret)
 {
-       struct dlm_rsb *r = NULL, *tmp;
+       struct dlm_rsb *r = NULL;
        uint32_t hash, bucket;
-       int error = -EINVAL;
+       int error;
 
-       if (namelen > DLM_RESNAME_MAXLEN)
+       if (namelen > DLM_RESNAME_MAXLEN) {
+               error = -EINVAL;
                goto out;
+       }
 
        if (dlm_no_directory(ls))
                flags |= R_CREATE;
 
-       error = 0;
        hash = jhash(name, namelen, 0);
        bucket = hash & (ls->ls_rsbtbl_size - 1);
 
-       error = search_rsb(ls, name, namelen, bucket, flags, &r);
+ retry:
+       if (flags & R_CREATE) {
+               error = pre_rsb_struct(ls);
+               if (error < 0)
+                       goto out;
+       }
+
+       spin_lock(&ls->ls_rsbtbl[bucket].lock);
+
+       error = _search_rsb(ls, name, namelen, bucket, flags, &r);
        if (!error)
-               goto out;
+               goto out_unlock;
 
        if (error == -EBADR && !(flags & R_CREATE))
-               goto out;
+               goto out_unlock;
 
        /* the rsb was found but wasn't a master copy */
        if (error == -ENOTBLK)
-               goto out;
+               goto out_unlock;
 
-       error = -ENOMEM;
-       r = create_rsb(ls, name, namelen);
-       if (!r)
-               goto out;
+       error = get_rsb_struct(ls, name, namelen, &r);
+       if (error == -EAGAIN) {
+               spin_unlock(&ls->ls_rsbtbl[bucket].lock);
+               goto retry;
+       }
+       if (error)
+               goto out_unlock;
 
        r->res_hash = hash;
        r->res_bucket = bucket;
@@ -474,18 +527,10 @@ static int find_rsb(struct dlm_ls *ls, char *name, int namelen,
                        nodeid = 0;
                r->res_nodeid = nodeid;
        }
-
-       spin_lock(&ls->ls_rsbtbl[bucket].lock);
-       error = _search_rsb(ls, name, namelen, bucket, 0, &tmp);
-       if (!error) {
-               spin_unlock(&ls->ls_rsbtbl[bucket].lock);
-               dlm_free_rsb(r);
-               r = tmp;
-               goto out;
-       }
        list_add(&r->res_hashchain, &ls->ls_rsbtbl[bucket].list);
-       spin_unlock(&ls->ls_rsbtbl[bucket].lock);
        error = 0;
+ out_unlock:
+       spin_unlock(&ls->ls_rsbtbl[bucket].lock);
  out:
        *r_ret = r;
        return error;
@@ -580,9 +625,8 @@ static void detach_lkb(struct dlm_lkb *lkb)
 
 static int create_lkb(struct dlm_ls *ls, struct dlm_lkb **lkb_ret)
 {
-       struct dlm_lkb *lkb, *tmp;
-       uint32_t lkid = 0;
-       uint16_t bucket;
+       struct dlm_lkb *lkb;
+       int rv, id;
 
        lkb = dlm_allocate_lkb(ls);
        if (!lkb)
@@ -594,60 +638,42 @@ static int create_lkb(struct dlm_ls *ls, struct dlm_lkb **lkb_ret)
        INIT_LIST_HEAD(&lkb->lkb_ownqueue);
        INIT_LIST_HEAD(&lkb->lkb_rsb_lookup);
        INIT_LIST_HEAD(&lkb->lkb_time_list);
-       INIT_LIST_HEAD(&lkb->lkb_astqueue);
+       INIT_LIST_HEAD(&lkb->lkb_cb_list);
+       mutex_init(&lkb->lkb_cb_mutex);
+       INIT_WORK(&lkb->lkb_cb_work, dlm_callback_work);
 
-       get_random_bytes(&bucket, sizeof(bucket));
-       bucket &= (ls->ls_lkbtbl_size - 1);
-
-       write_lock(&ls->ls_lkbtbl[bucket].lock);
+ retry:
+       rv = idr_pre_get(&ls->ls_lkbidr, GFP_NOFS);
+       if (!rv)
+               return -ENOMEM;
 
-       /* counter can roll over so we must verify lkid is not in use */
+       spin_lock(&ls->ls_lkbidr_spin);
+       rv = idr_get_new_above(&ls->ls_lkbidr, lkb, 1, &id);
+       if (!rv)
+               lkb->lkb_id = id;
+       spin_unlock(&ls->ls_lkbidr_spin);
 
-       while (lkid == 0) {
-               lkid = (bucket << 16) | ls->ls_lkbtbl[bucket].counter++;
+       if (rv == -EAGAIN)
+               goto retry;
 
-               list_for_each_entry(tmp, &ls->ls_lkbtbl[bucket].list,
-                                   lkb_idtbl_list) {
-                       if (tmp->lkb_id != lkid)
-                               continue;
-                       lkid = 0;
-                       break;
-               }
+       if (rv < 0) {
+               log_error(ls, "create_lkb idr error %d", rv);
+               return rv;
        }
 
-       lkb->lkb_id = lkid;
-       list_add(&lkb->lkb_idtbl_list, &ls->ls_lkbtbl[bucket].list);
-       write_unlock(&ls->ls_lkbtbl[bucket].lock);
-
        *lkb_ret = lkb;
        return 0;
 }
 
-static struct dlm_lkb *__find_lkb(struct dlm_ls *ls, uint32_t lkid)
-{
-       struct dlm_lkb *lkb;
-       uint16_t bucket = (lkid >> 16);
-
-       list_for_each_entry(lkb, &ls->ls_lkbtbl[bucket].list, lkb_idtbl_list) {
-               if (lkb->lkb_id == lkid)
-                       return lkb;
-       }
-       return NULL;
-}
-
 static int find_lkb(struct dlm_ls *ls, uint32_t lkid, struct dlm_lkb **lkb_ret)
 {
        struct dlm_lkb *lkb;
-       uint16_t bucket = (lkid >> 16);
-
-       if (bucket >= ls->ls_lkbtbl_size)
-               return -EBADSLT;
 
-       read_lock(&ls->ls_lkbtbl[bucket].lock);
-       lkb = __find_lkb(ls, lkid);
+       spin_lock(&ls->ls_lkbidr_spin);
+       lkb = idr_find(&ls->ls_lkbidr, lkid);
        if (lkb)
                kref_get(&lkb->lkb_ref);
-       read_unlock(&ls->ls_lkbtbl[bucket].lock);
+       spin_unlock(&ls->ls_lkbidr_spin);
 
        *lkb_ret = lkb;
        return lkb ? 0 : -ENOENT;
@@ -668,12 +694,12 @@ static void kill_lkb(struct kref *kref)
 
 static int __put_lkb(struct dlm_ls *ls, struct dlm_lkb *lkb)
 {
-       uint16_t bucket = (lkb->lkb_id >> 16);
+       uint32_t lkid = lkb->lkb_id;
 
-       write_lock(&ls->ls_lkbtbl[bucket].lock);
+       spin_lock(&ls->ls_lkbidr_spin);
        if (kref_put(&lkb->lkb_ref, kill_lkb)) {
-               list_del(&lkb->lkb_idtbl_list);
-               write_unlock(&ls->ls_lkbtbl[bucket].lock);
+               idr_remove(&ls->ls_lkbidr, lkid);
+               spin_unlock(&ls->ls_lkbidr_spin);
 
                detach_lkb(lkb);
 
@@ -683,7 +709,7 @@ static int __put_lkb(struct dlm_ls *ls, struct dlm_lkb *lkb)
                dlm_free_lkb(lkb);
                return 1;
        } else {
-               write_unlock(&ls->ls_lkbtbl[bucket].lock);
+               spin_unlock(&ls->ls_lkbidr_spin);
                return 0;
        }
 }
@@ -849,9 +875,7 @@ void dlm_scan_waiters(struct dlm_ls *ls)
 
                if (!num_nodes) {
                        num_nodes = ls->ls_num_nodes;
-                       warned = kmalloc(GFP_KERNEL, num_nodes * sizeof(int));
-                       if (warned)
-                               memset(warned, 0, num_nodes * sizeof(int));
+                       warned = kzalloc(num_nodes * sizeof(int), GFP_KERNEL);
                }
                if (!warned)
                        continue;
@@ -863,9 +887,7 @@ void dlm_scan_waiters(struct dlm_ls *ls)
                          dlm_config.ci_waitwarn_us, lkb->lkb_wait_nodeid);
        }
        mutex_unlock(&ls->ls_waiters_mutex);
-
-       if (warned)
-               kfree(warned);
+       kfree(warned);
 
        if (debug_expired)
                log_debug(ls, "scan_waiters %u warn %u over %d us max %lld us",
@@ -2401,9 +2423,6 @@ static int do_convert(struct dlm_rsb *r, struct dlm_lkb *lkb)
 
        if (deadlk) {
                /* it's left on the granted queue */
-               log_debug(r->res_ls, "deadlock %x node %d sts%d g%d r%d %s",
-                         lkb->lkb_id, lkb->lkb_nodeid, lkb->lkb_status,
-                         lkb->lkb_grmode, lkb->lkb_rqmode, r->res_name);
                revert_lock(r, lkb);
                queue_cast(r, lkb, -EDEADLK);
                error = -EDEADLK;
@@ -3993,8 +4012,6 @@ static void _receive_message(struct dlm_ls *ls, struct dlm_message *ms)
        default:
                log_error(ls, "unknown message type %d", ms->m_type);
        }
-
-       dlm_astd_wake();
 }
 
 /* If the lockspace is in recovery mode (locking stopped), then normal
@@ -4133,7 +4150,7 @@ void dlm_recover_waiters_pre(struct dlm_ls *ls)
        struct dlm_message *ms_stub;
        int wait_type, stub_unlock_result, stub_cancel_result;
 
-       ms_stub = kmalloc(GFP_KERNEL, sizeof(struct dlm_message));
+       ms_stub = kmalloc(sizeof(struct dlm_message), GFP_KERNEL);
        if (!ms_stub) {
                log_error(ls, "dlm_recover_waiters_pre no mem");
                return;
@@ -4809,7 +4826,7 @@ int dlm_user_unlock(struct dlm_ls *ls, struct dlm_user_args *ua_tmp,
                goto out_put;
 
        spin_lock(&ua->proc->locks_spin);
-       /* dlm_user_add_ast() may have already taken lkb off the proc list */
+       /* dlm_user_add_cb() may have already taken lkb off the proc list */
        if (!list_empty(&lkb->lkb_ownqueue))
                list_move(&lkb->lkb_ownqueue, &ua->proc->unlocking);
        spin_unlock(&ua->proc->locks_spin);
@@ -4946,7 +4963,7 @@ static int unlock_proc_lock(struct dlm_ls *ls, struct dlm_lkb *lkb)
 
 /* We have to release clear_proc_locks mutex before calling unlock_proc_lock()
    (which does lock_rsb) due to deadlock with receiving a message that does
-   lock_rsb followed by dlm_user_add_ast() */
+   lock_rsb followed by dlm_user_add_cb() */
 
 static struct dlm_lkb *del_proc_lock(struct dlm_ls *ls,
                                     struct dlm_user_proc *proc)
@@ -4969,7 +4986,7 @@ static struct dlm_lkb *del_proc_lock(struct dlm_ls *ls,
        return lkb;
 }
 
-/* The ls_clear_proc_locks mutex protects against dlm_user_add_asts() which
+/* The ls_clear_proc_locks mutex protects against dlm_user_add_cb() which
    1) references lkb->ua which we free here and 2) adds lkbs to proc->asts,
    which we clear here. */
 
@@ -5011,10 +5028,10 @@ void dlm_clear_proc_locks(struct dlm_ls *ls, struct dlm_user_proc *proc)
                dlm_put_lkb(lkb);
        }
 
-       list_for_each_entry_safe(lkb, safe, &proc->asts, lkb_astqueue) {
+       list_for_each_entry_safe(lkb, safe, &proc->asts, lkb_cb_list) {
                memset(&lkb->lkb_callbacks, 0,
                       sizeof(struct dlm_callback) * DLM_CALLBACKS_SIZE);
-               list_del_init(&lkb->lkb_astqueue);
+               list_del_init(&lkb->lkb_cb_list);
                dlm_put_lkb(lkb);
        }
 
@@ -5053,10 +5070,10 @@ static void purge_proc_locks(struct dlm_ls *ls, struct dlm_user_proc *proc)
        spin_unlock(&proc->locks_spin);
 
        spin_lock(&proc->asts_spin);
-       list_for_each_entry_safe(lkb, safe, &proc->asts, lkb_astqueue) {
+       list_for_each_entry_safe(lkb, safe, &proc->asts, lkb_cb_list) {
                memset(&lkb->lkb_callbacks, 0,
                       sizeof(struct dlm_callback) * DLM_CALLBACKS_SIZE);
-               list_del_init(&lkb->lkb_astqueue);
+               list_del_init(&lkb->lkb_cb_list);
                dlm_put_lkb(lkb);
        }
        spin_unlock(&proc->asts_spin);
index 14cbf40..a1d8f1a 100644 (file)
@@ -15,7 +15,6 @@
 #include "lockspace.h"
 #include "member.h"
 #include "recoverd.h"
-#include "ast.h"
 #include "dir.h"
 #include "lowcomms.h"
 #include "config.h"
@@ -24,6 +23,7 @@
 #include "recover.h"
 #include "requestqueue.h"
 #include "user.h"
+#include "ast.h"
 
 static int                     ls_count;
 static struct mutex            ls_lock;
@@ -359,17 +359,10 @@ static int threads_start(void)
 {
        int error;
 
-       /* Thread which process lock requests for all lockspace's */
-       error = dlm_astd_start();
-       if (error) {
-               log_print("cannot start dlm_astd thread %d", error);
-               goto fail;
-       }
-
        error = dlm_scand_start();
        if (error) {
                log_print("cannot start dlm_scand thread %d", error);
-               goto astd_fail;
+               goto fail;
        }
 
        /* Thread for sending/receiving messages for all lockspace's */
@@ -383,8 +376,6 @@ static int threads_start(void)
 
  scand_fail:
        dlm_scand_stop();
- astd_fail:
-       dlm_astd_stop();
  fail:
        return error;
 }
@@ -393,7 +384,6 @@ static void threads_stop(void)
 {
        dlm_scand_stop();
        dlm_lowcomms_stop();
-       dlm_astd_stop();
 }
 
 static int new_lockspace(const char *name, int namelen, void **lockspace,
@@ -463,7 +453,7 @@ static int new_lockspace(const char *name, int namelen, void **lockspace,
        size = dlm_config.ci_rsbtbl_size;
        ls->ls_rsbtbl_size = size;
 
-       ls->ls_rsbtbl = kmalloc(sizeof(struct dlm_rsbtable) * size, GFP_NOFS);
+       ls->ls_rsbtbl = vmalloc(sizeof(struct dlm_rsbtable) * size);
        if (!ls->ls_rsbtbl)
                goto out_lsfree;
        for (i = 0; i < size; i++) {
@@ -472,22 +462,13 @@ static int new_lockspace(const char *name, int namelen, void **lockspace,
                spin_lock_init(&ls->ls_rsbtbl[i].lock);
        }
 
-       size = dlm_config.ci_lkbtbl_size;
-       ls->ls_lkbtbl_size = size;
-
-       ls->ls_lkbtbl = kmalloc(sizeof(struct dlm_lkbtable) * size, GFP_NOFS);
-       if (!ls->ls_lkbtbl)
-               goto out_rsbfree;
-       for (i = 0; i < size; i++) {
-               INIT_LIST_HEAD(&ls->ls_lkbtbl[i].list);
-               rwlock_init(&ls->ls_lkbtbl[i].lock);
-               ls->ls_lkbtbl[i].counter = 1;
-       }
+       idr_init(&ls->ls_lkbidr);
+       spin_lock_init(&ls->ls_lkbidr_spin);
 
        size = dlm_config.ci_dirtbl_size;
        ls->ls_dirtbl_size = size;
 
-       ls->ls_dirtbl = kmalloc(sizeof(struct dlm_dirtable) * size, GFP_NOFS);
+       ls->ls_dirtbl = vmalloc(sizeof(struct dlm_dirtable) * size);
        if (!ls->ls_dirtbl)
                goto out_lkbfree;
        for (i = 0; i < size; i++) {
@@ -502,6 +483,9 @@ static int new_lockspace(const char *name, int namelen, void **lockspace,
        INIT_LIST_HEAD(&ls->ls_timeout);
        mutex_init(&ls->ls_timeout_mutex);
 
+       INIT_LIST_HEAD(&ls->ls_new_rsb);
+       spin_lock_init(&ls->ls_new_rsb_spin);
+
        INIT_LIST_HEAD(&ls->ls_nodes);
        INIT_LIST_HEAD(&ls->ls_nodes_gone);
        ls->ls_num_nodes = 0;
@@ -520,6 +504,9 @@ static int new_lockspace(const char *name, int namelen, void **lockspace,
        init_completion(&ls->ls_members_done);
        ls->ls_members_result = -1;
 
+       mutex_init(&ls->ls_cb_mutex);
+       INIT_LIST_HEAD(&ls->ls_cb_delay);
+
        ls->ls_recoverd_task = NULL;
        mutex_init(&ls->ls_recoverd_active);
        spin_lock_init(&ls->ls_recover_lock);
@@ -553,18 +540,26 @@ static int new_lockspace(const char *name, int namelen, void **lockspace,
        list_add(&ls->ls_list, &lslist);
        spin_unlock(&lslist_lock);
 
+       if (flags & DLM_LSFL_FS) {
+               error = dlm_callback_start(ls);
+               if (error) {
+                       log_error(ls, "can't start dlm_callback %d", error);
+                       goto out_delist;
+               }
+       }
+
        /* needs to find ls in lslist */
        error = dlm_recoverd_start(ls);
        if (error) {
                log_error(ls, "can't start dlm_recoverd %d", error);
-               goto out_delist;
+               goto out_callback;
        }
 
        ls->ls_kobj.kset = dlm_kset;
        error = kobject_init_and_add(&ls->ls_kobj, &dlm_ktype, NULL,
                                     "%s", ls->ls_name);
        if (error)
-               goto out_stop;
+               goto out_recoverd;
        kobject_uevent(&ls->ls_kobj, KOBJ_ADD);
 
        /* let kobject handle freeing of ls if there's an error */
@@ -578,7 +573,7 @@ static int new_lockspace(const char *name, int namelen, void **lockspace,
 
        error = do_uevent(ls, 1);
        if (error)
-               goto out_stop;
+               goto out_recoverd;
 
        wait_for_completion(&ls->ls_members_done);
        error = ls->ls_members_result;
@@ -595,19 +590,20 @@ static int new_lockspace(const char *name, int namelen, void **lockspace,
        do_uevent(ls, 0);
        dlm_clear_members(ls);
        kfree(ls->ls_node_array);
- out_stop:
+ out_recoverd:
        dlm_recoverd_stop(ls);
+ out_callback:
+       dlm_callback_stop(ls);
  out_delist:
        spin_lock(&lslist_lock);
        list_del(&ls->ls_list);
        spin_unlock(&lslist_lock);
        kfree(ls->ls_recover_buf);
  out_dirfree:
-       kfree(ls->ls_dirtbl);
+       vfree(ls->ls_dirtbl);
  out_lkbfree:
-       kfree(ls->ls_lkbtbl);
- out_rsbfree:
-       kfree(ls->ls_rsbtbl);
+       idr_destroy(&ls->ls_lkbidr);
+       vfree(ls->ls_rsbtbl);
  out_lsfree:
        if (do_unreg)
                kobject_put(&ls->ls_kobj);
@@ -641,50 +637,64 @@ int dlm_new_lockspace(const char *name, int namelen, void **lockspace,
        return error;
 }
 
-/* Return 1 if the lockspace still has active remote locks,
- *        2 if the lockspace still has active local locks.
- */
-static int lockspace_busy(struct dlm_ls *ls)
-{
-       int i, lkb_found = 0;
-       struct dlm_lkb *lkb;
-
-       /* NOTE: We check the lockidtbl here rather than the resource table.
-          This is because there may be LKBs queued as ASTs that have been
-          unlinked from their RSBs and are pending deletion once the AST has
-          been delivered */
-
-       for (i = 0; i < ls->ls_lkbtbl_size; i++) {
-               read_lock(&ls->ls_lkbtbl[i].lock);
-               if (!list_empty(&ls->ls_lkbtbl[i].list)) {
-                       lkb_found = 1;
-                       list_for_each_entry(lkb, &ls->ls_lkbtbl[i].list,
-                                           lkb_idtbl_list) {
-                               if (!lkb->lkb_nodeid) {
-                                       read_unlock(&ls->ls_lkbtbl[i].lock);
-                                       return 2;
-                               }
-                       }
-               }
-               read_unlock(&ls->ls_lkbtbl[i].lock);
+static int lkb_idr_is_local(int id, void *p, void *data)
+{
+       struct dlm_lkb *lkb = p;
+
+       if (!lkb->lkb_nodeid)
+               return 1;
+       return 0;
+}
+
+static int lkb_idr_is_any(int id, void *p, void *data)
+{
+       return 1;
+}
+
+static int lkb_idr_free(int id, void *p, void *data)
+{
+       struct dlm_lkb *lkb = p;
+
+       if (lkb->lkb_lvbptr && lkb->lkb_flags & DLM_IFL_MSTCPY)
+               dlm_free_lvb(lkb->lkb_lvbptr);
+
+       dlm_free_lkb(lkb);
+       return 0;
+}
+
+/* NOTE: We check the lkbidr here rather than the resource table.
+   This is because there may be LKBs queued as ASTs that have been unlinked
+   from their RSBs and are pending deletion once the AST has been delivered */
+
+static int lockspace_busy(struct dlm_ls *ls, int force)
+{
+       int rv;
+
+       spin_lock(&ls->ls_lkbidr_spin);
+       if (force == 0) {
+               rv = idr_for_each(&ls->ls_lkbidr, lkb_idr_is_any, ls);
+       } else if (force == 1) {
+               rv = idr_for_each(&ls->ls_lkbidr, lkb_idr_is_local, ls);
+       } else {
+               rv = 0;
        }
-       return lkb_found;
+       spin_unlock(&ls->ls_lkbidr_spin);
+       return rv;
 }
 
 static int release_lockspace(struct dlm_ls *ls, int force)
 {
-       struct dlm_lkb *lkb;
        struct dlm_rsb *rsb;
        struct list_head *head;
        int i, busy, rv;
 
-       busy = lockspace_busy(ls);
+       busy = lockspace_busy(ls, force);
 
        spin_lock(&lslist_lock);
        if (ls->ls_create_count == 1) {
-               if (busy > force)
+               if (busy) {
                        rv = -EBUSY;
-               else {
+               else {
                        /* remove_lockspace takes ls off lslist */
                        ls->ls_create_count = 0;
                        rv = 0;
@@ -708,12 +718,12 @@ static int release_lockspace(struct dlm_ls *ls, int force)
 
        dlm_recoverd_stop(ls);
 
+       dlm_callback_stop(ls);
+
        remove_lockspace(ls);
 
        dlm_delete_debug_file(ls);
 
-       dlm_astd_suspend();
-
        kfree(ls->ls_recover_buf);
 
        /*
@@ -721,31 +731,15 @@ static int release_lockspace(struct dlm_ls *ls, int force)
         */
 
        dlm_dir_clear(ls);
-       kfree(ls->ls_dirtbl);
+       vfree(ls->ls_dirtbl);
 
        /*
-        * Free all lkb's on lkbtbl[] lists.
+        * Free all lkb's in idr
         */
 
-       for (i = 0; i < ls->ls_lkbtbl_size; i++) {
-               head = &ls->ls_lkbtbl[i].list;
-               while (!list_empty(head)) {
-                       lkb = list_entry(head->next, struct dlm_lkb,
-                                        lkb_idtbl_list);
-
-                       list_del(&lkb->lkb_idtbl_list);
-
-                       dlm_del_ast(lkb);
-
-                       if (lkb->lkb_lvbptr && lkb->lkb_flags & DLM_IFL_MSTCPY)
-                               dlm_free_lvb(lkb->lkb_lvbptr);
-
-                       dlm_free_lkb(lkb);
-               }
-       }
-       dlm_astd_resume();
-
-       kfree(ls->ls_lkbtbl);
+       idr_for_each(&ls->ls_lkbidr, lkb_idr_free, ls);
+       idr_remove_all(&ls->ls_lkbidr);
+       idr_destroy(&ls->ls_lkbidr);
 
        /*
         * Free all rsb's on rsbtbl[] lists
@@ -770,7 +764,14 @@ static int release_lockspace(struct dlm_ls *ls, int force)
                }
        }
 
-       kfree(ls->ls_rsbtbl);
+       vfree(ls->ls_rsbtbl);
+
+       while (!list_empty(&ls->ls_new_rsb)) {
+               rsb = list_first_entry(&ls->ls_new_rsb, struct dlm_rsb,
+                                      res_hashchain);
+               list_del(&rsb->res_hashchain);
+               dlm_free_rsb(rsb);
+       }
 
        /*
         * Free structures on any other lists
index 5e2c71f..990626e 100644 (file)
@@ -512,12 +512,10 @@ static void process_sctp_notification(struct connection *con,
                        }
                        make_sockaddr(&prim.ssp_addr, 0, &addr_len);
                        if (dlm_addr_to_nodeid(&prim.ssp_addr, &nodeid)) {
-                               int i;
                                unsigned char *b=(unsigned char *)&prim.ssp_addr;
                                log_print("reject connect from unknown addr");
-                               for (i=0; i<sizeof(struct sockaddr_storage);i++)
-                                       printk("%02x ", b[i]);
-                               printk("\n");
+                               print_hex_dump_bytes("ss: ", DUMP_PREFIX_NONE, 
+                                                    b, sizeof(struct sockaddr_storage));
                                sctp_send_shutdown(prim.ssp_assoc_id);
                                return;
                        }
@@ -748,7 +746,10 @@ static int tcp_accept_from_sock(struct connection *con)
        /* Get the new node's NODEID */
        make_sockaddr(&peeraddr, 0, &len);
        if (dlm_addr_to_nodeid(&peeraddr, &nodeid)) {
+               unsigned char *b=(unsigned char *)&peeraddr;
                log_print("connect from non cluster node");
+               print_hex_dump_bytes("ss: ", DUMP_PREFIX_NONE, 
+                                    b, sizeof(struct sockaddr_storage));
                sock_release(newsock);
                mutex_unlock(&con->sock_mutex);
                return -1;
index 8e0d00d..da64df7 100644 (file)
@@ -16,6 +16,7 @@
 #include "memory.h"
 
 static struct kmem_cache *lkb_cache;
+static struct kmem_cache *rsb_cache;
 
 
 int __init dlm_memory_init(void)
@@ -26,6 +27,14 @@ int __init dlm_memory_init(void)
                                __alignof__(struct dlm_lkb), 0, NULL);
        if (!lkb_cache)
                ret = -ENOMEM;
+
+       rsb_cache = kmem_cache_create("dlm_rsb", sizeof(struct dlm_rsb),
+                               __alignof__(struct dlm_rsb), 0, NULL);
+       if (!rsb_cache) {
+               kmem_cache_destroy(lkb_cache);
+               ret = -ENOMEM;
+       }
+
        return ret;
 }
 
@@ -33,6 +42,8 @@ void dlm_memory_exit(void)
 {
        if (lkb_cache)
                kmem_cache_destroy(lkb_cache);
+       if (rsb_cache)
+               kmem_cache_destroy(rsb_cache);
 }
 
 char *dlm_allocate_lvb(struct dlm_ls *ls)
@@ -48,16 +59,11 @@ void dlm_free_lvb(char *p)
        kfree(p);
 }
 
-/* FIXME: have some minimal space built-in to rsb for the name and
-   kmalloc a separate name if needed, like dentries are done */
-
-struct dlm_rsb *dlm_allocate_rsb(struct dlm_ls *ls, int namelen)
+struct dlm_rsb *dlm_allocate_rsb(struct dlm_ls *ls)
 {
        struct dlm_rsb *r;
 
-       DLM_ASSERT(namelen <= DLM_RESNAME_MAXLEN,);
-
-       r = kzalloc(sizeof(*r) + namelen, GFP_NOFS);
+       r = kmem_cache_zalloc(rsb_cache, GFP_NOFS);
        return r;
 }
 
@@ -65,7 +71,7 @@ void dlm_free_rsb(struct dlm_rsb *r)
 {
        if (r->res_lvbptr)
                dlm_free_lvb(r->res_lvbptr);
-       kfree(r);
+       kmem_cache_free(rsb_cache, r);
 }
 
 struct dlm_lkb *dlm_allocate_lkb(struct dlm_ls *ls)
index 485fb29..177c11c 100644 (file)
@@ -16,7 +16,7 @@
 
 int dlm_memory_init(void);
 void dlm_memory_exit(void);
-struct dlm_rsb *dlm_allocate_rsb(struct dlm_ls *ls, int namelen);
+struct dlm_rsb *dlm_allocate_rsb(struct dlm_ls *ls);
 void dlm_free_rsb(struct dlm_rsb *r);
 struct dlm_lkb *dlm_allocate_lkb(struct dlm_ls *ls);
 void dlm_free_lkb(struct dlm_lkb *l);
index fd677c8..774da3c 100644 (file)
@@ -58,13 +58,7 @@ static int ls_recover(struct dlm_ls *ls, struct dlm_recover *rv)
 
        mutex_lock(&ls->ls_recoverd_active);
 
-       /*
-        * Suspending and resuming dlm_astd ensures that no lkb's from this ls
-        * will be processed by dlm_astd during recovery.
-        */
-
-       dlm_astd_suspend();
-       dlm_astd_resume();
+       dlm_callback_suspend(ls);
 
        /*
         * Free non-master tossed rsb's.  Master rsb's are kept on toss
@@ -202,6 +196,8 @@ static int ls_recover(struct dlm_ls *ls, struct dlm_recover *rv)
 
        dlm_adjust_timeouts(ls);
 
+       dlm_callback_resume(ls);
+
        error = enable_locking(ls, rv->seq);
        if (error) {
                log_debug(ls, "enable_locking failed %d", error);
@@ -222,8 +218,6 @@ static int ls_recover(struct dlm_ls *ls, struct dlm_recover *rv)
 
        dlm_grant_after_purge(ls);
 
-       dlm_astd_wake();
-
        log_debug(ls, "recover %llx done: %u ms",
                  (unsigned long long)rv->seq,
                  jiffies_to_msecs(jiffies - start));
index e96bf3e..d8ea607 100644 (file)
@@ -213,9 +213,9 @@ void dlm_user_add_ast(struct dlm_lkb *lkb, uint32_t flags, int mode,
                goto out;
        }
 
-       if (list_empty(&lkb->lkb_astqueue)) {
+       if (list_empty(&lkb->lkb_cb_list)) {
                kref_get(&lkb->lkb_ref);
-               list_add_tail(&lkb->lkb_astqueue, &proc->asts);
+               list_add_tail(&lkb->lkb_cb_list, &proc->asts);
                wake_up_interruptible(&proc->wait);
        }
        spin_unlock(&proc->asts_spin);
@@ -832,24 +832,24 @@ static ssize_t device_read(struct file *file, char __user *buf, size_t count,
        }
 
        /* if we empty lkb_callbacks, we don't want to unlock the spinlock
-          without removing lkb_astqueue; so empty lkb_astqueue is always
+          without removing lkb_cb_list; so empty lkb_cb_list is always
           consistent with empty lkb_callbacks */
 
-       lkb = list_entry(proc->asts.next, struct dlm_lkb, lkb_astqueue);
+       lkb = list_entry(proc->asts.next, struct dlm_lkb, lkb_cb_list);
 
        rv = dlm_rem_lkb_callback(lkb->lkb_resource->res_ls, lkb, &cb, &resid);
        if (rv < 0) {
                /* this shouldn't happen; lkb should have been removed from
                   list when resid was zero */
                log_print("dlm_rem_lkb_callback empty %x", lkb->lkb_id);
-               list_del_init(&lkb->lkb_astqueue);
+               list_del_init(&lkb->lkb_cb_list);
                spin_unlock(&proc->asts_spin);
                /* removes ref for proc->asts, may cause lkb to be freed */
                dlm_put_lkb(lkb);
                goto try_another;
        }
        if (!resid)
-               list_del_init(&lkb->lkb_astqueue);
+               list_del_init(&lkb->lkb_cb_list);
        spin_unlock(&proc->asts_spin);
 
        if (cb.flags & DLM_CB_SKIP) {