org.objectweb.howl.log
Class LogBufferManager

java.lang.Object
  extended by org.objectweb.howl.log.LogObject
      extended by org.objectweb.howl.log.LogBufferManager

 class LogBufferManager
extends LogObject

Provides a generalized buffer manager for journals and loggers.

log records are written to disk as blocks of data. Block size is a multiple of 512 data to assure optimum disk performance.


Nested Class Summary
 class LogBufferManager.BufferPoolStats
           
static interface LogBufferManager.BufferPoolStatsMBean
           
(package private)  class LogBufferManager.FlushManager
          helper thread to flush buffers that have threads waiting longer than configured maximum.
 class LogBufferManager.ForceStats
           
static interface LogBufferManager.ForceStatsMBean
           
 class LogBufferManager.WriteStats
           
static interface LogBufferManager.WriteStatsMBean
           
 
Field Summary
private  LogBuffer[] bufferList
          array of all LogBuffer objects allocated.
private  Object bufferManagerLock
          mutex for synchronizing access to buffers list.
(package private)  int buffersWaitingForce
          number of buffers waiting to be forced.
private  LogBuffer fillBuffer
          The LogBuffer that is currently being filled.
(package private)  LogBufferManager.FlushManager flushManager
          thread used to flush long waiting buffers
private static String flushManagerName
          name of flush manager thread
private  boolean flushPartialBuffers
           
private  long forceCount
          number of times channel.force() called.
(package private)  long forceHalfOfBuffers
           
private  Object forceManagerLock
          mutex for synchronizing threads through the portion of force() that forces the channel.
(package private)  long forceMaxWaitingThreads
           
(package private)  long forceNoWaitingThreads
           
(package private)  long forceOnFileSwitch
           
(package private)  long forceOnTimeout
           
private  LogBuffer[] forceQueue
          queue of buffers waiting to be written.
(package private)  boolean forceRequired
          indicates if a force() must be called in the flush() method.
private  int fqGet
          next get workerID from forceQueue .
private  int fqPut
          next put workerID into forceQueue .
private  LogBuffer[] freeBuffer
          array of LogBuffer objects available for filling
private  int growPoolCounter
          number of times buffer size was increased because of threads waiting for buffers.
private  boolean haveIOException
          boolean is set true when an IOException is returned by a write or force to a log file.
private  IOException ioexception
          The last IOException returned to the logger
(package private)  int lastForceBSN
          last BSN forced to log.
private  long lastForceTOD
          time of last force used to compute totalTimeBetweenForce
private  LogFileManager lfm
          reference to LogFileManager that owns this Buffer Manager instance.
private  int maxBuffersForced
          maximum number of buffers forced by channel.force()
private  int maxThreadsWaitingForce
           
private  long maxTimeBetweenForce
           
private  long maxWriteTime
          maximum time (ms) for any single write
private  int minBuffersForced
          minimum number of buffers forced by channel.force().
private  long minTimeBetweenForce
           
(package private)  int nextFillBSN
          next block sequence number for fillBuffer.
(package private)  short nextIndex
          workerID into freeBuffer list maintained in getBuffer.
(package private)  int nextWriteBSN
          next BSN to be written to log.
private  long noRoomInBuffer
          number of times buffer was forced because it is full.
(package private)  long prevWriteTOD
          LogBuffer.tod from previous buffer written.
private  int threadsWaitingForce
          number of threads waiting for a force
private  int threadsWaitingForceThreshold
           
private  long totalForceTime
          total amount of time spent in channel.force();
private  long totalThreadsWaitingForce
           
private  long totalTimeBetweenForce
          total time between channel.force() calls
private  long totalWaitForWriteLockTime
          total amount of time (ms) spent waiting for the forceMangerLock
private  long totalWriteTime
          total amount of time spent in channel.write();
private  long waitForBuffer
          number of times there were no buffers available.
private  long writeCount
          number of times channel.write() called.
 
Fields inherited from class org.objectweb.howl.log.LogObject
config
 
Constructor Summary
LogBufferManager(Configuration config)
           
 
Method Summary
(package private)  int bsnFromMark(long mark)
          returns the BSN value portion of a log key mark .
(package private)  void close()
          Shutdown any threads that are started by this LogBufferManager instance.
private  String doubleToString(double val, int decimalPlaces)
          convert a double to String with fixed number of decimal places
(package private)  long elapsedTime(long startTime)
          compute elapsed time for an event
(package private)  void flushAll()
          flush active buffers to disk and wait for all LogBuffers to be returned to the freeBuffer pool.
private  void force(boolean timeout)
          forces buffer to disk.
(package private)  void forceCurrentBuffer()
          Force the current buffer to disk before starting a replay().
(package private)  void fqAdd(LogBuffer buffer)
          Add a buffer to the forceQueue.
private  LogBuffer getFillBuffer()
          returns a LogBuffer to be filled.
(package private)  LogBuffer getLogBuffer(int index)
          return a new instance of LogBuffer.
(package private)  String getStats()
          Returns an XML node containing statistics for the LogBufferManager.
 long getWaitForBuffer()
          provides synchronized access to waitForBuffer
(package private)  void init(LogFileManager lfm, int bsn)
          perform initialization following reposition of LogFileManager.
(package private)  long markFromBsn(int bsn, int offset)
          generate a log mark (log key).
(package private)  void open()
          Allocate pool of IO buffers for Logger.
(package private)  long put(short type, byte[][] data, boolean sync)
          writes data byte[][] to log and returns a log key.
private  void releaseBuffer(LogBuffer buffer)
          decrements count of threads waiting on this buffer.
(package private)  void replay(ReplayListener listener, long mark, boolean replayCtrlRecords)
          Replays log from requested mark forward to end of log.
private  void sync(LogBuffer logBuffer)
          Waits for logBuffer to be forced to disk.
 
Methods inherited from class java.lang.Object
clone, equals, finalize, getClass, hashCode, notify, notifyAll, toString, wait, wait, wait
 

Field Detail

flushPartialBuffers

private final boolean flushPartialBuffers
See Also:
Configuration.flushPartialBuffers

haveIOException

private boolean haveIOException
boolean is set true when an IOException is returned by a write or force to a log file.

Any attempt to write or force after haveIOException becomes true should result in an IOException being returned to the caller.


ioexception

private IOException ioexception
The last IOException returned to the logger


bufferManagerLock

private final Object bufferManagerLock
mutex for synchronizing access to buffers list.

also synchronizes access to fqPut in routines that put LogBuffers into the forceQueue[].


forceManagerLock

private final Object forceManagerLock
mutex for synchronizing threads through the portion of force() that forces the channel.


lfm

private LogFileManager lfm
reference to LogFileManager that owns this Buffer Manager instance.

See Also:
LogFileManager.getLogFileForWrite(LogBuffer)

forceRequired

final boolean forceRequired
indicates if a force() must be called in the flush() method.

Set false in constructor if config.getLogFileMode() is "rwd".


fillBuffer

private LogBuffer fillBuffer
The LogBuffer that is currently being filled.


freeBuffer

private LogBuffer[] freeBuffer
array of LogBuffer objects available for filling


bufferList

private LogBuffer[] bufferList
array of all LogBuffer objects allocated.

Used to find and debug buffers that are not in the freeBuffer list if logger hangs waiting for buffers to be returned to the freeBuffer pool.


nextIndex

short nextIndex
workerID into freeBuffer list maintained in getBuffer.


waitForBuffer

private long waitForBuffer
number of times there were no buffers available.

The FlushManager thread monitors this field to determine if the buffer pool needs to be grown.


noRoomInBuffer

private long noRoomInBuffer
number of times buffer was forced because it is full.


growPoolCounter

private int growPoolCounter
number of times buffer size was increased because of threads waiting for buffers.


nextFillBSN

int nextFillBSN
next block sequence number for fillBuffer.


nextWriteBSN

int nextWriteBSN
next BSN to be written to log.

synchronized by forceManagerLock


prevWriteTOD

long prevWriteTOD
LogBuffer.tod from previous buffer written.

maintained in force() method. Used to check against decrement in TOD field. Added to help investigate BUG 303907


buffersWaitingForce

int buffersWaitingForce
number of buffers waiting to be forced.

synchronized by bufferManagerLock.

incremented in put() and decremented in releaseBuffer(). When a thread calls put() with sync parameter set true, and buffersWaitingForce is also zero, then put() causes the buffer to be forced immediately. This strategy minimizes latency in situations of low load, such as a single thread running.


lastForceBSN

int lastForceBSN
last BSN forced to log.

synchronized by forceManagerLock


forceCount

private long forceCount
number of times channel.force() called.


writeCount

private long writeCount
number of times channel.write() called.


minBuffersForced

private int minBuffersForced
minimum number of buffers forced by channel.force().


maxBuffersForced

private int maxBuffersForced
maximum number of buffers forced by channel.force()


totalForceTime

private long totalForceTime
total amount of time spent in channel.force();


totalWriteTime

private long totalWriteTime
total amount of time spent in channel.write();


maxWriteTime

private long maxWriteTime
maximum time (ms) for any single write


totalWaitForWriteLockTime

private long totalWaitForWriteLockTime
total amount of time (ms) spent waiting for the forceMangerLock


totalTimeBetweenForce

private long totalTimeBetweenForce
total time between channel.force() calls


minTimeBetweenForce

private long minTimeBetweenForce

maxTimeBetweenForce

private long maxTimeBetweenForce

lastForceTOD

private long lastForceTOD
time of last force used to compute totalTimeBetweenForce


threadsWaitingForce

private int threadsWaitingForce
number of threads waiting for a force


maxThreadsWaitingForce

private int maxThreadsWaitingForce

totalThreadsWaitingForce

private long totalThreadsWaitingForce

threadsWaitingForceThreshold

private int threadsWaitingForceThreshold

forceOnTimeout

long forceOnTimeout

forceNoWaitingThreads

long forceNoWaitingThreads

forceHalfOfBuffers

long forceHalfOfBuffers

forceMaxWaitingThreads

long forceMaxWaitingThreads

forceOnFileSwitch

long forceOnFileSwitch

flushManager

final LogBufferManager.FlushManager flushManager
thread used to flush long waiting buffers


flushManagerName

private static final String flushManagerName
name of flush manager thread

See Also:
Constant Field Values

forceQueue

private LogBuffer[] forceQueue
queue of buffers waiting to be written. The queue guarantees that buffers are written to disk in BSN order. Buffers are placed into the forceQueue using the fqPut workerID, and removed from the forceQueue using the fqGet workerID. Access to these two workerID members is synchronized using separate objects to allow most threads to be storing log records while a single thread is blocked waiting for a physical force.

Buffers are added to the queue when put() detects the buffer is full, and when the FlushManager thread detects that a buffer has waited too long to be written. The fqPut member is the workerID of the next location in forceQueue to put a LogBuffer that is to be written. fqPut is protected by bufferManagerLock .

Buffers are removed from the queue in force() and written to disk. The fqGet member is an workerID to the next buffer to remove from the forceQueue. fqGet is protected by forceManagerLock .

The size of forceQueue[] is one larger than the size of freeBuffer[] so that fqPut == fqGet always means the queue is empty.


fqPut

private int fqPut
next put workerID into forceQueue .

synchronized by bufferManagerLock.


fqGet

private int fqGet
next get workerID from forceQueue .

synchronized by forceManagerLock.

Constructor Detail

LogBufferManager

LogBufferManager(Configuration config)
Parameters:
config - Configuration object
Method Detail

elapsedTime

final long elapsedTime(long startTime)
compute elapsed time for an event

Parameters:
startTime - time event began
Returns:
elapsed time (System.currentTimeMillis() - startTime)

force

private void force(boolean timeout)
            throws IOException,
                   InterruptedException
forces buffer to disk.

batches multiple buffers into a single force when possible.

Design Note:
It was suggested that using forceManagerLock to control writes from the forceQueue[] and forces would reduce overlap due to the amount of time that forceManagerLock is shut while channel.force() is active.

Experimented with using two separate locks to manage the channel.write() and the channel.force() calls, but it appears that thread calling channel.force() will block another thread trying to call channel.write() so both locks end up being shut anyway. Since two locks did not provide any measurable benefit, it seems best to use a single forceManagerLock to keep the code simple.

Throws:
IOException
InterruptedException

sync

private void sync(LogBuffer logBuffer)
           throws IOException,
                  InterruptedException
Waits for logBuffer to be forced to disk.

No monitors are owned when routine is entered.

Prior to calling sync(), the thread called put() with sync param set true to register the fact that the thread would wait for the force.

Throws:
IOException
InterruptedException

releaseBuffer

private void releaseBuffer(LogBuffer buffer)
decrements count of threads waiting on this buffer.

If count goes to zero, buffer is returned to the freeBuffer list, and any threads waiting for a free buffer are notified.

Parameters:
buffer - LogBuffer to be released
See Also:
buffersWaitingForce

getFillBuffer

private LogBuffer getFillBuffer()
                         throws LogFileOverflowException
returns a LogBuffer to be filled.

PRECONDITION: caller holds bufferManagerLock monitor.

Returns:
a LogBuffer to be filled.
Throws:
LogFileOverflowException

getLogBuffer

LogBuffer getLogBuffer(int index)
                 throws ClassNotFoundException
return a new instance of LogBuffer.

Actual LogBuffer implementation class is specified by configuration.

Returns:
a new instance of LogBuffer
Throws:
ClassNotFoundException

fqAdd

void fqAdd(LogBuffer buffer)
Add a buffer to the forceQueue.

PRECONDITION: bufferManagerLock owned by caller

Parameters:
buffer - LogBuffer to be added to the forceQueue

put

long put(short type,
         byte[][] data,
         boolean sync)
   throws LogRecordSizeException,
          LogFileOverflowException,
          InterruptedException,
          IOException
writes data byte[][] to log and returns a log key.

waits for IO to complete if sync is true.

MG 27/Jan/05 modified code to force buffer if caller has set sync == true, and there are no buffers waiting to be written. This causes buffers to be written immediately in a single threaded and/or low volume situation. Change suggested by developers at ApacheCon and at ObjectWebCon. This feature is disabled by default and is enabled by setting the log configuration property XXX to true.

Returns:
token reference (log key) for record just written
Throws:
LogRecordSizeException - when size of byte[] is larger than the maximum possible record for the configured buffer size.
LogFileOverflowException
InterruptedException
IOException
See Also:
buffersWaitingForce

forceCurrentBuffer

void forceCurrentBuffer()
                  throws IOException
Force the current buffer to disk before starting a replay().

Throws:
IOException

replay

void replay(ReplayListener listener,
            long mark,
            boolean replayCtrlRecords)
      throws LogConfigurationException,
             InvalidLogKeyException
Replays log from requested mark forward to end of log.

Blocks caller until replay completes due to end of log, or an exception is passed to listener.onError().

Parameters:
listener - ReplayListener to receive notifications for each log record.
mark - log key for the first record to be replayed.

If mark is zero then the entire active log is replayed.

replayCtrlRecords - indicates whether to return control records.

used by utility routines such as CopyLog.

Throws:
InvalidLogKeyException - if the requested key is not found in the log.
LogConfigurationException
See Also:
Logger.replay(ReplayListener, long)

open

void open()
    throws ClassNotFoundException
Allocate pool of IO buffers for Logger.

The LogBufferManager class is a generalized manager for any type of LogBuffer. The class name for the type of LogBuffer to use is specified by configuration parameters.

Throws:
ClassNotFoundException - if the configured LogBuffer class cannot be found.

close

void close()
Shutdown any threads that are started by this LogBufferManager instance.


init

void init(LogFileManager lfm,
          int bsn)
perform initialization following reposition of LogFileManager.

Parameters:
lfm - LogFileManager used by the buffer manager to obtain log files for writing buffers.
bsn - last Block Sequence Number written by Logger.

flushAll

void flushAll()
        throws IOException
flush active buffers to disk and wait for all LogBuffers to be returned to the freeBuffer pool.

May be called multiple times.

Throws:
IOException

doubleToString

private String doubleToString(double val,
                              int decimalPlaces)
convert a double to String with fixed number of decimal places

Parameters:
val - double to be converted
decimalPlaces - number of decimal places in output
Returns:
String result of conversion

getStats

String getStats()
Returns an XML node containing statistics for the LogBufferManager.

The nested element contains entries for each LogBuffer object in the buffer pool.

Returns:
a String containing statistics.

bsnFromMark

int bsnFromMark(long mark)
returns the BSN value portion of a log key mark .

Parameters:
mark - log key or log mark to extract BSN from.
Returns:
BSN portion of mark

markFromBsn

long markFromBsn(int bsn,
                 int offset)
generate a log mark (log key).

Parameters:
bsn - Block Sequence Number.
offset - offset within block.

May be zero to allow access to the beginning of a block.

Returns:
a log key.

getWaitForBuffer

public final long getWaitForBuffer()
provides synchronized access to waitForBuffer

Returns:
the current value of waitForBuffer