dlm: don't limit active work items
[pandora-kernel.git] / fs / dlm / ast.c
1 /******************************************************************************
2 *******************************************************************************
3 **
4 **  Copyright (C) Sistina Software, Inc.  1997-2003  All rights reserved.
5 **  Copyright (C) 2004-2010 Red Hat, Inc.  All rights reserved.
6 **
7 **  This copyrighted material is made available to anyone wishing to use,
8 **  modify, copy, or redistribute it subject to the terms and conditions
9 **  of the GNU General Public License v.2.
10 **
11 *******************************************************************************
12 ******************************************************************************/
13
14 #include "dlm_internal.h"
15 #include "lock.h"
16 #include "user.h"
17
18 static uint64_t                 dlm_cb_seq;
19 static spinlock_t               dlm_cb_seq_spin;
20
21 static void dlm_dump_lkb_callbacks(struct dlm_lkb *lkb)
22 {
23         int i;
24
25         log_print("last_bast %x %llu flags %x mode %d sb %d %x",
26                   lkb->lkb_id,
27                   (unsigned long long)lkb->lkb_last_bast.seq,
28                   lkb->lkb_last_bast.flags,
29                   lkb->lkb_last_bast.mode,
30                   lkb->lkb_last_bast.sb_status,
31                   lkb->lkb_last_bast.sb_flags);
32
33         log_print("last_cast %x %llu flags %x mode %d sb %d %x",
34                   lkb->lkb_id,
35                   (unsigned long long)lkb->lkb_last_cast.seq,
36                   lkb->lkb_last_cast.flags,
37                   lkb->lkb_last_cast.mode,
38                   lkb->lkb_last_cast.sb_status,
39                   lkb->lkb_last_cast.sb_flags);
40
41         for (i = 0; i < DLM_CALLBACKS_SIZE; i++) {
42                 log_print("cb %x %llu flags %x mode %d sb %d %x",
43                           lkb->lkb_id,
44                           (unsigned long long)lkb->lkb_callbacks[i].seq,
45                           lkb->lkb_callbacks[i].flags,
46                           lkb->lkb_callbacks[i].mode,
47                           lkb->lkb_callbacks[i].sb_status,
48                           lkb->lkb_callbacks[i].sb_flags);
49         }
50 }
51
52 int dlm_add_lkb_callback(struct dlm_lkb *lkb, uint32_t flags, int mode,
53                          int status, uint32_t sbflags, uint64_t seq)
54 {
55         struct dlm_ls *ls = lkb->lkb_resource->res_ls;
56         uint64_t prev_seq;
57         int prev_mode;
58         int i, rv;
59
60         for (i = 0; i < DLM_CALLBACKS_SIZE; i++) {
61                 if (lkb->lkb_callbacks[i].seq)
62                         continue;
63
64                 /*
65                  * Suppress some redundant basts here, do more on removal.
66                  * Don't even add a bast if the callback just before it
67                  * is a bast for the same mode or a more restrictive mode.
68                  * (the addional > PR check is needed for PR/CW inversion)
69                  */
70
71                 if ((i > 0) && (flags & DLM_CB_BAST) &&
72                     (lkb->lkb_callbacks[i-1].flags & DLM_CB_BAST)) {
73
74                         prev_seq = lkb->lkb_callbacks[i-1].seq;
75                         prev_mode = lkb->lkb_callbacks[i-1].mode;
76
77                         if ((prev_mode == mode) ||
78                             (prev_mode > mode && prev_mode > DLM_LOCK_PR)) {
79
80                                 log_debug(ls, "skip %x add bast %llu mode %d "
81                                           "for bast %llu mode %d",
82                                           lkb->lkb_id,
83                                           (unsigned long long)seq,
84                                           mode,
85                                           (unsigned long long)prev_seq,
86                                           prev_mode);
87                                 rv = 0;
88                                 goto out;
89                         }
90                 }
91
92                 lkb->lkb_callbacks[i].seq = seq;
93                 lkb->lkb_callbacks[i].flags = flags;
94                 lkb->lkb_callbacks[i].mode = mode;
95                 lkb->lkb_callbacks[i].sb_status = status;
96                 lkb->lkb_callbacks[i].sb_flags = (sbflags & 0x000000FF);
97                 rv = 0;
98                 break;
99         }
100
101         if (i == DLM_CALLBACKS_SIZE) {
102                 log_error(ls, "no callbacks %x %llu flags %x mode %d sb %d %x",
103                           lkb->lkb_id, (unsigned long long)seq,
104                           flags, mode, status, sbflags);
105                 dlm_dump_lkb_callbacks(lkb);
106                 rv = -1;
107                 goto out;
108         }
109  out:
110         return rv;
111 }
112
113 int dlm_rem_lkb_callback(struct dlm_ls *ls, struct dlm_lkb *lkb,
114                          struct dlm_callback *cb, int *resid)
115 {
116         int i, rv;
117
118         *resid = 0;
119
120         if (!lkb->lkb_callbacks[0].seq) {
121                 rv = -ENOENT;
122                 goto out;
123         }
124
125         /* oldest undelivered cb is callbacks[0] */
126
127         memcpy(cb, &lkb->lkb_callbacks[0], sizeof(struct dlm_callback));
128         memset(&lkb->lkb_callbacks[0], 0, sizeof(struct dlm_callback));
129
130         /* shift others down */
131
132         for (i = 1; i < DLM_CALLBACKS_SIZE; i++) {
133                 if (!lkb->lkb_callbacks[i].seq)
134                         break;
135                 memcpy(&lkb->lkb_callbacks[i-1], &lkb->lkb_callbacks[i],
136                        sizeof(struct dlm_callback));
137                 memset(&lkb->lkb_callbacks[i], 0, sizeof(struct dlm_callback));
138                 (*resid)++;
139         }
140
141         /* if cb is a bast, it should be skipped if the blocking mode is
142            compatible with the last granted mode */
143
144         if ((cb->flags & DLM_CB_BAST) && lkb->lkb_last_cast.seq) {
145                 if (dlm_modes_compat(cb->mode, lkb->lkb_last_cast.mode)) {
146                         cb->flags |= DLM_CB_SKIP;
147
148                         log_debug(ls, "skip %x bast %llu mode %d "
149                                   "for cast %llu mode %d",
150                                   lkb->lkb_id,
151                                   (unsigned long long)cb->seq,
152                                   cb->mode,
153                                   (unsigned long long)lkb->lkb_last_cast.seq,
154                                   lkb->lkb_last_cast.mode);
155                         rv = 0;
156                         goto out;
157                 }
158         }
159
160         if (cb->flags & DLM_CB_CAST) {
161                 memcpy(&lkb->lkb_last_cast, cb, sizeof(struct dlm_callback));
162                 lkb->lkb_last_cast_time = ktime_get();
163         }
164
165         if (cb->flags & DLM_CB_BAST) {
166                 memcpy(&lkb->lkb_last_bast, cb, sizeof(struct dlm_callback));
167                 lkb->lkb_last_bast_time = ktime_get();
168         }
169         rv = 0;
170  out:
171         return rv;
172 }
173
174 void dlm_add_cb(struct dlm_lkb *lkb, uint32_t flags, int mode, int status,
175                 uint32_t sbflags)
176 {
177         struct dlm_ls *ls = lkb->lkb_resource->res_ls;
178         uint64_t new_seq, prev_seq;
179         int rv;
180
181         spin_lock(&dlm_cb_seq_spin);
182         new_seq = ++dlm_cb_seq;
183         spin_unlock(&dlm_cb_seq_spin);
184
185         if (lkb->lkb_flags & DLM_IFL_USER) {
186                 dlm_user_add_ast(lkb, flags, mode, status, sbflags, new_seq);
187                 return;
188         }
189
190         mutex_lock(&lkb->lkb_cb_mutex);
191         prev_seq = lkb->lkb_callbacks[0].seq;
192
193         rv = dlm_add_lkb_callback(lkb, flags, mode, status, sbflags, new_seq);
194         if (rv < 0)
195                 goto out;
196
197         if (!prev_seq) {
198                 kref_get(&lkb->lkb_ref);
199
200                 if (test_bit(LSFL_CB_DELAY, &ls->ls_flags)) {
201                         mutex_lock(&ls->ls_cb_mutex);
202                         list_add(&lkb->lkb_cb_list, &ls->ls_cb_delay);
203                         mutex_unlock(&ls->ls_cb_mutex);
204                 } else {
205                         queue_work(ls->ls_callback_wq, &lkb->lkb_cb_work);
206                 }
207         }
208  out:
209         mutex_unlock(&lkb->lkb_cb_mutex);
210 }
211
212 void dlm_callback_work(struct work_struct *work)
213 {
214         struct dlm_lkb *lkb = container_of(work, struct dlm_lkb, lkb_cb_work);
215         struct dlm_ls *ls = lkb->lkb_resource->res_ls;
216         void (*castfn) (void *astparam);
217         void (*bastfn) (void *astparam, int mode);
218         struct dlm_callback callbacks[DLM_CALLBACKS_SIZE];
219         int i, rv, resid;
220
221         memset(&callbacks, 0, sizeof(callbacks));
222
223         mutex_lock(&lkb->lkb_cb_mutex);
224         if (!lkb->lkb_callbacks[0].seq) {
225                 /* no callback work exists, shouldn't happen */
226                 log_error(ls, "dlm_callback_work %x no work", lkb->lkb_id);
227                 dlm_print_lkb(lkb);
228                 dlm_dump_lkb_callbacks(lkb);
229         }
230
231         for (i = 0; i < DLM_CALLBACKS_SIZE; i++) {
232                 rv = dlm_rem_lkb_callback(ls, lkb, &callbacks[i], &resid);
233                 if (rv < 0)
234                         break;
235         }
236
237         if (resid) {
238                 /* cbs remain, loop should have removed all, shouldn't happen */
239                 log_error(ls, "dlm_callback_work %x resid %d", lkb->lkb_id,
240                           resid);
241                 dlm_print_lkb(lkb);
242                 dlm_dump_lkb_callbacks(lkb);
243         }
244         mutex_unlock(&lkb->lkb_cb_mutex);
245
246         castfn = lkb->lkb_astfn;
247         bastfn = lkb->lkb_bastfn;
248
249         for (i = 0; i < DLM_CALLBACKS_SIZE; i++) {
250                 if (!callbacks[i].seq)
251                         break;
252                 if (callbacks[i].flags & DLM_CB_SKIP) {
253                         continue;
254                 } else if (callbacks[i].flags & DLM_CB_BAST) {
255                         bastfn(lkb->lkb_astparam, callbacks[i].mode);
256                 } else if (callbacks[i].flags & DLM_CB_CAST) {
257                         lkb->lkb_lksb->sb_status = callbacks[i].sb_status;
258                         lkb->lkb_lksb->sb_flags = callbacks[i].sb_flags;
259                         castfn(lkb->lkb_astparam);
260                 }
261         }
262
263         /* undo kref_get from dlm_add_callback, may cause lkb to be freed */
264         dlm_put_lkb(lkb);
265 }
266
267 int dlm_callback_start(struct dlm_ls *ls)
268 {
269         ls->ls_callback_wq = alloc_workqueue("dlm_callback",
270                                              WQ_UNBOUND |
271                                              WQ_MEM_RECLAIM |
272                                              WQ_NON_REENTRANT,
273                                              0);
274         if (!ls->ls_callback_wq) {
275                 log_print("can't start dlm_callback workqueue");
276                 return -ENOMEM;
277         }
278         return 0;
279 }
280
281 void dlm_callback_stop(struct dlm_ls *ls)
282 {
283         if (ls->ls_callback_wq)
284                 destroy_workqueue(ls->ls_callback_wq);
285 }
286
287 void dlm_callback_suspend(struct dlm_ls *ls)
288 {
289         set_bit(LSFL_CB_DELAY, &ls->ls_flags);
290
291         if (ls->ls_callback_wq)
292                 flush_workqueue(ls->ls_callback_wq);
293 }
294
295 void dlm_callback_resume(struct dlm_ls *ls)
296 {
297         struct dlm_lkb *lkb, *safe;
298         int count = 0;
299
300         clear_bit(LSFL_CB_DELAY, &ls->ls_flags);
301
302         if (!ls->ls_callback_wq)
303                 return;
304
305         mutex_lock(&ls->ls_cb_mutex);
306         list_for_each_entry_safe(lkb, safe, &ls->ls_cb_delay, lkb_cb_list) {
307                 list_del_init(&lkb->lkb_cb_list);
308                 queue_work(ls->ls_callback_wq, &lkb->lkb_cb_work);
309                 count++;
310         }
311         mutex_unlock(&ls->ls_cb_mutex);
312
313         log_debug(ls, "dlm_callback_resume %d", count);
314 }
315