We have also been working on a Lucene-based distributed index architecture.
Our design differs from the above proposals in the way it leverages Hadoop
as much as possible. In particular, HDFS is used to reliably store Lucene
instances, Map/Reduce is used to analyze documents and update Lucene
in parallel, and Hadoop's IPC framework is used. Our design is geared for
applications that require a highly scalable index and where batch updates
to each Lucene instance are acceptable (verses finer-grained document at
a time updates).
We have a working implementation of our design and are in the process
of evaluating its performance. An overview of our design is provided below.
We welcome feedback and would like to know if you are interested in working
on it. If so, we would be happy to make the code publicly available. At the
same time, we would like to collaborate with people working on existing
proposals and see if we can consolidate our efforts.
A distributed "index" is partitioned into "shards". Each shard corresponds
a Lucene instance and contains a disjoint subset of the documents in the
Each shard is stored in HDFS and served by one or more "shard servers". Here
we only talk about a single distributed index, but in practice multiple
can be supported.
A "master" keeps track of the shard servers and the shards being served by
them. An "application" updates and queries the global index through an
"index client". An index client communicates with the shard servers to
execute a query.
KEY RPC METHODS
This section lists the key RPC methods in our design. To simplify the
discussion, some of their parameters have been omitted.
On the Shard Servers
// Execute a query on this shard server's Lucene instance.
// This method is called by an index client.
SearchResults search(Query query);
On the Master
// Tell the master to update the shards, i.e., Lucene instances.
// This method is called by an index client.
boolean updateShards(Configuration conf);
// Ask the master where the shards are located.
// This method is called by an index client.
// Send a heartbeat to the master. This method is called by a
// shard server. In the response, the master informs the
// shard server when to switch to a newer version of the index.
QUERYING THE INDEX
To query the index, an application sends a search request to an index
The index client then calls the shard server search() method for each shard
of the index, merges the results and returns them to the application. The
index client caches the mapping between shards and shard servers by
periodically calling the master's getShardLocations() method.
UPDATING THE INDEX USING MAP/REDUCE
To update the index, an application sends an update request to an index
The index client then calls the master's updateShards() method, which
a Map/Reduce job to update the index. The Map/Reduce job updates the shards
parallel and copies the new index files of each shard (i.e., Lucene
The updateShards() method includes a "configuration", which provides
information for updating the shards. More specifically, the configuration
includes the following information:
- Input path. This provides the location of updated documents, e.g., HDFS
files or directories, or HBase tables.
- Input formatter. This specifies how to format the input documents.
- Analysis. This defines the analyzer to use on the input. The analyzer
determines whether a document is being inserted, updated, or deleted.
inserts or updates, the analyzer also converts each input document into
a Lucene document.
The Map phase of the Map/Reduce job formats and analyzes the input (in
parallel), while the Reduce phase collects and applies the updates to each
Lucene instance (again in parallel). The updates are applied using the local
file system where a Reduce task runs and then copied back to HDFS. For
if the updates caused a new Lucene segment to be created, the new segment
would be created on the local file system first, and then copied back to
When the Map/Reduce job completes, a "new version" of the index is ready to
queried. It is important to note that the new version of the index is not
derived from scratch. By leveraging Lucene's update algorithm, the new
of each Lucene instance will share as many files as possible as the previous
ENSURING INDEX CONSISTENCY
At any point in time, an index client always has a consistent view of the
shards in the index. The results of a search query include either all or
of a recent update to the index. The details of the algorithm to accomplish
this are omitted here, but the basic flow is pretty simple.
After the Map/Reduce job to update the shards completes, the master will
each shard server to "prepare" the new version of the index. After all the
shard servers have responded affirmatively to the "prepare" message, the new
index is ready to be queried. An index client will then lazily learn about
the new index when it makes its next getShardLocations() call to the master.
In essence, a lazy two-phase commit protocol is used, with "prepare" and
"commit" messages piggybacked on heartbeats. After a shard has switched to
the new index, the Lucene files in the old index that are no longer needed
can safely be deleted.
We rely on the fault-tolerance of Map/Reduce to guarantee that an index
will eventually succeed. All shards are stored in HDFS and can be read by
shard server in a cluster. For a given shard, if one of its shard servers
new search requests are handled by its surviving shard servers. To ensure
there is always enough coverage for a shard, the master will instruct other
shard servers to take over the shards of a dead shard server.
Currently, each shard server reads a shard directly from HDFS. Experiments
have shown that this approach does not perform very well, with HDFS causing
Lucene to slow down fairly dramatically (by well over 5x when data blocks
accessed over the network). Consequently, we are exploring different ways to
leverage the fault tolerance of HDFS and, at the same time, work around its
performance problems. One simple alternative is to add a local file system
cache on each shard server. Another alternative is to modify HDFS so that an
application has more control over where to store the primary and replicas of
an HDFS block. This feature may be useful for other HDFS applications (e.g.,
HBase). We would like to collaborate with other people who are interested in
adding this feature to HDFS.
Re: Lucene-based Distributed Index Leveraging Hadoop
Clay Webster wrote:
> There seem to be a few other players in this space too.
> Are you from Rackspace?
> (http://highscalability.com/how-rackspace-now-uses-mapreduce-and-hadoop- > query-terabytes-data)
> AOL also has a Hadoop/Solr project going on.
> CNET does not have much brewing there. Although Yonik and I had
> talked about it a bunch -- but that was long ago.
AOL has a couple of projects going on in the lucene/hadoop/solr space,
and we will be pushing more stuff out as we can. We don't have anything
going with solr over hadoop at the moment.
I'm not sure if this would be better than what SOLR-303 does, but you
should have a look at the work being done there.
One of the things you mentioned is that the data sets are disjoint.
SOLR-303 doesn't require this, and allows us to have a document stored
in multiple shards (with different caching/update characteristics).
I work for IBM Research. I read the Rackspace article. Rackspace's Mailtrust
has a similar design. Happy to see an existing application on such a system.
Do they plan to open-source it? Is the AOL project an open source project?
One main focus is to provide fault-tolerance in this distributed index
system. Correct me if I'm wrong, I think SOLR-303 is focusing on merging
results from multiple shards right now. We'd like to start an open source
project for a fault-tolerant distributed index system (or join if one
already exists) if there is enough interest. Making Solr work on top of such
a system could be an important goal and SOLR-303 is a big part of it in that
I should have made it clear that disjoint data sets are not a requirement of
> AOL has a couple of projects going on in the lucene/hadoop/solr space,
> and we will be pushing more stuff out as we can. We don't have anything
> going with solr over hadoop at the moment.
> I'm not sure if this would be better than what SOLR-303 does, but you
> should have a look at the work being done there.
> One of the things you mentioned is that the data sets are disjoint.
> SOLR-303 doesn't require this, and allows us to have a document stored
> in multiple shards (with different caching/update characteristics).
Doug Cutting wrote:
> I am also interested in starting a new project in this area. The
> approach I have in mind is slightly different, but hopefully we can come
> to some agreement and collaborate.
I'm interested in this too.
> My current thinking is that the Solr search API is the appropriate
> model. Solr's facets are an important feature that require low-level
> support to be practical. Thus a useful distributed search system should
> support facets from the outset, rather than attempt to graft them on
> later. In particular, I believe this requirement mandates disjoint shards.
I agree - shards should be disjoint also because if we eventually want
to manage multiple replicas of each shard across the cluster (for
reliability and performance) then overlapping documents would complicate
both the query dispatching process and the merging of partial result sets.
> My primary difference with your proposal is that I would like to support
> online indexing. Documents could be inserted and removed directly, and
> shards would synchronize changes amongst replicas, with an "eventual
> consistency" model. Indexes would not be stored in HDFS, but directly
> on the local disk of each node. Hadoop would perhaps not play a role.
> In many ways this would resemble CouchDB, but with explicit support for
> sharding and failover from the outset.
It's true that searching over HDFS is slow - but I'd hate to lose all
other HDFS benefits and have to start from scratch ... I wonder what
would be the performance of FsDirectory over an HDFS index that is
"pinned" to a local disk, i.e. a full local replica is available, with
block size of each index file equal to the file size.
> A particular client should be able to provide a consistent read/write
> view by bonding to particular replicas of a shard. Thus a user who
> makes a modification should be able to generally see that modification
> in results immediately, while other users, talking to different
> replicas, may not see it until synchronization is complete.
This requires that we use versioning, and that we have a "shard manager"
that knows the latest versions of each shard among the whole active set
- or that clients discover this dynamically by querying the shard
servers every now and then.
Andrzej Bialecki <><
___. ___ ___ ___ _ _ __________________________________
[__ || __|__/|__||\/| Information Retrieval, Semantic Web
___|||__|| \| || | Embedded Unix, System Integration
http://www.sigram.com Contact: info at sigram dot com
In continuation with our offline conversation, here is a public
expression of interest in your work and a description of our work. Sorry
for the length in advance and I hope that the folk will be able to
collaborate and/or share experiences and/or give us some pointers...
1) We are trying to leverage Lucene on Hadoop for blog archiving and
searching i.e. ever-increasing data (in terabytes) on commodity hardware
in a generic LAN. These machines are not hi-spec nor are dedicated but
actually used within the lab by users for day to day tasks.
Unfortunately, Nutch and Solr are not applicable to our situation -
atleast directly. Think of us as an academic oriented Technorati
2) There are 2 aspects.One is that we want to archive the blogposts that
we hit under a UUID/timestamp taxonomy. This archive can be used for
many things like cached copies, diffing, surf acceleration etc. The
other aspect is to archive the indexes. You see, the indexes have a
lifecycle. For simplicity sake, an index consists of one days worth of
blogposts (roughly, 15MM documents) and follow the <YYYYMMDD> taxonomy.
Ideally, we want to store an indefinite archive of blogposts and their
indexes side-by-side but 1 year or 365 days is a start
3) We want to use the taxonomical name of the post as a specific ID
field in the Lucene index and want to get away with not storing the
content of the post at all but only a file pointer/reference to it. This
we hope will keep the index sizes low but the fact remains that this is
a case of multiple threads on multiple JVMs handling multiple indexes on
multiple machines. Further, the posts and indexes are mostly WORM but
there may be situations where they have to be updated. For example, if
some blog posts have edited content or have to be removed for copyright,
or updated with metadata like rank. There is some duplication detection
work that has to be done here but it is out of scope for now. And oh,
the lab is a Linux-Windows environment
4) Our first port of call is to have Hadoop running on this group of
machines (without clustering or load balancing or grid or master/slave
mumbo jumbo) in the simplest way possible. The goal being to make
applications see the bunch of machines as a reliable, scalable,
fault-tolerant, average-performing file store with simple, file CRUD
operations. For example, the blog crawler should be able to put the
blogposts in this HDFS in live or in batch mode. With about 20 machines
and each being installed with a 240GB drive for the experiment, we have
about 4.5 TB of storage available
5) Next we want to handle Lucene and exploit the structure of its index
and the algorithms behind it. Since a Lucene index is a directory of
files, we intend to 'tag' the files as belonging to one index and store
them on the HDFS. At any instant in time, an index can be regenerated
and used. The regenerated index is however not used directly from HDFS
but copied into the local filesystem of the indexer/searcher. This copy
is subject to change and every once in a while, the constituent files in
the HDFS are overwritten with the latest files. Hence, naming is quite
important to us. Even so, we feel that the number of files that have to
be updated are quite less and that we can use MD5 sums to make sure we
only update the content changed files. However, this means that out of
4.5 TB available, we use half of it for archival and the other half for
searching. Even so, we should be able to store a years worth of posts
and indexes. Disks are no problem
6) Right then. So, we have (365*15MM) posts and (365*LFi) Lucene file
segments on the HDFS. Suppose there are N machines online, then each
machine will have to own 365/N indexes. N constantly keeps changing but
at any instant the 365 indexes should be live and we are working on the
best way to achieve this kind of 'fair' autonomic computing cloud where
when a machine goes down, the other machines will add some indexes to
their kitty. If a machine is added, then it relieves other machines of
some indexes. The searcher runs on each of these machines and is a
service (IP:port) and queries are served using a ParallelMultiSearch()
[on the machines] and a MultiSearch() [within the machines] so that we
need not have an unmanageable number of JVMs per machine. Atmost, 1 for
Hadoop, 1 for Cloud and 1 for Search. We are wondering if Solr can be
used for search if it supports multiple indexes available on the same
As you can see, this is not a simple endeavour and it is obvious, I
suppose, that we are still in theory stage and only now getting to know
the Lucene projects better. There is a huge body of work, albeit not
acknowledged in the scientific community as it should be, and I want to
say kudos to all who have been responsible for it.
I wish and hope to utilize the collective consciousness to mount our
challenge. Any pointers, code, help, collaboration et al. for any of the
6 points above - it goes with saying/asking - is welcome and look
forward to share our experiences in a formal written discourse as and
when we have them.