static mempool_t *rpc_buffer_mempool __read_mostly;
static void __rpc_default_timer(struct rpc_task *task);
-static void rpciod_killall(void);
static void rpc_async_schedule(struct work_struct *);
static void rpc_release_task(struct rpc_task *task);
*/
static RPC_WAITQ(delay_queue, "delayq");
-/*
- * All RPC clients are linked into this list
- */
-static LIST_HEAD(all_clients);
-static DECLARE_WAIT_QUEUE_HEAD(client_kill_wait);
-
/*
* rpciod-related stuff
*/
-static DEFINE_MUTEX(rpciod_mutex);
-static unsigned int rpciod_users;
struct workqueue_struct *rpciod_workqueue;
-/*
- * Spinlock for other critical sections of code.
- */
-static DEFINE_SPINLOCK(rpc_sched_lock);
-
/*
* Disable the timer for a given RPC task. Should be called with
* queue->lock and bh_disabled in order to avoid races within
{
__rpc_init_priority_wait_queue(queue, qname, 0);
}
-EXPORT_SYMBOL(rpc_init_wait_queue);
+EXPORT_SYMBOL_GPL(rpc_init_wait_queue);
static int rpc_wait_bit_interruptible(void *word)
{
return wait_on_bit(&task->tk_runstate, RPC_TASK_ACTIVE,
action, TASK_INTERRUPTIBLE);
}
-EXPORT_SYMBOL(__rpc_wait_for_completion_task);
+EXPORT_SYMBOL_GPL(__rpc_wait_for_completion_task);
/*
* Make an RPC task runnable.
__rpc_sleep_on(q, task, action, timer);
spin_unlock_bh(&q->lock);
}
+EXPORT_SYMBOL_GPL(rpc_sleep_on);
/**
* __rpc_do_wake_up_task - wake up a single rpc_task
}
rcu_read_unlock_bh();
}
+EXPORT_SYMBOL_GPL(rpc_wake_up_task);
/*
* Wake up the next task on a priority queue.
return task;
}
+EXPORT_SYMBOL_GPL(rpc_wake_up_next);
/**
* rpc_wake_up - wake up all rpc_tasks
spin_unlock(&queue->lock);
rcu_read_unlock_bh();
}
+EXPORT_SYMBOL_GPL(rpc_wake_up);
/**
* rpc_wake_up_status - wake up all rpc_tasks and set their status value.
spin_unlock(&queue->lock);
rcu_read_unlock_bh();
}
+EXPORT_SYMBOL_GPL(rpc_wake_up_status);
static void __rpc_atrun(struct rpc_task *task)
{
task->tk_timeout = delay;
rpc_sleep_on(&delay_queue, task, NULL, __rpc_atrun);
}
+EXPORT_SYMBOL_GPL(rpc_delay);
/*
* Helper to call task->tk_ops->rpc_call_prepare
}
}
}
-EXPORT_SYMBOL(rpc_exit_task);
+EXPORT_SYMBOL_GPL(rpc_exit_task);
void rpc_release_calldata(const struct rpc_call_ops *ops, void *calldata)
{
rpc_set_running(task);
__rpc_execute(task);
}
+EXPORT_SYMBOL_GPL(rpc_execute);
static void rpc_async_schedule(struct work_struct *work)
{
task->tk_pid, size, buf);
return &buf->data;
}
+EXPORT_SYMBOL_GPL(rpc_malloc);
/**
* rpc_free - free buffer allocated via rpc_malloc
else
kfree(buf);
}
+EXPORT_SYMBOL_GPL(rpc_free);
/*
* Creation and deletion of RPC task structures
void rpc_init_task(struct rpc_task *task, struct rpc_clnt *clnt, int flags, const struct rpc_call_ops *tk_ops, void *calldata)
{
memset(task, 0, sizeof(*task));
- init_timer(&task->tk_timer);
- task->tk_timer.data = (unsigned long) task;
- task->tk_timer.function = (void (*)(unsigned long)) rpc_run_timer;
+ setup_timer(&task->tk_timer, (void (*)(unsigned long))rpc_run_timer,
+ (unsigned long)task);
atomic_set(&task->tk_count, 1);
task->tk_client = clnt;
task->tk_flags = flags;
task->tk_start = jiffies;
dprintk("RPC: new task initialized, procpid %u\n",
- current->pid);
+ task_pid_nr(current));
}
+EXPORT_SYMBOL_GPL(rpc_init_task);
static struct rpc_task *
rpc_alloc_task(void)
call_rcu_bh(&task->u.tk_rcu, rpc_free_task);
rpc_release_calldata(tk_ops, calldata);
}
-EXPORT_SYMBOL(rpc_put_task);
+EXPORT_SYMBOL_GPL(rpc_put_task);
static void rpc_release_task(struct rpc_task *task)
{
rpc_put_task(task);
}
-/**
- * rpc_run_task - Allocate a new RPC task, then run rpc_execute against it
- * @clnt: pointer to RPC client
- * @flags: RPC flags
- * @ops: RPC call ops
- * @data: user call data
- */
-struct rpc_task *rpc_run_task(struct rpc_clnt *clnt, int flags,
- const struct rpc_call_ops *ops,
- void *data)
-{
- struct rpc_task *task;
- task = rpc_new_task(clnt, flags, ops, data);
- if (task == NULL) {
- rpc_release_calldata(ops, data);
- return ERR_PTR(-ENOMEM);
- }
- atomic_inc(&task->tk_count);
- rpc_execute(task);
- return task;
-}
-EXPORT_SYMBOL(rpc_run_task);
-
/*
* Kill all tasks for the given client.
* XXX: kill their descendants as well?
}
spin_unlock(&clnt->cl_lock);
}
+EXPORT_SYMBOL_GPL(rpc_killall_tasks);
-static void rpciod_killall(void)
+int rpciod_up(void)
{
- struct rpc_clnt *clnt;
- unsigned long flags;
-
- for(;;) {
- clear_thread_flag(TIF_SIGPENDING);
-
- spin_lock(&rpc_sched_lock);
- list_for_each_entry(clnt, &all_clients, cl_clients)
- rpc_killall_tasks(clnt);
- spin_unlock(&rpc_sched_lock);
- flush_workqueue(rpciod_workqueue);
- if (!list_empty(&all_clients))
- break;
- dprintk("RPC: rpciod_killall: waiting for tasks "
- "to exit\n");
- wait_event_timeout(client_kill_wait,
- list_empty(&all_clients), 1*HZ);
- }
-
- spin_lock_irqsave(¤t->sighand->siglock, flags);
- recalc_sigpending();
- spin_unlock_irqrestore(¤t->sighand->siglock, flags);
+ return try_module_get(THIS_MODULE) ? 0 : -EINVAL;
}
-void rpc_register_client(struct rpc_clnt *clnt)
+void rpciod_down(void)
{
- spin_lock(&rpc_sched_lock);
- list_add(&clnt->cl_clients, &all_clients);
- spin_unlock(&rpc_sched_lock);
-}
-
-void rpc_unregister_client(struct rpc_clnt *clnt)
-{
- spin_lock(&rpc_sched_lock);
- list_del(&clnt->cl_clients);
- if (list_empty(&all_clients))
- wake_up(&client_kill_wait);
- spin_unlock(&rpc_sched_lock);
+ module_put(THIS_MODULE);
}
/*
- * Start up the rpciod process if it's not already running.
+ * Start up the rpciod workqueue.
*/
-int
-rpciod_up(void)
+static int rpciod_start(void)
{
struct workqueue_struct *wq;
- int error = 0;
- mutex_lock(&rpciod_mutex);
- dprintk("RPC: rpciod_up: users %u\n", rpciod_users);
- rpciod_users++;
- if (rpciod_workqueue)
- goto out;
- /*
- * If there's no pid, we should be the first user.
- */
- if (rpciod_users > 1)
- printk(KERN_WARNING "rpciod_up: no workqueue, %u users??\n", rpciod_users);
/*
* Create the rpciod thread and wait for it to start.
*/
- error = -ENOMEM;
+ dprintk("RPC: creating workqueue rpciod\n");
wq = create_workqueue("rpciod");
- if (wq == NULL) {
- printk(KERN_WARNING "rpciod_up: create workqueue failed, error=%d\n", error);
- rpciod_users--;
- goto out;
- }
rpciod_workqueue = wq;
- error = 0;
-out:
- mutex_unlock(&rpciod_mutex);
- return error;
+ return rpciod_workqueue != NULL;
}
-void
-rpciod_down(void)
+static void rpciod_stop(void)
{
- mutex_lock(&rpciod_mutex);
- dprintk("RPC: rpciod_down sema %u\n", rpciod_users);
- if (rpciod_users) {
- if (--rpciod_users)
- goto out;
- } else
- printk(KERN_WARNING "rpciod_down: no users??\n");
+ struct workqueue_struct *wq = NULL;
- if (!rpciod_workqueue) {
- dprintk("RPC: rpciod_down: Nothing to do!\n");
- goto out;
- }
- rpciod_killall();
+ if (rpciod_workqueue == NULL)
+ return;
+ dprintk("RPC: destroying workqueue rpciod\n");
- destroy_workqueue(rpciod_workqueue);
+ wq = rpciod_workqueue;
rpciod_workqueue = NULL;
- out:
- mutex_unlock(&rpciod_mutex);
+ destroy_workqueue(wq);
}
-#ifdef RPC_DEBUG
-void rpc_show_tasks(void)
-{
- struct rpc_clnt *clnt;
- struct rpc_task *t;
-
- spin_lock(&rpc_sched_lock);
- if (list_empty(&all_clients))
- goto out;
- printk("-pid- proc flgs status -client- -prog- --rqstp- -timeout "
- "-rpcwait -action- ---ops--\n");
- list_for_each_entry(clnt, &all_clients, cl_clients) {
- if (list_empty(&clnt->cl_tasks))
- continue;
- spin_lock(&clnt->cl_lock);
- list_for_each_entry(t, &clnt->cl_tasks, tk_task) {
- const char *rpc_waitq = "none";
-
- if (RPC_IS_QUEUED(t))
- rpc_waitq = rpc_qname(t->u.tk_wait.rpc_waitq);
-
- printk("%5u %04d %04x %6d %8p %6d %8p %8ld %8s %8p %8p\n",
- t->tk_pid,
- (t->tk_msg.rpc_proc ? t->tk_msg.rpc_proc->p_proc : -1),
- t->tk_flags, t->tk_status,
- t->tk_client,
- (t->tk_client ? t->tk_client->cl_prog : 0),
- t->tk_rqstp, t->tk_timeout,
- rpc_waitq,
- t->tk_action, t->tk_ops);
- }
- spin_unlock(&clnt->cl_lock);
- }
-out:
- spin_unlock(&rpc_sched_lock);
-}
-#endif
-
void
rpc_destroy_mempool(void)
{
+ rpciod_stop();
if (rpc_buffer_mempool)
mempool_destroy(rpc_buffer_mempool);
if (rpc_task_mempool)
rpc_task_slabp = kmem_cache_create("rpc_tasks",
sizeof(struct rpc_task),
0, SLAB_HWCACHE_ALIGN,
- NULL, NULL);
+ NULL);
if (!rpc_task_slabp)
goto err_nomem;
rpc_buffer_slabp = kmem_cache_create("rpc_buffers",
RPC_BUFFER_MAXSIZE,
0, SLAB_HWCACHE_ALIGN,
- NULL, NULL);
+ NULL);
if (!rpc_buffer_slabp)
goto err_nomem;
rpc_task_mempool = mempool_create_slab_pool(RPC_TASK_POOLSIZE,
rpc_buffer_slabp);
if (!rpc_buffer_mempool)
goto err_nomem;
+ if (!rpciod_start())
+ goto err_nomem;
return 0;
err_nomem:
rpc_destroy_mempool();