public class ConcurrentMergeScheduler extends MergeScheduler
MergeScheduler
that runs each merge using a
separate thread.
Specify the max number of threads that may run at
once, and the maximum number of simultaneous merges
with setMaxMergesAndThreads(int, int)
.
If the number of merges exceeds the max number of threads then the largest merges are paused until one of the smaller merges completes.
If more than getMaxMergeCount()
merges are
requested then this class will forcefully throttle the
incoming threads by pausing until one more merges
complete.
This class attempts to detect whether the index is
on rotational storage (traditional hard drive) or not
(e.g. solid-state disk) and changes the default max merge
and thread count accordingly. This detection is currently
Linux-only, and relies on the OS to put the right value
into /sys/block/<dev>/block/rotational. For all
other operating systems it currently assumes a rotational
disk for backwards compatibility. To enable default
settings for spinning or solid state disks for such
operating systems, use setDefaultMaxMergesAndThreads(boolean)
.
Modifier and Type | Class and Description |
---|---|
protected class |
ConcurrentMergeScheduler.MergeThread
Runs a merge thread to execute a single merge, then exits.
|
Modifier and Type | Field and Description |
---|---|
static int |
AUTO_DETECT_MERGES_AND_THREADS
Dynamic default for
maxThreadCount and maxMergeCount ,
used to detect whether the index is backed by an SSD or rotational disk and
set maxThreadCount accordingly. |
static String |
DEFAULT_CPU_CORE_COUNT_PROPERTY
Used for testing.
|
static String |
DEFAULT_SPINS_PROPERTY
Used for testing.
|
protected int |
mergeThreadCount
How many
ConcurrentMergeScheduler.MergeThread s have kicked off (this is use
to name them). |
protected List<ConcurrentMergeScheduler.MergeThread> |
mergeThreads
List of currently active
ConcurrentMergeScheduler.MergeThread s. |
protected double |
targetMBPerSec
Current IO writes throttle rate
|
infoStream
Constructor and Description |
---|
ConcurrentMergeScheduler()
Sole constructor, with all settings set to default
values.
|
Modifier and Type | Method and Description |
---|---|
void |
close()
Close this MergeScheduler.
|
void |
disableAutoIOThrottle()
Turn off auto IO throttling.
|
protected void |
doMerge(IndexWriter writer,
MergePolicy.OneMerge merge)
Does the actual merge, by calling
IndexWriter.merge(org.apache.lucene.index.MergePolicy.OneMerge) |
protected void |
doStall()
Called from
maybeStall(org.apache.lucene.index.IndexWriter) to pause the calling thread for a bit. |
void |
enableAutoIOThrottle()
Turn on dynamic IO throttling, to adaptively rate limit writes
bytes/sec to the minimal rate necessary so merges do not fall behind.
|
boolean |
getAutoIOThrottle()
Returns true if auto IO throttling is currently enabled.
|
double |
getForceMergeMBPerSec()
Get the per-merge IO throttle rate for forced merges.
|
double |
getIORateLimitMBPerSec()
Returns the currently set per-merge IO writes rate limit, if
enableAutoIOThrottle()
was called, else Double.POSITIVE_INFINITY . |
int |
getMaxMergeCount()
|
int |
getMaxThreadCount()
Returns
maxThreadCount . |
protected ConcurrentMergeScheduler.MergeThread |
getMergeThread(IndexWriter writer,
MergePolicy.OneMerge merge)
Create and return a new MergeThread
|
protected void |
handleMergeException(Directory dir,
Throwable exc)
Called when an exception is hit in a background merge
thread
|
protected boolean |
maybeStall(IndexWriter writer)
This is invoked by
merge(org.apache.lucene.index.IndexWriter, org.apache.lucene.index.MergeTrigger, boolean) to possibly stall the incoming
thread when there are too many merges running or pending. |
void |
merge(IndexWriter writer,
MergeTrigger trigger,
boolean newMergesFound)
Run the merges provided by
IndexWriter.getNextMerge() . |
int |
mergeThreadCount()
Returns the number of merge threads that are alive, ignoring the calling thread
if it is a merge thread.
|
void |
setDefaultMaxMergesAndThreads(boolean spins)
Sets max merges and threads to proper defaults for rotational
or non-rotational storage.
|
void |
setForceMergeMBPerSec(double v)
Set the per-merge IO throttle rate for forced merges (default:
Double.POSITIVE_INFINITY ). |
void |
setMaxMergesAndThreads(int maxMergeCount,
int maxThreadCount)
Expert: directly set the maximum number of merge threads and
simultaneous merges allowed.
|
void |
sync()
Wait for any running merge threads to finish.
|
protected void |
targetMBPerSecChanged()
Subclass can override to tweak targetMBPerSec.
|
String |
toString() |
protected void |
updateMergeThreads()
Called whenever the running merges have changed, to set merge IO limits.
|
Directory |
wrapForMerge(MergePolicy.OneMerge merge,
Directory in)
Wraps the incoming
Directory so that we can merge-throttle it
using RateLimitedIndexOutput . |
message, verbose
public static final int AUTO_DETECT_MERGES_AND_THREADS
maxThreadCount
and maxMergeCount
,
used to detect whether the index is backed by an SSD or rotational disk and
set maxThreadCount
accordingly. If it's an SSD,
maxThreadCount
is set to max(1, min(4, cpuCoreCount/2))
,
otherwise 1. Note that detection only currently works on
Linux; other platforms will assume the index is not on an SSD.public static final String DEFAULT_CPU_CORE_COUNT_PROPERTY
public static final String DEFAULT_SPINS_PROPERTY
protected final List<ConcurrentMergeScheduler.MergeThread> mergeThreads
ConcurrentMergeScheduler.MergeThread
s.protected int mergeThreadCount
ConcurrentMergeScheduler.MergeThread
s have kicked off (this is use
to name them).protected double targetMBPerSec
public ConcurrentMergeScheduler()
public void setMaxMergesAndThreads(int maxMergeCount, int maxThreadCount)
maxMergeCount
- the max # simultaneous merges that are allowed.
If a merge is necessary yet we already have this many
threads running, the incoming thread (that is calling
add/updateDocument) will block until a merge thread
has completed. Note that we will only run the
smallest maxThreadCount
merges at a time.maxThreadCount
- the max # simultaneous merge threads that should
be running at once. This must be <= maxMergeCount
public void setDefaultMaxMergesAndThreads(boolean spins)
spins
- true to set defaults best for traditional rotatational storage (spinning disks),
else false (e.g. for solid-state disks)public void setForceMergeMBPerSec(double v)
Double.POSITIVE_INFINITY
).public double getForceMergeMBPerSec()
public void enableAutoIOThrottle()
public void disableAutoIOThrottle()
enableAutoIOThrottle()
public boolean getAutoIOThrottle()
public double getIORateLimitMBPerSec()
enableAutoIOThrottle()
was called, else Double.POSITIVE_INFINITY
.public int getMaxThreadCount()
maxThreadCount
.setMaxMergesAndThreads(int, int)
public int getMaxMergeCount()
public Directory wrapForMerge(MergePolicy.OneMerge merge, Directory in)
MergeScheduler
Directory
so that we can merge-throttle it
using RateLimitedIndexOutput
.wrapForMerge
in class MergeScheduler
protected void updateMergeThreads()
public void close()
MergeScheduler
close
in interface Closeable
close
in interface AutoCloseable
close
in class MergeScheduler
public void sync()
close()
.public int mergeThreadCount()
mergeThreads
size.public void merge(IndexWriter writer, MergeTrigger trigger, boolean newMergesFound) throws IOException
MergeScheduler
IndexWriter.getNextMerge()
.merge
in class MergeScheduler
writer
- the IndexWriter
to obtain the merges from.trigger
- the MergeTrigger
that caused this merge to happennewMergesFound
- true
iff any new merges were found by the caller otherwise false
IOException
protected boolean maybeStall(IndexWriter writer)
merge(org.apache.lucene.index.IndexWriter, org.apache.lucene.index.MergeTrigger, boolean)
to possibly stall the incoming
thread when there are too many merges running or pending. The
default behavior is to force this thread, which is producing too
many segments for merging to keep up, to wait until merges catch
up. Applications that can take other less drastic measures, such
as limiting how many threads are allowed to index, can do nothing
here and throttle elsewhere.
If this method wants to stall but the calling thread is a merge
thread, it should return false to tell caller not to kick off
any new merges.protected void doStall()
maybeStall(org.apache.lucene.index.IndexWriter)
to pause the calling thread for a bit.protected void doMerge(IndexWriter writer, MergePolicy.OneMerge merge) throws IOException
IndexWriter.merge(org.apache.lucene.index.MergePolicy.OneMerge)
IOException
protected ConcurrentMergeScheduler.MergeThread getMergeThread(IndexWriter writer, MergePolicy.OneMerge merge) throws IOException
IOException
protected void handleMergeException(Directory dir, Throwable exc)
protected void targetMBPerSecChanged()
Copyright © 2000-2019 Apache Software Foundation. All Rights Reserved.