struct ceph_msg *msg;
struct ceph_mds_session_head *h;
- msg = ceph_msg_new(CEPH_MSG_CLIENT_SESSION, sizeof(*h), 0, 0, NULL);
- if (IS_ERR(msg)) {
+ msg = ceph_msg_new(CEPH_MSG_CLIENT_SESSION, sizeof(*h));
+ if (!msg) {
pr_err("create_session_msg ENOMEM creating msg\n");
- return ERR_PTR(PTR_ERR(msg));
+ return NULL;
}
h = msg->front.iov_base;
h->op = cpu_to_le32(op);
struct ceph_msg *msg;
int mstate;
int mds = session->s_mds;
- int err = 0;
/* wait for mds to go active? */
mstate = ceph_mdsmap_get_state(mdsc->mdsmap, mds);
/* send connect message */
msg = create_session_msg(CEPH_SESSION_REQUEST_OPEN, session->s_seq);
- if (IS_ERR(msg)) {
- err = PTR_ERR(msg);
- goto out;
- }
+ if (!msg)
+ return -ENOMEM;
ceph_con_send(&session->s_con, msg);
-
-out:
return 0;
}
}
/*
- * Helper to safely iterate over all caps associated with a session.
+ * Helper to safely iterate over all caps associated with a session, with
+ * special care taken to handle a racing __ceph_remove_cap().
*
- * caller must hold session s_mutex
+ * Caller must hold session s_mutex.
*/
static int iterate_session_caps(struct ceph_mds_session *session,
int (*cb)(struct inode *, struct ceph_cap *,
ceph_mds_state_name(state));
msg = create_session_msg(CEPH_SESSION_REQUEST_RENEWCAPS,
++session->s_renew_seq);
- if (IS_ERR(msg))
- return PTR_ERR(msg);
+ if (!msg)
+ return -ENOMEM;
ceph_con_send(&session->s_con, msg);
return 0;
}
struct ceph_mds_session *session)
{
struct ceph_msg *msg;
- int err = 0;
dout("request_close_session mds%d state %s seq %lld\n",
session->s_mds, session_state_name(session->s_state),
session->s_seq);
msg = create_session_msg(CEPH_SESSION_REQUEST_CLOSE, session->s_seq);
- if (IS_ERR(msg))
- err = PTR_ERR(msg);
- else
- ceph_con_send(&session->s_con, msg);
- return err;
+ if (!msg)
+ return -ENOMEM;
+ ceph_con_send(&session->s_con, msg);
+ return 0;
}
/*
while (session->s_num_cap_releases < session->s_nr_caps + extra) {
spin_unlock(&session->s_cap_lock);
- msg = ceph_msg_new(CEPH_MSG_CLIENT_CAPRELEASE, PAGE_CACHE_SIZE,
- 0, 0, NULL);
+ msg = ceph_msg_new(CEPH_MSG_CLIENT_CAPRELEASE, PAGE_CACHE_SIZE);
if (!msg)
goto out_unlocked;
dout("add_cap_releases %p msg %p now %d\n", session, msg,
if (!req)
return ERR_PTR(-ENOMEM);
+ mutex_init(&req->r_fill_mutex);
req->r_started = jiffies;
req->r_resend_mds = -1;
INIT_LIST_HEAD(&req->r_unsafe_dir_item);
struct inode *inode = temp->d_inode;
if (inode && ceph_snap(inode) == CEPH_SNAPDIR) {
- dout("build_path_dentry path+%d: %p SNAPDIR\n",
+ dout("build_path path+%d: %p SNAPDIR\n",
pos, temp);
} else if (stop_on_nosnap && inode &&
ceph_snap(inode) == CEPH_NOSNAP) {
break;
strncpy(path + pos, temp->d_name.name,
temp->d_name.len);
- dout("build_path_dentry path+%d: %p '%.*s'\n",
- pos, temp, temp->d_name.len, path + pos);
}
if (pos)
path[--pos] = '/';
temp = temp->d_parent;
if (temp == NULL) {
- pr_err("build_path_dentry corrupt dentry\n");
+ pr_err("build_path corrupt dentry\n");
kfree(path);
return ERR_PTR(-EINVAL);
}
}
if (pos != 0) {
- pr_err("build_path_dentry did not end path lookup where "
+ pr_err("build_path did not end path lookup where "
"expected, namelen is %d, pos is %d\n", len, pos);
/* presumably this is only possible if racing with a
rename of one of the parent directories (we can not
*base = ceph_ino(temp->d_inode);
*plen = len;
- dout("build_path_dentry on %p %d built %llx '%.*s'\n",
+ dout("build_path on %p %d built %llx '%.*s'\n",
dentry, atomic_read(&dentry->d_count), *base, len, path);
return path;
}
if (req->r_old_dentry_drop)
len += req->r_old_dentry->d_name.len;
- msg = ceph_msg_new(CEPH_MSG_CLIENT_REQUEST, len, 0, 0, NULL);
- if (IS_ERR(msg))
+ msg = ceph_msg_new(CEPH_MSG_CLIENT_REQUEST, len);
+ if (!msg) {
+ msg = ERR_PTR(-ENOMEM);
goto out_free2;
+ }
msg->hdr.tid = cpu_to_le64(req->r_tid);
}
msg = create_request_message(mdsc, req, mds);
if (IS_ERR(msg)) {
- req->r_reply = ERR_PTR(PTR_ERR(msg));
+ req->r_err = PTR_ERR(msg);
complete_request(mdsc, req);
- return -PTR_ERR(msg);
+ return PTR_ERR(msg);
}
req->r_request = msg;
int mds = -1;
int err = -EAGAIN;
- if (req->r_reply)
+ if (req->r_err || req->r_got_result)
goto out;
if (req->r_timeout &&
return err;
finish:
- req->r_reply = ERR_PTR(err);
+ req->r_err = err;
complete_request(mdsc, req);
goto out;
}
__register_request(mdsc, req, dir);
__do_request(mdsc, req);
- /* wait */
- if (!req->r_reply) {
- mutex_unlock(&mdsc->mutex);
- if (req->r_timeout) {
- err = (long)wait_for_completion_interruptible_timeout(
- &req->r_completion, req->r_timeout);
- if (err == 0)
- req->r_reply = ERR_PTR(-EIO);
- else if (err < 0)
- req->r_reply = ERR_PTR(err);
- } else {
- err = wait_for_completion_interruptible(
- &req->r_completion);
- if (err)
- req->r_reply = ERR_PTR(err);
- }
- mutex_lock(&mdsc->mutex);
+ if (req->r_err) {
+ err = req->r_err;
+ __unregister_request(mdsc, req);
+ dout("do_request early error %d\n", err);
+ goto out;
}
- if (IS_ERR(req->r_reply)) {
- err = PTR_ERR(req->r_reply);
- req->r_reply = NULL;
+ /* wait */
+ mutex_unlock(&mdsc->mutex);
+ dout("do_request waiting\n");
+ if (req->r_timeout) {
+ err = (long)wait_for_completion_interruptible_timeout(
+ &req->r_completion, req->r_timeout);
+ if (err == 0)
+ err = -EIO;
+ } else {
+ err = wait_for_completion_interruptible(&req->r_completion);
+ }
+ dout("do_request waited, got %d\n", err);
+ mutex_lock(&mdsc->mutex);
- if (err == -ERESTARTSYS) {
- /* aborted */
- req->r_aborted = true;
+ /* only abort if we didn't race with a real reply */
+ if (req->r_got_result) {
+ err = le32_to_cpu(req->r_reply_info.head->result);
+ } else if (err < 0) {
+ dout("aborted request %lld with %d\n", req->r_tid, err);
- if (req->r_locked_dir &&
- (req->r_op & CEPH_MDS_OP_WRITE)) {
- struct ceph_inode_info *ci =
- ceph_inode(req->r_locked_dir);
+ /*
+ * ensure we aren't running concurrently with
+ * ceph_fill_trace or ceph_readdir_prepopulate, which
+ * rely on locks (dir mutex) held by our caller.
+ */
+ mutex_lock(&req->r_fill_mutex);
+ req->r_err = err;
+ req->r_aborted = true;
+ mutex_unlock(&req->r_fill_mutex);
- dout("aborted, clearing I_COMPLETE on %p\n",
- req->r_locked_dir);
- spin_lock(&req->r_locked_dir->i_lock);
- ci->i_ceph_flags &= ~CEPH_I_COMPLETE;
- ci->i_release_count++;
- spin_unlock(&req->r_locked_dir->i_lock);
- }
- } else {
- /* clean up this request */
- __unregister_request(mdsc, req);
- if (!list_empty(&req->r_unsafe_item))
- list_del_init(&req->r_unsafe_item);
- complete(&req->r_safe_completion);
+ if (req->r_locked_dir &&
+ (req->r_op & CEPH_MDS_OP_WRITE)) {
+ struct ceph_inode_info *ci =
+ ceph_inode(req->r_locked_dir);
+
+ dout("aborted, clearing I_COMPLETE on %p, leases\n",
+ req->r_locked_dir);
+ spin_lock(&req->r_locked_dir->i_lock);
+ ci->i_ceph_flags &= ~CEPH_I_COMPLETE;
+ ci->i_release_count++;
+ spin_unlock(&req->r_locked_dir->i_lock);
+
+ if (req->r_dentry)
+ ceph_invalidate_dentry_lease(req->r_dentry);
+ if (req->r_old_dentry)
+ ceph_invalidate_dentry_lease(req->r_old_dentry);
}
- } else if (req->r_err) {
- err = req->r_err;
} else {
- err = le32_to_cpu(req->r_reply_info.head->result);
+ err = req->r_err;
}
- mutex_unlock(&mdsc->mutex);
+out:
+ mutex_unlock(&mdsc->mutex);
dout("do_request %p done, result %d\n", req, err);
return err;
}
mutex_unlock(&mdsc->mutex);
goto out;
}
- }
-
- BUG_ON(req->r_reply);
-
- if (!head->safe) {
+ } else {
req->r_got_unsafe = true;
list_add_tail(&req->r_unsafe_item, &req->r_session->s_unsafe);
}
}
/* insert trace into our cache */
+ mutex_lock(&req->r_fill_mutex);
err = ceph_fill_trace(mdsc->client->sb, req, req->r_session);
if (err == 0) {
if (result == 0 && rinfo->dir_nr)
ceph_readdir_prepopulate(req, req->r_session);
ceph_unreserve_caps(&req->r_caps_reservation);
}
+ mutex_unlock(&req->r_fill_mutex);
up_read(&mdsc->snap_rwsem);
out_err:
- if (err) {
- req->r_err = err;
+ mutex_lock(&mdsc->mutex);
+ if (!req->r_aborted) {
+ if (err) {
+ req->r_err = err;
+ } else {
+ req->r_reply = msg;
+ ceph_msg_get(msg);
+ req->r_got_result = true;
+ }
} else {
- req->r_reply = msg;
- ceph_msg_get(msg);
+ dout("reply arrived after request %lld was aborted\n", tid);
}
+ mutex_unlock(&mdsc->mutex);
add_cap_releases(mdsc, req->r_session, -1);
mutex_unlock(&session->s_mutex);
struct ceph_mds_session *session = NULL;
struct ceph_msg *reply;
struct rb_node *p;
- int err;
+ int err = -ENOMEM;
struct ceph_pagelist *pagelist;
pr_info("reconnect to recovering mds%d\n", mds);
goto fail_nopagelist;
ceph_pagelist_init(pagelist);
- reply = ceph_msg_new(CEPH_MSG_CLIENT_RECONNECT, 0, 0, 0, NULL);
- if (IS_ERR(reply)) {
- err = PTR_ERR(reply);
+ err = -ENOMEM;
+ reply = ceph_msg_new(CEPH_MSG_CLIENT_RECONNECT, 0);
+ if (!reply)
goto fail_nomsg;
- }
/* find session */
session = __ceph_lookup_mds_session(mdsc, mds);
goto fail;
err = iterate_session_caps(session, encode_caps_cb, pagelist);
if (err < 0)
- goto out;
+ goto fail;
/*
* snaprealms. we provide mds with the ino, seq (version), and
reply->nr_pages = calc_pages_for(0, pagelist->length);
ceph_con_send(&session->s_con, reply);
- if (session) {
- session->s_state = CEPH_MDS_SESSION_OPEN;
- __wake_requests(mdsc, &session->s_waiting);
- }
+ session->s_state = CEPH_MDS_SESSION_OPEN;
+ mutex_unlock(&session->s_mutex);
+
+ mutex_lock(&mdsc->mutex);
+ __wake_requests(mdsc, &session->s_waiting);
+ mutex_unlock(&mdsc->mutex);
+
+ ceph_put_mds_session(session);
-out:
up_read(&mdsc->snap_rwsem);
- if (session) {
- mutex_unlock(&session->s_mutex);
- ceph_put_mds_session(session);
- }
mutex_lock(&mdsc->mutex);
return;
fail:
ceph_msg_put(reply);
+ up_read(&mdsc->snap_rwsem);
+ mutex_unlock(&session->s_mutex);
+ ceph_put_mds_session(session);
fail_nomsg:
ceph_pagelist_release(pagelist);
kfree(pagelist);
fail_nopagelist:
- pr_err("ENOMEM preparing reconnect for mds%d\n", mds);
- goto out;
+ pr_err("error %d preparing reconnect for mds%d\n", err, mds);
+ mutex_lock(&mdsc->mutex);
+ return;
}
dnamelen = dentry->d_name.len;
len += dnamelen;
- msg = ceph_msg_new(CEPH_MSG_CLIENT_LEASE, len, 0, 0, NULL);
- if (IS_ERR(msg))
+ msg = ceph_msg_new(CEPH_MSG_CLIENT_LEASE, len);
+ if (!msg)
return;
lease = msg->front.iov_base;
lease->action = action;
mdsc->client = client;
mutex_init(&mdsc->mutex);
mdsc->mdsmap = kzalloc(sizeof(*mdsc->mdsmap), GFP_NOFS);
+ if (mdsc->mdsmap == NULL)
+ return -ENOMEM;
+
init_completion(&mdsc->safe_umount_waiters);
init_completion(&mdsc->session_close_waiters);
INIT_LIST_HEAD(&mdsc->waiting_for_map);
init_waitqueue_head(&mdsc->cap_flushing_wq);
spin_lock_init(&mdsc->dentry_lru_lock);
INIT_LIST_HEAD(&mdsc->dentry_lru);
+
return 0;
}
{
u64 want_tid, want_flush;
+ if (mdsc->client->mount_state == CEPH_MOUNT_SHUTDOWN)
+ return;
+
dout("sync\n");
mutex_lock(&mdsc->mutex);
want_tid = mdsc->last_tid;