4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License version 2 only,
8 * as published by the Free Software Foundation.
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License version 2 for more details (a copy is included
14 * in the LICENSE file that accompanied this code).
16 * You should have received a copy of the GNU General Public License
17 * version 2 along with this program; If not, see
18 * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
20 * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21 * CA 95054 USA or visit www.sun.com if you need additional information or
27 * Copyright (c) 2003, 2010, Oracle and/or its affiliates. All rights reserved.
28 * Use is subject to license terms.
30 * Copyright (c) 2011, 2012, Intel Corporation.
33 * This file is part of Lustre, http://www.lustre.org/
34 * Lustre is a trademark of Sun Microsystems, Inc.
37 #define DEBUG_SUBSYSTEM S_MDC
39 # include <linux/module.h>
41 #include <linux/lustre_intent.h>
43 #include <obd_class.h>
44 #include <lustre_dlm.h>
45 #include <lustre_fid.h> /* fid_res_name_eq() */
46 #include <lustre_mdc.h>
47 #include <lustre_net.h>
48 #include <lustre_req_layout.h>
49 #include "mdc_internal.h"
51 struct mdc_getattr_args {
52 struct obd_export *ga_exp;
53 struct md_enqueue_info *ga_minfo;
54 struct ldlm_enqueue_info *ga_einfo;
57 int it_disposition(struct lookup_intent *it, int flag)
59 return it->d.lustre.it_disposition & flag;
61 EXPORT_SYMBOL(it_disposition);
63 void it_set_disposition(struct lookup_intent *it, int flag)
65 it->d.lustre.it_disposition |= flag;
67 EXPORT_SYMBOL(it_set_disposition);
69 void it_clear_disposition(struct lookup_intent *it, int flag)
71 it->d.lustre.it_disposition &= ~flag;
73 EXPORT_SYMBOL(it_clear_disposition);
75 int it_open_error(int phase, struct lookup_intent *it)
77 if (it_disposition(it, DISP_OPEN_LEASE)) {
78 if (phase >= DISP_OPEN_LEASE)
79 return it->d.lustre.it_status;
83 if (it_disposition(it, DISP_OPEN_OPEN)) {
84 if (phase >= DISP_OPEN_OPEN)
85 return it->d.lustre.it_status;
90 if (it_disposition(it, DISP_OPEN_CREATE)) {
91 if (phase >= DISP_OPEN_CREATE)
92 return it->d.lustre.it_status;
97 if (it_disposition(it, DISP_LOOKUP_EXECD)) {
98 if (phase >= DISP_LOOKUP_EXECD)
99 return it->d.lustre.it_status;
104 if (it_disposition(it, DISP_IT_EXECD)) {
105 if (phase >= DISP_IT_EXECD)
106 return it->d.lustre.it_status;
110 CERROR("it disp: %X, status: %d\n", it->d.lustre.it_disposition,
111 it->d.lustre.it_status);
115 EXPORT_SYMBOL(it_open_error);
117 /* this must be called on a lockh that is known to have a referenced lock */
118 int mdc_set_lock_data(struct obd_export *exp, __u64 *lockh, void *data,
121 struct ldlm_lock *lock;
122 struct inode *new_inode = data;
130 lock = ldlm_handle2lock((struct lustre_handle *)lockh);
132 LASSERT(lock != NULL);
133 lock_res_and_lock(lock);
134 if (lock->l_resource->lr_lvb_inode &&
135 lock->l_resource->lr_lvb_inode != data) {
136 struct inode *old_inode = lock->l_resource->lr_lvb_inode;
137 LASSERTF(old_inode->i_state & I_FREEING,
138 "Found existing inode %p/%lu/%u state %lu in lock: "
139 "setting data to %p/%lu/%u\n", old_inode,
140 old_inode->i_ino, old_inode->i_generation,
142 new_inode, new_inode->i_ino, new_inode->i_generation);
144 lock->l_resource->lr_lvb_inode = new_inode;
146 *bits = lock->l_policy_data.l_inodebits.bits;
148 unlock_res_and_lock(lock);
154 ldlm_mode_t mdc_lock_match(struct obd_export *exp, __u64 flags,
155 const struct lu_fid *fid, ldlm_type_t type,
156 ldlm_policy_data_t *policy, ldlm_mode_t mode,
157 struct lustre_handle *lockh)
159 struct ldlm_res_id res_id;
162 fid_build_reg_res_name(fid, &res_id);
163 /* LU-4405: Clear bits not supported by server */
164 policy->l_inodebits.bits &= exp_connect_ibits(exp);
165 rc = ldlm_lock_match(class_exp2obd(exp)->obd_namespace, flags,
166 &res_id, type, policy, mode, lockh, 0);
170 int mdc_cancel_unused(struct obd_export *exp,
171 const struct lu_fid *fid,
172 ldlm_policy_data_t *policy,
174 ldlm_cancel_flags_t flags,
177 struct ldlm_res_id res_id;
178 struct obd_device *obd = class_exp2obd(exp);
181 fid_build_reg_res_name(fid, &res_id);
182 rc = ldlm_cli_cancel_unused_resource(obd->obd_namespace, &res_id,
183 policy, mode, flags, opaque);
187 int mdc_null_inode(struct obd_export *exp,
188 const struct lu_fid *fid)
190 struct ldlm_res_id res_id;
191 struct ldlm_resource *res;
192 struct ldlm_namespace *ns = class_exp2obd(exp)->obd_namespace;
194 LASSERTF(ns != NULL, "no namespace passed\n");
196 fid_build_reg_res_name(fid, &res_id);
198 res = ldlm_resource_get(ns, NULL, &res_id, 0, 0);
203 res->lr_lvb_inode = NULL;
206 ldlm_resource_putref(res);
210 /* find any ldlm lock of the inode in mdc
214 int mdc_find_cbdata(struct obd_export *exp,
215 const struct lu_fid *fid,
216 ldlm_iterator_t it, void *data)
218 struct ldlm_res_id res_id;
221 fid_build_reg_res_name((struct lu_fid*)fid, &res_id);
222 rc = ldlm_resource_iterate(class_exp2obd(exp)->obd_namespace, &res_id,
224 if (rc == LDLM_ITER_STOP)
226 else if (rc == LDLM_ITER_CONTINUE)
231 static inline void mdc_clear_replay_flag(struct ptlrpc_request *req, int rc)
233 /* Don't hold error requests for replay. */
234 if (req->rq_replay) {
235 spin_lock(&req->rq_lock);
237 spin_unlock(&req->rq_lock);
239 if (rc && req->rq_transno != 0) {
240 DEBUG_REQ(D_ERROR, req, "transno returned on error rc %d", rc);
245 /* Save a large LOV EA into the request buffer so that it is available
246 * for replay. We don't do this in the initial request because the
247 * original request doesn't need this buffer (at most it sends just the
248 * lov_mds_md) and it is a waste of RAM/bandwidth to send the empty
249 * buffer and may also be difficult to allocate and save a very large
250 * request buffer for each open. (bug 5707)
252 * OOM here may cause recovery failure if lmm is needed (only for the
253 * original open if the MDS crashed just when this client also OOM'd)
254 * but this is incredibly unlikely, and questionable whether the client
255 * could do MDS recovery under OOM anyways... */
256 static void mdc_realloc_openmsg(struct ptlrpc_request *req,
257 struct mdt_body *body)
261 /* FIXME: remove this explicit offset. */
262 rc = sptlrpc_cli_enlarge_reqbuf(req, DLM_INTENT_REC_OFF + 4,
265 CERROR("Can't enlarge segment %d size to %d\n",
266 DLM_INTENT_REC_OFF + 4, body->eadatasize);
267 body->valid &= ~OBD_MD_FLEASIZE;
268 body->eadatasize = 0;
272 static struct ptlrpc_request *mdc_intent_open_pack(struct obd_export *exp,
273 struct lookup_intent *it,
274 struct md_op_data *op_data,
275 void *lmm, int lmmsize,
278 struct ptlrpc_request *req;
279 struct obd_device *obddev = class_exp2obd(exp);
280 struct ldlm_intent *lit;
286 it->it_create_mode = (it->it_create_mode & ~S_IFMT) | S_IFREG;
288 /* XXX: openlock is not cancelled for cross-refs. */
289 /* If inode is known, cancel conflicting OPEN locks. */
290 if (fid_is_sane(&op_data->op_fid2)) {
291 if (it->it_flags & MDS_OPEN_LEASE) { /* try to get lease */
292 if (it->it_flags & FMODE_WRITE)
297 if (it->it_flags & (FMODE_WRITE|MDS_OPEN_TRUNC))
300 else if (it->it_flags & FMODE_EXEC)
306 count = mdc_resource_get_unused(exp, &op_data->op_fid2,
311 /* If CREATE, cancel parent's UPDATE lock. */
312 if (it->it_op & IT_CREAT)
316 count += mdc_resource_get_unused(exp, &op_data->op_fid1,
318 MDS_INODELOCK_UPDATE);
320 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
321 &RQF_LDLM_INTENT_OPEN);
323 ldlm_lock_list_put(&cancels, l_bl_ast, count);
324 return ERR_PTR(-ENOMEM);
327 /* parent capability */
328 mdc_set_capa_size(req, &RMF_CAPA1, op_data->op_capa1);
329 /* child capability, reserve the size according to parent capa, it will
330 * be filled after we get the reply */
331 mdc_set_capa_size(req, &RMF_CAPA2, op_data->op_capa1);
333 req_capsule_set_size(&req->rq_pill, &RMF_NAME, RCL_CLIENT,
334 op_data->op_namelen + 1);
335 req_capsule_set_size(&req->rq_pill, &RMF_EADATA, RCL_CLIENT,
336 max(lmmsize, obddev->u.cli.cl_default_mds_easize));
338 rc = ldlm_prep_enqueue_req(exp, req, &cancels, count);
340 ptlrpc_request_free(req);
344 spin_lock(&req->rq_lock);
345 req->rq_replay = req->rq_import->imp_replayable;
346 spin_unlock(&req->rq_lock);
348 /* pack the intent */
349 lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
350 lit->opc = (__u64)it->it_op;
352 /* pack the intended request */
353 mdc_open_pack(req, op_data, it->it_create_mode, 0, it->it_flags, lmm,
356 /* for remote client, fetch remote perm for current user */
357 if (client_is_remote(exp))
358 req_capsule_set_size(&req->rq_pill, &RMF_ACL, RCL_SERVER,
359 sizeof(struct mdt_remote_perm));
360 ptlrpc_request_set_replen(req);
364 static struct ptlrpc_request *
365 mdc_intent_getxattr_pack(struct obd_export *exp,
366 struct lookup_intent *it,
367 struct md_op_data *op_data)
369 struct ptlrpc_request *req;
370 struct ldlm_intent *lit;
371 int rc, count = 0, maxdata;
376 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
377 &RQF_LDLM_INTENT_GETXATTR);
379 return ERR_PTR(-ENOMEM);
381 mdc_set_capa_size(req, &RMF_CAPA1, op_data->op_capa1);
383 rc = ldlm_prep_enqueue_req(exp, req, &cancels, count);
385 ptlrpc_request_free(req);
389 /* pack the intent */
390 lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
391 lit->opc = IT_GETXATTR;
393 maxdata = class_exp2cliimp(exp)->imp_connect_data.ocd_max_easize;
395 /* pack the intended request */
396 mdc_pack_body(req, &op_data->op_fid1, op_data->op_capa1,
397 op_data->op_valid, maxdata, -1, 0);
399 req_capsule_set_size(&req->rq_pill, &RMF_EADATA,
400 RCL_SERVER, maxdata);
402 req_capsule_set_size(&req->rq_pill, &RMF_EAVALS,
403 RCL_SERVER, maxdata);
405 req_capsule_set_size(&req->rq_pill, &RMF_EAVALS_LENS,
406 RCL_SERVER, maxdata);
408 ptlrpc_request_set_replen(req);
413 static struct ptlrpc_request *mdc_intent_unlink_pack(struct obd_export *exp,
414 struct lookup_intent *it,
415 struct md_op_data *op_data)
417 struct ptlrpc_request *req;
418 struct obd_device *obddev = class_exp2obd(exp);
419 struct ldlm_intent *lit;
422 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
423 &RQF_LDLM_INTENT_UNLINK);
425 return ERR_PTR(-ENOMEM);
427 mdc_set_capa_size(req, &RMF_CAPA1, op_data->op_capa1);
428 req_capsule_set_size(&req->rq_pill, &RMF_NAME, RCL_CLIENT,
429 op_data->op_namelen + 1);
431 rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
433 ptlrpc_request_free(req);
437 /* pack the intent */
438 lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
439 lit->opc = (__u64)it->it_op;
441 /* pack the intended request */
442 mdc_unlink_pack(req, op_data);
444 req_capsule_set_size(&req->rq_pill, &RMF_MDT_MD, RCL_SERVER,
445 obddev->u.cli.cl_max_mds_easize);
446 req_capsule_set_size(&req->rq_pill, &RMF_ACL, RCL_SERVER,
447 obddev->u.cli.cl_max_mds_cookiesize);
448 ptlrpc_request_set_replen(req);
452 static struct ptlrpc_request *mdc_intent_getattr_pack(struct obd_export *exp,
453 struct lookup_intent *it,
454 struct md_op_data *op_data)
456 struct ptlrpc_request *req;
457 struct obd_device *obddev = class_exp2obd(exp);
458 obd_valid valid = OBD_MD_FLGETATTR | OBD_MD_FLEASIZE |
459 OBD_MD_FLMODEASIZE | OBD_MD_FLDIREA |
460 OBD_MD_FLMDSCAPA | OBD_MD_MEA |
461 (client_is_remote(exp) ?
462 OBD_MD_FLRMTPERM : OBD_MD_FLACL);
463 struct ldlm_intent *lit;
466 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
467 &RQF_LDLM_INTENT_GETATTR);
469 return ERR_PTR(-ENOMEM);
471 mdc_set_capa_size(req, &RMF_CAPA1, op_data->op_capa1);
472 req_capsule_set_size(&req->rq_pill, &RMF_NAME, RCL_CLIENT,
473 op_data->op_namelen + 1);
475 rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
477 ptlrpc_request_free(req);
481 /* pack the intent */
482 lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
483 lit->opc = (__u64)it->it_op;
485 /* pack the intended request */
486 mdc_getattr_pack(req, valid, it->it_flags, op_data,
487 obddev->u.cli.cl_max_mds_easize);
489 req_capsule_set_size(&req->rq_pill, &RMF_MDT_MD, RCL_SERVER,
490 obddev->u.cli.cl_max_mds_easize);
491 if (client_is_remote(exp))
492 req_capsule_set_size(&req->rq_pill, &RMF_ACL, RCL_SERVER,
493 sizeof(struct mdt_remote_perm));
494 ptlrpc_request_set_replen(req);
498 static struct ptlrpc_request *mdc_intent_layout_pack(struct obd_export *exp,
499 struct lookup_intent *it,
500 struct md_op_data *unused)
502 struct obd_device *obd = class_exp2obd(exp);
503 struct ptlrpc_request *req;
504 struct ldlm_intent *lit;
505 struct layout_intent *layout;
508 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
509 &RQF_LDLM_INTENT_LAYOUT);
511 return ERR_PTR(-ENOMEM);
513 req_capsule_set_size(&req->rq_pill, &RMF_EADATA, RCL_CLIENT, 0);
514 rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
516 ptlrpc_request_free(req);
520 /* pack the intent */
521 lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
522 lit->opc = (__u64)it->it_op;
524 /* pack the layout intent request */
525 layout = req_capsule_client_get(&req->rq_pill, &RMF_LAYOUT_INTENT);
526 /* LAYOUT_INTENT_ACCESS is generic, specific operation will be
527 * set for replication */
528 layout->li_opc = LAYOUT_INTENT_ACCESS;
530 req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER,
531 obd->u.cli.cl_max_mds_easize);
532 ptlrpc_request_set_replen(req);
536 static struct ptlrpc_request *
537 mdc_enqueue_pack(struct obd_export *exp, int lvb_len)
539 struct ptlrpc_request *req;
542 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_LDLM_ENQUEUE);
544 return ERR_PTR(-ENOMEM);
546 rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
548 ptlrpc_request_free(req);
552 req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER, lvb_len);
553 ptlrpc_request_set_replen(req);
557 static int mdc_finish_enqueue(struct obd_export *exp,
558 struct ptlrpc_request *req,
559 struct ldlm_enqueue_info *einfo,
560 struct lookup_intent *it,
561 struct lustre_handle *lockh,
564 struct req_capsule *pill = &req->rq_pill;
565 struct ldlm_request *lockreq;
566 struct ldlm_reply *lockrep;
567 struct lustre_intent_data *intent = &it->d.lustre;
568 struct ldlm_lock *lock;
569 void *lvb_data = NULL;
573 /* Similarly, if we're going to replay this request, we don't want to
574 * actually get a lock, just perform the intent. */
575 if (req->rq_transno || req->rq_replay) {
576 lockreq = req_capsule_client_get(pill, &RMF_DLM_REQ);
577 lockreq->lock_flags |= ldlm_flags_to_wire(LDLM_FL_INTENT_ONLY);
580 if (rc == ELDLM_LOCK_ABORTED) {
582 memset(lockh, 0, sizeof(*lockh));
584 } else { /* rc = 0 */
585 lock = ldlm_handle2lock(lockh);
586 LASSERT(lock != NULL);
588 /* If the server gave us back a different lock mode, we should
589 * fix up our variables. */
590 if (lock->l_req_mode != einfo->ei_mode) {
591 ldlm_lock_addref(lockh, lock->l_req_mode);
592 ldlm_lock_decref(lockh, einfo->ei_mode);
593 einfo->ei_mode = lock->l_req_mode;
598 lockrep = req_capsule_server_get(pill, &RMF_DLM_REP);
599 LASSERT(lockrep != NULL); /* checked by ldlm_cli_enqueue() */
601 intent->it_disposition = (int)lockrep->lock_policy_res1;
602 intent->it_status = (int)lockrep->lock_policy_res2;
603 intent->it_lock_mode = einfo->ei_mode;
604 intent->it_lock_handle = lockh->cookie;
605 intent->it_data = req;
607 /* Technically speaking rq_transno must already be zero if
608 * it_status is in error, so the check is a bit redundant */
609 if ((!req->rq_transno || intent->it_status < 0) && req->rq_replay)
610 mdc_clear_replay_flag(req, intent->it_status);
612 /* If we're doing an IT_OPEN which did not result in an actual
613 * successful open, then we need to remove the bit which saves
614 * this request for unconditional replay.
616 * It's important that we do this first! Otherwise we might exit the
617 * function without doing so, and try to replay a failed create
619 if (it->it_op & IT_OPEN && req->rq_replay &&
620 (!it_disposition(it, DISP_OPEN_OPEN) ||intent->it_status != 0))
621 mdc_clear_replay_flag(req, intent->it_status);
623 DEBUG_REQ(D_RPCTRACE, req, "op: %d disposition: %x, status: %d",
624 it->it_op, intent->it_disposition, intent->it_status);
626 /* We know what to expect, so we do any byte flipping required here */
627 if (it->it_op & (IT_OPEN | IT_UNLINK | IT_LOOKUP | IT_GETATTR)) {
628 struct mdt_body *body;
630 body = req_capsule_server_get(pill, &RMF_MDT_BODY);
632 CERROR ("Can't swab mdt_body\n");
636 if (it_disposition(it, DISP_OPEN_OPEN) &&
637 !it_open_error(DISP_OPEN_OPEN, it)) {
639 * If this is a successful OPEN request, we need to set
640 * replay handler and data early, so that if replay
641 * happens immediately after swabbing below, new reply
642 * is swabbed by that handler correctly.
644 mdc_set_open_replay_data(NULL, NULL, it);
647 if ((body->valid & (OBD_MD_FLDIREA | OBD_MD_FLEASIZE)) != 0) {
650 mdc_update_max_ea_from_body(exp, body);
653 * The eadata is opaque; just check that it is there.
654 * Eventually, obd_unpackmd() will check the contents.
656 eadata = req_capsule_server_sized_get(pill, &RMF_MDT_MD,
661 /* save lvb data and length in case this is for layout
664 lvb_len = body->eadatasize;
667 * We save the reply LOV EA in case we have to replay a
668 * create for recovery. If we didn't allocate a large
669 * enough request buffer above we need to reallocate it
670 * here to hold the actual LOV EA.
672 * To not save LOV EA if request is not going to replay
673 * (for example error one).
675 if ((it->it_op & IT_OPEN) && req->rq_replay) {
677 if (req_capsule_get_size(pill, &RMF_EADATA,
680 mdc_realloc_openmsg(req, body);
682 req_capsule_shrink(pill, &RMF_EADATA,
686 req_capsule_set_size(pill, &RMF_EADATA,
690 lmm = req_capsule_client_get(pill, &RMF_EADATA);
692 memcpy(lmm, eadata, body->eadatasize);
696 if (body->valid & OBD_MD_FLRMTPERM) {
697 struct mdt_remote_perm *perm;
699 LASSERT(client_is_remote(exp));
700 perm = req_capsule_server_swab_get(pill, &RMF_ACL,
701 lustre_swab_mdt_remote_perm);
705 if (body->valid & OBD_MD_FLMDSCAPA) {
706 struct lustre_capa *capa, *p;
708 capa = req_capsule_server_get(pill, &RMF_CAPA1);
712 if (it->it_op & IT_OPEN) {
713 /* client fid capa will be checked in replay */
714 p = req_capsule_client_get(pill, &RMF_CAPA2);
719 if (body->valid & OBD_MD_FLOSSCAPA) {
720 struct lustre_capa *capa;
722 capa = req_capsule_server_get(pill, &RMF_CAPA2);
726 } else if (it->it_op & IT_LAYOUT) {
727 /* maybe the lock was granted right away and layout
728 * is packed into RMF_DLM_LVB of req */
729 lvb_len = req_capsule_get_size(pill, &RMF_DLM_LVB, RCL_SERVER);
731 lvb_data = req_capsule_server_sized_get(pill,
732 &RMF_DLM_LVB, lvb_len);
733 if (lvb_data == NULL)
738 /* fill in stripe data for layout lock */
739 lock = ldlm_handle2lock(lockh);
740 if (lock != NULL && ldlm_has_layout(lock) && lvb_data != NULL) {
743 LDLM_DEBUG(lock, "layout lock returned by: %s, lvb_len: %d\n",
744 ldlm_it2str(it->it_op), lvb_len);
746 OBD_ALLOC_LARGE(lmm, lvb_len);
751 memcpy(lmm, lvb_data, lvb_len);
753 /* install lvb_data */
754 lock_res_and_lock(lock);
755 if (lock->l_lvb_data == NULL) {
756 lock->l_lvb_type = LVB_T_LAYOUT;
757 lock->l_lvb_data = lmm;
758 lock->l_lvb_len = lvb_len;
761 unlock_res_and_lock(lock);
763 OBD_FREE_LARGE(lmm, lvb_len);
771 /* We always reserve enough space in the reply packet for a stripe MD, because
772 * we don't know in advance the file type. */
773 int mdc_enqueue(struct obd_export *exp, struct ldlm_enqueue_info *einfo,
774 struct lookup_intent *it, struct md_op_data *op_data,
775 struct lustre_handle *lockh, void *lmm, int lmmsize,
776 struct ptlrpc_request **reqp, __u64 extra_lock_flags)
778 struct obd_device *obddev = class_exp2obd(exp);
779 struct ptlrpc_request *req = NULL;
780 __u64 flags, saved_flags = extra_lock_flags;
782 struct ldlm_res_id res_id;
783 static const ldlm_policy_data_t lookup_policy =
784 { .l_inodebits = { MDS_INODELOCK_LOOKUP } };
785 static const ldlm_policy_data_t update_policy =
786 { .l_inodebits = { MDS_INODELOCK_UPDATE } };
787 static const ldlm_policy_data_t layout_policy =
788 { .l_inodebits = { MDS_INODELOCK_LAYOUT } };
789 static const ldlm_policy_data_t getxattr_policy = {
790 .l_inodebits = { MDS_INODELOCK_XATTR } };
791 ldlm_policy_data_t const *policy = &lookup_policy;
792 int generation, resends = 0;
793 struct ldlm_reply *lockrep;
794 enum lvb_type lvb_type = 0;
796 LASSERTF(!it || einfo->ei_type == LDLM_IBITS, "lock type %d\n",
799 fid_build_reg_res_name(&op_data->op_fid1, &res_id);
802 saved_flags |= LDLM_FL_HAS_INTENT;
803 if (it->it_op & (IT_UNLINK | IT_GETATTR | IT_READDIR))
804 policy = &update_policy;
805 else if (it->it_op & IT_LAYOUT)
806 policy = &layout_policy;
807 else if (it->it_op & (IT_GETXATTR | IT_SETXATTR))
808 policy = &getxattr_policy;
811 LASSERT(reqp == NULL);
813 generation = obddev->u.cli.cl_import->imp_generation;
817 /* The only way right now is FLOCK, in this case we hide flock
818 policy as lmm, but lmmsize is 0 */
819 LASSERT(lmm && lmmsize == 0);
820 LASSERTF(einfo->ei_type == LDLM_FLOCK, "lock type %d\n",
822 policy = (ldlm_policy_data_t *)lmm;
823 res_id.name[3] = LDLM_FLOCK;
824 } else if (it->it_op & IT_OPEN) {
825 req = mdc_intent_open_pack(exp, it, op_data, lmm, lmmsize,
827 policy = &update_policy;
828 einfo->ei_cbdata = NULL;
830 } else if (it->it_op & IT_UNLINK) {
831 req = mdc_intent_unlink_pack(exp, it, op_data);
832 } else if (it->it_op & (IT_GETATTR | IT_LOOKUP)) {
833 req = mdc_intent_getattr_pack(exp, it, op_data);
834 } else if (it->it_op & IT_READDIR) {
835 req = mdc_enqueue_pack(exp, 0);
836 } else if (it->it_op & IT_LAYOUT) {
837 if (!imp_connect_lvb_type(class_exp2cliimp(exp)))
839 req = mdc_intent_layout_pack(exp, it, op_data);
840 lvb_type = LVB_T_LAYOUT;
841 } else if (it->it_op & IT_GETXATTR) {
842 req = mdc_intent_getxattr_pack(exp, it, op_data);
851 if (req != NULL && it && it->it_op & IT_CREAT)
852 /* ask ptlrpc not to resend on EINPROGRESS since we have our own
854 req->rq_no_retry_einprogress = 1;
857 req->rq_generation_set = 1;
858 req->rq_import_generation = generation;
859 req->rq_sent = cfs_time_current_sec() + resends;
862 /* It is important to obtain rpc_lock first (if applicable), so that
863 * threads that are serialised with rpc_lock are not polluting our
864 * rpcs in flight counter. We do not do flock request limiting, though*/
866 mdc_get_rpc_lock(obddev->u.cli.cl_rpc_lock, it);
867 rc = mdc_enter_request(&obddev->u.cli);
869 mdc_put_rpc_lock(obddev->u.cli.cl_rpc_lock, it);
870 mdc_clear_replay_flag(req, 0);
871 ptlrpc_req_finished(req);
876 rc = ldlm_cli_enqueue(exp, &req, einfo, &res_id, policy, &flags, NULL,
877 0, lvb_type, lockh, 0);
879 /* For flock requests we immediately return without further
880 delay and let caller deal with the rest, since rest of
881 this function metadata processing makes no sense for flock
882 requests anyway. But in case of problem during comms with
883 Server (ETIMEDOUT) or any signal/kill attempt (EINTR), we
884 can not rely on caller and this mainly for F_UNLCKs
885 (explicits or automatically generated by Kernel to clean
886 current FLocks upon exit) that can't be trashed */
887 if ((rc == -EINTR) || (rc == -ETIMEDOUT))
892 mdc_exit_request(&obddev->u.cli);
893 mdc_put_rpc_lock(obddev->u.cli.cl_rpc_lock, it);
896 CERROR("ldlm_cli_enqueue: %d\n", rc);
897 mdc_clear_replay_flag(req, rc);
898 ptlrpc_req_finished(req);
902 lockrep = req_capsule_server_get(&req->rq_pill, &RMF_DLM_REP);
903 LASSERT(lockrep != NULL);
905 lockrep->lock_policy_res2 =
906 ptlrpc_status_ntoh(lockrep->lock_policy_res2);
908 /* Retry the create infinitely when we get -EINPROGRESS from
909 * server. This is required by the new quota design. */
910 if (it && it->it_op & IT_CREAT &&
911 (int)lockrep->lock_policy_res2 == -EINPROGRESS) {
912 mdc_clear_replay_flag(req, rc);
913 ptlrpc_req_finished(req);
916 CDEBUG(D_HA, "%s: resend:%d op:%d "DFID"/"DFID"\n",
917 obddev->obd_name, resends, it->it_op,
918 PFID(&op_data->op_fid1), PFID(&op_data->op_fid2));
920 if (generation == obddev->u.cli.cl_import->imp_generation) {
923 CDEBUG(D_HA, "resend cross eviction\n");
928 rc = mdc_finish_enqueue(exp, req, einfo, it, lockh, rc);
930 if (lustre_handle_is_used(lockh)) {
931 ldlm_lock_decref(lockh, einfo->ei_mode);
932 memset(lockh, 0, sizeof(*lockh));
934 ptlrpc_req_finished(req);
939 static int mdc_finish_intent_lock(struct obd_export *exp,
940 struct ptlrpc_request *request,
941 struct md_op_data *op_data,
942 struct lookup_intent *it,
943 struct lustre_handle *lockh)
945 struct lustre_handle old_lock;
946 struct mdt_body *mdt_body;
947 struct ldlm_lock *lock;
950 LASSERT(request != NULL);
951 LASSERT(request != LP_POISON);
952 LASSERT(request->rq_repmsg != LP_POISON);
954 if (!it_disposition(it, DISP_IT_EXECD)) {
955 /* The server failed before it even started executing the
956 * intent, i.e. because it couldn't unpack the request. */
957 LASSERT(it->d.lustre.it_status != 0);
958 return it->d.lustre.it_status;
960 rc = it_open_error(DISP_IT_EXECD, it);
964 mdt_body = req_capsule_server_get(&request->rq_pill, &RMF_MDT_BODY);
965 LASSERT(mdt_body != NULL); /* mdc_enqueue checked */
967 /* If we were revalidating a fid/name pair, mark the intent in
968 * case we fail and get called again from lookup */
969 if (fid_is_sane(&op_data->op_fid2) &&
970 it->it_create_mode & M_CHECK_STALE &&
971 it->it_op != IT_GETATTR) {
973 /* Also: did we find the same inode? */
974 /* sever can return one of two fids:
975 * op_fid2 - new allocated fid - if file is created.
976 * op_fid3 - existent fid - if file only open.
977 * op_fid3 is saved in lmv_intent_open */
978 if ((!lu_fid_eq(&op_data->op_fid2, &mdt_body->fid1)) &&
979 (!lu_fid_eq(&op_data->op_fid3, &mdt_body->fid1))) {
980 CDEBUG(D_DENTRY, "Found stale data "DFID"("DFID")/"DFID
981 "\n", PFID(&op_data->op_fid2),
982 PFID(&op_data->op_fid2), PFID(&mdt_body->fid1));
987 rc = it_open_error(DISP_LOOKUP_EXECD, it);
991 /* keep requests around for the multiple phases of the call
992 * this shows the DISP_XX must guarantee we make it into the call
994 if (!it_disposition(it, DISP_ENQ_CREATE_REF) &&
995 it_disposition(it, DISP_OPEN_CREATE) &&
996 !it_open_error(DISP_OPEN_CREATE, it)) {
997 it_set_disposition(it, DISP_ENQ_CREATE_REF);
998 ptlrpc_request_addref(request); /* balanced in ll_create_node */
1000 if (!it_disposition(it, DISP_ENQ_OPEN_REF) &&
1001 it_disposition(it, DISP_OPEN_OPEN) &&
1002 !it_open_error(DISP_OPEN_OPEN, it)) {
1003 it_set_disposition(it, DISP_ENQ_OPEN_REF);
1004 ptlrpc_request_addref(request); /* balanced in ll_file_open */
1005 /* BUG 11546 - eviction in the middle of open rpc processing */
1006 OBD_FAIL_TIMEOUT(OBD_FAIL_MDC_ENQUEUE_PAUSE, obd_timeout);
1009 if (it->it_op & IT_CREAT) {
1010 /* XXX this belongs in ll_create_it */
1011 } else if (it->it_op == IT_OPEN) {
1012 LASSERT(!it_disposition(it, DISP_OPEN_CREATE));
1014 LASSERT(it->it_op & (IT_GETATTR | IT_LOOKUP | IT_LAYOUT));
1017 /* If we already have a matching lock, then cancel the new
1018 * one. We have to set the data here instead of in
1019 * mdc_enqueue, because we need to use the child's inode as
1020 * the l_ast_data to match, and that's not available until
1021 * intent_finish has performed the iget().) */
1022 lock = ldlm_handle2lock(lockh);
1024 ldlm_policy_data_t policy = lock->l_policy_data;
1025 LDLM_DEBUG(lock, "matching against this");
1027 LASSERTF(fid_res_name_eq(&mdt_body->fid1,
1028 &lock->l_resource->lr_name),
1029 "Lock res_id: "DLDLMRES", fid: "DFID"\n",
1030 PLDLMRES(lock->l_resource), PFID(&mdt_body->fid1));
1031 LDLM_LOCK_PUT(lock);
1033 memcpy(&old_lock, lockh, sizeof(*lockh));
1034 if (ldlm_lock_match(NULL, LDLM_FL_BLOCK_GRANTED, NULL,
1035 LDLM_IBITS, &policy, LCK_NL, &old_lock, 0)) {
1036 ldlm_lock_decref_and_cancel(lockh,
1037 it->d.lustre.it_lock_mode);
1038 memcpy(lockh, &old_lock, sizeof(old_lock));
1039 it->d.lustre.it_lock_handle = lockh->cookie;
1042 CDEBUG(D_DENTRY,"D_IT dentry %.*s intent: %s status %d disp %x rc %d\n",
1043 op_data->op_namelen, op_data->op_name, ldlm_it2str(it->it_op),
1044 it->d.lustre.it_status, it->d.lustre.it_disposition, rc);
1048 int mdc_revalidate_lock(struct obd_export *exp, struct lookup_intent *it,
1049 struct lu_fid *fid, __u64 *bits)
1051 /* We could just return 1 immediately, but since we should only
1052 * be called in revalidate_it if we already have a lock, let's
1054 struct ldlm_res_id res_id;
1055 struct lustre_handle lockh;
1056 ldlm_policy_data_t policy;
1059 if (it->d.lustre.it_lock_handle) {
1060 lockh.cookie = it->d.lustre.it_lock_handle;
1061 mode = ldlm_revalidate_lock_handle(&lockh, bits);
1063 fid_build_reg_res_name(fid, &res_id);
1064 switch (it->it_op) {
1066 /* File attributes are held under multiple bits:
1067 * nlink is under lookup lock, size and times are
1068 * under UPDATE lock and recently we've also got
1069 * a separate permissions lock for owner/group/acl that
1070 * were protected by lookup lock before.
1071 * Getattr must provide all of that information,
1072 * so we need to ensure we have all of those locks.
1073 * Unfortunately, if the bits are split across multiple
1074 * locks, there's no easy way to match all of them here,
1075 * so an extra RPC would be performed to fetch all
1076 * of those bits at once for now. */
1077 /* For new MDTs(> 2.4), UPDATE|PERM should be enough,
1078 * but for old MDTs (< 2.4), permission is covered
1079 * by LOOKUP lock, so it needs to match all bits here.*/
1080 policy.l_inodebits.bits = MDS_INODELOCK_UPDATE |
1081 MDS_INODELOCK_LOOKUP |
1085 policy.l_inodebits.bits = MDS_INODELOCK_LAYOUT;
1088 policy.l_inodebits.bits = MDS_INODELOCK_LOOKUP;
1092 mode = mdc_lock_match(exp, LDLM_FL_BLOCK_GRANTED, fid,
1093 LDLM_IBITS, &policy,
1094 LCK_CR | LCK_CW | LCK_PR | LCK_PW,
1099 it->d.lustre.it_lock_handle = lockh.cookie;
1100 it->d.lustre.it_lock_mode = mode;
1102 it->d.lustre.it_lock_handle = 0;
1103 it->d.lustre.it_lock_mode = 0;
1110 * This long block is all about fixing up the lock and request state
1111 * so that it is correct as of the moment _before_ the operation was
1112 * applied; that way, the VFS will think that everything is normal and
1113 * call Lustre's regular VFS methods.
1115 * If we're performing a creation, that means that unless the creation
1116 * failed with EEXIST, we should fake up a negative dentry.
1118 * For everything else, we want to lookup to succeed.
1120 * One additional note: if CREATE or OPEN succeeded, we add an extra
1121 * reference to the request because we need to keep it around until
1122 * ll_create/ll_open gets called.
1124 * The server will return to us, in it_disposition, an indication of
1125 * exactly what d.lustre.it_status refers to.
1127 * If DISP_OPEN_OPEN is set, then d.lustre.it_status refers to the open() call,
1128 * otherwise if DISP_OPEN_CREATE is set, then it status is the
1129 * creation failure mode. In either case, one of DISP_LOOKUP_NEG or
1130 * DISP_LOOKUP_POS will be set, indicating whether the child lookup
1133 * Else, if DISP_LOOKUP_EXECD then d.lustre.it_status is the rc of the
1136 int mdc_intent_lock(struct obd_export *exp, struct md_op_data *op_data,
1137 void *lmm, int lmmsize, struct lookup_intent *it,
1138 int lookup_flags, struct ptlrpc_request **reqp,
1139 ldlm_blocking_callback cb_blocking,
1140 __u64 extra_lock_flags)
1142 struct ldlm_enqueue_info einfo = {
1143 .ei_type = LDLM_IBITS,
1144 .ei_mode = it_to_lock_mode(it),
1145 .ei_cb_bl = cb_blocking,
1146 .ei_cb_cp = ldlm_completion_ast,
1148 struct lustre_handle lockh;
1153 CDEBUG(D_DLMTRACE, "(name: %.*s,"DFID") in obj "DFID
1154 ", intent: %s flags %#Lo\n", op_data->op_namelen,
1155 op_data->op_name, PFID(&op_data->op_fid2),
1156 PFID(&op_data->op_fid1), ldlm_it2str(it->it_op),
1160 if (fid_is_sane(&op_data->op_fid2) &&
1161 (it->it_op & (IT_LOOKUP | IT_GETATTR))) {
1162 /* We could just return 1 immediately, but since we should only
1163 * be called in revalidate_it if we already have a lock, let's
1165 it->d.lustre.it_lock_handle = 0;
1166 rc = mdc_revalidate_lock(exp, it, &op_data->op_fid2, NULL);
1167 /* Only return failure if it was not GETATTR by cfid
1168 (from inode_revalidate) */
1169 if (rc || op_data->op_namelen != 0)
1173 /* For case if upper layer did not alloc fid, do it now. */
1174 if (!fid_is_sane(&op_data->op_fid2) && it->it_op & IT_CREAT) {
1175 rc = mdc_fid_alloc(exp, &op_data->op_fid2, op_data);
1177 CERROR("Can't alloc new fid, rc %d\n", rc);
1181 rc = mdc_enqueue(exp, &einfo, it, op_data, &lockh, lmm, lmmsize, NULL,
1186 *reqp = it->d.lustre.it_data;
1187 rc = mdc_finish_intent_lock(exp, *reqp, op_data, it, &lockh);
1191 static int mdc_intent_getattr_async_interpret(const struct lu_env *env,
1192 struct ptlrpc_request *req,
1195 struct mdc_getattr_args *ga = args;
1196 struct obd_export *exp = ga->ga_exp;
1197 struct md_enqueue_info *minfo = ga->ga_minfo;
1198 struct ldlm_enqueue_info *einfo = ga->ga_einfo;
1199 struct lookup_intent *it;
1200 struct lustre_handle *lockh;
1201 struct obd_device *obddev;
1202 struct ldlm_reply *lockrep;
1203 __u64 flags = LDLM_FL_HAS_INTENT;
1206 lockh = &minfo->mi_lockh;
1208 obddev = class_exp2obd(exp);
1210 mdc_exit_request(&obddev->u.cli);
1211 if (OBD_FAIL_CHECK(OBD_FAIL_MDC_GETATTR_ENQUEUE))
1214 rc = ldlm_cli_enqueue_fini(exp, req, einfo->ei_type, 1, einfo->ei_mode,
1215 &flags, NULL, 0, lockh, rc);
1217 CERROR("ldlm_cli_enqueue_fini: %d\n", rc);
1218 mdc_clear_replay_flag(req, rc);
1222 lockrep = req_capsule_server_get(&req->rq_pill, &RMF_DLM_REP);
1223 LASSERT(lockrep != NULL);
1225 lockrep->lock_policy_res2 =
1226 ptlrpc_status_ntoh(lockrep->lock_policy_res2);
1228 rc = mdc_finish_enqueue(exp, req, einfo, it, lockh, rc);
1232 rc = mdc_finish_intent_lock(exp, req, &minfo->mi_data, it, lockh);
1235 OBD_FREE_PTR(einfo);
1236 minfo->mi_cb(req, minfo, rc);
1240 int mdc_intent_getattr_async(struct obd_export *exp,
1241 struct md_enqueue_info *minfo,
1242 struct ldlm_enqueue_info *einfo)
1244 struct md_op_data *op_data = &minfo->mi_data;
1245 struct lookup_intent *it = &minfo->mi_it;
1246 struct ptlrpc_request *req;
1247 struct mdc_getattr_args *ga;
1248 struct obd_device *obddev = class_exp2obd(exp);
1249 struct ldlm_res_id res_id;
1250 /*XXX: Both MDS_INODELOCK_LOOKUP and MDS_INODELOCK_UPDATE are needed
1251 * for statahead currently. Consider CMD in future, such two bits
1252 * maybe managed by different MDS, should be adjusted then. */
1253 ldlm_policy_data_t policy = {
1254 .l_inodebits = { MDS_INODELOCK_LOOKUP |
1255 MDS_INODELOCK_UPDATE }
1258 __u64 flags = LDLM_FL_HAS_INTENT;
1261 "name: %.*s in inode "DFID", intent: %s flags %#Lo\n",
1262 op_data->op_namelen, op_data->op_name, PFID(&op_data->op_fid1),
1263 ldlm_it2str(it->it_op), it->it_flags);
1265 fid_build_reg_res_name(&op_data->op_fid1, &res_id);
1266 req = mdc_intent_getattr_pack(exp, it, op_data);
1268 return PTR_ERR(req);
1270 rc = mdc_enter_request(&obddev->u.cli);
1272 ptlrpc_req_finished(req);
1276 rc = ldlm_cli_enqueue(exp, &req, einfo, &res_id, &policy, &flags, NULL,
1277 0, LVB_T_NONE, &minfo->mi_lockh, 1);
1279 mdc_exit_request(&obddev->u.cli);
1280 ptlrpc_req_finished(req);
1284 CLASSERT(sizeof(*ga) <= sizeof(req->rq_async_args));
1285 ga = ptlrpc_req_async_args(req);
1287 ga->ga_minfo = minfo;
1288 ga->ga_einfo = einfo;
1290 req->rq_interpret_reply = mdc_intent_getattr_async_interpret;
1291 ptlrpcd_add_req(req, PDL_POLICY_LOCAL, -1);