Merge branch 'for-linus' of git://oss.sgi.com/xfs/xfs
authorLinus Torvalds <torvalds@linux-foundation.org>
Tue, 25 May 2010 15:17:01 +0000 (08:17 -0700)
committerLinus Torvalds <torvalds@linux-foundation.org>
Tue, 25 May 2010 15:17:01 +0000 (08:17 -0700)
* 'for-linus' of git://oss.sgi.com/xfs/xfs:
  xfs: Ensure inode allocation buffers are fully replayed
  xfs: enable background pushing of the CIL
  xfs: forced unmounts need to push the CIL
  xfs: Introduce delayed logging core code
  xfs: Delayed logging design documentation
  xfs: Improve scalability of busy extent tracking
  xfs: make the log ticket ID available outside the log infrastructure
  xfs: clean up log ticket overrun debug output
  xfs: Clean up XFS_BLI_* flag namespace
  xfs: modify buffer item reference counting
  xfs: allow log ticket allocation to take allocation flags
  xfs: Don't reuse the same transaction ID for duplicated transactions.

27 files changed:
Documentation/filesystems/xfs-delayed-logging-design.txt [new file with mode: 0644]
fs/xfs/Makefile
fs/xfs/linux-2.6/xfs_buf.c
fs/xfs/linux-2.6/xfs_quotaops.c
fs/xfs/linux-2.6/xfs_super.c
fs/xfs/linux-2.6/xfs_trace.h
fs/xfs/quota/xfs_dquot.c
fs/xfs/xfs_ag.h
fs/xfs/xfs_alloc.c
fs/xfs/xfs_alloc.h
fs/xfs/xfs_alloc_btree.c
fs/xfs/xfs_buf_item.c
fs/xfs/xfs_buf_item.h
fs/xfs/xfs_error.c
fs/xfs/xfs_log.c
fs/xfs/xfs_log.h
fs/xfs/xfs_log_cil.c [new file with mode: 0644]
fs/xfs/xfs_log_priv.h
fs/xfs/xfs_log_recover.c
fs/xfs/xfs_log_recover.h
fs/xfs/xfs_mount.h
fs/xfs/xfs_trans.c
fs/xfs/xfs_trans.h
fs/xfs/xfs_trans_buf.c
fs/xfs/xfs_trans_item.c
fs/xfs/xfs_trans_priv.h
fs/xfs/xfs_types.h

diff --git a/Documentation/filesystems/xfs-delayed-logging-design.txt b/Documentation/filesystems/xfs-delayed-logging-design.txt
new file mode 100644 (file)
index 0000000..d8119e9
--- /dev/null
@@ -0,0 +1,816 @@
+XFS Delayed Logging Design
+--------------------------
+
+Introduction to Re-logging in XFS
+---------------------------------
+
+XFS logging is a combination of logical and physical logging. Some objects,
+such as inodes and dquots, are logged in logical format where the details
+logged are made up of the changes to in-core structures rather than on-disk
+structures. Other objects - typically buffers - have their physical changes
+logged. The reason for these differences is to reduce the amount of log space
+required for objects that are frequently logged. Some parts of inodes are more
+frequently logged than others, and inodes are typically more frequently logged
+than any other object (except maybe the superblock buffer) so keeping the
+amount of metadata logged low is of prime importance.
+
+The reason that this is such a concern is that XFS allows multiple separate
+modifications to a single object to be carried in the log at any given time.
+This allows the log to avoid needing to flush each change to disk before
+recording a new change to the object. XFS does this via a method called
+"re-logging". Conceptually, this is quite simple - all it requires is that any
+new change to the object is recorded with a *new copy* of all the existing
+changes in the new transaction that is written to the log.
+
+That is, if we have a sequence of changes A through to F, and the object was
+written to disk after change D, we would see in the log the following series
+of transactions, their contents and the log sequence number (LSN) of the
+transaction:
+
+       Transaction             Contents        LSN
+          A                       A               X
+          B                      A+B             X+n
+          C                     A+B+C           X+n+m
+          D                    A+B+C+D         X+n+m+o
+           <object written to disk>
+          E                       E               Y (> X+n+m+o)
+          F                      E+F             Yٍ+p
+
+In other words, each time an object is relogged, the new transaction contains
+the aggregation of all the previous changes currently held only in the log.
+
+This relogging technique also allows objects to be moved forward in the log so
+that an object being relogged does not prevent the tail of the log from ever
+moving forward.  This can be seen in the table above by the changing
+(increasing) LSN of each subsquent transaction - the LSN is effectively a
+direct encoding of the location in the log of the transaction.
+
+This relogging is also used to implement long-running, multiple-commit
+transactions.  These transaction are known as rolling transactions, and require
+a special log reservation known as a permanent transaction reservation. A
+typical example of a rolling transaction is the removal of extents from an
+inode which can only be done at a rate of two extents per transaction because
+of reservation size limitations. Hence a rolling extent removal transaction
+keeps relogging the inode and btree buffers as they get modified in each
+removal operation. This keeps them moving forward in the log as the operation
+progresses, ensuring that current operation never gets blocked by itself if the
+log wraps around.
+
+Hence it can be seen that the relogging operation is fundamental to the correct
+working of the XFS journalling subsystem. From the above description, most
+people should be able to see why the XFS metadata operations writes so much to
+the log - repeated operations to the same objects write the same changes to
+the log over and over again. Worse is the fact that objects tend to get
+dirtier as they get relogged, so each subsequent transaction is writing more
+metadata into the log.
+
+Another feature of the XFS transaction subsystem is that most transactions are
+asynchronous. That is, they don't commit to disk until either a log buffer is
+filled (a log buffer can hold multiple transactions) or a synchronous operation
+forces the log buffers holding the transactions to disk. This means that XFS is
+doing aggregation of transactions in memory - batching them, if you like - to
+minimise the impact of the log IO on transaction throughput.
+
+The limitation on asynchronous transaction throughput is the number and size of
+log buffers made available by the log manager. By default there are 8 log
+buffers available and the size of each is 32kB - the size can be increased up
+to 256kB by use of a mount option.
+
+Effectively, this gives us the maximum bound of outstanding metadata changes
+that can be made to the filesystem at any point in time - if all the log
+buffers are full and under IO, then no more transactions can be committed until
+the current batch completes. It is now common for a single current CPU core to
+be to able to issue enough transactions to keep the log buffers full and under
+IO permanently. Hence the XFS journalling subsystem can be considered to be IO
+bound.
+
+Delayed Logging: Concepts
+-------------------------
+
+The key thing to note about the asynchronous logging combined with the
+relogging technique XFS uses is that we can be relogging changed objects
+multiple times before they are committed to disk in the log buffers. If we
+return to the previous relogging example, it is entirely possible that
+transactions A through D are committed to disk in the same log buffer.
+
+That is, a single log buffer may contain multiple copies of the same object,
+but only one of those copies needs to be there - the last one "D", as it
+contains all the changes from the previous changes. In other words, we have one
+necessary copy in the log buffer, and three stale copies that are simply
+wasting space. When we are doing repeated operations on the same set of
+objects, these "stale objects" can be over 90% of the space used in the log
+buffers. It is clear that reducing the number of stale objects written to the
+log would greatly reduce the amount of metadata we write to the log, and this
+is the fundamental goal of delayed logging.
+
+From a conceptual point of view, XFS is already doing relogging in memory (where
+memory == log buffer), only it is doing it extremely inefficiently. It is using
+logical to physical formatting to do the relogging because there is no
+infrastructure to keep track of logical changes in memory prior to physically
+formatting the changes in a transaction to the log buffer. Hence we cannot avoid
+accumulating stale objects in the log buffers.
+
+Delayed logging is the name we've given to keeping and tracking transactional
+changes to objects in memory outside the log buffer infrastructure. Because of
+the relogging concept fundamental to the XFS journalling subsystem, this is
+actually relatively easy to do - all the changes to logged items are already
+tracked in the current infrastructure. The big problem is how to accumulate
+them and get them to the log in a consistent, recoverable manner.
+Describing the problems and how they have been solved is the focus of this
+document.
+
+One of the key changes that delayed logging makes to the operation of the
+journalling subsystem is that it disassociates the amount of outstanding
+metadata changes from the size and number of log buffers available. In other
+words, instead of there only being a maximum of 2MB of transaction changes not
+written to the log at any point in time, there may be a much greater amount
+being accumulated in memory. Hence the potential for loss of metadata on a
+crash is much greater than for the existing logging mechanism.
+
+It should be noted that this does not change the guarantee that log recovery
+will result in a consistent filesystem. What it does mean is that as far as the
+recovered filesystem is concerned, there may be many thousands of transactions
+that simply did not occur as a result of the crash. This makes it even more
+important that applications that care about their data use fsync() where they
+need to ensure application level data integrity is maintained.
+
+It should be noted that delayed logging is not an innovative new concept that
+warrants rigorous proofs to determine whether it is correct or not. The method
+of accumulating changes in memory for some period before writing them to the
+log is used effectively in many filesystems including ext3 and ext4. Hence
+no time is spent in this document trying to convince the reader that the
+concept is sound. Instead it is simply considered a "solved problem" and as
+such implementing it in XFS is purely an exercise in software engineering.
+
+The fundamental requirements for delayed logging in XFS are simple:
+
+       1. Reduce the amount of metadata written to the log by at least
+          an order of magnitude.
+       2. Supply sufficient statistics to validate Requirement #1.
+       3. Supply sufficient new tracing infrastructure to be able to debug
+          problems with the new code.
+       4. No on-disk format change (metadata or log format).
+       5. Enable and disable with a mount option.
+       6. No performance regressions for synchronous transaction workloads.
+
+Delayed Logging: Design
+-----------------------
+
+Storing Changes
+
+The problem with accumulating changes at a logical level (i.e. just using the
+existing log item dirty region tracking) is that when it comes to writing the
+changes to the log buffers, we need to ensure that the object we are formatting
+is not changing while we do this. This requires locking the object to prevent
+concurrent modification. Hence flushing the logical changes to the log would
+require us to lock every object, format them, and then unlock them again.
+
+This introduces lots of scope for deadlocks with transactions that are already
+running. For example, a transaction has object A locked and modified, but needs
+the delayed logging tracking lock to commit the transaction. However, the
+flushing thread has the delayed logging tracking lock already held, and is
+trying to get the lock on object A to flush it to the log buffer. This appears
+to be an unsolvable deadlock condition, and it was solving this problem that
+was the barrier to implementing delayed logging for so long.
+
+The solution is relatively simple - it just took a long time to recognise it.
+Put simply, the current logging code formats the changes to each item into an
+vector array that points to the changed regions in the item. The log write code
+simply copies the memory these vectors point to into the log buffer during
+transaction commit while the item is locked in the transaction. Instead of
+using the log buffer as the destination of the formatting code, we can use an
+allocated memory buffer big enough to fit the formatted vector.
+
+If we then copy the vector into the memory buffer and rewrite the vector to
+point to the memory buffer rather than the object itself, we now have a copy of
+the changes in a format that is compatible with the log buffer writing code.
+that does not require us to lock the item to access. This formatting and
+rewriting can all be done while the object is locked during transaction commit,
+resulting in a vector that is transactionally consistent and can be accessed
+without needing to lock the owning item.
+
+Hence we avoid the need to lock items when we need to flush outstanding
+asynchronous transactions to the log. The differences between the existing
+formatting method and the delayed logging formatting can be seen in the
+diagram below.
+
+Current format log vector:
+
+Object    +---------------------------------------------+
+Vector 1      +----+
+Vector 2                    +----+
+Vector 3                                   +----------+
+
+After formatting:
+
+Log Buffer    +-V1-+-V2-+----V3----+
+
+Delayed logging vector:
+
+Object    +---------------------------------------------+
+Vector 1      +----+
+Vector 2                    +----+
+Vector 3                                   +----------+
+
+After formatting:
+
+Memory Buffer +-V1-+-V2-+----V3----+
+Vector 1      +----+
+Vector 2           +----+
+Vector 3                +----------+
+
+The memory buffer and associated vector need to be passed as a single object,
+but still need to be associated with the parent object so if the object is
+relogged we can replace the current memory buffer with a new memory buffer that
+contains the latest changes.
+
+The reason for keeping the vector around after we've formatted the memory
+buffer is to support splitting vectors across log buffer boundaries correctly.
+If we don't keep the vector around, we do not know where the region boundaries
+are in the item, so we'd need a new encapsulation method for regions in the log
+buffer writing (i.e. double encapsulation). This would be an on-disk format
+change and as such is not desirable.  It also means we'd have to write the log
+region headers in the formatting stage, which is problematic as there is per
+region state that needs to be placed into the headers during the log write.
+
+Hence we need to keep the vector, but by attaching the memory buffer to it and
+rewriting the vector addresses to point at the memory buffer we end up with a
+self-describing object that can be passed to the log buffer write code to be
+handled in exactly the same manner as the existing log vectors are handled.
+Hence we avoid needing a new on-disk format to handle items that have been
+relogged in memory.
+
+
+Tracking Changes
+
+Now that we can record transactional changes in memory in a form that allows
+them to be used without limitations, we need to be able to track and accumulate
+them so that they can be written to the log at some later point in time.  The
+log item is the natural place to store this vector and buffer, and also makes sense
+to be the object that is used to track committed objects as it will always
+exist once the object has been included in a transaction.
+
+The log item is already used to track the log items that have been written to
+the log but not yet written to disk. Such log items are considered "active"
+and as such are stored in the Active Item List (AIL) which is a LSN-ordered
+double linked list. Items are inserted into this list during log buffer IO
+completion, after which they are unpinned and can be written to disk. An object
+that is in the AIL can be relogged, which causes the object to be pinned again
+and then moved forward in the AIL when the log buffer IO completes for that
+transaction.
+
+Essentially, this shows that an item that is in the AIL can still be modified
+and relogged, so any tracking must be separate to the AIL infrastructure. As
+such, we cannot reuse the AIL list pointers for tracking committed items, nor
+can we store state in any field that is protected by the AIL lock. Hence the
+committed item tracking needs it's own locks, lists and state fields in the log
+item.
+
+Similar to the AIL, tracking of committed items is done through a new list
+called the Committed Item List (CIL).  The list tracks log items that have been
+committed and have formatted memory buffers attached to them. It tracks objects
+in transaction commit order, so when an object is relogged it is removed from
+it's place in the list and re-inserted at the tail. This is entirely arbitrary
+and done to make it easy for debugging - the last items in the list are the
+ones that are most recently modified. Ordering of the CIL is not necessary for
+transactional integrity (as discussed in the next section) so the ordering is
+done for convenience/sanity of the developers.
+
+
+Delayed Logging: Checkpoints
+
+When we have a log synchronisation event, commonly known as a "log force",
+all the items in the CIL must be written into the log via the log buffers.
+We need to write these items in the order that they exist in the CIL, and they
+need to be written as an atomic transaction. The need for all the objects to be
+written as an atomic transaction comes from the requirements of relogging and
+log replay - all the changes in all the objects in a given transaction must
+either be completely replayed during log recovery, or not replayed at all. If
+a transaction is not replayed because it is not complete in the log, then
+no later transactions should be replayed, either.
+
+To fulfill this requirement, we need to write the entire CIL in a single log
+transaction. Fortunately, the XFS log code has no fixed limit on the size of a
+transaction, nor does the log replay code. The only fundamental limit is that
+the transaction cannot be larger than just under half the size of the log.  The
+reason for this limit is that to find the head and tail of the log, there must
+be at least one complete transaction in the log at any given time. If a
+transaction is larger than half the log, then there is the possibility that a
+crash during the write of a such a transaction could partially overwrite the
+only complete previous transaction in the log. This will result in a recovery
+failure and an inconsistent filesystem and hence we must enforce the maximum
+size of a checkpoint to be slightly less than a half the log.
+
+Apart from this size requirement, a checkpoint transaction looks no different
+to any other transaction - it contains a transaction header, a series of
+formatted log items and a commit record at the tail. From a recovery
+perspective, the checkpoint transaction is also no different - just a lot
+bigger with a lot more items in it. The worst case effect of this is that we
+might need to tune the recovery transaction object hash size.
+
+Because the checkpoint is just another transaction and all the changes to log
+items are stored as log vectors, we can use the existing log buffer writing
+code to write the changes into the log. To do this efficiently, we need to
+minimise the time we hold the CIL locked while writing the checkpoint
+transaction. The current log write code enables us to do this easily with the
+way it separates the writing of the transaction contents (the log vectors) from
+the transaction commit record, but tracking this requires us to have a
+per-checkpoint context that travels through the log write process through to
+checkpoint completion.
+
+Hence a checkpoint has a context that tracks the state of the current
+checkpoint from initiation to checkpoint completion. A new context is initiated
+at the same time a checkpoint transaction is started. That is, when we remove
+all the current items from the CIL during a checkpoint operation, we move all
+those changes into the current checkpoint context. We then initialise a new
+context and attach that to the CIL for aggregation of new transactions.
+
+This allows us to unlock the CIL immediately after transfer of all the
+committed items and effectively allow new transactions to be issued while we
+are formatting the checkpoint into the log. It also allows concurrent
+checkpoints to be written into the log buffers in the case of log force heavy
+workloads, just like the existing transaction commit code does. This, however,
+requires that we strictly order the commit records in the log so that
+checkpoint sequence order is maintained during log replay.
+
+To ensure that we can be writing an item into a checkpoint transaction at
+the same time another transaction modifies the item and inserts the log item
+into the new CIL, then checkpoint transaction commit code cannot use log items
+to store the list of log vectors that need to be written into the transaction.
+Hence log vectors need to be able to be chained together to allow them to be
+detatched from the log items. That is, when the CIL is flushed the memory
+buffer and log vector attached to each log item needs to be attached to the
+checkpoint context so that the log item can be released. In diagrammatic form,
+the CIL would look like this before the flush:
+
+       CIL Head
+          |
+          V
+       Log Item <-> log vector 1       -> memory buffer
+          |                            -> vector array
+          V
+       Log Item <-> log vector 2       -> memory buffer
+          |                            -> vector array
+          V
+       ......
+          |
+          V
+       Log Item <-> log vector N-1     -> memory buffer
+          |                            -> vector array
+          V
+       Log Item <-> log vector N       -> memory buffer
+                                       -> vector array
+
+And after the flush the CIL head is empty, and the checkpoint context log
+vector list would look like:
+
+       Checkpoint Context
+          |
+          V
+       log vector 1    -> memory buffer
+          |            -> vector array
+          |            -> Log Item
+          V
+       log vector 2    -> memory buffer
+          |            -> vector array
+          |            -> Log Item
+          V
+       ......
+          |
+          V
+       log vector N-1  -> memory buffer
+          |            -> vector array
+          |            -> Log Item
+          V
+       log vector N    -> memory buffer
+                       -> vector array
+                       -> Log Item
+
+Once this transfer is done, the CIL can be unlocked and new transactions can
+start, while the checkpoint flush code works over the log vector chain to
+commit the checkpoint.
+
+Once the checkpoint is written into the log buffers, the checkpoint context is
+attached to the log buffer that the commit record was written to along with a
+completion callback. Log IO completion will call that callback, which can then
+run transaction committed processing for the log items (i.e. insert into AIL
+and unpin) in the log vector chain and then free the log vector chain and
+checkpoint context.
+
+Discussion Point: I am uncertain as to whether the log item is the most
+efficient way to track vectors, even though it seems like the natural way to do
+it. The fact that we walk the log items (in the CIL) just to chain the log
+vectors and break the link between the log item and the log vector means that
+we take a cache line hit for the log item list modification, then another for
+the log vector chaining. If we track by the log vectors, then we only need to
+break the link between the log item and the log vector, which means we should
+dirty only the log item cachelines. Normally I wouldn't be concerned about one
+vs two dirty cachelines except for the fact I've seen upwards of 80,000 log
+vectors in one checkpoint transaction. I'd guess this is a "measure and
+compare" situation that can be done after a working and reviewed implementation
+is in the dev tree....
+
+Delayed Logging: Checkpoint Sequencing
+
+One of the key aspects of the XFS transaction subsystem is that it tags
+committed transactions with the log sequence number of the transaction commit.
+This allows transactions to be issued asynchronously even though there may be
+future operations that cannot be completed until that transaction is fully
+committed to the log. In the rare case that a dependent operation occurs (e.g.
+re-using a freed metadata extent for a data extent), a special, optimised log
+force can be issued to force the dependent transaction to disk immediately.
+
+To do this, transactions need to record the LSN of the commit record of the
+transaction. This LSN comes directly from the log buffer the transaction is
+written into. While this works just fine for the existing transaction
+mechanism, it does not work for delayed logging because transactions are not
+written directly into the log buffers. Hence some other method of sequencing
+transactions is required.
+
+As discussed in the checkpoint section, delayed logging uses per-checkpoint
+contexts, and as such it is simple to assign a sequence number to each
+checkpoint. Because the switching of checkpoint contexts must be done
+atomically, it is simple to ensure that each new context has a monotonically
+increasing sequence number assigned to it without the need for an external
+atomic counter - we can just take the current context sequence number and add
+one to it for the new context.
+
+Then, instead of assigning a log buffer LSN to the transaction commit LSN
+during the commit, we can assign the current checkpoint sequence. This allows
+operations that track transactions that have not yet completed know what
+checkpoint sequence needs to be committed before they can continue. As a
+result, the code that forces the log to a specific LSN now needs to ensure that
+the log forces to a specific checkpoint.
+
+To ensure that we can do this, we need to track all the checkpoint contexts
+that are currently committing to the log. When we flush a checkpoint, the
+context gets added to a "committing" list which can be searched. When a
+checkpoint commit completes, it is removed from the committing list. Because
+the checkpoint context records the LSN of the commit record for the checkpoint,
+we can also wait on the log buffer that contains the commit record, thereby
+using the existing log force mechanisms to execute synchronous forces.
+
+It should be noted that the synchronous forces may need to be extended with
+mitigation algorithms similar to the current log buffer code to allow
+aggregation of multiple synchronous transactions if there are already
+synchronous transactions being flushed. Investigation of the performance of the
+current design is needed before making any decisions here.
+
+The main concern with log forces is to ensure that all the previous checkpoints
+are also committed to disk before the one we need to wait for. Therefore we
+need to check that all the prior contexts in the committing list are also
+complete before waiting on the one we need to complete. We do this
+synchronisation in the log force code so that we don't need to wait anywhere
+else for such serialisation - it only matters when we do a log force.
+
+The only remaining complexity is that a log force now also has to handle the
+case where the forcing sequence number is the same as the current context. That
+is, we need to flush the CIL and potentially wait for it to complete. This is a
+simple addition to the existing log forcing code to check the sequence numbers
+and push if required. Indeed, placing the current sequence checkpoint flush in
+the log force code enables the current mechanism for issuing synchronous
+transactions to remain untouched (i.e. commit an asynchronous transaction, then
+force the log at the LSN of that transaction) and so the higher level code
+behaves the same regardless of whether delayed logging is being used or not.
+
+Delayed Logging: Checkpoint Log Space Accounting
+
+The big issue for a checkpoint transaction is the log space reservation for the
+transaction. We don't know how big a checkpoint transaction is going to be
+ahead of time, nor how many log buffers it will take to write out, nor the
+number of split log vector regions are going to be used. We can track the
+amount of log space required as we add items to the commit item list, but we
+still need to reserve the space in the log for the checkpoint.
+
+A typical transaction reserves enough space in the log for the worst case space
+usage of the transaction. The reservation accounts for log record headers,
+transaction and region headers, headers for split regions, buffer tail padding,
+etc. as well as the actual space for all the changed metadata in the
+transaction. While some of this is fixed overhead, much of it is dependent on
+the size of the transaction and the number of regions being logged (the number
+of log vectors in the transaction).
+
+An example of the differences would be logging directory changes versus logging
+inode changes. If you modify lots of inode cores (e.g. chmod -R g+w *), then
+there are lots of transactions that only contain an inode core and an inode log
+format structure. That is, two vectors totaling roughly 150 bytes. If we modify
+10,000 inodes, we have about 1.5MB of metadata to write in 20,000 vectors. Each
+vector is 12 bytes, so the total to be logged is approximately 1.75MB. In
+comparison, if we are logging full directory buffers, they are typically 4KB
+each, so we in 1.5MB of directory buffers we'd have roughly 400 buffers and a
+buffer format structure for each buffer - roughly 800 vectors or 1.51MB total
+space.  From this, it should be obvious that a static log space reservation is
+not particularly flexible and is difficult to select the "optimal value" for
+all workloads.
+
+Further, if we are going to use a static reservation, which bit of the entire
+reservation does it cover? We account for space used by the transaction
+reservation by tracking the space currently used by the object in the CIL and
+then calculating the increase or decrease in space used as the object is
+relogged. This allows for a checkpoint reservation to only have to account for
+log buffer metadata used such as log header records.
+
+However, even using a static reservation for just the log metadata is
+problematic. Typically log record headers use at least 16KB of log space per
+1MB of log space consumed (512 bytes per 32k) and the reservation needs to be
+large enough to handle arbitrary sized checkpoint transactions. This
+reservation needs to be made before the checkpoint is started, and we need to
+be able to reserve the space without sleeping.  For a 8MB checkpoint, we need a
+reservation of around 150KB, which is a non-trivial amount of space.
+
+A static reservation needs to manipulate the log grant counters - we can take a
+permanent reservation on the space, but we still need to make sure we refresh
+the write reservation (the actual space available to the transaction) after
+every checkpoint transaction completion. Unfortunately, if this space is not
+available when required, then the regrant code will sleep waiting for it.
+
+The problem with this is that it can lead to deadlocks as we may need to commit
+checkpoints to be able to free up log space (refer back to the description of
+rolling transactions for an example of this).  Hence we *must* always have
+space available in the log if we are to use static reservations, and that is
+very difficult and complex to arrange. It is possible to do, but there is a
+simpler way.
+
+The simpler way of doing this is tracking the entire log space used by the
+items in the CIL and using this to dynamically calculate the amount of log
+space required by the log metadata. If this log metadata space changes as a
+result of a transaction commit inserting a new memory buffer into the CIL, then
+the difference in space required is removed from the transaction that causes
+the change. Transactions at this level will *always* have enough space
+available in their reservation for this as they have already reserved the
+maximal amount of log metadata space they require, and such a delta reservation
+will always be less than or equal to the maximal amount in the reservation.
+
+Hence we can grow the checkpoint transaction reservation dynamically as items
+are added to the CIL and avoid the need for reserving and regranting log space
+up front. This avoids deadlocks and removes a blocking point from the
+checkpoint flush code.
+
+As mentioned early, transactions can't grow to more than half the size of the
+log. Hence as part of the reservation growing, we need to also check the size
+of the reservation against the maximum allowed transaction size. If we reach
+the maximum threshold, we need to push the CIL to the log. This is effectively
+a "background flush" and is done on demand. This is identical to
+a CIL push triggered by a log force, only that there is no waiting for the
+checkpoint commit to complete. This background push is checked and executed by
+transaction commit code.
+
+If the transaction subsystem goes idle while we still have items in the CIL,
+they will be flushed by the periodic log force issued by the xfssyncd. This log
+force will push the CIL to disk, and if the transaction subsystem stays idle,
+allow the idle log to be covered (effectively marked clean) in exactly the same
+manner that is done for the existing logging method. A discussion point is
+whether this log force needs to be done more frequently than the current rate
+which is once every 30s.
+
+
+Delayed Logging: Log Item Pinning
+
+Currently log items are pinned during transaction commit while the items are
+still locked. This happens just after the items are formatted, though it could
+be done any time before the items are unlocked. The result of this mechanism is
+that items get pinned once for every transaction that is committed to the log
+buffers. Hence items that are relogged in the log buffers will have a pin count
+for every outstanding transaction they were dirtied in. When each of these
+transactions is completed, they will unpin the item once. As a result, the item
+only becomes unpinned when all the transactions complete and there are no
+pending transactions. Thus the pinning and unpinning of a log item is symmetric
+as there is a 1:1 relationship with transaction commit and log item completion.
+
+For delayed logging, however, we have an assymetric transaction commit to
+completion relationship. Every time an object is relogged in the CIL it goes
+through the commit process without a corresponding completion being registered.
+That is, we now have a many-to-one relationship between transaction commit and
+log item completion. The result of this is that pinning and unpinning of the
+log items becomes unbalanced if we retain the "pin on transaction commit, unpin
+on transaction completion" model.
+
+To keep pin/unpin symmetry, the algorithm needs to change to a "pin on
+insertion into the CIL, unpin on checkpoint completion". In other words, the
+pinning and unpinning becomes symmetric around a checkpoint context. We have to
+pin the object the first time it is inserted into the CIL - if it is already in
+the CIL during a transaction commit, then we do not pin it again. Because there
+can be multiple outstanding checkpoint contexts, we can still see elevated pin
+counts, but as each checkpoint completes the pin count will retain the correct
+value according to it's context.
+
+Just to make matters more slightly more complex, this checkpoint level context
+for the pin count means that the pinning of an item must take place under the
+CIL commit/flush lock. If we pin the object outside this lock, we cannot
+guarantee which context the pin count is associated with. This is because of
+the fact pinning the item is dependent on whether the item is present in the
+current CIL or not. If we don't pin the CIL first before we check and pin the
+object, we have a race with CIL being flushed between the check and the pin
+(or not pinning, as the case may be). Hence we must hold the CIL flush/commit
+lock to guarantee that we pin the items correctly.
+
+Delayed Logging: Concurrent Scalability
+
+A fundamental requirement for the CIL is that accesses through transaction
+commits must scale to many concurrent commits. The current transaction commit
+code does not break down even when there are transactions coming from 2048
+processors at once. The current transaction code does not go any faster than if
+there was only one CPU using it, but it does not slow down either.
+
+As a result, the delayed logging transaction commit code needs to be designed
+for concurrency from the ground up. It is obvious that there are serialisation
+points in the design - the three important ones are:
+
+       1. Locking out new transaction commits while flushing the CIL
+       2. Adding items to the CIL and updating item space accounting
+       3. Checkpoint commit ordering
+
+Looking at the transaction commit and CIL flushing interactions, it is clear
+that we have a many-to-one interaction here. That is, the only restriction on
+the number of concurrent transactions that can be trying to commit at once is
+the amount of space available in the log for their reservations. The practical
+limit here is in the order of several hundred concurrent transactions for a
+128MB log, which means that it is generally one per CPU in a machine.
+
+The amount of time a transaction commit needs to hold out a flush is a
+relatively long period of time - the pinning of log items needs to be done
+while we are holding out a CIL flush, so at the moment that means it is held
+across the formatting of the objects into memory buffers (i.e. while memcpy()s
+are in progress). Ultimately a two pass algorithm where the formatting is done
+separately to the pinning of objects could be used to reduce the hold time of
+the transaction commit side.
+
+Because of the number of potential transaction commit side holders, the lock
+really needs to be a sleeping lock - if the CIL flush takes the lock, we do not
+want every other CPU in the machine spinning on the CIL lock. Given that
+flushing the CIL could involve walking a list of tens of thousands of log
+items, it will get held for a significant time and so spin contention is a
+significant concern. Preventing lots of CPUs spinning doing nothing is the
+main reason for choosing a sleeping lock even though nothing in either the
+transaction commit or CIL flush side sleeps with the lock held.
+
+It should also be noted that CIL flushing is also a relatively rare operation
+compared to transaction commit for asynchronous transaction workloads - only
+time will tell if using a read-write semaphore for exclusion will limit
+transaction commit concurrency due to cache line bouncing of the lock on the
+read side.
+
+The second serialisation point is on the transaction commit side where items
+are inserted into the CIL. Because transactions can enter this code
+concurrently, the CIL needs to be protected separately from the above
+commit/flush exclusion. It also needs to be an exclusive lock but it is only
+held for a very short time and so a spin lock is appropriate here. It is
+possible that this lock will become a contention point, but given the short
+hold time once per transaction I think that contention is unlikely.
+
+The final serialisation point is the checkpoint commit record ordering code
+that is run as part of the checkpoint commit and log force sequencing. The code
+path that triggers a CIL flush (i.e. whatever triggers the log force) will enter
+an ordering loop after writing all the log vectors into the log buffers but
+before writing the commit record. This loop walks the list of committing
+checkpoints and needs to block waiting for checkpoints to complete their commit
+record write. As a result it needs a lock and a wait variable. Log force
+sequencing also requires the same lock, list walk, and blocking mechanism to
+ensure completion of checkpoints.
+
+These two sequencing operations can use the mechanism even though the
+events they are waiting for are different. The checkpoint commit record
+sequencing needs to wait until checkpoint contexts contain a commit LSN
+(obtained through completion of a commit record write) while log force
+sequencing needs to wait until previous checkpoint contexts are removed from
+the committing list (i.e. they've completed). A simple wait variable and
+broadcast wakeups (thundering herds) has been used to implement these two
+serialisation queues. They use the same lock as the CIL, too. If we see too
+much contention on the CIL lock, or too many context switches as a result of
+the broadcast wakeups these operations can be put under a new spinlock and
+given separate wait lists to reduce lock contention and the number of processes
+woken by the wrong event.
+
+
+Lifecycle Changes
+
+The existing log item life cycle is as follows:
+
+       1. Transaction allocate
+       2. Transaction reserve
+       3. Lock item
+       4. Join item to transaction
+               If not already attached,
+                       Allocate log item
+                       Attach log item to owner item
+               Attach log item to transaction
+       5. Modify item
+               Record modifications in log item
+       6. Transaction commit
+               Pin item in memory
+               Format item into log buffer
+               Write commit LSN into transaction
+               Unlock item
+               Attach transaction to log buffer
+
+       <log buffer IO dispatched>
+       <log buffer IO completes>
+
+       7. Transaction completion
+               Mark log item committed
+               Insert log item into AIL
+                       Write commit LSN into log item
+               Unpin log item
+       8. AIL traversal
+               Lock item
+               Mark log item clean
+               Flush item to disk
+
+       <item IO completion>
+
+       9. Log item removed from AIL
+               Moves log tail
+               Item unlocked
+
+Essentially, steps 1-6 operate independently from step 7, which is also
+independent of steps 8-9. An item can be locked in steps 1-6 or steps 8-9
+at the same time step 7 is occurring, but only steps 1-6 or 8-9 can occur
+at the same time. If the log item is in the AIL or between steps 6 and 7
+and steps 1-6 are re-entered, then the item is relogged. Only when steps 8-9
+are entered and completed is the object considered clean.
+
+With delayed logging, there are new steps inserted into the life cycle:
+
+       1. Transaction allocate
+       2. Transaction reserve
+       3. Lock item
+       4. Join item to transaction
+               If not already attached,
+                       Allocate log item
+                       Attach log item to owner item
+               Attach log item to transaction
+       5. Modify item
+               Record modifications in log item
+       6. Transaction commit
+               Pin item in memory if not pinned in CIL
+               Format item into log vector + buffer
+               Attach log vector and buffer to log item
+               Insert log item into CIL
+               Write CIL context sequence into transaction
+               Unlock item
+
+       <next log force>
+
+       7. CIL push
+               lock CIL flush
+               Chain log vectors and buffers together
+               Remove items from CIL
+               unlock CIL flush
+               write log vectors into log
+               sequence commit records
+               attach checkpoint context to log buffer
+
+       <log buffer IO dispatched>
+       <log buffer IO completes>
+
+       8. Checkpoint completion
+               Mark log item committed
+               Insert item into AIL
+                       Write commit LSN into log item
+               Unpin log item
+       9. AIL traversal
+               Lock item
+               Mark log item clean
+               Flush item to disk
+       <item IO completion>
+       10. Log item removed from AIL
+               Moves log tail
+               Item unlocked
+
+From this, it can be seen that the only life cycle differences between the two
+logging methods are in the middle of the life cycle - they still have the same
+beginning and end and execution constraints. The only differences are in the
+commiting of the log items to the log itself and the completion processing.
+Hence delayed logging should not introduce any constraints on log item
+behaviour, allocation or freeing that don't already exist.
+
+As a result of this zero-impact "insertion" of delayed logging infrastructure
+and the design of the internal structures to avoid on disk format changes, we
+can basically switch between delayed logging and the existing mechanism with a
+mount option. Fundamentally, there is no reason why the log manager would not
+be able to swap methods automatically and transparently depending on load
+characteristics, but this should not be necessary if delayed logging works as
+designed.
+
+Roadmap:
+
+2.6.35 Inclusion in mainline as an experimental mount option
+       => approximately 2-3 months to merge window
+       => needs to be in xfs-dev tree in 4-6 weeks
+       => code is nearing readiness for review
+
+2.6.37 Remove experimental tag from mount option
+       => should be roughly 6 months after initial merge
+       => enough time to:
+               => gain confidence and fix problems reported by early
+                  adopters (a.k.a. guinea pigs)
+               => address worst performance regressions and undesired
+                  behaviours
+               => start tuning/optimising code for parallelism
+               => start tuning/optimising algorithms consuming
+                  excessive CPU time
+
+2.6.39 Switch default mount option to use delayed logging
+       => should be roughly 12 months after initial merge
+       => enough time to shake out remaining problems before next round of
+          enterprise distro kernel rebases
index b4769e4..c8fb13f 100644 (file)
@@ -77,6 +77,7 @@ xfs-y                         += xfs_alloc.o \
                                   xfs_itable.o \
                                   xfs_dfrag.o \
                                   xfs_log.o \
+                                  xfs_log_cil.o \
                                   xfs_log_recover.o \
                                   xfs_mount.o \
                                   xfs_mru_cache.o \
index f01de3c..649ade8 100644 (file)
@@ -37,6 +37,7 @@
 
 #include "xfs_sb.h"
 #include "xfs_inum.h"
+#include "xfs_log.h"
 #include "xfs_ag.h"
 #include "xfs_dmapi.h"
 #include "xfs_mount.h"
@@ -850,6 +851,12 @@ xfs_buf_lock_value(
  *     Note that this in no way locks the underlying pages, so it is only
  *     useful for synchronizing concurrent use of buffer objects, not for
  *     synchronizing independent access to the underlying pages.
+ *
+ *     If we come across a stale, pinned, locked buffer, we know that we
+ *     are being asked to lock a buffer that has been reallocated. Because
+ *     it is pinned, we know that the log has not been pushed to disk and
+ *     hence it will still be locked. Rather than sleeping until someone
+ *     else pushes the log, push it ourselves before trying to get the lock.
  */
 void
 xfs_buf_lock(
@@ -857,6 +864,8 @@ xfs_buf_lock(
 {
        trace_xfs_buf_lock(bp, _RET_IP_);
 
+       if (atomic_read(&bp->b_pin_count) && (bp->b_flags & XBF_STALE))
+               xfs_log_force(bp->b_mount, 0);
        if (atomic_read(&bp->b_io_remaining))
                blk_run_address_space(bp->b_target->bt_mapping);
        down(&bp->b_sema);
index e31bf21..9ac8aea 100644 (file)
@@ -19,6 +19,7 @@
 #include "xfs_dmapi.h"
 #include "xfs_sb.h"
 #include "xfs_inum.h"
+#include "xfs_log.h"
 #include "xfs_ag.h"
 #include "xfs_mount.h"
 #include "xfs_quota.h"
index f24dbe5..f2d1718 100644 (file)
@@ -119,6 +119,8 @@ mempool_t *xfs_ioend_pool;
 #define MNTOPT_DMAPI   "dmapi"         /* DMI enabled (DMAPI / XDSM) */
 #define MNTOPT_XDSM    "xdsm"          /* DMI enabled (DMAPI / XDSM) */
 #define MNTOPT_DMI     "dmi"           /* DMI enabled (DMAPI / XDSM) */
+#define MNTOPT_DELAYLOG   "delaylog"   /* Delayed loging enabled */
+#define MNTOPT_NODELAYLOG "nodelaylog" /* Delayed loging disabled */
 
 /*
  * Table driven mount option parser.
@@ -374,6 +376,13 @@ xfs_parseargs(
                        mp->m_flags |= XFS_MOUNT_DMAPI;
                } else if (!strcmp(this_char, MNTOPT_DMI)) {
                        mp->m_flags |= XFS_MOUNT_DMAPI;
+               } else if (!strcmp(this_char, MNTOPT_DELAYLOG)) {
+                       mp->m_flags |= XFS_MOUNT_DELAYLOG;
+                       cmn_err(CE_WARN,
+                               "Enabling EXPERIMENTAL delayed logging feature "
+                               "- use at your own risk.\n");
+               } else if (!strcmp(this_char, MNTOPT_NODELAYLOG)) {
+                       mp->m_flags &= ~XFS_MOUNT_DELAYLOG;
                } else if (!strcmp(this_char, "ihashsize")) {
                        cmn_err(CE_WARN,
        "XFS: ihashsize no longer used, option is deprecated.");
@@ -535,6 +544,7 @@ xfs_showargs(
                { XFS_MOUNT_FILESTREAMS,        "," MNTOPT_FILESTREAM },
                { XFS_MOUNT_DMAPI,              "," MNTOPT_DMAPI },
                { XFS_MOUNT_GRPID,              "," MNTOPT_GRPID },
+               { XFS_MOUNT_DELAYLOG,           "," MNTOPT_DELAYLOG },
                { 0, NULL }
        };
        static struct proc_xfs_info xfs_info_unset[] = {
@@ -1755,7 +1765,7 @@ xfs_init_zones(void)
         * but it is much faster.
         */
        xfs_buf_item_zone = kmem_zone_init((sizeof(xfs_buf_log_item_t) +
-                               (((XFS_MAX_BLOCKSIZE / XFS_BLI_CHUNK) /
+                               (((XFS_MAX_BLOCKSIZE / XFS_BLF_CHUNK) /
                                  NBWORD) * sizeof(int))), "xfs_buf_item");
        if (!xfs_buf_item_zone)
                goto out_destroy_trans_zone;
index 8a319cf..ff6bc79 100644 (file)
@@ -1059,83 +1059,112 @@ TRACE_EVENT(xfs_bunmap,
 
 );
 
+#define XFS_BUSY_SYNC \
+       { 0,    "async" }, \
+       { 1,    "sync" }
+
 TRACE_EVENT(xfs_alloc_busy,
-       TP_PROTO(struct xfs_mount *mp, xfs_agnumber_t agno, xfs_agblock_t agbno,
-                xfs_extlen_t len, int slot),
-       TP_ARGS(mp, agno, agbno, len, slot),
+       TP_PROTO(struct xfs_trans *trans, xfs_agnumber_t agno,
+                xfs_agblock_t agbno, xfs_extlen_t len, int sync),
+       TP_ARGS(trans, agno, agbno, len, sync),
        TP_STRUCT__entry(
                __field(dev_t, dev)
+               __field(struct xfs_trans *, tp)
+               __field(int, tid)
                __field(xfs_agnumber_t, agno)
                __field(xfs_agblock_t, agbno)
                __field(xfs_extlen_t, len)
-               __field(int, slot)
+               __field(int, sync)
        ),
        TP_fast_assign(
-               __entry->dev = mp->m_super->s_dev;
+               __entry->dev = trans->t_mountp->m_super->s_dev;
+               __entry->tp = trans;
+               __entry->tid = trans->t_ticket->t_tid;
                __entry->agno = agno;
                __entry->agbno = agbno;
                __entry->len = len;
-               __entry->slot = slot;
+               __entry->sync = sync;
        ),
-       TP_printk("dev %d:%d agno %u agbno %u len %u slot %d",
+       TP_printk("dev %d:%d trans 0x%p tid 0x%x agno %u agbno %u len %u %s",
                  MAJOR(__entry->dev), MINOR(__entry->dev),
+                 __entry->tp,
+                 __entry->tid,
                  __entry->agno,
                  __entry->agbno,
                  __entry->len,
-                 __entry->slot)
+                 __print_symbolic(__entry->sync, XFS_BUSY_SYNC))
 
 );
 
-#define XFS_BUSY_STATES \
-       { 0,    "found" }, \
-       { 1,    "missing" }
-
 TRACE_EVENT(xfs_alloc_unbusy,
        TP_PROTO(struct xfs_mount *mp, xfs_agnumber_t agno,
-                int slot, int found),
-       TP_ARGS(mp, agno, slot, found),
+                xfs_agblock_t agbno, xfs_extlen_t len),
+       TP_ARGS(mp, agno, agbno, len),
        TP_STRUCT__entry(
                __field(dev_t, dev)
                __field(xfs_agnumber_t, agno)
-               __field(int, slot)
-               __field(int, found)
+               __field(xfs_agblock_t, agbno)
+               __field(xfs_extlen_t, len)
        ),
        TP_fast_assign(
                __entry->dev = mp->m_super->s_dev;
                __entry->agno = agno;
-               __entry->slot = slot;
-               __entry->found = found;
+               __entry->agbno = agbno;
+               __entry->len = len;
        ),
-       TP_printk("dev %d:%d agno %u slot %d %s",
+       TP_printk("dev %d:%d agno %u agbno %u len %u",
                  MAJOR(__entry->dev), MINOR(__entry->dev),
                  __entry->agno,
-                 __entry->slot,
-                 __print_symbolic(__entry->found, XFS_BUSY_STATES))
+                 __entry->agbno,
+                 __entry->len)
 );
 
+#define XFS_BUSY_STATES \
+       { 0,    "missing" }, \
+       { 1,    "found" }
+
 TRACE_EVENT(xfs_alloc_busysearch,
-       TP_PROTO(struct xfs_mount *mp, xfs_agnumber_t agno, xfs_agblock_t agbno,
-                xfs_extlen_t len, xfs_lsn_t lsn),
-       TP_ARGS(mp, agno, agbno, len, lsn),
+       TP_PROTO(struct xfs_mount *mp, xfs_agnumber_t agno,
+                xfs_agblock_t agbno, xfs_extlen_t len, int found),
+       TP_ARGS(mp, agno, agbno, len, found),
        TP_STRUCT__entry(
                __field(dev_t, dev)
                __field(xfs_agnumber_t, agno)
                __field(xfs_agblock_t, agbno)
                __field(xfs_extlen_t, len)
-               __field(xfs_lsn_t, lsn)
+               __field(int, found)
        ),
        TP_fast_assign(
                __entry->dev = mp->m_super->s_dev;
                __entry->agno = agno;
                __entry->agbno = agbno;
                __entry->len = len;
-               __entry->lsn = lsn;
+               __entry->found = found;
        ),
-       TP_printk("dev %d:%d agno %u agbno %u len %u force lsn 0x%llx",
+       TP_printk("dev %d:%d agno %u agbno %u len %u %s",
                  MAJOR(__entry->dev), MINOR(__entry->dev),
                  __entry->agno,
                  __entry->agbno,
                  __entry->len,
+                 __print_symbolic(__entry->found, XFS_BUSY_STATES))
+);
+
+TRACE_EVENT(xfs_trans_commit_lsn,
+       TP_PROTO(struct xfs_trans *trans),
+       TP_ARGS(trans),
+       TP_STRUCT__entry(
+               __field(dev_t, dev)
+               __field(struct xfs_trans *, tp)
+               __field(xfs_lsn_t, lsn)
+       ),
+       TP_fast_assign(
+               __entry->dev = trans->t_mountp->m_super->s_dev;
+               __entry->tp = trans;
+               __entry->lsn = trans->t_commit_lsn;
+       ),
+       TP_printk("dev %d:%d trans 0x%p commit_lsn 0x%llx",
+                 MAJOR(__entry->dev), MINOR(__entry->dev),
+                 __entry->tp,
                  __entry->lsn)
 );
 
index b89ec5d..585e763 100644 (file)
@@ -344,9 +344,9 @@ xfs_qm_init_dquot_blk(
        for (i = 0; i < q->qi_dqperchunk; i++, d++, curid++)
                xfs_qm_dqinit_core(curid, type, d);
        xfs_trans_dquot_buf(tp, bp,
-                           (type & XFS_DQ_USER ? XFS_BLI_UDQUOT_BUF :
-                           ((type & XFS_DQ_PROJ) ? XFS_BLI_PDQUOT_BUF :
-                            XFS_BLI_GDQUOT_BUF)));
+                           (type & XFS_DQ_USER ? XFS_BLF_UDQUOT_BUF :
+                           ((type & XFS_DQ_PROJ) ? XFS_BLF_PDQUOT_BUF :
+                            XFS_BLF_GDQUOT_BUF)));
        xfs_trans_log_buf(tp, bp, 0, BBTOB(q->qi_dqchunklen) - 1);
 }
 
index abb8222..401f364 100644 (file)
@@ -175,14 +175,20 @@ typedef struct xfs_agfl {
 } xfs_agfl_t;
 
 /*
- * Busy block/extent entry.  Used in perag to mark blocks that have been freed
- * but whose transactions aren't committed to disk yet.
+ * Busy block/extent entry.  Indexed by a rbtree in perag to mark blocks that
+ * have been freed but whose transactions aren't committed to disk yet.
+ *
+ * Note that we use the transaction ID to record the transaction, not the
+ * transaction structure itself. See xfs_alloc_busy_insert() for details.
  */
-typedef struct xfs_perag_busy {
-       xfs_agblock_t   busy_start;
-       xfs_extlen_t    busy_length;
-       struct xfs_trans *busy_tp;      /* transaction that did the free */
-} xfs_perag_busy_t;
+struct xfs_busy_extent {
+       struct rb_node  rb_node;        /* ag by-bno indexed search tree */
+       struct list_head list;          /* transaction busy extent list */
+       xfs_agnumber_t  agno;
+       xfs_agblock_t   bno;
+       xfs_extlen_t    length;
+       xlog_tid_t      tid;            /* transaction that created this */
+};
 
 /*
  * Per-ag incore structure, copies of information in agf and agi,
@@ -216,7 +222,8 @@ typedef struct xfs_perag {
        xfs_agino_t     pagl_leftrec;
        xfs_agino_t     pagl_rightrec;
 #ifdef __KERNEL__
-       spinlock_t      pagb_lock;      /* lock for pagb_list */
+       spinlock_t      pagb_lock;      /* lock for pagb_tree */
+       struct rb_root  pagb_tree;      /* ordered tree of busy extents */
 
        atomic_t        pagf_fstrms;    /* # of filestreams active in this AG */
 
@@ -226,7 +233,6 @@ typedef struct xfs_perag {
        int             pag_ici_reclaimable;    /* reclaimable inodes */
 #endif
        int             pagb_count;     /* pagb slots in use */
-       xfs_perag_busy_t pagb_list[XFS_PAGB_NUM_SLOTS]; /* unstable blocks */
 } xfs_perag_t;
 
 /*
index 94cddbf..a7fbe8a 100644 (file)
 #define        XFSA_FIXUP_BNO_OK       1
 #define        XFSA_FIXUP_CNT_OK       2
 
-STATIC void
-xfs_alloc_search_busy(xfs_trans_t *tp,
-                   xfs_agnumber_t agno,
-                   xfs_agblock_t bno,
-                   xfs_extlen_t len);
+static int
+xfs_alloc_busy_search(struct xfs_mount *mp, xfs_agnumber_t agno,
+                   xfs_agblock_t bno, xfs_extlen_t len);
 
 /*
  * Prototypes for per-ag allocation routines
@@ -540,9 +538,16 @@ xfs_alloc_ag_vextent(
                                be32_to_cpu(agf->agf_length));
                        xfs_alloc_log_agf(args->tp, args->agbp,
                                                XFS_AGF_FREEBLKS);
-                       /* search the busylist for these blocks */
-                       xfs_alloc_search_busy(args->tp, args->agno,
-                                       args->agbno, args->len);
+                       /*
+                        * Search the busylist for these blocks and mark the
+                        * transaction as synchronous if blocks are found. This
+                        * avoids the need to block due to a synchronous log
+                        * force to ensure correct ordering as the synchronous
+                        * transaction will guarantee that for us.
+                        */
+                       if (xfs_alloc_busy_search(args->mp, args->agno,
+                                               args->agbno, args->len))
+                               xfs_trans_set_sync(args->tp);
                }
                if (!args->isfl)
                        xfs_trans_mod_sb(args->tp,
@@ -1693,7 +1698,7 @@ xfs_free_ag_extent(
         * when the iclog commits to disk.  If a busy block is allocated,
         * the iclog is pushed up to the LSN that freed the block.
         */
-       xfs_alloc_mark_busy(tp, agno, bno, len);
+       xfs_alloc_busy_insert(tp, agno, bno, len);
        return 0;
 
  error0:
@@ -1989,14 +1994,20 @@ xfs_alloc_get_freelist(
        *bnop = bno;
 
        /*
-        * As blocks are freed, they are added to the per-ag busy list
-        * and remain there until the freeing transaction is committed to
-        * disk.  Now that we have allocated blocks, this list must be
-        * searched to see if a block is being reused.  If one is, then
-        * the freeing transaction must be pushed to disk NOW by forcing
-        * to disk all iclogs up that transaction's LSN.
+        * As blocks are freed, they are added to the per-ag busy list and
+        * remain there until the freeing transaction is committed to disk.
+        * Now that we have allocated blocks, this list must be searched to see
+        * if a block is being reused.  If one is, then the freeing transaction
+        * must be pushed to disk before this transaction.
+        *
+        * We do this by setting the current transaction to a sync transaction
+        * which guarantees that the freeing transaction is on disk before this
+        * transaction. This is done instead of a synchronous log force here so
+        * that we don't sit and wait with the AGF locked in the transaction
+        * during the log force.
         */
-       xfs_alloc_search_busy(tp, be32_to_cpu(agf->agf_seqno), bno, 1);
+       if (xfs_alloc_busy_search(mp, be32_to_cpu(agf->agf_seqno), bno, 1))
+               xfs_trans_set_sync(tp);
        return 0;
 }
 
@@ -2201,7 +2212,7 @@ xfs_alloc_read_agf(
                        be32_to_cpu(agf->agf_levels[XFS_BTNUM_CNTi]);
                spin_lock_init(&pag->pagb_lock);
                pag->pagb_count = 0;
-               memset(pag->pagb_list, 0, sizeof(pag->pagb_list));
+               pag->pagb_tree = RB_ROOT;
                pag->pagf_init = 1;
        }
 #ifdef DEBUG
@@ -2479,127 +2490,263 @@ error0:
  * list is reused, the transaction that freed it must be forced to disk
  * before continuing to use the block.
  *
- * xfs_alloc_mark_busy - add to the per-ag busy list
- * xfs_alloc_clear_busy - remove an item from the per-ag busy list
+ * xfs_alloc_busy_insert - add to the per-ag busy list
+ * xfs_alloc_busy_clear - remove an item from the per-ag busy list
+ * xfs_alloc_busy_search - search for a busy extent
+ */
+
+/*
+ * Insert a new extent into the busy tree.
+ *
+ * The busy extent tree is indexed by the start block of the busy extent.
+ * there can be multiple overlapping ranges in the busy extent tree but only
+ * ever one entry at a given start block. The reason for this is that
+ * multi-block extents can be freed, then smaller chunks of that extent
+ * allocated and freed again before the first transaction commit is on disk.
+ * If the exact same start block is freed a second time, we have to wait for
+ * that busy extent to pass out of the tree before the new extent is inserted.
+ * There are two main cases we have to handle here.
+ *
+ * The first case is a transaction that triggers a "free - allocate - free"
+ * cycle. This can occur during btree manipulations as a btree block is freed
+ * to the freelist, then allocated from the free list, then freed again. In
+ * this case, the second extxpnet free is what triggers the duplicate and as
+ * such the transaction IDs should match. Because the extent was allocated in
+ * this transaction, the transaction must be marked as synchronous. This is
+ * true for all cases where the free/alloc/free occurs in the one transaction,
+ * hence the addition of the ASSERT(tp->t_flags & XFS_TRANS_SYNC) to this case.
+ * This serves to catch violations of the second case quite effectively.
+ *
+ * The second case is where the free/alloc/free occur in different
+ * transactions. In this case, the thread freeing the extent the second time
+ * can't mark the extent busy immediately because it is already tracked in a
+ * transaction that may be committing.  When the log commit for the existing
+ * busy extent completes, the busy extent will be removed from the tree. If we
+ * allow the second busy insert to continue using that busy extent structure,
+ * it can be freed before this transaction is safely in the log.  Hence our
+ * only option in this case is to force the log to remove the existing busy
+ * extent from the list before we insert the new one with the current
+ * transaction ID.
+ *
+ * The problem we are trying to avoid in the free-alloc-free in separate
+ * transactions is most easily described with a timeline:
+ *
+ *      Thread 1       Thread 2        Thread 3        xfslogd
+ *     xact alloc
+ *     free X
+ *     mark busy
+ *     commit xact
+ *     free xact
+ *                     xact alloc
+ *                     alloc X
+ *                     busy search
+ *                     mark xact sync
+ *                     commit xact
+ *                     free xact
+ *                     force log
+ *                     checkpoint starts
+ *                     ....
+ *                                     xact alloc
+ *                                     free X
+ *                                     mark busy
+ *                                     finds match
+ *                                     *** KABOOM! ***
+ *                                     ....
+ *                                                     log IO completes
+ *                                                     unbusy X
+ *                     checkpoint completes
+ *
+ * By issuing a log force in thread 3 @ "KABOOM", the thread will block until
+ * the checkpoint completes, and the busy extent it matched will have been
+ * removed from the tree when it is woken. Hence it can then continue safely.
+ *
+ * However, to ensure this matching process is robust, we need to use the
+ * transaction ID for identifying transaction, as delayed logging results in
+ * the busy extent and transaction lifecycles being different. i.e. the busy
+ * extent is active for a lot longer than the transaction.  Hence the
+ * transaction structure can be freed and reallocated, then mark the same
+ * extent busy again in the new transaction. In this case the new transaction
+ * will have a different tid but can have the same address, and hence we need
+ * to check against the tid.
+ *
+ * Future: for delayed logging, we could avoid the log force if the extent was
+ * first freed in the current checkpoint sequence. This, however, requires the
+ * ability to pin the current checkpoint in memory until this transaction
+ * commits to ensure that both the original free and the current one combine
+ * logically into the one checkpoint. If the checkpoint sequences are
+ * different, however, we still need to wait on a log force.
  */
 void
-xfs_alloc_mark_busy(xfs_trans_t *tp,
-                   xfs_agnumber_t agno,
-                   xfs_agblock_t bno,
-                   xfs_extlen_t len)
+xfs_alloc_busy_insert(
+       struct xfs_trans        *tp,
+       xfs_agnumber_t          agno,
+       xfs_agblock_t           bno,
+       xfs_extlen_t            len)
 {
-       xfs_perag_busy_t        *bsy;
+       struct xfs_busy_extent  *new;
+       struct xfs_busy_extent  *busyp;
        struct xfs_perag        *pag;
-       int                     n;
+       struct rb_node          **rbp;
+       struct rb_node          *parent;
+       int                     match;
 
-       pag = xfs_perag_get(tp->t_mountp, agno);
-       spin_lock(&pag->pagb_lock);
 
-       /* search pagb_list for an open slot */
-       for (bsy = pag->pagb_list, n = 0;
-            n < XFS_PAGB_NUM_SLOTS;
-            bsy++, n++) {
-               if (bsy->busy_tp == NULL) {
-                       break;
-               }
+       new = kmem_zalloc(sizeof(struct xfs_busy_extent), KM_MAYFAIL);
+       if (!new) {
+               /*
+                * No Memory!  Since it is now not possible to track the free
+                * block, make this a synchronous transaction to insure that
+                * the block is not reused before this transaction commits.
+                */
+               trace_xfs_alloc_busy(tp, agno, bno, len, 1);
+               xfs_trans_set_sync(tp);
+               return;
        }
 
-       trace_xfs_alloc_busy(tp->t_mountp, agno, bno, len, n);
+       new->agno = agno;
+       new->bno = bno;
+       new->length = len;
+       new->tid = xfs_log_get_trans_ident(tp);
 
-       if (n < XFS_PAGB_NUM_SLOTS) {
-               bsy = &pag->pagb_list[n];
-               pag->pagb_count++;
-               bsy->busy_start = bno;
-               bsy->busy_length = len;
-               bsy->busy_tp = tp;
-               xfs_trans_add_busy(tp, agno, n);
-       } else {
+       INIT_LIST_HEAD(&new->list);
+
+       /* trace before insert to be able to see failed inserts */
+       trace_xfs_alloc_busy(tp, agno, bno, len, 0);
+
+       pag = xfs_perag_get(tp->t_mountp, new->agno);
+restart:
+       spin_lock(&pag->pagb_lock);
+       rbp = &pag->pagb_tree.rb_node;
+       parent = NULL;
+       busyp = NULL;
+       match = 0;
+       while (*rbp && match >= 0) {
+               parent = *rbp;
+               busyp = rb_entry(parent, struct xfs_busy_extent, rb_node);
+
+               if (new->bno < busyp->bno) {
+                       /* may overlap, but exact start block is lower */
+                       rbp = &(*rbp)->rb_left;
+                       if (new->bno + new->length > busyp->bno)
+                               match = busyp->tid == new->tid ? 1 : -1;
+               } else if (new->bno > busyp->bno) {
+                       /* may overlap, but exact start block is higher */
+                       rbp = &(*rbp)->rb_right;
+                       if (bno < busyp->bno + busyp->length)
+                               match = busyp->tid == new->tid ? 1 : -1;
+               } else {
+                       match = busyp->tid == new->tid ? 1 : -1;
+                       break;
+               }
+       }
+       if (match < 0) {
+               /* overlap marked busy in different transaction */
+               spin_unlock(&pag->pagb_lock);
+               xfs_log_force(tp->t_mountp, XFS_LOG_SYNC);
+               goto restart;
+       }
+       if (match > 0) {
                /*
-                * The busy list is full!  Since it is now not possible to
-                * track the free block, make this a synchronous transaction
-                * to insure that the block is not reused before this
-                * transaction commits.
+                * overlap marked busy in same transaction. Update if exact
+                * start block match, otherwise combine the busy extents into
+                * a single range.
                 */
-               xfs_trans_set_sync(tp);
-       }
+               if (busyp->bno == new->bno) {
+                       busyp->length = max(busyp->length, new->length);
+                       spin_unlock(&pag->pagb_lock);
+                       ASSERT(tp->t_flags & XFS_TRANS_SYNC);
+                       xfs_perag_put(pag);
+                       kmem_free(new);
+                       return;
+               }
+               rb_erase(&busyp->rb_node, &pag->pagb_tree);
+               new->length = max(busyp->bno + busyp->length,
+                                       new->bno + new->length) -
+                               min(busyp->bno, new->bno);
+               new->bno = min(busyp->bno, new->bno);
+       } else
+               busyp = NULL;
 
+       rb_link_node(&new->rb_node, parent, rbp);
+       rb_insert_color(&new->rb_node, &pag->pagb_tree);
+
+       list_add(&new->list, &tp->t_busy);
        spin_unlock(&pag->pagb_lock);
        xfs_perag_put(pag);
+       kmem_free(busyp);
 }
 
-void
-xfs_alloc_clear_busy(xfs_trans_t *tp,
-                    xfs_agnumber_t agno,
-                    int idx)
+/*
+ * Search for a busy extent within the range of the extent we are about to
+ * allocate.  You need to be holding the busy extent tree lock when calling
+ * xfs_alloc_busy_search(). This function returns 0 for no overlapping busy
+ * extent, -1 for an overlapping but not exact busy extent, and 1 for an exact
+ * match. This is done so that a non-zero return indicates an overlap that
+ * will require a synchronous transaction, but it can still be
+ * used to distinguish between a partial or exact match.
+ */
+static int
+xfs_alloc_busy_search(
+       struct xfs_mount        *mp,
+       xfs_agnumber_t          agno,
+       xfs_agblock_t           bno,
+       xfs_extlen_t            len)
 {
        struct xfs_perag        *pag;
-       xfs_perag_busy_t        *list;
+       struct rb_node          *rbp;
+       struct xfs_busy_extent  *busyp;
+       int                     match = 0;
 
-       ASSERT(idx < XFS_PAGB_NUM_SLOTS);
-       pag = xfs_perag_get(tp->t_mountp, agno);
+       pag = xfs_perag_get(mp, agno);
        spin_lock(&pag->pagb_lock);
-       list = pag->pagb_list;
 
-       trace_xfs_alloc_unbusy(tp->t_mountp, agno, idx, list[idx].busy_tp == tp);
-
-       if (list[idx].busy_tp == tp) {
-               list[idx].busy_tp = NULL;
-               pag->pagb_count--;
+       rbp = pag->pagb_tree.rb_node;
+
+       /* find closest start bno overlap */
+       while (rbp) {
+               busyp = rb_entry(rbp, struct xfs_busy_extent, rb_node);
+               if (bno < busyp->bno) {
+                       /* may overlap, but exact start block is lower */
+                       if (bno + len > busyp->bno)
+                               match = -1;
+                       rbp = rbp->rb_left;
+               } else if (bno > busyp->bno) {
+                       /* may overlap, but exact start block is higher */
+                       if (bno < busyp->bno + busyp->length)
+                               match = -1;
+                       rbp = rbp->rb_right;
+               } else {
+                       /* bno matches busyp, length determines exact match */
+                       match = (busyp->length == len) ? 1 : -1;
+                       break;
+               }
        }
-
        spin_unlock(&pag->pagb_lock);
+       trace_xfs_alloc_busysearch(mp, agno, bno, len, !!match);
        xfs_perag_put(pag);
+       return match;
 }
 
-
-/*
- * If we find the extent in the busy list, force the log out to get the
- * extent out of the busy list so the caller can use it straight away.
- */
-STATIC void
-xfs_alloc_search_busy(xfs_trans_t *tp,
-                   xfs_agnumber_t agno,
-                   xfs_agblock_t bno,
-                   xfs_extlen_t len)
+void
+xfs_alloc_busy_clear(
+       struct xfs_mount        *mp,
+       struct xfs_busy_extent  *busyp)
 {
        struct xfs_perag        *pag;
-       xfs_perag_busy_t        *bsy;
-       xfs_agblock_t           uend, bend;
-       xfs_lsn_t               lsn = 0;
-       int                     cnt;
 
-       pag = xfs_perag_get(tp->t_mountp, agno);
-       spin_lock(&pag->pagb_lock);
-       cnt = pag->pagb_count;
+       trace_xfs_alloc_unbusy(mp, busyp->agno, busyp->bno,
+                                               busyp->length);
 
-       /*
-        * search pagb_list for this slot, skipping open slots. We have to
-        * search the entire array as there may be multiple overlaps and
-        * we have to get the most recent LSN for the log force to push out
-        * all the transactions that span the range.
-        */
-       uend = bno + len - 1;
-       for (cnt = 0; cnt < pag->pagb_count; cnt++) {
-               bsy = &pag->pagb_list[cnt];
-               if (!bsy->busy_tp)
-                       continue;
+       ASSERT(xfs_alloc_busy_search(mp, busyp->agno, busyp->bno,
+                                               busyp->length) == 1);
 
-               bend = bsy->busy_start + bsy->busy_length - 1;
-               if (bno > bend || uend < bsy->busy_start)
-                       continue;
+       list_del_init(&busyp->list);
 
-               /* (start1,length1) within (start2, length2) */
-               if (XFS_LSN_CMP(bsy->busy_tp->t_commit_lsn, lsn) > 0)
-                       lsn = bsy->busy_tp->t_commit_lsn;
-       }
+       pag = xfs_perag_get(mp, busyp->agno);
+       spin_lock(&pag->pagb_lock);
+       rb_erase(&busyp->rb_node, &pag->pagb_tree);
        spin_unlock(&pag->pagb_lock);
        xfs_perag_put(pag);
-       trace_xfs_alloc_busysearch(tp->t_mountp, agno, bno, len, lsn);
 
-       /*
-        * If a block was found, force the log through the LSN of the
-        * transaction that freed the block
-        */
-       if (lsn)
-               xfs_log_force_lsn(tp->t_mountp, lsn, XFS_LOG_SYNC);
+       kmem_free(busyp);
 }
index 599bffa..6d05199 100644 (file)
@@ -22,6 +22,7 @@ struct xfs_buf;
 struct xfs_mount;
 struct xfs_perag;
 struct xfs_trans;
+struct xfs_busy_extent;
 
 /*
  * Freespace allocation types.  Argument to xfs_alloc_[v]extent.
@@ -119,15 +120,13 @@ xfs_alloc_longest_free_extent(struct xfs_mount *mp,
 #ifdef __KERNEL__
 
 void
-xfs_alloc_mark_busy(xfs_trans_t *tp,
+xfs_alloc_busy_insert(xfs_trans_t *tp,
                xfs_agnumber_t agno,
                xfs_agblock_t bno,
                xfs_extlen_t len);
 
 void
-xfs_alloc_clear_busy(xfs_trans_t *tp,
-               xfs_agnumber_t ag,
-               int idx);
+xfs_alloc_busy_clear(struct xfs_mount *mp, struct xfs_busy_extent *busyp);
 
 #endif /* __KERNEL__ */
 
index b726e10..83f4942 100644 (file)
@@ -134,7 +134,7 @@ xfs_allocbt_free_block(
         * disk. If a busy block is allocated, the iclog is pushed up to the
         * LSN that freed the block.
         */
-       xfs_alloc_mark_busy(cur->bc_tp, be32_to_cpu(agf->agf_seqno), bno, 1);
+       xfs_alloc_busy_insert(cur->bc_tp, be32_to_cpu(agf->agf_seqno), bno, 1);
        xfs_trans_agbtree_delta(cur->bc_tp, -1);
        return 0;
 }
index 240340a..02a8098 100644 (file)
@@ -64,7 +64,7 @@ xfs_buf_item_log_debug(
        nbytes = last - first + 1;
        bfset(bip->bli_logged, first, nbytes);
        for (x = 0; x < nbytes; x++) {
-               chunk_num = byte >> XFS_BLI_SHIFT;
+               chunk_num = byte >> XFS_BLF_SHIFT;
                word_num = chunk_num >> BIT_TO_WORD_SHIFT;
                bit_num = chunk_num & (NBWORD - 1);
                wordp = &(bip->bli_format.blf_data_map[word_num]);
@@ -166,7 +166,7 @@ xfs_buf_item_size(
                 * cancel flag in it.
                 */
                trace_xfs_buf_item_size_stale(bip);
-               ASSERT(bip->bli_format.blf_flags & XFS_BLI_CANCEL);
+               ASSERT(bip->bli_format.blf_flags & XFS_BLF_CANCEL);
                return 1;
        }
 
@@ -197,9 +197,9 @@ xfs_buf_item_size(
                } else if (next_bit != last_bit + 1) {
                        last_bit = next_bit;
                        nvecs++;
-               } else if (xfs_buf_offset(bp, next_bit * XFS_BLI_CHUNK) !=
-                          (xfs_buf_offset(bp, last_bit * XFS_BLI_CHUNK) +
-                           XFS_BLI_CHUNK)) {
+               } else if (xfs_buf_offset(bp, next_bit * XFS_BLF_CHUNK) !=
+                          (xfs_buf_offset(bp, last_bit * XFS_BLF_CHUNK) +
+                           XFS_BLF_CHUNK)) {
                        last_bit = next_bit;
                        nvecs++;
                } else {
@@ -254,6 +254,20 @@ xfs_buf_item_format(
        vecp++;
        nvecs = 1;
 
+       /*
+        * If it is an inode buffer, transfer the in-memory state to the
+        * format flags and clear the in-memory state. We do not transfer
+        * this state if the inode buffer allocation has not yet been committed
+        * to the log as setting the XFS_BLI_INODE_BUF flag will prevent
+        * correct replay of the inode allocation.
+        */
+       if (bip->bli_flags & XFS_BLI_INODE_BUF) {
+               if (!((bip->bli_flags & XFS_BLI_INODE_ALLOC_BUF) &&
+                     xfs_log_item_in_current_chkpt(&bip->bli_item)))
+                       bip->bli_format.blf_flags |= XFS_BLF_INODE_BUF;
+               bip->bli_flags &= ~XFS_BLI_INODE_BUF;
+       }
+
        if (bip->bli_flags & XFS_BLI_STALE) {
                /*
                 * The buffer is stale, so all we need to log
@@ -261,7 +275,7 @@ xfs_buf_item_format(
                 * cancel flag in it.
                 */
                trace_xfs_buf_item_format_stale(bip);
-               ASSERT(bip->bli_format.blf_flags & XFS_BLI_CANCEL);
+               ASSERT(bip->bli_format.blf_flags & XFS_BLF_CANCEL);
                bip->bli_format.blf_size = nvecs;
                return;
        }
@@ -294,28 +308,28 @@ xfs_buf_item_format(
                 * keep counting and scanning.
                 */
                if (next_bit == -1) {
-                       buffer_offset = first_bit * XFS_BLI_CHUNK;
+                       buffer_offset = first_bit * XFS_BLF_CHUNK;
                        vecp->i_addr = xfs_buf_offset(bp, buffer_offset);
-                       vecp->i_len = nbits * XFS_BLI_CHUNK;
+                       vecp->i_len = nbits * XFS_BLF_CHUNK;
                        vecp->i_type = XLOG_REG_TYPE_BCHUNK;
                        nvecs++;
                        break;
                } else if (next_bit != last_bit + 1) {
-                       buffer_offset = first_bit * XFS_BLI_CHUNK;
+                       buffer_offset = first_bit * XFS_BLF_CHUNK;
                        vecp->i_addr = xfs_buf_offset(bp, buffer_offset);
-                       vecp->i_len = nbits * XFS_BLI_CHUNK;
+                       vecp->i_len = nbits * XFS_BLF_CHUNK;
                        vecp->i_type = XLOG_REG_TYPE_BCHUNK;
                        nvecs++;
                        vecp++;
                        first_bit = next_bit;
                        last_bit = next_bit;
                        nbits = 1;
-               } else if (xfs_buf_offset(bp, next_bit << XFS_BLI_SHIFT) !=
-                          (xfs_buf_offset(bp, last_bit << XFS_BLI_SHIFT) +
-                           XFS_BLI_CHUNK)) {
-                       buffer_offset = first_bit * XFS_BLI_CHUNK;
+               } else if (xfs_buf_offset(bp, next_bit << XFS_BLF_SHIFT) !=
+                          (xfs_buf_offset(bp, last_bit << XFS_BLF_SHIFT) +
+                           XFS_BLF_CHUNK)) {
+                       buffer_offset = first_bit * XFS_BLF_CHUNK;
                        vecp->i_addr = xfs_buf_offset(bp, buffer_offset);
-                       vecp->i_len = nbits * XFS_BLI_CHUNK;
+                       vecp->i_len = nbits * XFS_BLF_CHUNK;
                        vecp->i_type = XLOG_REG_TYPE_BCHUNK;
 /* You would think we need to bump the nvecs here too, but we do not
  * this number is used by recovery, and it gets confused by the boundary
@@ -341,10 +355,15 @@ xfs_buf_item_format(
 }
 
 /*
- * This is called to pin the buffer associated with the buf log
- * item in memory so it cannot be written out.  Simply call bpin()
- * on the buffer to do this.
+ * This is called to pin the buffer associated with the buf log item in memory
+ * so it cannot be written out.  Simply call bpin() on the buffer to do this.
+ *
+ * We also always take a reference to the buffer log item here so that the bli
+ * is held while the item is pinned in memory. This means that we can
+ * unconditionally drop the reference count a transaction holds when the
+ * transaction is completed.
  */
+
 STATIC void
 xfs_buf_item_pin(
        xfs_buf_log_item_t      *bip)
@@ -356,6 +375,7 @@ xfs_buf_item_pin(
        ASSERT(atomic_read(&bip->bli_refcount) > 0);
        ASSERT((bip->bli_flags & XFS_BLI_LOGGED) ||
               (bip->bli_flags & XFS_BLI_STALE));
+       atomic_inc(&bip->bli_refcount);
        trace_xfs_buf_item_pin(bip);
        xfs_bpin(bp);
 }
@@ -393,7 +413,7 @@ xfs_buf_item_unpin(
                ASSERT(XFS_BUF_VALUSEMA(bp) <= 0);
                ASSERT(!(XFS_BUF_ISDELAYWRITE(bp)));
                ASSERT(XFS_BUF_ISSTALE(bp));
-               ASSERT(bip->bli_format.blf_flags & XFS_BLI_CANCEL);
+               ASSERT(bip->bli_format.blf_flags & XFS_BLF_CANCEL);
                trace_xfs_buf_item_unpin_stale(bip);
 
                /*
@@ -489,20 +509,23 @@ xfs_buf_item_trylock(
 }
 
 /*
- * Release the buffer associated with the buf log item.
- * If there is no dirty logged data associated with the
- * buffer recorded in the buf log item, then free the
- * buf log item and remove the reference to it in the
- * buffer.
+ * Release the buffer associated with the buf log item.  If there is no dirty
+ * logged data associated with the buffer recorded in the buf log item, then
+ * free the buf log item and remove the reference to it in the buffer.
+ *
+ * This call ignores the recursion count.  It is only called when the buffer
+ * should REALLY be unlocked, regardless of the recursion count.
  *
- * This call ignores the recursion count.  It is only called
- * when the buffer should REALLY be unlocked, regardless
- * of the recursion count.
+ * We unconditionally drop the transaction's reference to the log item. If the
+ * item was logged, then another reference was taken when it was pinned, so we
+ * can safely drop the transaction reference now.  This also allows us to avoid
+ * potential races with the unpin code freeing the bli by not referencing the
+ * bli after we've dropped the reference count.
  *
- * If the XFS_BLI_HOLD flag is set in the buf log item, then
- * free the log item if necessary but do not unlock the buffer.
- * This is for support of xfs_trans_bhold(). Make sure the
- * XFS_BLI_HOLD field is cleared if we don't free the item.
+ * If the XFS_BLI_HOLD flag is set in the buf log item, then free the log item
+ * if necessary but do not unlock the buffer.  This is for support of
+ * xfs_trans_bhold(). Make sure the XFS_BLI_HOLD field is cleared if we don't
+ * free the item.
  */
 STATIC void
 xfs_buf_item_unlock(
@@ -514,73 +537,54 @@ xfs_buf_item_unlock(
 
        bp = bip->bli_buf;
 
-       /*
-        * Clear the buffer's association with this transaction.
-        */
+       /* Clear the buffer's association with this transaction. */
        XFS_BUF_SET_FSPRIVATE2(bp, NULL);
 
        /*
-        * If this is a transaction abort, don't return early.
-        * Instead, allow the brelse to happen.
-        * Normally it would be done for stale (cancelled) buffers
-        * at unpin time, but we'll never go through the pin/unpin
-        * cycle if we abort inside commit.
+        * If this is a transaction abort, don't return early.  Instead, allow
+        * the brelse to happen.  Normally it would be done for stale
+        * (cancelled) buffers at unpin time, but we'll never go through the
+        * pin/unpin cycle if we abort inside commit.
         */
        aborted = (bip->bli_item.li_flags & XFS_LI_ABORTED) != 0;
 
        /*
-        * If the buf item is marked stale, then don't do anything.
-        * We'll unlock the buffer and free the buf item when the
-        * buffer is unpinned for the last time.
+        * Before possibly freeing the buf item, determine if we should
+        * release the buffer at the end of this routine.
         */
-       if (bip->bli_flags & XFS_BLI_STALE) {
-               bip->bli_flags &= ~XFS_BLI_LOGGED;
-               trace_xfs_buf_item_unlock_stale(bip);
-               ASSERT(bip->bli_format.blf_flags & XFS_BLI_CANCEL);
-               if (!aborted)
-                       return;
-       }
+       hold = bip->bli_flags & XFS_BLI_HOLD;
+
+       /* Clear the per transaction state. */
+       bip->bli_flags &= ~(XFS_BLI_LOGGED | XFS_BLI_HOLD);
 
        /*
-        * Drop the transaction's reference to the log item if
-        * it was not logged as part of the transaction.  Otherwise
-        * we'll drop the reference in xfs_buf_item_unpin() when
-        * the transaction is really through with the buffer.
+        * If the buf item is marked stale, then don't do anything.  We'll
+        * unlock the buffer and free the buf item when the buffer is unpinned
+        * for the last time.
         */
-       if (!(bip->bli_flags & XFS_BLI_LOGGED)) {
-               atomic_dec(&bip->bli_refcount);
-       } else {
-               /*
-                * Clear the logged flag since this is per
-                * transaction state.
-                */
-               bip->bli_flags &= ~XFS_BLI_LOGGED;
+       if (bip->bli_flags & XFS_BLI_STALE) {
+               trace_xfs_buf_item_unlock_stale(bip);
+               ASSERT(bip->bli_format.blf_flags & XFS_BLF_CANCEL);
+               if (!aborted) {
+                       atomic_dec(&bip->bli_refcount);
+                       return;
+               }
        }
 
-       /*
-        * Before possibly freeing the buf item, determine if we should
-        * release the buffer at the end of this routine.
-        */
-       hold = bip->bli_flags & XFS_BLI_HOLD;
        trace_xfs_buf_item_unlock(bip);
 
        /*
-        * If the buf item isn't tracking any data, free it.
-        * Otherwise, if XFS_BLI_HOLD is set clear it.
+        * If the buf item isn't tracking any data, free it, otherwise drop the
+        * reference we hold to it.
         */
        if (xfs_bitmap_empty(bip->bli_format.blf_data_map,
-                            bip->bli_format.blf_map_size)) {
+                            bip->bli_format.blf_map_size))
                xfs_buf_item_relse(bp);
-       } else if (hold) {
-               bip->bli_flags &= ~XFS_BLI_HOLD;
-       }
+       else
+               atomic_dec(&bip->bli_refcount);
 
-       /*
-        * Release the buffer if XFS_BLI_HOLD was not set.
-        */
-       if (!hold) {
+       if (!hold)
                xfs_buf_relse(bp);
-       }
 }
 
 /*
@@ -717,12 +721,12 @@ xfs_buf_item_init(
        }
 
        /*
-        * chunks is the number of XFS_BLI_CHUNK size pieces
+        * chunks is the number of XFS_BLF_CHUNK size pieces
         * the buffer can be divided into. Make sure not to
         * truncate any pieces.  map_size is the size of the
         * bitmap needed to describe the chunks of the buffer.
         */
-       chunks = (int)((XFS_BUF_COUNT(bp) + (XFS_BLI_CHUNK - 1)) >> XFS_BLI_SHIFT);
+       chunks = (int)((XFS_BUF_COUNT(bp) + (XFS_BLF_CHUNK - 1)) >> XFS_BLF_SHIFT);
        map_size = (int)((chunks + NBWORD) >> BIT_TO_WORD_SHIFT);
 
        bip = (xfs_buf_log_item_t*)kmem_zone_zalloc(xfs_buf_item_zone,
@@ -790,8 +794,8 @@ xfs_buf_item_log(
        /*
         * Convert byte offsets to bit numbers.
         */
-       first_bit = first >> XFS_BLI_SHIFT;
-       last_bit = last >> XFS_BLI_SHIFT;
+       first_bit = first >> XFS_BLF_SHIFT;
+       last_bit = last >> XFS_BLF_SHIFT;
 
        /*
         * Calculate the total number of bits to be set.
index df44545..f20bb47 100644 (file)
@@ -41,22 +41,22 @@ typedef struct xfs_buf_log_format {
  * This flag indicates that the buffer contains on disk inodes
  * and requires special recovery handling.
  */
-#define        XFS_BLI_INODE_BUF       0x1
+#define        XFS_BLF_INODE_BUF       0x1
 /*
  * This flag indicates that the buffer should not be replayed
  * during recovery because its blocks are being freed.
  */
-#define        XFS_BLI_CANCEL          0x2
+#define        XFS_BLF_CANCEL          0x2
 /*
  * This flag indicates that the buffer contains on disk
  * user or group dquots and may require special recovery handling.
  */
-#define        XFS_BLI_UDQUOT_BUF      0x4
-#define XFS_BLI_PDQUOT_BUF     0x8
-#define        XFS_BLI_GDQUOT_BUF      0x10
+#define        XFS_BLF_UDQUOT_BUF      0x4
+#define XFS_BLF_PDQUOT_BUF     0x8
+#define        XFS_BLF_GDQUOT_BUF      0x10
 
-#define        XFS_BLI_CHUNK           128
-#define        XFS_BLI_SHIFT           7
+#define        XFS_BLF_CHUNK           128
+#define        XFS_BLF_SHIFT           7
 #define        BIT_TO_WORD_SHIFT       5
 #define        NBWORD                  (NBBY * sizeof(unsigned int))
 
@@ -69,6 +69,7 @@ typedef struct xfs_buf_log_format {
 #define        XFS_BLI_LOGGED          0x08
 #define        XFS_BLI_INODE_ALLOC_BUF 0x10
 #define XFS_BLI_STALE_INODE    0x20
+#define        XFS_BLI_INODE_BUF       0x40
 
 #define XFS_BLI_FLAGS \
        { XFS_BLI_HOLD,         "HOLD" }, \
@@ -76,7 +77,8 @@ typedef struct xfs_buf_log_format {
        { XFS_BLI_STALE,        "STALE" }, \
        { XFS_BLI_LOGGED,       "LOGGED" }, \
        { XFS_BLI_INODE_ALLOC_BUF, "INODE_ALLOC" }, \
-       { XFS_BLI_STALE_INODE,  "STALE_INODE" }
+       { XFS_BLI_STALE_INODE,  "STALE_INODE" }, \
+       { XFS_BLI_INODE_BUF,    "INODE_BUF" }
 
 
 #ifdef __KERNEL__
index ef96175..047b8a8 100644 (file)
@@ -170,7 +170,7 @@ xfs_cmn_err(int panic_tag, int level, xfs_mount_t *mp, char *fmt, ...)
        va_list ap;
 
 #ifdef DEBUG
-       xfs_panic_mask |= XFS_PTAG_SHUTDOWN_CORRUPT;
+       xfs_panic_mask |= (XFS_PTAG_SHUTDOWN_CORRUPT | XFS_PTAG_LOGRES);
 #endif
 
        if (xfs_panic_mask && (xfs_panic_mask & panic_tag)
index 3038dd5..5215abc 100644 (file)
@@ -54,9 +54,6 @@ STATIC xlog_t *  xlog_alloc_log(xfs_mount_t   *mp,
 STATIC int      xlog_space_left(xlog_t *log, int cycle, int bytes);
 STATIC int      xlog_sync(xlog_t *log, xlog_in_core_t *iclog);
 STATIC void     xlog_dealloc_log(xlog_t *log);
-STATIC int      xlog_write(struct log *log, struct xfs_log_vec *log_vector,
-                           struct xlog_ticket *tic, xfs_lsn_t *start_lsn,
-                           xlog_in_core_t **commit_iclog, uint flags);
 
 /* local state machine functions */
 STATIC void xlog_state_done_syncing(xlog_in_core_t *iclog, int);
@@ -86,14 +83,6 @@ STATIC int xlog_regrant_write_log_space(xlog_t               *log,
 STATIC void xlog_ungrant_log_space(xlog_t       *log,
                                   xlog_ticket_t *ticket);
 
-
-/* local ticket functions */
-STATIC xlog_ticket_t   *xlog_ticket_alloc(xlog_t *log,
-                                        int    unit_bytes,
-                                        int    count,
-                                        char   clientid,
-                                        uint   flags);
-
 #if defined(DEBUG)
 STATIC void    xlog_verify_dest_ptr(xlog_t *log, char *ptr);
 STATIC void    xlog_verify_grant_head(xlog_t *log, int equals);
@@ -360,6 +349,15 @@ xfs_log_reserve(
                ASSERT(flags & XFS_LOG_PERM_RESERV);
                internal_ticket = *ticket;
 
+               /*
+                * this is a new transaction on the ticket, so we need to
+                * change the transaction ID so that the next transaction has a
+                * different TID in the log. Just add one to the existing tid
+                * so that we can see chains of rolling transactions in the log
+                * easily.
+                */
+               internal_ticket->t_tid++;
+
                trace_xfs_log_reserve(log, internal_ticket);
 
                xlog_grant_push_ail(mp, internal_ticket->t_unit_res);
@@ -367,7 +365,8 @@ xfs_log_reserve(
        } else {
                /* may sleep if need to allocate more tickets */
                internal_ticket = xlog_ticket_alloc(log, unit_bytes, cnt,
-                                                 client, flags);
+                                                 client, flags,
+                                                 KM_SLEEP|KM_MAYFAIL);
                if (!internal_ticket)
                        return XFS_ERROR(ENOMEM);
                internal_ticket->t_trans_type = t_type;
@@ -452,6 +451,13 @@ xfs_log_mount(
        /* Normal transactions can now occur */
        mp->m_log->l_flags &= ~XLOG_ACTIVE_RECOVERY;
 
+       /*
+        * Now the log has been fully initialised and we know were our
+        * space grant counters are, we can initialise the permanent ticket
+        * needed for delayed logging to work.
+        */
+       xlog_cil_init_post_recovery(mp->m_log);
+
        return 0;
 
 out_destroy_ail:
@@ -658,6 +664,10 @@ xfs_log_item_init(
        item->li_ailp = mp->m_ail;
        item->li_type = type;
        item->li_ops = ops;
+       item->li_lv = NULL;
+
+       INIT_LIST_HEAD(&item->li_ail);
+       INIT_LIST_HEAD(&item->li_cil);
 }
 
 /*
@@ -1168,6 +1178,9 @@ xlog_alloc_log(xfs_mount_t        *mp,
        *iclogp = log->l_iclog;                 /* complete ring */
        log->l_iclog->ic_prev = prev_iclog;     /* re-write 1st prev ptr */
 
+       error = xlog_cil_init(log);
+       if (error)
+               goto out_free_iclog;
        return log;
 
 out_free_iclog:
@@ -1494,6 +1507,8 @@ xlog_dealloc_log(xlog_t *log)
        xlog_in_core_t  *iclog, *next_iclog;
        int             i;
 
+       xlog_cil_destroy(log);
+
        iclog = log->l_iclog;
        for (i=0; i<log->l_iclog_bufs; i++) {
                sv_destroy(&iclog->ic_force_wait);
@@ -1536,8 +1551,10 @@ xlog_state_finish_copy(xlog_t            *log,
  * print out info relating to regions written which consume
  * the reservation
  */
-STATIC void
-xlog_print_tic_res(xfs_mount_t *mp, xlog_ticket_t *ticket)
+void
+xlog_print_tic_res(
+       struct xfs_mount        *mp,
+       struct xlog_ticket      *ticket)
 {
        uint i;
        uint ophdr_spc = ticket->t_res_num_ophdrs * (uint)sizeof(xlog_op_header_t);
@@ -1637,6 +1654,10 @@ xlog_print_tic_res(xfs_mount_t *mp, xlog_ticket_t *ticket)
                            "bad-rtype" : res_type_str[r_type-1]),
                            ticket->t_res_arr[i].r_len);
        }
+
+       xfs_cmn_err(XFS_PTAG_LOGRES, CE_ALERT, mp,
+               "xfs_log_write: reservation ran out. Need to up reservation");
+       xfs_force_shutdown(mp, SHUTDOWN_CORRUPT_INCORE);
 }
 
 /*
@@ -1865,7 +1886,7 @@ xlog_write_copy_finish(
  *     we don't update ic_offset until the end when we know exactly how many
  *     bytes have been written out.
  */
-STATIC int
+int
 xlog_write(
        struct log              *log,
        struct xfs_log_vec      *log_vector,
@@ -1889,22 +1910,26 @@ xlog_write(
        *start_lsn = 0;
 
        len = xlog_write_calc_vec_length(ticket, log_vector);
-       if (ticket->t_curr_res < len) {
-               xlog_print_tic_res(log->l_mp, ticket);
-#ifdef DEBUG
-               xlog_panic(
-       "xfs_log_write: reservation ran out. Need to up reservation");
-#else
-               /* Customer configurable panic */
-               xfs_cmn_err(XFS_PTAG_LOGRES, CE_ALERT, log->l_mp,
-       "xfs_log_write: reservation ran out. Need to up reservation");
+       if (log->l_cilp) {
+               /*
+                * Region headers and bytes are already accounted for.
+                * We only need to take into account start records and
+                * split regions in this function.
+                */
+               if (ticket->t_flags & XLOG_TIC_INITED)
+                       ticket->t_curr_res -= sizeof(xlog_op_header_t);
 
-               /* If we did not panic, shutdown the filesystem */
-               xfs_force_shutdown(log->l_mp, SHUTDOWN_CORRUPT_INCORE);
-#endif
-       }
+               /*
+                * Commit record headers need to be accounted for. These
+                * come in as separate writes so are easy to detect.
+                */
+               if (flags & (XLOG_COMMIT_TRANS | XLOG_UNMOUNT_TRANS))
+                       ticket->t_curr_res -= sizeof(xlog_op_header_t);
+       } else
+               ticket->t_curr_res -= len;
 
-       ticket->t_curr_res -= len;
+       if (ticket->t_curr_res < 0)
+               xlog_print_tic_res(log->l_mp, ticket);
 
        index = 0;
        lv = log_vector;
@@ -3000,6 +3025,8 @@ _xfs_log_force(
 
        XFS_STATS_INC(xs_log_force);
 
+       xlog_cil_push(log, 1);
+
        spin_lock(&log->l_icloglock);
 
        iclog = log->l_iclog;
@@ -3149,6 +3176,12 @@ _xfs_log_force_lsn(
 
        XFS_STATS_INC(xs_log_force);
 
+       if (log->l_cilp) {
+               lsn = xlog_cil_push_lsn(log, lsn);
+               if (lsn == NULLCOMMITLSN)
+                       return 0;
+       }
+
 try_again:
        spin_lock(&log->l_icloglock);
        iclog = log->l_iclog;
@@ -3313,22 +3346,30 @@ xfs_log_ticket_get(
        return ticket;
 }
 
+xlog_tid_t
+xfs_log_get_trans_ident(
+       struct xfs_trans        *tp)
+{
+       return tp->t_ticket->t_tid;
+}
+
 /*
  * Allocate and initialise a new log ticket.
  */
-STATIC xlog_ticket_t *
+xlog_ticket_t *
 xlog_ticket_alloc(
        struct log      *log,
        int             unit_bytes,
        int             cnt,
        char            client,
-       uint            xflags)
+       uint            xflags,
+       int             alloc_flags)
 {
        struct xlog_ticket *tic;
        uint            num_headers;
        int             iclog_space;
 
-       tic = kmem_zone_zalloc(xfs_log_ticket_zone, KM_SLEEP|KM_MAYFAIL);
+       tic = kmem_zone_zalloc(xfs_log_ticket_zone, alloc_flags);
        if (!tic)
                return NULL;
 
@@ -3647,6 +3688,11 @@ xlog_state_ioerror(
  *     c. nothing new gets queued up after (a) and (b) are done.
  *     d. if !logerror, flush the iclogs to disk, then seal them off
  *        for business.
+ *
+ * Note: for delayed logging the !logerror case needs to flush the regions
+ * held in memory out to the iclogs before flushing them to disk. This needs
+ * to be done before the log is marked as shutdown, otherwise the flush to the
+ * iclogs will fail.
  */
 int
 xfs_log_force_umount(
@@ -3680,6 +3726,16 @@ xfs_log_force_umount(
                return 1;
        }
        retval = 0;
+
+       /*
+        * Flush the in memory commit item list before marking the log as
+        * being shut down. We need to do it in this order to ensure all the
+        * completed transactions are flushed to disk with the xfs_log_force()
+        * call below.
+        */
+       if (!logerror && (mp->m_flags & XFS_MOUNT_DELAYLOG))
+               xlog_cil_push(log, 1);
+
        /*
         * We must hold both the GRANT lock and the LOG lock,
         * before we mark the filesystem SHUTDOWN and wake
index 229d1f3..04c78e6 100644 (file)
@@ -19,7 +19,6 @@
 #define __XFS_LOG_H__
 
 /* get lsn fields */
-
 #define CYCLE_LSN(lsn) ((uint)((lsn)>>32))
 #define BLOCK_LSN(lsn) ((uint)(lsn))
 
@@ -114,6 +113,9 @@ struct xfs_log_vec {
        struct xfs_log_vec      *lv_next;       /* next lv in build list */
        int                     lv_niovecs;     /* number of iovecs in lv */
        struct xfs_log_iovec    *lv_iovecp;     /* iovec array */
+       struct xfs_log_item     *lv_item;       /* owner */
+       char                    *lv_buf;        /* formatted buffer */
+       int                     lv_buf_len;     /* size of formatted buffer */
 };
 
 /*
@@ -134,6 +136,7 @@ struct xlog_in_core;
 struct xlog_ticket;
 struct xfs_log_item;
 struct xfs_item_ops;
+struct xfs_trans;
 
 void   xfs_log_item_init(struct xfs_mount      *mp,
                        struct xfs_log_item     *item,
@@ -187,9 +190,16 @@ int          xfs_log_need_covered(struct xfs_mount *mp);
 
 void     xlog_iodone(struct xfs_buf *);
 
-struct xlog_ticket * xfs_log_ticket_get(struct xlog_ticket *ticket);
+struct xlog_ticket *xfs_log_ticket_get(struct xlog_ticket *ticket);
 void     xfs_log_ticket_put(struct xlog_ticket *ticket);
 
+xlog_tid_t xfs_log_get_trans_ident(struct xfs_trans *tp);
+
+int    xfs_log_commit_cil(struct xfs_mount *mp, struct xfs_trans *tp,
+                               struct xfs_log_vec *log_vector,
+                               xfs_lsn_t *commit_lsn, int flags);
+bool   xfs_log_item_in_current_chkpt(struct xfs_log_item *lip);
+
 #endif
 
 
diff --git a/fs/xfs/xfs_log_cil.c b/fs/xfs/xfs_log_cil.c
new file mode 100644 (file)
index 0000000..bb17cc0
--- /dev/null
@@ -0,0 +1,725 @@
+/*
+ * Copyright (c) 2010 Red Hat, Inc. All Rights Reserved.
+ *
+ * This program is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU General Public License as
+ * published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it would be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program; if not, write the Free Software Foundation,
+ * Inc.,  51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA
+ */
+
+#include "xfs.h"
+#include "xfs_fs.h"
+#include "xfs_types.h"
+#include "xfs_bit.h"
+#include "xfs_log.h"
+#include "xfs_inum.h"
+#include "xfs_trans.h"
+#include "xfs_trans_priv.h"
+#include "xfs_log_priv.h"
+#include "xfs_sb.h"
+#include "xfs_ag.h"
+#include "xfs_dir2.h"
+#include "xfs_dmapi.h"
+#include "xfs_mount.h"
+#include "xfs_error.h"
+#include "xfs_alloc.h"
+
+/*
+ * Perform initial CIL structure initialisation. If the CIL is not
+ * enabled in this filesystem, ensure the log->l_cilp is null so
+ * we can check this conditional to determine if we are doing delayed
+ * logging or not.
+ */
+int
+xlog_cil_init(
+       struct log      *log)
+{
+       struct xfs_cil  *cil;
+       struct xfs_cil_ctx *ctx;
+
+       log->l_cilp = NULL;
+       if (!(log->l_mp->m_flags & XFS_MOUNT_DELAYLOG))
+               return 0;
+
+       cil = kmem_zalloc(sizeof(*cil), KM_SLEEP|KM_MAYFAIL);
+       if (!cil)
+               return ENOMEM;
+
+       ctx = kmem_zalloc(sizeof(*ctx), KM_SLEEP|KM_MAYFAIL);
+       if (!ctx) {
+               kmem_free(cil);
+               return ENOMEM;
+       }
+
+       INIT_LIST_HEAD(&cil->xc_cil);
+       INIT_LIST_HEAD(&cil->xc_committing);
+       spin_lock_init(&cil->xc_cil_lock);
+       init_rwsem(&cil->xc_ctx_lock);
+       sv_init(&cil->xc_commit_wait, SV_DEFAULT, "cilwait");
+
+       INIT_LIST_HEAD(&ctx->committing);
+       INIT_LIST_HEAD(&ctx->busy_extents);
+       ctx->sequence = 1;
+       ctx->cil = cil;
+       cil->xc_ctx = ctx;
+
+       cil->xc_log = log;
+       log->l_cilp = cil;
+       return 0;
+}
+
+void
+xlog_cil_destroy(
+       struct log      *log)
+{
+       if (!log->l_cilp)
+               return;
+
+       if (log->l_cilp->xc_ctx) {
+               if (log->l_cilp->xc_ctx->ticket)
+                       xfs_log_ticket_put(log->l_cilp->xc_ctx->ticket);
+               kmem_free(log->l_cilp->xc_ctx);
+       }
+
+       ASSERT(list_empty(&log->l_cilp->xc_cil));
+       kmem_free(log->l_cilp);
+}
+
+/*
+ * Allocate a new ticket. Failing to get a new ticket makes it really hard to
+ * recover, so we don't allow failure here. Also, we allocate in a context that
+ * we don't want to be issuing transactions from, so we need to tell the
+ * allocation code this as well.
+ *
+ * We don't reserve any space for the ticket - we are going to steal whatever
+ * space we require from transactions as they commit. To ensure we reserve all
+ * the space required, we need to set the current reservation of the ticket to
+ * zero so that we know to steal the initial transaction overhead from the
+ * first transaction commit.
+ */
+static struct xlog_ticket *
+xlog_cil_ticket_alloc(
+       struct log      *log)
+{
+       struct xlog_ticket *tic;
+
+       tic = xlog_ticket_alloc(log, 0, 1, XFS_TRANSACTION, 0,
+                               KM_SLEEP|KM_NOFS);
+       tic->t_trans_type = XFS_TRANS_CHECKPOINT;
+
+       /*
+        * set the current reservation to zero so we know to steal the basic
+        * transaction overhead reservation from the first transaction commit.
+        */
+       tic->t_curr_res = 0;
+       return tic;
+}
+
+/*
+ * After the first stage of log recovery is done, we know where the head and
+ * tail of the log are. We need this log initialisation done before we can
+ * initialise the first CIL checkpoint context.
+ *
+ * Here we allocate a log ticket to track space usage during a CIL push.  This
+ * ticket is passed to xlog_write() directly so that we don't slowly leak log
+ * space by failing to account for space used by log headers and additional
+ * region headers for split regions.
+ */
+void
+xlog_cil_init_post_recovery(
+       struct log      *log)
+{
+       if (!log->l_cilp)
+               return;
+
+       log->l_cilp->xc_ctx->ticket = xlog_cil_ticket_alloc(log);
+       log->l_cilp->xc_ctx->sequence = 1;
+       log->l_cilp->xc_ctx->commit_lsn = xlog_assign_lsn(log->l_curr_cycle,
+                                                               log->l_curr_block);
+}
+
+/*
+ * Insert the log item into the CIL and calculate the difference in space
+ * consumed by the item. Add the space to the checkpoint ticket and calculate
+ * if the change requires additional log metadata. If it does, take that space
+ * as well. Remove the amount of space we addded to the checkpoint ticket from
+ * the current transaction ticket so that the accounting works out correctly.
+ *
+ * If this is the first time the item is being placed into the CIL in this
+ * context, pin it so it can't be written to disk until the CIL is flushed to
+ * the iclog and the iclog written to disk.
+ */
+static void
+xlog_cil_insert(
+       struct log              *log,
+       struct xlog_ticket      *ticket,
+       struct xfs_log_item     *item,
+       struct xfs_log_vec      *lv)
+{
+       struct xfs_cil          *cil = log->l_cilp;
+       struct xfs_log_vec      *old = lv->lv_item->li_lv;
+       struct xfs_cil_ctx      *ctx = cil->xc_ctx;
+       int                     len;
+       int                     diff_iovecs;
+       int                     iclog_space;
+
+       if (old) {
+               /* existing lv on log item, space used is a delta */
+               ASSERT(!list_empty(&item->li_cil));
+               ASSERT(old->lv_buf && old->lv_buf_len && old->lv_niovecs);
+
+               len = lv->lv_buf_len - old->lv_buf_len;
+               diff_iovecs = lv->lv_niovecs - old->lv_niovecs;
+               kmem_free(old->lv_buf);
+               kmem_free(old);
+       } else {
+               /* new lv, must pin the log item */
+               ASSERT(!lv->lv_item->li_lv);
+               ASSERT(list_empty(&item->li_cil));
+
+               len = lv->lv_buf_len;
+               diff_iovecs = lv->lv_niovecs;
+               IOP_PIN(lv->lv_item);
+
+       }
+       len += diff_iovecs * sizeof(xlog_op_header_t);
+
+       /* attach new log vector to log item */
+       lv->lv_item->li_lv = lv;
+
+       spin_lock(&cil->xc_cil_lock);
+       list_move_tail(&item->li_cil, &cil->xc_cil);
+       ctx->nvecs += diff_iovecs;
+
+       /*
+        * If this is the first time the item is being committed to the CIL,
+        * store the sequence number on the log item so we can tell
+        * in future commits whether this is the first checkpoint the item is
+        * being committed into.
+        */
+       if (!item->li_seq)
+               item->li_seq = ctx->sequence;
+
+       /*
+        * Now transfer enough transaction reservation to the context ticket
+        * for the checkpoint. The context ticket is special - the unit
+        * reservation has to grow as well as the current reservation as we
+        * steal from tickets so we can correctly determine the space used
+        * during the transaction commit.
+        */
+       if (ctx->ticket->t_curr_res == 0) {
+               /* first commit in checkpoint, steal the header reservation */
+               ASSERT(ticket->t_curr_res >= ctx->ticket->t_unit_res + len);
+               ctx->ticket->t_curr_res = ctx->ticket->t_unit_res;
+               ticket->t_curr_res -= ctx->ticket->t_unit_res;
+       }
+
+       /* do we need space for more log record headers? */
+       iclog_space = log->l_iclog_size - log->l_iclog_hsize;
+       if (len > 0 && (ctx->space_used / iclog_space !=
+                               (ctx->space_used + len) / iclog_space)) {
+               int hdrs;
+
+               hdrs = (len + iclog_space - 1) / iclog_space;
+               /* need to take into account split region headers, too */
+               hdrs *= log->l_iclog_hsize + sizeof(struct xlog_op_header);
+               ctx->ticket->t_unit_res += hdrs;
+               ctx->ticket->t_curr_res += hdrs;
+               ticket->t_curr_res -= hdrs;
+               ASSERT(ticket->t_curr_res >= len);
+       }
+       ticket->t_curr_res -= len;
+       ctx->space_used += len;
+
+       spin_unlock(&cil->xc_cil_lock);
+}
+
+/*
+ * Format log item into a flat buffers
+ *
+ * For delayed logging, we need to hold a formatted buffer containing all the
+ * changes on the log item. This enables us to relog the item in memory and
+ * write it out asynchronously without needing to relock the object that was
+ * modified at the time it gets written into the iclog.
+ *
+ * This function builds a vector for the changes in each log item in the
+ * transaction. It then works out the length of the buffer needed for each log
+ * item, allocates them and formats the vector for the item into the buffer.
+ * The buffer is then attached to the log item are then inserted into the
+ * Committed Item List for tracking until the next checkpoint is written out.
+ *
+ * We don't set up region headers during this process; we simply copy the
+ * regions into the flat buffer. We can do this because we still have to do a
+ * formatting step to write the regions into the iclog buffer.  Writing the
+ * ophdrs during the iclog write means that we can support splitting large
+ * regions across iclog boundares without needing a change in the format of the
+ * item/region encapsulation.
+ *
+ * Hence what we need to do now is change the rewrite the vector array to point
+ * to the copied region inside the buffer we just allocated. This allows us to
+ * format the regions into the iclog as though they are being formatted
+ * directly out of the objects themselves.
+ */
+static void
+xlog_cil_format_items(
+       struct log              *log,
+       struct xfs_log_vec      *log_vector,
+       struct xlog_ticket      *ticket,
+       xfs_lsn_t               *start_lsn)
+{
+       struct xfs_log_vec *lv;
+
+       if (start_lsn)
+               *start_lsn = log->l_cilp->xc_ctx->sequence;
+
+       ASSERT(log_vector);
+       for (lv = log_vector; lv; lv = lv->lv_next) {
+               void    *ptr;
+               int     index;
+               int     len = 0;
+
+               /* build the vector array and calculate it's length */
+               IOP_FORMAT(lv->lv_item, lv->lv_iovecp);
+               for (index = 0; index < lv->lv_niovecs; index++)
+                       len += lv->lv_iovecp[index].i_len;
+
+               lv->lv_buf_len = len;
+               lv->lv_buf = kmem_zalloc(lv->lv_buf_len, KM_SLEEP|KM_NOFS);
+               ptr = lv->lv_buf;
+
+               for (index = 0; index < lv->lv_niovecs; index++) {
+                       struct xfs_log_iovec *vec = &lv->lv_iovecp[index];
+
+                       memcpy(ptr, vec->i_addr, vec->i_len);
+                       vec->i_addr = ptr;
+                       ptr += vec->i_len;
+               }
+               ASSERT(ptr == lv->lv_buf + lv->lv_buf_len);
+
+               xlog_cil_insert(log, ticket, lv->lv_item, lv);
+       }
+}
+
+static void
+xlog_cil_free_logvec(
+       struct xfs_log_vec      *log_vector)
+{
+       struct xfs_log_vec      *lv;
+
+       for (lv = log_vector; lv; ) {
+               struct xfs_log_vec *next = lv->lv_next;
+               kmem_free(lv->lv_buf);
+               kmem_free(lv);
+               lv = next;
+       }
+}
+
+/*
+ * Commit a transaction with the given vector to the Committed Item List.
+ *
+ * To do this, we need to format the item, pin it in memory if required and
+ * account for the space used by the transaction. Once we have done that we
+ * need to release the unused reservation for the transaction, attach the
+ * transaction to the checkpoint context so we carry the busy extents through
+ * to checkpoint completion, and then unlock all the items in the transaction.
+ *
+ * For more specific information about the order of operations in
+ * xfs_log_commit_cil() please refer to the comments in
+ * xfs_trans_commit_iclog().
+ *
+ * Called with the context lock already held in read mode to lock out
+ * background commit, returns without it held once background commits are
+ * allowed again.
+ */
+int
+xfs_log_commit_cil(
+       struct xfs_mount        *mp,
+       struct xfs_trans        *tp,
+       struct xfs_log_vec      *log_vector,
+       xfs_lsn_t               *commit_lsn,
+       int                     flags)
+{
+       struct log              *log = mp->m_log;
+       int                     log_flags = 0;
+       int                     push = 0;
+
+       if (flags & XFS_TRANS_RELEASE_LOG_RES)
+               log_flags = XFS_LOG_REL_PERM_RESERV;
+
+       if (XLOG_FORCED_SHUTDOWN(log)) {
+               xlog_cil_free_logvec(log_vector);
+               return XFS_ERROR(EIO);
+       }
+
+       /* lock out background commit */
+       down_read(&log->l_cilp->xc_ctx_lock);
+       xlog_cil_format_items(log, log_vector, tp->t_ticket, commit_lsn);
+
+       /* check we didn't blow the reservation */
+       if (tp->t_ticket->t_curr_res < 0)
+               xlog_print_tic_res(log->l_mp, tp->t_ticket);
+
+       /* attach the transaction to the CIL if it has any busy extents */
+       if (!list_empty(&tp->t_busy)) {
+               spin_lock(&log->l_cilp->xc_cil_lock);
+               list_splice_init(&tp->t_busy,
+                                       &log->l_cilp->xc_ctx->busy_extents);
+               spin_unlock(&log->l_cilp->xc_cil_lock);
+       }
+
+       tp->t_commit_lsn = *commit_lsn;
+       xfs_log_done(mp, tp->t_ticket, NULL, log_flags);
+       xfs_trans_unreserve_and_mod_sb(tp);
+
+       /* check for background commit before unlock */
+       if (log->l_cilp->xc_ctx->space_used > XLOG_CIL_SPACE_LIMIT(log))
+               push = 1;
+       up_read(&log->l_cilp->xc_ctx_lock);
+
+       /*
+        * We need to push CIL every so often so we don't cache more than we
+        * can fit in the log. The limit really is that a checkpoint can't be
+        * more than half the log (the current checkpoint is not allowed to
+        * overwrite the previous checkpoint), but commit latency and memory
+        * usage limit this to a smaller size in most cases.
+        */
+       if (push)
+               xlog_cil_push(log, 0);
+       return 0;
+}
+
+/*
+ * Mark all items committed and clear busy extents. We free the log vector
+ * chains in a separate pass so that we unpin the log items as quickly as
+ * possible.
+ */
+static void
+xlog_cil_committed(
+       void    *args,
+       int     abort)
+{
+       struct xfs_cil_ctx      *ctx = args;
+       struct xfs_log_vec      *lv;
+       int                     abortflag = abort ? XFS_LI_ABORTED : 0;
+       struct xfs_busy_extent  *busyp, *n;
+
+       /* unpin all the log items */
+       for (lv = ctx->lv_chain; lv; lv = lv->lv_next ) {
+               xfs_trans_item_committed(lv->lv_item, ctx->start_lsn,
+                                                       abortflag);
+       }
+
+       list_for_each_entry_safe(busyp, n, &ctx->busy_extents, list)
+               xfs_alloc_busy_clear(ctx->cil->xc_log->l_mp, busyp);
+
+       spin_lock(&ctx->cil->xc_cil_lock);
+       list_del(&ctx->committing);
+       spin_unlock(&ctx->cil->xc_cil_lock);
+
+       xlog_cil_free_logvec(ctx->lv_chain);
+       kmem_free(ctx);
+}
+
+/*
+ * Push the Committed Item List to the log. If the push_now flag is not set,
+ * then it is a background flush and so we can chose to ignore it.
+ */
+int
+xlog_cil_push(
+       struct log              *log,
+       int                     push_now)
+{
+       struct xfs_cil          *cil = log->l_cilp;
+       struct xfs_log_vec      *lv;
+       struct xfs_cil_ctx      *ctx;
+       struct xfs_cil_ctx      *new_ctx;
+       struct xlog_in_core     *commit_iclog;
+       struct xlog_ticket      *tic;
+       int                     num_lv;
+       int                     num_iovecs;
+       int                     len;
+       int                     error = 0;
+       struct xfs_trans_header thdr;
+       struct xfs_log_iovec    lhdr;
+       struct xfs_log_vec      lvhdr = { NULL };
+       xfs_lsn_t               commit_lsn;
+
+       if (!cil)
+               return 0;
+
+       new_ctx = kmem_zalloc(sizeof(*new_ctx), KM_SLEEP|KM_NOFS);
+       new_ctx->ticket = xlog_cil_ticket_alloc(log);
+
+       /* lock out transaction commit, but don't block on background push */
+       if (!down_write_trylock(&cil->xc_ctx_lock)) {
+               if (!push_now)
+                       goto out_free_ticket;
+               down_write(&cil->xc_ctx_lock);
+       }
+       ctx = cil->xc_ctx;
+
+       /* check if we've anything to push */
+       if (list_empty(&cil->xc_cil))
+               goto out_skip;
+
+       /* check for spurious background flush */
+       if (!push_now && cil->xc_ctx->space_used < XLOG_CIL_SPACE_LIMIT(log))
+               goto out_skip;
+
+       /*
+        * pull all the log vectors off the items in the CIL, and
+        * remove the items from the CIL. We don't need the CIL lock
+        * here because it's only needed on the transaction commit
+        * side which is currently locked out by the flush lock.
+        */
+       lv = NULL;
+       num_lv = 0;
+       num_iovecs = 0;
+       len = 0;
+       while (!list_empty(&cil->xc_cil)) {
+               struct xfs_log_item     *item;
+               int                     i;
+
+               item = list_first_entry(&cil->xc_cil,
+                                       struct xfs_log_item, li_cil);
+               list_del_init(&item->li_cil);
+               if (!ctx->lv_chain)
+                       ctx->lv_chain = item->li_lv;
+               else
+                       lv->lv_next = item->li_lv;
+               lv = item->li_lv;
+               item->li_lv = NULL;
+
+               num_lv++;
+               num_iovecs += lv->lv_niovecs;
+               for (i = 0; i < lv->lv_niovecs; i++)
+                       len += lv->lv_iovecp[i].i_len;
+       }
+
+       /*
+        * initialise the new context and attach it to the CIL. Then attach
+        * the current context to the CIL committing lsit so it can be found
+        * during log forces to extract the commit lsn of the sequence that
+        * needs to be forced.
+        */
+       INIT_LIST_HEAD(&new_ctx->committing);
+       INIT_LIST_HEAD(&new_ctx->busy_extents);
+       new_ctx->sequence = ctx->sequence + 1;
+       new_ctx->cil = cil;
+       cil->xc_ctx = new_ctx;
+
+       /*
+        * The switch is now done, so we can drop the context lock and move out
+        * of a shared context. We can't just go straight to the commit record,
+        * though - we need to synchronise with previous and future commits so
+        * that the commit records are correctly ordered in the log to ensure
+        * that we process items during log IO completion in the correct order.
+        *
+        * For example, if we get an EFI in one checkpoint and the EFD in the
+        * next (e.g. due to log forces), we do not want the checkpoint with
+        * the EFD to be committed before the checkpoint with the EFI.  Hence
+        * we must strictly order the commit records of the checkpoints so
+        * that: a) the checkpoint callbacks are attached to the iclogs in the
+        * correct order; and b) the checkpoints are replayed in correct order
+        * in log recovery.
+        *
+        * Hence we need to add this context to the committing context list so
+        * that higher sequences will wait for us to write out a commit record
+        * before they do.
+        */
+       spin_lock(&cil->xc_cil_lock);
+       list_add(&ctx->committing, &cil->xc_committing);
+       spin_unlock(&cil->xc_cil_lock);
+       up_write(&cil->xc_ctx_lock);
+
+       /*
+        * Build a checkpoint transaction header and write it to the log to
+        * begin the transaction. We need to account for the space used by the
+        * transaction header here as it is not accounted for in xlog_write().
+        *
+        * The LSN we need to pass to the log items on transaction commit is
+        * the LSN reported by the first log vector write. If we use the commit
+        * record lsn then we can move the tail beyond the grant write head.
+        */
+       tic = ctx->ticket;
+       thdr.th_magic = XFS_TRANS_HEADER_MAGIC;
+       thdr.th_type = XFS_TRANS_CHECKPOINT;
+       thdr.th_tid = tic->t_tid;
+       thdr.th_num_items = num_iovecs;
+       lhdr.i_addr = (xfs_caddr_t)&thdr;
+       lhdr.i_len = sizeof(xfs_trans_header_t);
+       lhdr.i_type = XLOG_REG_TYPE_TRANSHDR;
+       tic->t_curr_res -= lhdr.i_len + sizeof(xlog_op_header_t);
+
+       lvhdr.lv_niovecs = 1;
+       lvhdr.lv_iovecp = &lhdr;
+       lvhdr.lv_next = ctx->lv_chain;
+
+       error = xlog_write(log, &lvhdr, tic, &ctx->start_lsn, NULL, 0);
+       if (error)
+               goto out_abort;
+
+       /*
+        * now that we've written the checkpoint into the log, strictly
+        * order the commit records so replay will get them in the right order.
+        */
+restart:
+       spin_lock(&cil->xc_cil_lock);
+       list_for_each_entry(new_ctx, &cil->xc_committing, committing) {
+               /*
+                * Higher sequences will wait for this one so skip them.
+                * Don't wait for own own sequence, either.
+                */
+               if (new_ctx->sequence >= ctx->sequence)
+                       continue;
+               if (!new_ctx->commit_lsn) {
+                       /*
+                        * It is still being pushed! Wait for the push to
+                        * complete, then start again from the beginning.
+                        */
+                       sv_wait(&cil->xc_commit_wait, 0, &cil->xc_cil_lock, 0);
+                       goto restart;
+               }
+       }
+       spin_unlock(&cil->xc_cil_lock);
+
+       commit_lsn = xfs_log_done(log->l_mp, tic, &commit_iclog, 0);
+       if (error || commit_lsn == -1)
+               goto out_abort;
+
+       /* attach all the transactions w/ busy extents to iclog */
+       ctx->log_cb.cb_func = xlog_cil_committed;
+       ctx->log_cb.cb_arg = ctx;
+       error = xfs_log_notify(log->l_mp, commit_iclog, &ctx->log_cb);
+       if (error)
+               goto out_abort;
+
+       /*
+        * now the checkpoint commit is complete and we've attached the
+        * callbacks to the iclog we can assign the commit LSN to the context
+        * and wake up anyone who is waiting for the commit to complete.
+        */
+       spin_lock(&cil->xc_cil_lock);
+       ctx->commit_lsn = commit_lsn;
+       sv_broadcast(&cil->xc_commit_wait);
+       spin_unlock(&cil->xc_cil_lock);
+
+       /* release the hounds! */
+       return xfs_log_release_iclog(log->l_mp, commit_iclog);
+
+out_skip:
+       up_write(&cil->xc_ctx_lock);
+out_free_ticket:
+       xfs_log_ticket_put(new_ctx->ticket);
+       kmem_free(new_ctx);
+       return 0;
+
+out_abort:
+       xlog_cil_committed(ctx, XFS_LI_ABORTED);
+       return XFS_ERROR(EIO);
+}
+
+/*
+ * Conditionally push the CIL based on the sequence passed in.
+ *
+ * We only need to push if we haven't already pushed the sequence
+ * number given. Hence the only time we will trigger a push here is
+ * if the push sequence is the same as the current context.
+ *
+ * We return the current commit lsn to allow the callers to determine if a
+ * iclog flush is necessary following this call.
+ *
+ * XXX: Initially, just push the CIL unconditionally and return whatever
+ * commit lsn is there. It'll be empty, so this is broken for now.
+ */
+xfs_lsn_t
+xlog_cil_push_lsn(
+       struct log      *log,
+       xfs_lsn_t       push_seq)
+{
+       struct xfs_cil          *cil = log->l_cilp;
+       struct xfs_cil_ctx      *ctx;
+       xfs_lsn_t               commit_lsn = NULLCOMMITLSN;
+
+restart:
+       down_write(&cil->xc_ctx_lock);
+       ASSERT(push_seq <= cil->xc_ctx->sequence);
+
+       /* check to see if we need to force out the current context */
+       if (push_seq == cil->xc_ctx->sequence) {
+               up_write(&cil->xc_ctx_lock);
+               xlog_cil_push(log, 1);
+               goto restart;
+       }
+
+       /*
+        * See if we can find a previous sequence still committing.
+        * We can drop the flush lock as soon as we have the cil lock
+        * because we are now only comparing contexts protected by
+        * the cil lock.
+        *
+        * We need to wait for all previous sequence commits to complete
+        * before allowing the force of push_seq to go ahead. Hence block
+        * on commits for those as well.
+        */
+       spin_lock(&cil->xc_cil_lock);
+       up_write(&cil->xc_ctx_lock);
+       list_for_each_entry(ctx, &cil->xc_committing, committing) {
+               if (ctx->sequence > push_seq)
+                       continue;
+               if (!ctx->commit_lsn) {
+                       /*
+                        * It is still being pushed! Wait for the push to
+                        * complete, then start again from the beginning.
+                        */
+                       sv_wait(&cil->xc_commit_wait, 0, &cil->xc_cil_lock, 0);
+                       goto restart;
+               }
+               if (ctx->sequence != push_seq)
+                       continue;
+               /* found it! */
+               commit_lsn = ctx->commit_lsn;
+       }
+       spin_unlock(&cil->xc_cil_lock);
+       return commit_lsn;
+}
+
+/*
+ * Check if the current log item was first committed in this sequence.
+ * We can't rely on just the log item being in the CIL, we have to check
+ * the recorded commit sequence number.
+ *
+ * Note: for this to be used in a non-racy manner, it has to be called with
+ * CIL flushing locked out. As a result, it should only be used during the
+ * transaction commit process when deciding what to format into the item.
+ */
+bool
+xfs_log_item_in_current_chkpt(
+       struct xfs_log_item *lip)
+{
+       struct xfs_cil_ctx *ctx;
+
+       if (!(lip->li_mountp->m_flags & XFS_MOUNT_DELAYLOG))
+               return false;
+       if (list_empty(&lip->li_cil))
+               return false;
+
+       ctx = lip->li_mountp->m_log->l_cilp->xc_ctx;
+
+       /*
+        * li_seq is written on the first commit of a log item to record the
+        * first checkpoint it is written to. Hence if it is different to the
+        * current sequence, we're in a new checkpoint.
+        */
+       if (XFS_LSN_CMP(lip->li_seq, ctx->sequence) != 0)
+               return false;
+       return true;
+}
index 9cf6951..8c07261 100644 (file)
@@ -152,8 +152,6 @@ static inline uint xlog_get_client_id(__be32 i)
 #define        XLOG_RECOVERY_NEEDED    0x4     /* log was recovered */
 #define XLOG_IO_ERROR          0x8     /* log hit an I/O error, and being
                                           shutdown */
-typedef __uint32_t xlog_tid_t;
-
 
 #ifdef __KERNEL__
 /*
@@ -378,6 +376,99 @@ typedef struct xlog_in_core {
 #define ic_header      ic_data->hic_header
 } xlog_in_core_t;
 
+/*
+ * The CIL context is used to aggregate per-transaction details as well be
+ * passed to the iclog for checkpoint post-commit processing.  After being
+ * passed to the iclog, another context needs to be allocated for tracking the
+ * next set of transactions to be aggregated into a checkpoint.
+ */
+struct xfs_cil;
+
+struct xfs_cil_ctx {
+       struct xfs_cil          *cil;
+       xfs_lsn_t               sequence;       /* chkpt sequence # */
+       xfs_lsn_t               start_lsn;      /* first LSN of chkpt commit */
+       xfs_lsn_t               commit_lsn;     /* chkpt commit record lsn */
+       struct xlog_ticket      *ticket;        /* chkpt ticket */
+       int                     nvecs;          /* number of regions */
+       int                     space_used;     /* aggregate size of regions */
+       struct list_head        busy_extents;   /* busy extents in chkpt */
+       struct xfs_log_vec      *lv_chain;      /* logvecs being pushed */
+       xfs_log_callback_t      log_cb;         /* completion callback hook. */
+       struct list_head        committing;     /* ctx committing list */
+};
+
+/*
+ * Committed Item List structure
+ *
+ * This structure is used to track log items that have been committed but not
+ * yet written into the log. It is used only when the delayed logging mount
+ * option is enabled.
+ *
+ * This structure tracks the list of committing checkpoint contexts so
+ * we can avoid the problem of having to hold out new transactions during a
+ * flush until we have a the commit record LSN of the checkpoint. We can
+ * traverse the list of committing contexts in xlog_cil_push_lsn() to find a
+ * sequence match and extract the commit LSN directly from there. If the
+ * checkpoint is still in the process of committing, we can block waiting for
+ * the commit LSN to be determined as well. This should make synchronous
+ * operations almost as efficient as the old logging methods.
+ */
+struct xfs_cil {
+       struct log              *xc_log;
+       struct list_head        xc_cil;
+       spinlock_t              xc_cil_lock;
+       struct xfs_cil_ctx      *xc_ctx;
+       struct rw_semaphore     xc_ctx_lock;
+       struct list_head        xc_committing;
+       sv_t                    xc_commit_wait;
+};
+
+/*
+ * The amount of log space we should the CIL to aggregate is difficult to size.
+ * Whatever we chose we have to make we can get a reservation for the log space
+ * effectively, that it is large enough to capture sufficient relogging to
+ * reduce log buffer IO significantly, but it is not too large for the log or
+ * induces too much latency when writing out through the iclogs. We track both
+ * space consumed and the number of vectors in the checkpoint context, so we
+ * need to decide which to use for limiting.
+ *
+ * Every log buffer we write out during a push needs a header reserved, which
+ * is at least one sector and more for v2 logs. Hence we need a reservation of
+ * at least 512 bytes per 32k of log space just for the LR headers. That means
+ * 16KB of reservation per megabyte of delayed logging space we will consume,
+ * plus various headers.  The number of headers will vary based on the num of
+ * io vectors, so limiting on a specific number of vectors is going to result
+ * in transactions of varying size. IOWs, it is more consistent to track and
+ * limit space consumed in the log rather than by the number of objects being
+ * logged in order to prevent checkpoint ticket overruns.
+ *
+ * Further, use of static reservations through the log grant mechanism is
+ * problematic. It introduces a lot of complexity (e.g. reserve grant vs write
+ * grant) and a significant deadlock potential because regranting write space
+ * can block on log pushes. Hence if we have to regrant log space during a log
+ * push, we can deadlock.
+ *
+ * However, we can avoid this by use of a dynamic "reservation stealing"
+ * technique during transaction commit whereby unused reservation space in the
+ * transaction ticket is transferred to the CIL ctx commit ticket to cover the
+ * space needed by the checkpoint transaction. This means that we never need to
+ * specifically reserve space for the CIL checkpoint transaction, nor do we
+ * need to regrant space once the checkpoint completes. This also means the
+ * checkpoint transaction ticket is specific to the checkpoint context, rather
+ * than the CIL itself.
+ *
+ * With dynamic reservations, we can basically make up arbitrary limits for the
+ * checkpoint size so long as they don't violate any other size rules.  Hence
+ * the initial maximum size for the checkpoint transaction will be set to a
+ * quarter of the log or 8MB, which ever is smaller. 8MB is an arbitrary limit
+ * right now based on the latency of writing out a large amount of data through
+ * the circular iclog buffers.
+ */
+
+#define XLOG_CIL_SPACE_LIMIT(log)      \
+       (min((log->l_logsize >> 2), (8 * 1024 * 1024)))
+
 /*
  * The reservation head lsn is not made up of a cycle number and block number.
  * Instead, it uses a cycle number and byte number.  Logs don't expect to
@@ -388,6 +479,7 @@ typedef struct log {
        /* The following fields don't need locking */
        struct xfs_mount        *l_mp;          /* mount point */
        struct xfs_ail          *l_ailp;        /* AIL log is working with */
+       struct xfs_cil          *l_cilp;        /* CIL log is working with */
        struct xfs_buf          *l_xbuf;        /* extra buffer for log
                                                 * wrapping */
        struct xfs_buftarg      *l_targ;        /* buftarg of log */
@@ -438,14 +530,17 @@ typedef struct log {
 
 #define XLOG_FORCED_SHUTDOWN(log)      ((log)->l_flags & XLOG_IO_ERROR)
 
-
 /* common routines */
 extern xfs_lsn_t xlog_assign_tail_lsn(struct xfs_mount *mp);
 extern int      xlog_recover(xlog_t *log);
 extern int      xlog_recover_finish(xlog_t *log);
 extern void     xlog_pack_data(xlog_t *log, xlog_in_core_t *iclog, int);
 
-extern kmem_zone_t     *xfs_log_ticket_zone;
+extern kmem_zone_t *xfs_log_ticket_zone;
+struct xlog_ticket *xlog_ticket_alloc(struct log *log, int unit_bytes,
+                               int count, char client, uint xflags,
+                               int alloc_flags);
+
 
 static inline void
 xlog_write_adv_cnt(void **ptr, int *len, int *off, size_t bytes)
@@ -455,6 +550,21 @@ xlog_write_adv_cnt(void **ptr, int *len, int *off, size_t bytes)
        *off += bytes;
 }
 
+void   xlog_print_tic_res(struct xfs_mount *mp, struct xlog_ticket *ticket);
+int    xlog_write(struct log *log, struct xfs_log_vec *log_vector,
+                               struct xlog_ticket *tic, xfs_lsn_t *start_lsn,
+                               xlog_in_core_t **commit_iclog, uint flags);
+
+/*
+ * Committed Item List interfaces
+ */
+int    xlog_cil_init(struct log *log);
+void   xlog_cil_init_post_recovery(struct log *log);
+void   xlog_cil_destroy(struct log *log);
+
+int    xlog_cil_push(struct log *log, int push_now);
+xfs_lsn_t xlog_cil_push_lsn(struct log *log, xfs_lsn_t push_sequence);
+
 /*
  * Unmount record type is used as a pseudo transaction type for the ticket.
  * It's value must be outside the range of XFS_TRANS_* values.
index 0de08e3..14a69ae 100644 (file)
@@ -1576,7 +1576,7 @@ xlog_recover_reorder_trans(
 
                switch (ITEM_TYPE(item)) {
                case XFS_LI_BUF:
-                       if (!(buf_f->blf_flags & XFS_BLI_CANCEL)) {
+                       if (!(buf_f->blf_flags & XFS_BLF_CANCEL)) {
                                trace_xfs_log_recover_item_reorder_head(log,
                                                        trans, item, pass);
                                list_move(&item->ri_list, &trans->r_itemq);
@@ -1638,7 +1638,7 @@ xlog_recover_do_buffer_pass1(
        /*
         * If this isn't a cancel buffer item, then just return.
         */
-       if (!(flags & XFS_BLI_CANCEL)) {
+       if (!(flags & XFS_BLF_CANCEL)) {
                trace_xfs_log_recover_buf_not_cancel(log, buf_f);
                return;
        }
@@ -1696,7 +1696,7 @@ xlog_recover_do_buffer_pass1(
  * Check to see whether the buffer being recovered has a corresponding
  * entry in the buffer cancel record table.  If it does then return 1
  * so that it will be cancelled, otherwise return 0.  If the buffer is
- * actually a buffer cancel item (XFS_BLI_CANCEL is set), then decrement
+ * actually a buffer cancel item (XFS_BLF_CANCEL is set), then decrement
  * the refcount on the entry in the table and remove it from the table
  * if this is the last reference.
  *
@@ -1721,7 +1721,7 @@ xlog_check_buffer_cancelled(
                 * There is nothing in the table built in pass one,
                 * so this buffer must not be cancelled.
                 */
-               ASSERT(!(flags & XFS_BLI_CANCEL));
+               ASSERT(!(flags & XFS_BLF_CANCEL));
                return 0;
        }
 
@@ -1733,7 +1733,7 @@ xlog_check_buffer_cancelled(
                 * There is no corresponding entry in the table built
                 * in pass one, so this buffer has not been cancelled.
                 */
-               ASSERT(!(flags & XFS_BLI_CANCEL));
+               ASSERT(!(flags & XFS_BLF_CANCEL));
                return 0;
        }
 
@@ -1752,7 +1752,7 @@ xlog_check_buffer_cancelled(
                         * one in the table and remove it if this is the
                         * last reference.
                         */
-                       if (flags & XFS_BLI_CANCEL) {
+                       if (flags & XFS_BLF_CANCEL) {
                                bcp->bc_refcount--;
                                if (bcp->bc_refcount == 0) {
                                        if (prevp == NULL) {
@@ -1772,7 +1772,7 @@ xlog_check_buffer_cancelled(
         * We didn't find a corresponding entry in the table, so
         * return 0 so that the buffer is NOT cancelled.
         */
-       ASSERT(!(flags & XFS_BLI_CANCEL));
+       ASSERT(!(flags & XFS_BLF_CANCEL));
        return 0;
 }
 
@@ -1874,8 +1874,8 @@ xlog_recover_do_inode_buffer(
                        nbits = xfs_contig_bits(data_map, map_size,
                                                         bit);
                        ASSERT(nbits > 0);
-                       reg_buf_offset = bit << XFS_BLI_SHIFT;
-                       reg_buf_bytes = nbits << XFS_BLI_SHIFT;
+                       reg_buf_offset = bit << XFS_BLF_SHIFT;
+                       reg_buf_bytes = nbits << XFS_BLF_SHIFT;
                        item_index++;
                }
 
@@ -1889,7 +1889,7 @@ xlog_recover_do_inode_buffer(
                }
 
                ASSERT(item->ri_buf[item_index].i_addr != NULL);
-               ASSERT((item->ri_buf[item_index].i_len % XFS_BLI_CHUNK) == 0);
+               ASSERT((item->ri_buf[item_index].i_len % XFS_BLF_CHUNK) == 0);
                ASSERT((reg_buf_offset + reg_buf_bytes) <= XFS_BUF_COUNT(bp));
 
                /*
@@ -1955,9 +1955,9 @@ xlog_recover_do_reg_buffer(
                nbits = xfs_contig_bits(data_map, map_size, bit);
                ASSERT(nbits > 0);
                ASSERT(item->ri_buf[i].i_addr != NULL);
-               ASSERT(item->ri_buf[i].i_len % XFS_BLI_CHUNK == 0);
+               ASSERT(item->ri_buf[i].i_len % XFS_BLF_CHUNK == 0);
                ASSERT(XFS_BUF_COUNT(bp) >=
-                      ((uint)bit << XFS_BLI_SHIFT)+(nbits<<XFS_BLI_SHIFT));
+                      ((uint)bit << XFS_BLF_SHIFT)+(nbits<<XFS_BLF_SHIFT));
 
                /*
                 * Do a sanity check if this is a dquot buffer. Just checking
@@ -1966,7 +1966,7 @@ xlog_recover_do_reg_buffer(
                 */
                error = 0;
                if (buf_f->blf_flags &
-                  (XFS_BLI_UDQUOT_BUF|XFS_BLI_PDQUOT_BUF|XFS_BLI_GDQUOT_BUF)) {
+                  (XFS_BLF_UDQUOT_BUF|XFS_BLF_PDQUOT_BUF|XFS_BLF_GDQUOT_BUF)) {
                        if (item->ri_buf[i].i_addr == NULL) {
                                cmn_err(CE_ALERT,
                                        "XFS: NULL dquot in %s.", __func__);
@@ -1987,9 +1987,9 @@ xlog_recover_do_reg_buffer(
                }
 
                memcpy(xfs_buf_offset(bp,
-                       (uint)bit << XFS_BLI_SHIFT),    /* dest */
+                       (uint)bit << XFS_BLF_SHIFT),    /* dest */
                        item->ri_buf[i].i_addr,         /* source */
-                       nbits<<XFS_BLI_SHIFT);          /* length */
+                       nbits<<XFS_BLF_SHIFT);          /* length */
  next:
                i++;
                bit += nbits;
@@ -2148,11 +2148,11 @@ xlog_recover_do_dquot_buffer(
        }
 
        type = 0;
-       if (buf_f->blf_flags & XFS_BLI_UDQUOT_BUF)
+       if (buf_f->blf_flags & XFS_BLF_UDQUOT_BUF)
                type |= XFS_DQ_USER;
-       if (buf_f->blf_flags & XFS_BLI_PDQUOT_BUF)
+       if (buf_f->blf_flags & XFS_BLF_PDQUOT_BUF)
                type |= XFS_DQ_PROJ;
-       if (buf_f->blf_flags & XFS_BLI_GDQUOT_BUF)
+       if (buf_f->blf_flags & XFS_BLF_GDQUOT_BUF)
                type |= XFS_DQ_GROUP;
        /*
         * This type of quotas was turned off, so ignore this buffer
@@ -2173,7 +2173,7 @@ xlog_recover_do_dquot_buffer(
  * here which overlaps that may be stale.
  *
  * When meta-data buffers are freed at run time we log a buffer item
- * with the XFS_BLI_CANCEL bit set to indicate that previous copies
+ * with the XFS_BLF_CANCEL bit set to indicate that previous copies
  * of the buffer in the log should not be replayed at recovery time.
  * This is so that if the blocks covered by the buffer are reused for
  * file data before we crash we don't end up replaying old, freed
@@ -2207,7 +2207,7 @@ xlog_recover_do_buffer_trans(
        if (pass == XLOG_RECOVER_PASS1) {
                /*
                 * In this pass we're only looking for buf items
-                * with the XFS_BLI_CANCEL bit set.
+                * with the XFS_BLF_CANCEL bit set.
                 */
                xlog_recover_do_buffer_pass1(log, buf_f);
                return 0;
@@ -2244,7 +2244,7 @@ xlog_recover_do_buffer_trans(
 
        mp = log->l_mp;
        buf_flags = XBF_LOCK;
-       if (!(flags & XFS_BLI_INODE_BUF))
+       if (!(flags & XFS_BLF_INODE_BUF))
                buf_flags |= XBF_MAPPED;
 
        bp = xfs_buf_read(mp->m_ddev_targp, blkno, len, buf_flags);
@@ -2257,10 +2257,10 @@ xlog_recover_do_buffer_trans(
        }
 
        error = 0;
-       if (flags & XFS_BLI_INODE_BUF) {
+       if (flags & XFS_BLF_INODE_BUF) {
                error = xlog_recover_do_inode_buffer(mp, item, bp, buf_f);
        } else if (flags &
-                 (XFS_BLI_UDQUOT_BUF|XFS_BLI_PDQUOT_BUF|XFS_BLI_GDQUOT_BUF)) {
+                 (XFS_BLF_UDQUOT_BUF|XFS_BLF_PDQUOT_BUF|XFS_BLF_GDQUOT_BUF)) {
                xlog_recover_do_dquot_buffer(mp, log, item, bp, buf_f);
        } else {
                xlog_recover_do_reg_buffer(mp, item, bp, buf_f);
index 75d7492..1c55ccb 100644 (file)
@@ -28,7 +28,7 @@
 #define XLOG_RHASH(tid)        \
        ((((__uint32_t)tid)>>XLOG_RHASH_SHIFT) & (XLOG_RHASH_SIZE-1))
 
-#define XLOG_MAX_REGIONS_IN_ITEM   (XFS_MAX_BLOCKSIZE / XFS_BLI_CHUNK / 2 + 1)
+#define XLOG_MAX_REGIONS_IN_ITEM   (XFS_MAX_BLOCKSIZE / XFS_BLF_CHUNK / 2 + 1)
 
 
 /*
index 9ff48a1..1d2c7ee 100644 (file)
@@ -268,6 +268,7 @@ typedef struct xfs_mount {
 #define XFS_MOUNT_WSYNC                (1ULL << 0)     /* for nfs - all metadata ops
                                                   must be synchronous except
                                                   for space allocations */
+#define XFS_MOUNT_DELAYLOG     (1ULL << 1)     /* delayed logging is enabled */
 #define XFS_MOUNT_DMAPI                (1ULL << 2)     /* dmapi is enabled */
 #define XFS_MOUNT_WAS_CLEAN    (1ULL << 3)
 #define XFS_MOUNT_FS_SHUTDOWN  (1ULL << 4)     /* atomic stop of all filesystem
index be578ec..ce558ef 100644 (file)
@@ -44,6 +44,7 @@
 #include "xfs_trans_priv.h"
 #include "xfs_trans_space.h"
 #include "xfs_inode_item.h"
+#include "xfs_trace.h"
 
 kmem_zone_t    *xfs_trans_zone;
 
@@ -243,9 +244,8 @@ _xfs_trans_alloc(
        tp->t_type = type;
        tp->t_mountp = mp;
        tp->t_items_free = XFS_LIC_NUM_SLOTS;
-       tp->t_busy_free = XFS_LBC_NUM_SLOTS;
        xfs_lic_init(&(tp->t_items));
-       XFS_LBC_INIT(&(tp->t_busy));
+       INIT_LIST_HEAD(&tp->t_busy);
        return tp;
 }
 
@@ -255,8 +255,13 @@ _xfs_trans_alloc(
  */
 STATIC void
 xfs_trans_free(
-       xfs_trans_t     *tp)
+       struct xfs_trans        *tp)
 {
+       struct xfs_busy_extent  *busyp, *n;
+
+       list_for_each_entry_safe(busyp, n, &tp->t_busy, list)
+               xfs_alloc_busy_clear(tp->t_mountp, busyp);
+
        atomic_dec(&tp->t_mountp->m_active_trans);
        xfs_trans_free_dqinfo(tp);
        kmem_zone_free(xfs_trans_zone, tp);
@@ -285,9 +290,8 @@ xfs_trans_dup(
        ntp->t_type = tp->t_type;
        ntp->t_mountp = tp->t_mountp;
        ntp->t_items_free = XFS_LIC_NUM_SLOTS;
-       ntp->t_busy_free = XFS_LBC_NUM_SLOTS;
        xfs_lic_init(&(ntp->t_items));
-       XFS_LBC_INIT(&(ntp->t_busy));
+       INIT_LIST_HEAD(&ntp->t_busy);
 
        ASSERT(tp->t_flags & XFS_TRANS_PERM_LOG_RES);
        ASSERT(tp->t_ticket != NULL);
@@ -423,7 +427,6 @@ undo_blocks:
        return error;
 }
 
-
 /*
  * Record the indicated change to the given field for application
  * to the file system's superblock when the transaction commits.
@@ -652,7 +655,7 @@ xfs_trans_apply_sb_deltas(
  * XFS_TRANS_SB_DIRTY will not be set when the transaction is updated but we
  * still need to update the incore superblock with the changes.
  */
-STATIC void
+void
 xfs_trans_unreserve_and_mod_sb(
        xfs_trans_t     *tp)
 {
@@ -880,7 +883,7 @@ xfs_trans_fill_vecs(
  * they could be immediately flushed and we'd have to race with the flusher
  * trying to pull the item from the AIL as we add it.
  */
-static void
+void
 xfs_trans_item_committed(
        struct xfs_log_item     *lip,
        xfs_lsn_t               commit_lsn,
@@ -930,26 +933,6 @@ xfs_trans_item_committed(
        IOP_UNPIN(lip);
 }
 
-/* Clear all the per-AG busy list items listed in this transaction */
-static void
-xfs_trans_clear_busy_extents(
-       struct xfs_trans        *tp)
-{
-       xfs_log_busy_chunk_t    *lbcp;
-       xfs_log_busy_slot_t     *lbsp;
-       int                     i;
-
-       for (lbcp = &tp->t_busy; lbcp != NULL; lbcp = lbcp->lbc_next) {
-               i = 0;
-               for (lbsp = lbcp->lbc_busy; i < lbcp->lbc_unused; i++, lbsp++) {
-                       if (XFS_LBC_ISFREE(lbcp, i))
-                               continue;
-                       xfs_alloc_clear_busy(tp, lbsp->lbc_ag, lbsp->lbc_idx);
-               }
-       }
-       xfs_trans_free_busy(tp);
-}
-
 /*
  * This is typically called by the LM when a transaction has been fully
  * committed to disk.  It needs to unpin the items which have
@@ -984,7 +967,6 @@ xfs_trans_committed(
                kmem_free(licp);
        }
 
-       xfs_trans_clear_busy_extents(tp);
        xfs_trans_free(tp);
 }
 
@@ -1012,8 +994,7 @@ xfs_trans_uncommit(
        xfs_trans_unreserve_and_mod_sb(tp);
        xfs_trans_unreserve_and_mod_dquots(tp);
 
-       xfs_trans_free_items(tp, flags);
-       xfs_trans_free_busy(tp);
+       xfs_trans_free_items(tp, NULLCOMMITLSN, flags);
        xfs_trans_free(tp);
 }
 
@@ -1075,6 +1056,8 @@ xfs_trans_commit_iclog(
        *commit_lsn = xfs_log_done(mp, tp->t_ticket, &commit_iclog, log_flags);
 
        tp->t_commit_lsn = *commit_lsn;
+       trace_xfs_trans_commit_lsn(tp);
+
        if (nvec > XFS_TRANS_LOGVEC_COUNT)
                kmem_free(log_vector);
 
@@ -1161,6 +1144,93 @@ xfs_trans_commit_iclog(
        return xfs_log_release_iclog(mp, commit_iclog);
 }
 
+/*
+ * Walk the log items and allocate log vector structures for
+ * each item large enough to fit all the vectors they require.
+ * Note that this format differs from the old log vector format in
+ * that there is no transaction header in these log vectors.
+ */
+STATIC struct xfs_log_vec *
+xfs_trans_alloc_log_vecs(
+       xfs_trans_t     *tp)
+{
+       xfs_log_item_desc_t     *lidp;
+       struct xfs_log_vec      *lv = NULL;
+       struct xfs_log_vec      *ret_lv = NULL;
+
+       lidp = xfs_trans_first_item(tp);
+
+       /* Bail out if we didn't find a log item.  */
+       if (!lidp) {
+               ASSERT(0);
+               return NULL;
+       }
+
+       while (lidp != NULL) {
+               struct xfs_log_vec *new_lv;
+
+               /* Skip items which aren't dirty in this transaction. */
+               if (!(lidp->lid_flags & XFS_LID_DIRTY)) {
+                       lidp = xfs_trans_next_item(tp, lidp);
+                       continue;
+               }
+
+               /* Skip items that do not have any vectors for writing */
+               lidp->lid_size = IOP_SIZE(lidp->lid_item);
+               if (!lidp->lid_size) {
+                       lidp = xfs_trans_next_item(tp, lidp);
+                       continue;
+               }
+
+               new_lv = kmem_zalloc(sizeof(*new_lv) +
+                               lidp->lid_size * sizeof(struct xfs_log_iovec),
+                               KM_SLEEP);
+
+               /* The allocated iovec region lies beyond the log vector. */
+               new_lv->lv_iovecp = (struct xfs_log_iovec *)&new_lv[1];
+               new_lv->lv_niovecs = lidp->lid_size;
+               new_lv->lv_item = lidp->lid_item;
+               if (!ret_lv)
+                       ret_lv = new_lv;
+               else
+                       lv->lv_next = new_lv;
+               lv = new_lv;
+               lidp = xfs_trans_next_item(tp, lidp);
+       }
+
+       return ret_lv;
+}
+
+static int
+xfs_trans_commit_cil(
+       struct xfs_mount        *mp,
+       struct xfs_trans        *tp,
+       xfs_lsn_t               *commit_lsn,
+       int                     flags)
+{
+       struct xfs_log_vec      *log_vector;
+       int                     error;
+
+       /*
+        * Get each log item to allocate a vector structure for
+        * the log item to to pass to the log write code. The
+        * CIL commit code will format the vector and save it away.
+        */
+       log_vector = xfs_trans_alloc_log_vecs(tp);
+       if (!log_vector)
+               return ENOMEM;
+
+       error = xfs_log_commit_cil(mp, tp, log_vector, commit_lsn, flags);
+       if (error)
+               return error;
+
+       current_restore_flags_nested(&tp->t_pflags, PF_FSTRANS);
+
+       /* xfs_trans_free_items() unlocks them first */
+       xfs_trans_free_items(tp, *commit_lsn, 0);
+       xfs_trans_free(tp);
+       return 0;
+}
 
 /*
  * xfs_trans_commit
@@ -1221,7 +1291,11 @@ _xfs_trans_commit(
                xfs_trans_apply_sb_deltas(tp);
        xfs_trans_apply_dquot_deltas(tp);
 
-       error = xfs_trans_commit_iclog(mp, tp, &commit_lsn, flags);
+       if (mp->m_flags & XFS_MOUNT_DELAYLOG)
+               error = xfs_trans_commit_cil(mp, tp, &commit_lsn, flags);
+       else
+               error = xfs_trans_commit_iclog(mp, tp, &commit_lsn, flags);
+
        if (error == ENOMEM) {
                xfs_force_shutdown(mp, SHUTDOWN_LOG_IO_ERROR);
                error = XFS_ERROR(EIO);
@@ -1259,8 +1333,7 @@ out_unreserve:
                        error = XFS_ERROR(EIO);
        }
        current_restore_flags_nested(&tp->t_pflags, PF_FSTRANS);
-       xfs_trans_free_items(tp, error ? XFS_TRANS_ABORT : 0);
-       xfs_trans_free_busy(tp);
+       xfs_trans_free_items(tp, NULLCOMMITLSN, error ? XFS_TRANS_ABORT : 0);
        xfs_trans_free(tp);
 
        XFS_STATS_INC(xs_trans_empty);
@@ -1338,8 +1411,7 @@ xfs_trans_cancel(
        /* mark this thread as no longer being in a transaction */
        current_restore_flags_nested(&tp->t_pflags, PF_FSTRANS);
 
-       xfs_trans_free_items(tp, flags);
-       xfs_trans_free_busy(tp);
+       xfs_trans_free_items(tp, NULLCOMMITLSN, flags);
        xfs_trans_free(tp);
 }
 
index c62beee..8c69e78 100644 (file)
@@ -106,7 +106,8 @@ typedef struct xfs_trans_header {
 #define        XFS_TRANS_GROWFSRT_FREE         39
 #define        XFS_TRANS_SWAPEXT               40
 #define        XFS_TRANS_SB_COUNT              41
-#define        XFS_TRANS_TYPE_MAX              41
+#define        XFS_TRANS_CHECKPOINT            42
+#define        XFS_TRANS_TYPE_MAX              42
 /* new transaction types need to be reflected in xfs_logprint(8) */
 
 #define XFS_TRANS_TYPES \
@@ -148,6 +149,7 @@ typedef struct xfs_trans_header {
        { XFS_TRANS_GROWFSRT_FREE,      "GROWFSRT_FREE" }, \
        { XFS_TRANS_SWAPEXT,            "SWAPEXT" }, \
        { XFS_TRANS_SB_COUNT,           "SB_COUNT" }, \
+       { XFS_TRANS_CHECKPOINT,         "CHECKPOINT" }, \
        { XFS_TRANS_DUMMY1,             "DUMMY1" }, \
        { XFS_TRANS_DUMMY2,             "DUMMY2" }, \
        { XLOG_UNMOUNT_REC_TYPE,        "UNMOUNT" }
@@ -813,6 +815,7 @@ struct xfs_log_item_desc;
 struct xfs_mount;
 struct xfs_trans;
 struct xfs_dquot_acct;
+struct xfs_busy_extent;
 
 typedef struct xfs_log_item {
        struct list_head                li_ail;         /* AIL pointers */
@@ -828,6 +831,11 @@ typedef struct xfs_log_item {
                                                        /* buffer item iodone */
                                                        /* callback func */
        struct xfs_item_ops             *li_ops;        /* function list */
+
+       /* delayed logging */
+       struct list_head                li_cil;         /* CIL pointers */
+       struct xfs_log_vec              *li_lv;         /* active log vector */
+       xfs_lsn_t                       li_seq;         /* CIL commit seq */
 } xfs_log_item_t;
 
 #define        XFS_LI_IN_AIL   0x1
@@ -871,34 +879,6 @@ typedef struct xfs_item_ops {
 #define        XFS_ITEM_LOCKED         2
 #define XFS_ITEM_PUSHBUF       3
 
-/*
- * This structure is used to maintain a list of block ranges that have been
- * freed in the transaction.  The ranges are listed in the perag[] busy list
- * between when they're freed and the transaction is committed to disk.
- */
-
-typedef struct xfs_log_busy_slot {
-       xfs_agnumber_t          lbc_ag;
-       ushort                  lbc_idx;        /* index in perag.busy[] */
-} xfs_log_busy_slot_t;
-
-#define XFS_LBC_NUM_SLOTS      31
-typedef struct xfs_log_busy_chunk {
-       struct xfs_log_busy_chunk       *lbc_next;
-       uint                            lbc_free;       /* free slots bitmask */
-       ushort                          lbc_unused;     /* first unused */
-       xfs_log_busy_slot_t             lbc_busy[XFS_LBC_NUM_SLOTS];
-} xfs_log_busy_chunk_t;
-
-#define        XFS_LBC_MAX_SLOT        (XFS_LBC_NUM_SLOTS - 1)
-#define        XFS_LBC_FREEMASK        ((1U << XFS_LBC_NUM_SLOTS) - 1)
-
-#define        XFS_LBC_INIT(cp)        ((cp)->lbc_free = XFS_LBC_FREEMASK)
-#define        XFS_LBC_CLAIM(cp, slot) ((cp)->lbc_free &= ~(1 << (slot)))
-#define        XFS_LBC_SLOT(cp, slot)  (&((cp)->lbc_busy[(slot)]))
-#define        XFS_LBC_VACANCY(cp)     (((cp)->lbc_free) & XFS_LBC_FREEMASK)
-#define        XFS_LBC_ISFREE(cp, slot) ((cp)->lbc_free & (1 << (slot)))
-
 /*
  * This is the type of function which can be given to xfs_trans_callback()
  * to be called upon the transaction's commit to disk.
@@ -950,8 +930,7 @@ typedef struct xfs_trans {
        unsigned int            t_items_free;   /* log item descs free */
        xfs_log_item_chunk_t    t_items;        /* first log item desc chunk */
        xfs_trans_header_t      t_header;       /* header for in-log trans */
-       unsigned int            t_busy_free;    /* busy descs free */
-       xfs_log_busy_chunk_t    t_busy;         /* busy/async free blocks */
+       struct list_head        t_busy;         /* list of busy extents */
        unsigned long           t_pflags;       /* saved process flags state */
 } xfs_trans_t;
 
@@ -1025,9 +1004,6 @@ int               _xfs_trans_commit(xfs_trans_t *,
 void           xfs_trans_cancel(xfs_trans_t *, int);
 int            xfs_trans_ail_init(struct xfs_mount *);
 void           xfs_trans_ail_destroy(struct xfs_mount *);
-xfs_log_busy_slot_t *xfs_trans_add_busy(xfs_trans_t *tp,
-                                       xfs_agnumber_t ag,
-                                       xfs_extlen_t idx);
 
 extern kmem_zone_t     *xfs_trans_zone;
 
index 9cd8090..63d81a2 100644 (file)
@@ -114,7 +114,7 @@ _xfs_trans_bjoin(
        xfs_buf_item_init(bp, tp->t_mountp);
        bip = XFS_BUF_FSPRIVATE(bp, xfs_buf_log_item_t *);
        ASSERT(!(bip->bli_flags & XFS_BLI_STALE));
-       ASSERT(!(bip->bli_format.blf_flags & XFS_BLI_CANCEL));
+       ASSERT(!(bip->bli_format.blf_flags & XFS_BLF_CANCEL));
        ASSERT(!(bip->bli_flags & XFS_BLI_LOGGED));
        if (reset_recur)
                bip->bli_recur = 0;
@@ -511,7 +511,7 @@ xfs_trans_brelse(xfs_trans_t        *tp,
        bip = XFS_BUF_FSPRIVATE(bp, xfs_buf_log_item_t *);
        ASSERT(bip->bli_item.li_type == XFS_LI_BUF);
        ASSERT(!(bip->bli_flags & XFS_BLI_STALE));
-       ASSERT(!(bip->bli_format.blf_flags & XFS_BLI_CANCEL));
+       ASSERT(!(bip->bli_format.blf_flags & XFS_BLF_CANCEL));
        ASSERT(atomic_read(&bip->bli_refcount) > 0);
 
        /*
@@ -619,7 +619,7 @@ xfs_trans_bhold(xfs_trans_t *tp,
 
        bip = XFS_BUF_FSPRIVATE(bp, xfs_buf_log_item_t *);
        ASSERT(!(bip->bli_flags & XFS_BLI_STALE));
-       ASSERT(!(bip->bli_format.blf_flags & XFS_BLI_CANCEL));
+       ASSERT(!(bip->bli_format.blf_flags & XFS_BLF_CANCEL));
        ASSERT(atomic_read(&bip->bli_refcount) > 0);
        bip->bli_flags |= XFS_BLI_HOLD;
        trace_xfs_trans_bhold(bip);
@@ -641,7 +641,7 @@ xfs_trans_bhold_release(xfs_trans_t *tp,
 
        bip = XFS_BUF_FSPRIVATE(bp, xfs_buf_log_item_t *);
        ASSERT(!(bip->bli_flags & XFS_BLI_STALE));
-       ASSERT(!(bip->bli_format.blf_flags & XFS_BLI_CANCEL));
+       ASSERT(!(bip->bli_format.blf_flags & XFS_BLF_CANCEL));
        ASSERT(atomic_read(&bip->bli_refcount) > 0);
        ASSERT(bip->bli_flags & XFS_BLI_HOLD);
        bip->bli_flags &= ~XFS_BLI_HOLD;
@@ -704,7 +704,7 @@ xfs_trans_log_buf(xfs_trans_t       *tp,
                bip->bli_flags &= ~XFS_BLI_STALE;
                ASSERT(XFS_BUF_ISSTALE(bp));
                XFS_BUF_UNSTALE(bp);
-               bip->bli_format.blf_flags &= ~XFS_BLI_CANCEL;
+               bip->bli_format.blf_flags &= ~XFS_BLF_CANCEL;
        }
 
        lidp = xfs_trans_find_item(tp, (xfs_log_item_t*)bip);
@@ -762,8 +762,8 @@ xfs_trans_binval(
                ASSERT(!(XFS_BUF_ISDELAYWRITE(bp)));
                ASSERT(XFS_BUF_ISSTALE(bp));
                ASSERT(!(bip->bli_flags & (XFS_BLI_LOGGED | XFS_BLI_DIRTY)));
-               ASSERT(!(bip->bli_format.blf_flags & XFS_BLI_INODE_BUF));
-               ASSERT(bip->bli_format.blf_flags & XFS_BLI_CANCEL);
+               ASSERT(!(bip->bli_format.blf_flags & XFS_BLF_INODE_BUF));
+               ASSERT(bip->bli_format.blf_flags & XFS_BLF_CANCEL);
                ASSERT(lidp->lid_flags & XFS_LID_DIRTY);
                ASSERT(tp->t_flags & XFS_TRANS_DIRTY);
                return;
@@ -774,7 +774,7 @@ xfs_trans_binval(
         * in the buf log item.  The STALE flag will be used in
         * xfs_buf_item_unpin() to determine if it should clean up
         * when the last reference to the buf item is given up.
-        * We set the XFS_BLI_CANCEL flag in the buf log format structure
+        * We set the XFS_BLF_CANCEL flag in the buf log format structure
         * and log the buf item.  This will be used at recovery time
         * to determine that copies of the buffer in the log before
         * this should not be replayed.
@@ -792,9 +792,9 @@ xfs_trans_binval(
        XFS_BUF_UNDELAYWRITE(bp);
        XFS_BUF_STALE(bp);
        bip->bli_flags |= XFS_BLI_STALE;
-       bip->bli_flags &= ~(XFS_BLI_LOGGED | XFS_BLI_DIRTY);
-       bip->bli_format.blf_flags &= ~XFS_BLI_INODE_BUF;
-       bip->bli_format.blf_flags |= XFS_BLI_CANCEL;
+       bip->bli_flags &= ~(XFS_BLI_INODE_BUF | XFS_BLI_LOGGED | XFS_BLI_DIRTY);
+       bip->bli_format.blf_flags &= ~XFS_BLF_INODE_BUF;
+       bip->bli_format.blf_flags |= XFS_BLF_CANCEL;
        memset((char *)(bip->bli_format.blf_data_map), 0,
              (bip->bli_format.blf_map_size * sizeof(uint)));
        lidp->lid_flags |= XFS_LID_DIRTY;
@@ -802,16 +802,16 @@ xfs_trans_binval(
 }
 
 /*
- * This call is used to indicate that the buffer contains on-disk
- * inodes which must be handled specially during recovery.  They
- * require special handling because only the di_next_unlinked from
- * the inodes in the buffer should be recovered.  The rest of the
- * data in the buffer is logged via the inodes themselves.
+ * This call is used to indicate that the buffer contains on-disk inodes which
+ * must be handled specially during recovery.  They require special handling
+ * because only the di_next_unlinked from the inodes in the buffer should be
+ * recovered.  The rest of the data in the buffer is logged via the inodes
+ * themselves.
  *
- * All we do is set the XFS_BLI_INODE_BUF flag in the buffer's log
- * format structure so that we'll know what to do at recovery time.
+ * All we do is set the XFS_BLI_INODE_BUF flag in the items flags so it can be
+ * transferred to the buffer's log format structure so that we'll know what to
+ * do at recovery time.
  */
-/* ARGSUSED */
 void
 xfs_trans_inode_buf(
        xfs_trans_t     *tp,
@@ -826,7 +826,7 @@ xfs_trans_inode_buf(
        bip = XFS_BUF_FSPRIVATE(bp, xfs_buf_log_item_t *);
        ASSERT(atomic_read(&bip->bli_refcount) > 0);
 
-       bip->bli_format.blf_flags |= XFS_BLI_INODE_BUF;
+       bip->bli_flags |= XFS_BLI_INODE_BUF;
 }
 
 /*
@@ -908,9 +908,9 @@ xfs_trans_dquot_buf(
        ASSERT(XFS_BUF_ISBUSY(bp));
        ASSERT(XFS_BUF_FSPRIVATE2(bp, xfs_trans_t *) == tp);
        ASSERT(XFS_BUF_FSPRIVATE(bp, void *) != NULL);
-       ASSERT(type == XFS_BLI_UDQUOT_BUF ||
-              type == XFS_BLI_PDQUOT_BUF ||
-              type == XFS_BLI_GDQUOT_BUF);
+       ASSERT(type == XFS_BLF_UDQUOT_BUF ||
+              type == XFS_BLF_PDQUOT_BUF ||
+              type == XFS_BLF_GDQUOT_BUF);
 
        bip = XFS_BUF_FSPRIVATE(bp, xfs_buf_log_item_t *);
        ASSERT(atomic_read(&bip->bli_refcount) > 0);
index eb3fc57..f11d37d 100644 (file)
@@ -299,6 +299,7 @@ xfs_trans_next_item(xfs_trans_t *tp, xfs_log_item_desc_t *lidp)
 void
 xfs_trans_free_items(
        xfs_trans_t     *tp,
+       xfs_lsn_t       commit_lsn,
        int             flags)
 {
        xfs_log_item_chunk_t    *licp;
@@ -311,7 +312,7 @@ xfs_trans_free_items(
         * Special case the embedded chunk so we don't free it below.
         */
        if (!xfs_lic_are_all_free(licp)) {
-               (void) xfs_trans_unlock_chunk(licp, 1, abort, NULLCOMMITLSN);
+               (void) xfs_trans_unlock_chunk(licp, 1, abort, commit_lsn);
                xfs_lic_all_free(licp);
                licp->lic_unused = 0;
        }
@@ -322,7 +323,7 @@ xfs_trans_free_items(
         */
        while (licp != NULL) {
                ASSERT(!xfs_lic_are_all_free(licp));
-               (void) xfs_trans_unlock_chunk(licp, 1, abort, NULLCOMMITLSN);
+               (void) xfs_trans_unlock_chunk(licp, 1, abort, commit_lsn);
                next_licp = licp->lic_next;
                kmem_free(licp);
                licp = next_licp;
@@ -438,112 +439,3 @@ xfs_trans_unlock_chunk(
 
        return freed;
 }
-
-
-/*
- * This is called to add the given busy item to the transaction's
- * list of busy items.  It must find a free busy item descriptor
- * or allocate a new one and add the item to that descriptor.
- * The function returns a pointer to busy descriptor used to point
- * to the new busy entry.  The log busy entry will now point to its new
- * descriptor with its ???? field.
- */
-xfs_log_busy_slot_t *
-xfs_trans_add_busy(xfs_trans_t *tp, xfs_agnumber_t ag, xfs_extlen_t idx)
-{
-       xfs_log_busy_chunk_t    *lbcp;
-       xfs_log_busy_slot_t     *lbsp;
-       int                     i=0;
-
-       /*
-        * If there are no free descriptors, allocate a new chunk
-        * of them and put it at the front of the chunk list.
-        */
-       if (tp->t_busy_free == 0) {
-               lbcp = (xfs_log_busy_chunk_t*)
-                      kmem_alloc(sizeof(xfs_log_busy_chunk_t), KM_SLEEP);
-               ASSERT(lbcp != NULL);
-               /*
-                * Initialize the chunk, and then
-                * claim the first slot in the newly allocated chunk.
-                */
-               XFS_LBC_INIT(lbcp);
-               XFS_LBC_CLAIM(lbcp, 0);
-               lbcp->lbc_unused = 1;
-               lbsp = XFS_LBC_SLOT(lbcp, 0);
-
-               /*
-                * Link in the new chunk and update the free count.
-                */
-               lbcp->lbc_next = tp->t_busy.lbc_next;
-               tp->t_busy.lbc_next = lbcp;
-               tp->t_busy_free = XFS_LIC_NUM_SLOTS - 1;
-
-               /*
-                * Initialize the descriptor and the generic portion
-                * of the log item.
-                *
-                * Point the new slot at this item and return it.
-                * Also point the log item at its currently active
-                * descriptor and set the item's mount pointer.
-                */
-               lbsp->lbc_ag = ag;
-               lbsp->lbc_idx = idx;
-               return lbsp;
-       }
-
-       /*
-        * Find the free descriptor. It is somewhere in the chunklist
-        * of descriptors.
-        */
-       lbcp = &tp->t_busy;
-       while (lbcp != NULL) {
-               if (XFS_LBC_VACANCY(lbcp)) {
-                       if (lbcp->lbc_unused <= XFS_LBC_MAX_SLOT) {
-                               i = lbcp->lbc_unused;
-                               break;
-                       } else {
-                               /* out-of-order vacancy */
-                               cmn_err(CE_DEBUG, "OOO vacancy lbcp 0x%p\n", lbcp);
-                               ASSERT(0);
-                       }
-               }
-               lbcp = lbcp->lbc_next;
-       }
-       ASSERT(lbcp != NULL);
-       /*
-        * If we find a free descriptor, claim it,
-        * initialize it, and return it.
-        */
-       XFS_LBC_CLAIM(lbcp, i);
-       if (lbcp->lbc_unused <= i) {
-               lbcp->lbc_unused = i + 1;
-       }
-       lbsp = XFS_LBC_SLOT(lbcp, i);
-       tp->t_busy_free--;
-       lbsp->lbc_ag = ag;
-       lbsp->lbc_idx = idx;
-       return lbsp;
-}
-
-
-/*
- * xfs_trans_free_busy
- * Free all of the busy lists from a transaction
- */
-void
-xfs_trans_free_busy(xfs_trans_t *tp)
-{
-       xfs_log_busy_chunk_t    *lbcp;
-       xfs_log_busy_chunk_t    *lbcq;
-
-       lbcp = tp->t_busy.lbc_next;
-       while (lbcp != NULL) {
-               lbcq = lbcp->lbc_next;
-               kmem_free(lbcp);
-               lbcp = lbcq;
-       }
-
-       XFS_LBC_INIT(&tp->t_busy);
-       tp->t_busy.lbc_unused = 0;
-}
index 73e2ad3..c6e4f2c 100644 (file)
@@ -35,13 +35,14 @@ struct xfs_log_item_desc    *xfs_trans_find_item(struct xfs_trans *,
 struct xfs_log_item_desc       *xfs_trans_first_item(struct xfs_trans *);
 struct xfs_log_item_desc       *xfs_trans_next_item(struct xfs_trans *,
                                             struct xfs_log_item_desc *);
-void                           xfs_trans_free_items(struct xfs_trans *, int);
-void                           xfs_trans_unlock_items(struct xfs_trans *,
-                                                       xfs_lsn_t);
-void                           xfs_trans_free_busy(xfs_trans_t *tp);
-xfs_log_busy_slot_t            *xfs_trans_add_busy(xfs_trans_t *tp,
-                                                   xfs_agnumber_t ag,
-                                                   xfs_extlen_t idx);
+
+void   xfs_trans_unlock_items(struct xfs_trans *tp, xfs_lsn_t commit_lsn);
+void   xfs_trans_free_items(struct xfs_trans *tp, xfs_lsn_t commit_lsn,
+                               int flags);
+
+void   xfs_trans_item_committed(struct xfs_log_item *lip,
+                               xfs_lsn_t commit_lsn, int aborted);
+void   xfs_trans_unreserve_and_mod_sb(struct xfs_trans *tp);
 
 /*
  * AIL traversal cursor.
index b099045..3207752 100644 (file)
@@ -75,6 +75,8 @@ typedef       __uint32_t      xfs_dahash_t;   /* dir/attr hash value */
 
 typedef __uint16_t     xfs_prid_t;     /* prid_t truncated to 16bits in XFS */
 
+typedef __uint32_t     xlog_tid_t;     /* transaction ID type */
+
 /*
  * These types are 64 bits on disk but are either 32 or 64 bits in memory.
  * Disk based types: