[jira] Created: (HADOOP-830) Improve the performance of the Merge phase

classic Classic list List threaded Threaded
29 messages Options
12
Reply | Threaded
Open this post in threaded view
|

[jira] Created: (HADOOP-830) Improve the performance of the Merge phase

David Eric Pugh (Jira)
Improve the performance of the Merge phase
------------------------------------------

                 Key: HADOOP-830
                 URL: http://issues.apache.org/jira/browse/HADOOP-830
             Project: Hadoop
          Issue Type: Improvement
          Components: mapred
            Reporter: Devaraj Das
         Assigned To: Devaraj Das


This issue is about trying to improve the performance of the merge phase (especially on the reduces). Currently, all the map outputs are copied to disk and then the merge phase starts (just to make a note - sorting happens on the maps).

The first optimization that I plan to implement is to do in-memory merging of the map outputs. There are two buffers maintained -
1) a scratch buffer for writing map outputs (directly off the socket). This is a first-come-first-serve buffer (as opposed to strategies like best fit). The map output copier copies the map output off the socket and puts it here (assuming there is sufficient space in the buffer).
2) a merge buffer - when the scratch buffer cannot accomodate any more map output, the roles of the buffers are switched - that is, the scratch buffer becomes the merge buffer and the merge buffer becomes the scratch buffer. We avoid copying by doing this switch of roles. The copier threads can continue writing data from the socket buffer to the current scratch buffer (access to the scratch buffer is synchronized).

Both the above buffers are of equal sizes configured to have default values of 100M.

Before switching roles, a check is done to see whether the merge buffer is in use (merge algorithm is working on the data there). We wait till the merge buffer is free. The hope is that while merging we are reading key/value data from an in-memory buffer and it will be really fast and so we won't see client timeouts on the server serving the map output. However, if they really timeout, the client sees an exception, and resubmits the request to the server.

With the above we are doing copying/merging in parallel.

The merge happens and then a spill to disk happens. At the end of the in-memory merge of all the map outputs, we will end up with ~100M files on disk that we will need to merge. Also, the in-memory merge gets triggered when the in-memory scratch buffer has been idle too long (like 30 secs), or, the number of outputs copied so far is equal to the number of maps in the job, whichever is earlier. We can proceed with the regular merge for these on-disk-files and maybe we can do some optimizations there too (haven't put much thought there).

If the map output can never be copied to the buffer (because the map output is let's say 200M), then that is directly spilled to disk.

To implement the above, I am planning to extend the FileSystem class to provide an InMemoryFileSystem class that will ease the integration of the in-memory scratch/merge with the existing APIs (like SequenceFile, MapOutputCopier) since all them work with the abstractions of FileSystem and Input/Output streams.

Comments?

--
This message is automatically generated by JIRA.
-
If you think it was sent incorrectly contact one of the administrators: http://issues.apache.org/jira/secure/Administrators.jspa
-
For more information on JIRA, see: http://www.atlassian.com/software/jira

       
Reply | Threaded
Open this post in threaded view
|

[jira] Commented: (HADOOP-830) Improve the performance of the Merge phase

David Eric Pugh (Jira)
    [ http://issues.apache.org/jira/browse/HADOOP-830?page=comments#action_12458875 ]
           
Sameer Paranjpye commented on HADOOP-830:
-----------------------------------------

I'd be a little more conservative with large map outputs. The scratch and merge buffers add value when the map outputs are substantially smaller than the buffers. You should probably spill map outputs larger than 25% of buffer space or something similar.

> Improve the performance of the Merge phase
> ------------------------------------------
>
>                 Key: HADOOP-830
>                 URL: http://issues.apache.org/jira/browse/HADOOP-830
>             Project: Hadoop
>          Issue Type: Improvement
>          Components: mapred
>            Reporter: Devaraj Das
>         Assigned To: Devaraj Das
>
> This issue is about trying to improve the performance of the merge phase (especially on the reduces). Currently, all the map outputs are copied to disk and then the merge phase starts (just to make a note - sorting happens on the maps).
> The first optimization that I plan to implement is to do in-memory merging of the map outputs. There are two buffers maintained -
> 1) a scratch buffer for writing map outputs (directly off the socket). This is a first-come-first-serve buffer (as opposed to strategies like best fit). The map output copier copies the map output off the socket and puts it here (assuming there is sufficient space in the buffer).
> 2) a merge buffer - when the scratch buffer cannot accomodate any more map output, the roles of the buffers are switched - that is, the scratch buffer becomes the merge buffer and the merge buffer becomes the scratch buffer. We avoid copying by doing this switch of roles. The copier threads can continue writing data from the socket buffer to the current scratch buffer (access to the scratch buffer is synchronized).
> Both the above buffers are of equal sizes configured to have default values of 100M.
> Before switching roles, a check is done to see whether the merge buffer is in use (merge algorithm is working on the data there). We wait till the merge buffer is free. The hope is that while merging we are reading key/value data from an in-memory buffer and it will be really fast and so we won't see client timeouts on the server serving the map output. However, if they really timeout, the client sees an exception, and resubmits the request to the server.
> With the above we are doing copying/merging in parallel.
> The merge happens and then a spill to disk happens. At the end of the in-memory merge of all the map outputs, we will end up with ~100M files on disk that we will need to merge. Also, the in-memory merge gets triggered when the in-memory scratch buffer has been idle too long (like 30 secs), or, the number of outputs copied so far is equal to the number of maps in the job, whichever is earlier. We can proceed with the regular merge for these on-disk-files and maybe we can do some optimizations there too (haven't put much thought there).
> If the map output can never be copied to the buffer (because the map output is let's say 200M), then that is directly spilled to disk.
> To implement the above, I am planning to extend the FileSystem class to provide an InMemoryFileSystem class that will ease the integration of the in-memory scratch/merge with the existing APIs (like SequenceFile, MapOutputCopier) since all them work with the abstractions of FileSystem and Input/Output streams.
> Comments?

--
This message is automatically generated by JIRA.
-
If you think it was sent incorrectly contact one of the administrators: http://issues.apache.org/jira/secure/Administrators.jspa
-
For more information on JIRA, see: http://www.atlassian.com/software/jira

       
Reply | Threaded
Open this post in threaded view
|

[jira] Commented: (HADOOP-830) Improve the performance of the Merge phase

David Eric Pugh (Jira)
In reply to this post by David Eric Pugh (Jira)
    [ http://issues.apache.org/jira/browse/HADOOP-830?page=comments#action_12459187 ]
           
Devaraj Das commented on HADOOP-830:
------------------------------------

In the original proposal, I suggested that we can have one buffer for all the files that the InMemoryFileSystem manages. After a discussion with Owen on this, it seems like the alternative arrangement of having one byte[] per file in the InMemoryFileSystem is a better approach, given that we know the lengths of the files before we allocate byte[] buffers for those.

In the original proposal, there was an assumption of two equal sized buffers & merge would happen when we have 50% of the total buffer space filled with map outputs. This can be mapped to the multiple buffers (one byte[] per file) case (wherein we consider the total size of all the small buffers).

Yes, Sameer, we can & should spill a map output directly to disk if its size is more than a certain fraction of the total buffer space.

> Improve the performance of the Merge phase
> ------------------------------------------
>
>                 Key: HADOOP-830
>                 URL: http://issues.apache.org/jira/browse/HADOOP-830
>             Project: Hadoop
>          Issue Type: Improvement
>          Components: mapred
>            Reporter: Devaraj Das
>         Assigned To: Devaraj Das
>
> This issue is about trying to improve the performance of the merge phase (especially on the reduces). Currently, all the map outputs are copied to disk and then the merge phase starts (just to make a note - sorting happens on the maps).
> The first optimization that I plan to implement is to do in-memory merging of the map outputs. There are two buffers maintained -
> 1) a scratch buffer for writing map outputs (directly off the socket). This is a first-come-first-serve buffer (as opposed to strategies like best fit). The map output copier copies the map output off the socket and puts it here (assuming there is sufficient space in the buffer).
> 2) a merge buffer - when the scratch buffer cannot accomodate any more map output, the roles of the buffers are switched - that is, the scratch buffer becomes the merge buffer and the merge buffer becomes the scratch buffer. We avoid copying by doing this switch of roles. The copier threads can continue writing data from the socket buffer to the current scratch buffer (access to the scratch buffer is synchronized).
> Both the above buffers are of equal sizes configured to have default values of 100M.
> Before switching roles, a check is done to see whether the merge buffer is in use (merge algorithm is working on the data there). We wait till the merge buffer is free. The hope is that while merging we are reading key/value data from an in-memory buffer and it will be really fast and so we won't see client timeouts on the server serving the map output. However, if they really timeout, the client sees an exception, and resubmits the request to the server.
> With the above we are doing copying/merging in parallel.
> The merge happens and then a spill to disk happens. At the end of the in-memory merge of all the map outputs, we will end up with ~100M files on disk that we will need to merge. Also, the in-memory merge gets triggered when the in-memory scratch buffer has been idle too long (like 30 secs), or, the number of outputs copied so far is equal to the number of maps in the job, whichever is earlier. We can proceed with the regular merge for these on-disk-files and maybe we can do some optimizations there too (haven't put much thought there).
> If the map output can never be copied to the buffer (because the map output is let's say 200M), then that is directly spilled to disk.
> To implement the above, I am planning to extend the FileSystem class to provide an InMemoryFileSystem class that will ease the integration of the in-memory scratch/merge with the existing APIs (like SequenceFile, MapOutputCopier) since all them work with the abstractions of FileSystem and Input/Output streams.
> Comments?

--
This message is automatically generated by JIRA.
-
If you think it was sent incorrectly contact one of the administrators: http://issues.apache.org/jira/secure/Administrators.jspa
-
For more information on JIRA, see: http://www.atlassian.com/software/jira

       
Reply | Threaded
Open this post in threaded view
|

[jira] Updated: (HADOOP-830) Improve the performance of the Merge phase

David Eric Pugh (Jira)
In reply to this post by David Eric Pugh (Jira)
     [ http://issues.apache.org/jira/browse/HADOOP-830?page=all ]

Devaraj Das updated HADOOP-830:
-------------------------------

    Attachment: 830-for-review.patch

Implemented as discussed. MapOutput files are buffered in an InMemoryFileSystem. The MapOutputs are spilled directly to disk if the size of the file is more than 25% of the total filesystem size (75M by default). When the used space of the filesystem becomes more than 50%, merge is triggered and the output goes directly to disk. During that time, merging/copying proceeds in parallel. The InMemoryFileSystem (uri is mem://<name>) extends from FileSystem. Although it has a few special methods, it appears as a regular FileSystem for the most part to the rest of the framework. It keeps file-data as byte arrays, and there is a map from filenames to byte[ ]. The figures 25% & 50% are hardcoded constants for now.

> Improve the performance of the Merge phase
> ------------------------------------------
>
>                 Key: HADOOP-830
>                 URL: http://issues.apache.org/jira/browse/HADOOP-830
>             Project: Hadoop
>          Issue Type: Improvement
>          Components: mapred
>            Reporter: Devaraj Das
>         Assigned To: Devaraj Das
>         Attachments: 830-for-review.patch
>
>
> This issue is about trying to improve the performance of the merge phase (especially on the reduces). Currently, all the map outputs are copied to disk and then the merge phase starts (just to make a note - sorting happens on the maps).
> The first optimization that I plan to implement is to do in-memory merging of the map outputs. There are two buffers maintained -
> 1) a scratch buffer for writing map outputs (directly off the socket). This is a first-come-first-serve buffer (as opposed to strategies like best fit). The map output copier copies the map output off the socket and puts it here (assuming there is sufficient space in the buffer).
> 2) a merge buffer - when the scratch buffer cannot accomodate any more map output, the roles of the buffers are switched - that is, the scratch buffer becomes the merge buffer and the merge buffer becomes the scratch buffer. We avoid copying by doing this switch of roles. The copier threads can continue writing data from the socket buffer to the current scratch buffer (access to the scratch buffer is synchronized).
> Both the above buffers are of equal sizes configured to have default values of 100M.
> Before switching roles, a check is done to see whether the merge buffer is in use (merge algorithm is working on the data there). We wait till the merge buffer is free. The hope is that while merging we are reading key/value data from an in-memory buffer and it will be really fast and so we won't see client timeouts on the server serving the map output. However, if they really timeout, the client sees an exception, and resubmits the request to the server.
> With the above we are doing copying/merging in parallel.
> The merge happens and then a spill to disk happens. At the end of the in-memory merge of all the map outputs, we will end up with ~100M files on disk that we will need to merge. Also, the in-memory merge gets triggered when the in-memory scratch buffer has been idle too long (like 30 secs), or, the number of outputs copied so far is equal to the number of maps in the job, whichever is earlier. We can proceed with the regular merge for these on-disk-files and maybe we can do some optimizations there too (haven't put much thought there).
> If the map output can never be copied to the buffer (because the map output is let's say 200M), then that is directly spilled to disk.
> To implement the above, I am planning to extend the FileSystem class to provide an InMemoryFileSystem class that will ease the integration of the in-memory scratch/merge with the existing APIs (like SequenceFile, MapOutputCopier) since all them work with the abstractions of FileSystem and Input/Output streams.
> Comments?

--
This message is automatically generated by JIRA.
-
If you think it was sent incorrectly contact one of the administrators: http://issues.apache.org/jira/secure/Administrators.jspa
-
For more information on JIRA, see: http://www.atlassian.com/software/jira

       
Reply | Threaded
Open this post in threaded view
|

[jira] Updated: (HADOOP-830) Improve the performance of the Merge phase

David Eric Pugh (Jira)
In reply to this post by David Eric Pugh (Jira)

     [ http://issues.apache.org/jira/browse/HADOOP-830?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Devaraj Das updated HADOOP-830:
-------------------------------

    Attachment: 830-for-review.new.patch

> Improve the performance of the Merge phase
> ------------------------------------------
>
>                 Key: HADOOP-830
>                 URL: http://issues.apache.org/jira/browse/HADOOP-830
>             Project: Hadoop
>          Issue Type: Improvement
>          Components: mapred
>            Reporter: Devaraj Das
>         Assigned To: Devaraj Das
>         Attachments: 830-for-review.new.patch, 830-for-review.patch
>
>
> This issue is about trying to improve the performance of the merge phase (especially on the reduces). Currently, all the map outputs are copied to disk and then the merge phase starts (just to make a note - sorting happens on the maps).
> The first optimization that I plan to implement is to do in-memory merging of the map outputs. There are two buffers maintained -
> 1) a scratch buffer for writing map outputs (directly off the socket). This is a first-come-first-serve buffer (as opposed to strategies like best fit). The map output copier copies the map output off the socket and puts it here (assuming there is sufficient space in the buffer).
> 2) a merge buffer - when the scratch buffer cannot accomodate any more map output, the roles of the buffers are switched - that is, the scratch buffer becomes the merge buffer and the merge buffer becomes the scratch buffer. We avoid copying by doing this switch of roles. The copier threads can continue writing data from the socket buffer to the current scratch buffer (access to the scratch buffer is synchronized).
> Both the above buffers are of equal sizes configured to have default values of 100M.
> Before switching roles, a check is done to see whether the merge buffer is in use (merge algorithm is working on the data there). We wait till the merge buffer is free. The hope is that while merging we are reading key/value data from an in-memory buffer and it will be really fast and so we won't see client timeouts on the server serving the map output. However, if they really timeout, the client sees an exception, and resubmits the request to the server.
> With the above we are doing copying/merging in parallel.
> The merge happens and then a spill to disk happens. At the end of the in-memory merge of all the map outputs, we will end up with ~100M files on disk that we will need to merge. Also, the in-memory merge gets triggered when the in-memory scratch buffer has been idle too long (like 30 secs), or, the number of outputs copied so far is equal to the number of maps in the job, whichever is earlier. We can proceed with the regular merge for these on-disk-files and maybe we can do some optimizations there too (haven't put much thought there).
> If the map output can never be copied to the buffer (because the map output is let's say 200M), then that is directly spilled to disk.
> To implement the above, I am planning to extend the FileSystem class to provide an InMemoryFileSystem class that will ease the integration of the in-memory scratch/merge with the existing APIs (like SequenceFile, MapOutputCopier) since all them work with the abstractions of FileSystem and Input/Output streams.
> Comments?

--
This message is automatically generated by JIRA.
-
If you think it was sent incorrectly contact one of the administrators: http://issues.apache.org/jira/secure/Administrators.jspa
-
For more information on JIRA, see: http://www.atlassian.com/software/jira

       
Reply | Threaded
Open this post in threaded view
|

[jira] Commented: (HADOOP-830) Improve the performance of the Merge phase

David Eric Pugh (Jira)
In reply to this post by David Eric Pugh (Jira)

    [ http://issues.apache.org/jira/browse/HADOOP-830?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#action_12461673 ]

Devaraj Das commented on HADOOP-830:
------------------------------------

This patch does a better progress reporting.

> Improve the performance of the Merge phase
> ------------------------------------------
>
>                 Key: HADOOP-830
>                 URL: http://issues.apache.org/jira/browse/HADOOP-830
>             Project: Hadoop
>          Issue Type: Improvement
>          Components: mapred
>            Reporter: Devaraj Das
>         Assigned To: Devaraj Das
>         Attachments: 830-for-review.new.patch, 830-for-review.patch
>
>
> This issue is about trying to improve the performance of the merge phase (especially on the reduces). Currently, all the map outputs are copied to disk and then the merge phase starts (just to make a note - sorting happens on the maps).
> The first optimization that I plan to implement is to do in-memory merging of the map outputs. There are two buffers maintained -
> 1) a scratch buffer for writing map outputs (directly off the socket). This is a first-come-first-serve buffer (as opposed to strategies like best fit). The map output copier copies the map output off the socket and puts it here (assuming there is sufficient space in the buffer).
> 2) a merge buffer - when the scratch buffer cannot accomodate any more map output, the roles of the buffers are switched - that is, the scratch buffer becomes the merge buffer and the merge buffer becomes the scratch buffer. We avoid copying by doing this switch of roles. The copier threads can continue writing data from the socket buffer to the current scratch buffer (access to the scratch buffer is synchronized).
> Both the above buffers are of equal sizes configured to have default values of 100M.
> Before switching roles, a check is done to see whether the merge buffer is in use (merge algorithm is working on the data there). We wait till the merge buffer is free. The hope is that while merging we are reading key/value data from an in-memory buffer and it will be really fast and so we won't see client timeouts on the server serving the map output. However, if they really timeout, the client sees an exception, and resubmits the request to the server.
> With the above we are doing copying/merging in parallel.
> The merge happens and then a spill to disk happens. At the end of the in-memory merge of all the map outputs, we will end up with ~100M files on disk that we will need to merge. Also, the in-memory merge gets triggered when the in-memory scratch buffer has been idle too long (like 30 secs), or, the number of outputs copied so far is equal to the number of maps in the job, whichever is earlier. We can proceed with the regular merge for these on-disk-files and maybe we can do some optimizations there too (haven't put much thought there).
> If the map output can never be copied to the buffer (because the map output is let's say 200M), then that is directly spilled to disk.
> To implement the above, I am planning to extend the FileSystem class to provide an InMemoryFileSystem class that will ease the integration of the in-memory scratch/merge with the existing APIs (like SequenceFile, MapOutputCopier) since all them work with the abstractions of FileSystem and Input/Output streams.
> Comments?

--
This message is automatically generated by JIRA.
-
If you think it was sent incorrectly contact one of the administrators: http://issues.apache.org/jira/secure/Administrators.jspa
-
For more information on JIRA, see: http://www.atlassian.com/software/jira

       
Reply | Threaded
Open this post in threaded view
|

[jira] Commented: (HADOOP-830) Improve the performance of the Merge phase

David Eric Pugh (Jira)
In reply to this post by David Eric Pugh (Jira)

    [ http://issues.apache.org/jira/browse/HADOOP-830?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#action_12461800 ]

Owen O'Malley commented on HADOOP-830:
--------------------------------------

This is looking good. Comments:
  1. I think that "ramfs:" would be less confusing that "mem:" as a prefix for the ram file system.
  2. The float constants should be represented as "0.25f" rather than "(float) 0.25".
  3. This patch introduces some methods with more FileSystems in the parameter list, when the correct path is to use the getFileSystem on the Paths. So cloneFileAttributes should look like:
  public Writer cloneFileAttributes(Path inputFile, Path outputFile, Progressable prog) throws IOException
  4. The old cloneFileAttributes should be marked as depricated.
  5. The javadoc on the "factor" parameter should mention that it is the maximum merge fan in.
  6. The patch looks for the file's existence in the local and ram file systems to see where it was placed. Since in the past, we have had issues with files getting deleted via race conditions, it seems better to remember where each file was placed. I suggest that MapOutputLocation.getFile return a Path of where the file was stored. CopyResult should keep that instead of the size, which was only used as an error flag when it is -1. Then the reduce task runner can keep a list of files that are in each file system.
  7. getFile also has a catch block where Throwable is caught and ignored. That will cause lots of errors to go unreported. I'd rather have the ram FileSystem catch OutOfMemoryException explicitly when it is creating the new file and return null, like it does if the ram FileSystem is full.

> Improve the performance of the Merge phase
> ------------------------------------------
>
>                 Key: HADOOP-830
>                 URL: http://issues.apache.org/jira/browse/HADOOP-830
>             Project: Hadoop
>          Issue Type: Improvement
>          Components: mapred
>            Reporter: Devaraj Das
>         Assigned To: Devaraj Das
>         Attachments: 830-for-review.new.patch, 830-for-review.patch
>
>
> This issue is about trying to improve the performance of the merge phase (especially on the reduces). Currently, all the map outputs are copied to disk and then the merge phase starts (just to make a note - sorting happens on the maps).
> The first optimization that I plan to implement is to do in-memory merging of the map outputs. There are two buffers maintained -
> 1) a scratch buffer for writing map outputs (directly off the socket). This is a first-come-first-serve buffer (as opposed to strategies like best fit). The map output copier copies the map output off the socket and puts it here (assuming there is sufficient space in the buffer).
> 2) a merge buffer - when the scratch buffer cannot accomodate any more map output, the roles of the buffers are switched - that is, the scratch buffer becomes the merge buffer and the merge buffer becomes the scratch buffer. We avoid copying by doing this switch of roles. The copier threads can continue writing data from the socket buffer to the current scratch buffer (access to the scratch buffer is synchronized).
> Both the above buffers are of equal sizes configured to have default values of 100M.
> Before switching roles, a check is done to see whether the merge buffer is in use (merge algorithm is working on the data there). We wait till the merge buffer is free. The hope is that while merging we are reading key/value data from an in-memory buffer and it will be really fast and so we won't see client timeouts on the server serving the map output. However, if they really timeout, the client sees an exception, and resubmits the request to the server.
> With the above we are doing copying/merging in parallel.
> The merge happens and then a spill to disk happens. At the end of the in-memory merge of all the map outputs, we will end up with ~100M files on disk that we will need to merge. Also, the in-memory merge gets triggered when the in-memory scratch buffer has been idle too long (like 30 secs), or, the number of outputs copied so far is equal to the number of maps in the job, whichever is earlier. We can proceed with the regular merge for these on-disk-files and maybe we can do some optimizations there too (haven't put much thought there).
> If the map output can never be copied to the buffer (because the map output is let's say 200M), then that is directly spilled to disk.
> To implement the above, I am planning to extend the FileSystem class to provide an InMemoryFileSystem class that will ease the integration of the in-memory scratch/merge with the existing APIs (like SequenceFile, MapOutputCopier) since all them work with the abstractions of FileSystem and Input/Output streams.
> Comments?

--
This message is automatically generated by JIRA.
-
If you think it was sent incorrectly contact one of the administrators: http://issues.apache.org/jira/secure/Administrators.jspa
-
For more information on JIRA, see: http://www.atlassian.com/software/jira

       
Reply | Threaded
Open this post in threaded view
|

[jira] Commented: (HADOOP-830) Improve the performance of the Merge phase

David Eric Pugh (Jira)
In reply to this post by David Eric Pugh (Jira)

    [ https://issues.apache.org/jira/browse/HADOOP-830?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#action_12462560 ]

Devaraj Das commented on HADOOP-830:
------------------------------------

Thanks for the review, Owen.
I incorporated all the comments except for a part of comment #6. With respect to comment #6, I made the change to do with getFile. Using the return value of getFile (a fully qualified path name which accurately says on which filesystem the map output got download), I now do an accurate renaming of the temp filename to the final filename. But I think I am missing something to do with understanding why the ReduceTaskRunner should keep the list of files that got created in each filesystem. Is it not sufficient to just accurately rename as I am doing now? I believe (if I remember right), the race condition we saw earlier was related to correctly creating a map output file, if we happen to somehow run into the rare case where multiple threads end up downloading a given map output (HADOOP-723).
I don't think we will have that race condition in the new patch (that I am going to submit now). Please let me know if I am missing something.

> Improve the performance of the Merge phase
> ------------------------------------------
>
>                 Key: HADOOP-830
>                 URL: https://issues.apache.org/jira/browse/HADOOP-830
>             Project: Hadoop
>          Issue Type: Improvement
>          Components: mapred
>            Reporter: Devaraj Das
>         Assigned To: Devaraj Das
>         Attachments: 830-for-review.new.patch, 830-for-review.patch
>
>
> This issue is about trying to improve the performance of the merge phase (especially on the reduces). Currently, all the map outputs are copied to disk and then the merge phase starts (just to make a note - sorting happens on the maps).
> The first optimization that I plan to implement is to do in-memory merging of the map outputs. There are two buffers maintained -
> 1) a scratch buffer for writing map outputs (directly off the socket). This is a first-come-first-serve buffer (as opposed to strategies like best fit). The map output copier copies the map output off the socket and puts it here (assuming there is sufficient space in the buffer).
> 2) a merge buffer - when the scratch buffer cannot accomodate any more map output, the roles of the buffers are switched - that is, the scratch buffer becomes the merge buffer and the merge buffer becomes the scratch buffer. We avoid copying by doing this switch of roles. The copier threads can continue writing data from the socket buffer to the current scratch buffer (access to the scratch buffer is synchronized).
> Both the above buffers are of equal sizes configured to have default values of 100M.
> Before switching roles, a check is done to see whether the merge buffer is in use (merge algorithm is working on the data there). We wait till the merge buffer is free. The hope is that while merging we are reading key/value data from an in-memory buffer and it will be really fast and so we won't see client timeouts on the server serving the map output. However, if they really timeout, the client sees an exception, and resubmits the request to the server.
> With the above we are doing copying/merging in parallel.
> The merge happens and then a spill to disk happens. At the end of the in-memory merge of all the map outputs, we will end up with ~100M files on disk that we will need to merge. Also, the in-memory merge gets triggered when the in-memory scratch buffer has been idle too long (like 30 secs), or, the number of outputs copied so far is equal to the number of maps in the job, whichever is earlier. We can proceed with the regular merge for these on-disk-files and maybe we can do some optimizations there too (haven't put much thought there).
> If the map output can never be copied to the buffer (because the map output is let's say 200M), then that is directly spilled to disk.
> To implement the above, I am planning to extend the FileSystem class to provide an InMemoryFileSystem class that will ease the integration of the in-memory scratch/merge with the existing APIs (like SequenceFile, MapOutputCopier) since all them work with the abstractions of FileSystem and Input/Output streams.
> Comments?

--
This message is automatically generated by JIRA.
-
If you think it was sent incorrectly contact one of the administrators: https://issues.apache.org/jira/secure/Administrators.jspa
-
For more information on JIRA, see: http://www.atlassian.com/software/jira

       
Reply | Threaded
Open this post in threaded view
|

[jira] Updated: (HADOOP-830) Improve the performance of the Merge phase

David Eric Pugh (Jira)
In reply to this post by David Eric Pugh (Jira)

     [ https://issues.apache.org/jira/browse/HADOOP-830?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Devaraj Das updated HADOOP-830:
-------------------------------

    Attachment: 830-after-comments.patch

> Improve the performance of the Merge phase
> ------------------------------------------
>
>                 Key: HADOOP-830
>                 URL: https://issues.apache.org/jira/browse/HADOOP-830
>             Project: Hadoop
>          Issue Type: Improvement
>          Components: mapred
>            Reporter: Devaraj Das
>         Assigned To: Devaraj Das
>         Attachments: 830-after-comments.patch, 830-for-review.new.patch, 830-for-review.patch
>
>
> This issue is about trying to improve the performance of the merge phase (especially on the reduces). Currently, all the map outputs are copied to disk and then the merge phase starts (just to make a note - sorting happens on the maps).
> The first optimization that I plan to implement is to do in-memory merging of the map outputs. There are two buffers maintained -
> 1) a scratch buffer for writing map outputs (directly off the socket). This is a first-come-first-serve buffer (as opposed to strategies like best fit). The map output copier copies the map output off the socket and puts it here (assuming there is sufficient space in the buffer).
> 2) a merge buffer - when the scratch buffer cannot accomodate any more map output, the roles of the buffers are switched - that is, the scratch buffer becomes the merge buffer and the merge buffer becomes the scratch buffer. We avoid copying by doing this switch of roles. The copier threads can continue writing data from the socket buffer to the current scratch buffer (access to the scratch buffer is synchronized).
> Both the above buffers are of equal sizes configured to have default values of 100M.
> Before switching roles, a check is done to see whether the merge buffer is in use (merge algorithm is working on the data there). We wait till the merge buffer is free. The hope is that while merging we are reading key/value data from an in-memory buffer and it will be really fast and so we won't see client timeouts on the server serving the map output. However, if they really timeout, the client sees an exception, and resubmits the request to the server.
> With the above we are doing copying/merging in parallel.
> The merge happens and then a spill to disk happens. At the end of the in-memory merge of all the map outputs, we will end up with ~100M files on disk that we will need to merge. Also, the in-memory merge gets triggered when the in-memory scratch buffer has been idle too long (like 30 secs), or, the number of outputs copied so far is equal to the number of maps in the job, whichever is earlier. We can proceed with the regular merge for these on-disk-files and maybe we can do some optimizations there too (haven't put much thought there).
> If the map output can never be copied to the buffer (because the map output is let's say 200M), then that is directly spilled to disk.
> To implement the above, I am planning to extend the FileSystem class to provide an InMemoryFileSystem class that will ease the integration of the in-memory scratch/merge with the existing APIs (like SequenceFile, MapOutputCopier) since all them work with the abstractions of FileSystem and Input/Output streams.
> Comments?

--
This message is automatically generated by JIRA.
-
If you think it was sent incorrectly contact one of the administrators: https://issues.apache.org/jira/secure/Administrators.jspa
-
For more information on JIRA, see: http://www.atlassian.com/software/jira

       
Reply | Threaded
Open this post in threaded view
|

[jira] Updated: (HADOOP-830) Improve the performance of the Merge phase

David Eric Pugh (Jira)
In reply to this post by David Eric Pugh (Jira)

     [ https://issues.apache.org/jira/browse/HADOOP-830?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Devaraj Das updated HADOOP-830:
-------------------------------

    Attachment: 830-initial9.patch

> Improve the performance of the Merge phase
> ------------------------------------------
>
>                 Key: HADOOP-830
>                 URL: https://issues.apache.org/jira/browse/HADOOP-830
>             Project: Hadoop
>          Issue Type: Improvement
>          Components: mapred
>            Reporter: Devaraj Das
>         Assigned To: Devaraj Das
>         Attachments: 830-after-comments.patch, 830-for-review.new.patch, 830-for-review.patch, 830-initial9.patch
>
>
> This issue is about trying to improve the performance of the merge phase (especially on the reduces). Currently, all the map outputs are copied to disk and then the merge phase starts (just to make a note - sorting happens on the maps).
> The first optimization that I plan to implement is to do in-memory merging of the map outputs. There are two buffers maintained -
> 1) a scratch buffer for writing map outputs (directly off the socket). This is a first-come-first-serve buffer (as opposed to strategies like best fit). The map output copier copies the map output off the socket and puts it here (assuming there is sufficient space in the buffer).
> 2) a merge buffer - when the scratch buffer cannot accomodate any more map output, the roles of the buffers are switched - that is, the scratch buffer becomes the merge buffer and the merge buffer becomes the scratch buffer. We avoid copying by doing this switch of roles. The copier threads can continue writing data from the socket buffer to the current scratch buffer (access to the scratch buffer is synchronized).
> Both the above buffers are of equal sizes configured to have default values of 100M.
> Before switching roles, a check is done to see whether the merge buffer is in use (merge algorithm is working on the data there). We wait till the merge buffer is free. The hope is that while merging we are reading key/value data from an in-memory buffer and it will be really fast and so we won't see client timeouts on the server serving the map output. However, if they really timeout, the client sees an exception, and resubmits the request to the server.
> With the above we are doing copying/merging in parallel.
> The merge happens and then a spill to disk happens. At the end of the in-memory merge of all the map outputs, we will end up with ~100M files on disk that we will need to merge. Also, the in-memory merge gets triggered when the in-memory scratch buffer has been idle too long (like 30 secs), or, the number of outputs copied so far is equal to the number of maps in the job, whichever is earlier. We can proceed with the regular merge for these on-disk-files and maybe we can do some optimizations there too (haven't put much thought there).
> If the map output can never be copied to the buffer (because the map output is let's say 200M), then that is directly spilled to disk.
> To implement the above, I am planning to extend the FileSystem class to provide an InMemoryFileSystem class that will ease the integration of the in-memory scratch/merge with the existing APIs (like SequenceFile, MapOutputCopier) since all them work with the abstractions of FileSystem and Input/Output streams.
> Comments?

--
This message is automatically generated by JIRA.
-
If you think it was sent incorrectly contact one of the administrators: https://issues.apache.org/jira/secure/Administrators.jspa
-
For more information on JIRA, see: http://www.atlassian.com/software/jira

       
Reply | Threaded
Open this post in threaded view
|

[jira] Updated: (HADOOP-830) Improve the performance of the Merge phase

David Eric Pugh (Jira)
In reply to this post by David Eric Pugh (Jira)

     [ https://issues.apache.org/jira/browse/HADOOP-830?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Devaraj Das updated HADOOP-830:
-------------------------------

    Status: Patch Available  (was: Open)

A bugfixed patch. This has been tested quite well.

> Improve the performance of the Merge phase
> ------------------------------------------
>
>                 Key: HADOOP-830
>                 URL: https://issues.apache.org/jira/browse/HADOOP-830
>             Project: Hadoop
>          Issue Type: Improvement
>          Components: mapred
>            Reporter: Devaraj Das
>         Assigned To: Devaraj Das
>         Attachments: 830-after-comments.patch, 830-for-review.new.patch, 830-for-review.patch, 830-initial9.patch
>
>
> This issue is about trying to improve the performance of the merge phase (especially on the reduces). Currently, all the map outputs are copied to disk and then the merge phase starts (just to make a note - sorting happens on the maps).
> The first optimization that I plan to implement is to do in-memory merging of the map outputs. There are two buffers maintained -
> 1) a scratch buffer for writing map outputs (directly off the socket). This is a first-come-first-serve buffer (as opposed to strategies like best fit). The map output copier copies the map output off the socket and puts it here (assuming there is sufficient space in the buffer).
> 2) a merge buffer - when the scratch buffer cannot accomodate any more map output, the roles of the buffers are switched - that is, the scratch buffer becomes the merge buffer and the merge buffer becomes the scratch buffer. We avoid copying by doing this switch of roles. The copier threads can continue writing data from the socket buffer to the current scratch buffer (access to the scratch buffer is synchronized).
> Both the above buffers are of equal sizes configured to have default values of 100M.
> Before switching roles, a check is done to see whether the merge buffer is in use (merge algorithm is working on the data there). We wait till the merge buffer is free. The hope is that while merging we are reading key/value data from an in-memory buffer and it will be really fast and so we won't see client timeouts on the server serving the map output. However, if they really timeout, the client sees an exception, and resubmits the request to the server.
> With the above we are doing copying/merging in parallel.
> The merge happens and then a spill to disk happens. At the end of the in-memory merge of all the map outputs, we will end up with ~100M files on disk that we will need to merge. Also, the in-memory merge gets triggered when the in-memory scratch buffer has been idle too long (like 30 secs), or, the number of outputs copied so far is equal to the number of maps in the job, whichever is earlier. We can proceed with the regular merge for these on-disk-files and maybe we can do some optimizations there too (haven't put much thought there).
> If the map output can never be copied to the buffer (because the map output is let's say 200M), then that is directly spilled to disk.
> To implement the above, I am planning to extend the FileSystem class to provide an InMemoryFileSystem class that will ease the integration of the in-memory scratch/merge with the existing APIs (like SequenceFile, MapOutputCopier) since all them work with the abstractions of FileSystem and Input/Output streams.
> Comments?

--
This message is automatically generated by JIRA.
-
If you think it was sent incorrectly contact one of the administrators: https://issues.apache.org/jira/secure/Administrators.jspa
-
For more information on JIRA, see: http://www.atlassian.com/software/jira

       
Reply | Threaded
Open this post in threaded view
|

[jira] Commented: (HADOOP-830) Improve the performance of the Merge phase

David Eric Pugh (Jira)
In reply to this post by David Eric Pugh (Jira)

    [ https://issues.apache.org/jira/browse/HADOOP-830?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#action_12463952 ]

Hadoop QA commented on HADOOP-830:
----------------------------------

-1, because the javadoc command appears to have generated warning messages when testing the latest attachment (http://issues.apache.org/jira/secure/attachment/12348746/830-initial9.patch) against trunk revision r495045. Please note that this message is automatically generated and may represent a problem with the automation system and not the patch.

> Improve the performance of the Merge phase
> ------------------------------------------
>
>                 Key: HADOOP-830
>                 URL: https://issues.apache.org/jira/browse/HADOOP-830
>             Project: Hadoop
>          Issue Type: Improvement
>          Components: mapred
>            Reporter: Devaraj Das
>         Assigned To: Devaraj Das
>         Attachments: 830-after-comments.patch, 830-for-review.new.patch, 830-for-review.patch, 830-initial9.patch
>
>
> This issue is about trying to improve the performance of the merge phase (especially on the reduces). Currently, all the map outputs are copied to disk and then the merge phase starts (just to make a note - sorting happens on the maps).
> The first optimization that I plan to implement is to do in-memory merging of the map outputs. There are two buffers maintained -
> 1) a scratch buffer for writing map outputs (directly off the socket). This is a first-come-first-serve buffer (as opposed to strategies like best fit). The map output copier copies the map output off the socket and puts it here (assuming there is sufficient space in the buffer).
> 2) a merge buffer - when the scratch buffer cannot accomodate any more map output, the roles of the buffers are switched - that is, the scratch buffer becomes the merge buffer and the merge buffer becomes the scratch buffer. We avoid copying by doing this switch of roles. The copier threads can continue writing data from the socket buffer to the current scratch buffer (access to the scratch buffer is synchronized).
> Both the above buffers are of equal sizes configured to have default values of 100M.
> Before switching roles, a check is done to see whether the merge buffer is in use (merge algorithm is working on the data there). We wait till the merge buffer is free. The hope is that while merging we are reading key/value data from an in-memory buffer and it will be really fast and so we won't see client timeouts on the server serving the map output. However, if they really timeout, the client sees an exception, and resubmits the request to the server.
> With the above we are doing copying/merging in parallel.
> The merge happens and then a spill to disk happens. At the end of the in-memory merge of all the map outputs, we will end up with ~100M files on disk that we will need to merge. Also, the in-memory merge gets triggered when the in-memory scratch buffer has been idle too long (like 30 secs), or, the number of outputs copied so far is equal to the number of maps in the job, whichever is earlier. We can proceed with the regular merge for these on-disk-files and maybe we can do some optimizations there too (haven't put much thought there).
> If the map output can never be copied to the buffer (because the map output is let's say 200M), then that is directly spilled to disk.
> To implement the above, I am planning to extend the FileSystem class to provide an InMemoryFileSystem class that will ease the integration of the in-memory scratch/merge with the existing APIs (like SequenceFile, MapOutputCopier) since all them work with the abstractions of FileSystem and Input/Output streams.
> Comments?

--
This message is automatically generated by JIRA.
-
If you think it was sent incorrectly contact one of the administrators: https://issues.apache.org/jira/secure/Administrators.jspa
-
For more information on JIRA, see: http://www.atlassian.com/software/jira

       
Reply | Threaded
Open this post in threaded view
|

[jira] Updated: (HADOOP-830) Improve the performance of the Merge phase

David Eric Pugh (Jira)
In reply to this post by David Eric Pugh (Jira)

     [ https://issues.apache.org/jira/browse/HADOOP-830?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Devaraj Das updated HADOOP-830:
-------------------------------

    Attachment: 830-with-javadoc-fix.patch

> Improve the performance of the Merge phase
> ------------------------------------------
>
>                 Key: HADOOP-830
>                 URL: https://issues.apache.org/jira/browse/HADOOP-830
>             Project: Hadoop
>          Issue Type: Improvement
>          Components: mapred
>            Reporter: Devaraj Das
>         Assigned To: Devaraj Das
>         Attachments: 830-after-comments.patch, 830-for-review.new.patch, 830-for-review.patch, 830-initial9.patch, 830-with-javadoc-fix.patch
>
>
> This issue is about trying to improve the performance of the merge phase (especially on the reduces). Currently, all the map outputs are copied to disk and then the merge phase starts (just to make a note - sorting happens on the maps).
> The first optimization that I plan to implement is to do in-memory merging of the map outputs. There are two buffers maintained -
> 1) a scratch buffer for writing map outputs (directly off the socket). This is a first-come-first-serve buffer (as opposed to strategies like best fit). The map output copier copies the map output off the socket and puts it here (assuming there is sufficient space in the buffer).
> 2) a merge buffer - when the scratch buffer cannot accomodate any more map output, the roles of the buffers are switched - that is, the scratch buffer becomes the merge buffer and the merge buffer becomes the scratch buffer. We avoid copying by doing this switch of roles. The copier threads can continue writing data from the socket buffer to the current scratch buffer (access to the scratch buffer is synchronized).
> Both the above buffers are of equal sizes configured to have default values of 100M.
> Before switching roles, a check is done to see whether the merge buffer is in use (merge algorithm is working on the data there). We wait till the merge buffer is free. The hope is that while merging we are reading key/value data from an in-memory buffer and it will be really fast and so we won't see client timeouts on the server serving the map output. However, if they really timeout, the client sees an exception, and resubmits the request to the server.
> With the above we are doing copying/merging in parallel.
> The merge happens and then a spill to disk happens. At the end of the in-memory merge of all the map outputs, we will end up with ~100M files on disk that we will need to merge. Also, the in-memory merge gets triggered when the in-memory scratch buffer has been idle too long (like 30 secs), or, the number of outputs copied so far is equal to the number of maps in the job, whichever is earlier. We can proceed with the regular merge for these on-disk-files and maybe we can do some optimizations there too (haven't put much thought there).
> If the map output can never be copied to the buffer (because the map output is let's say 200M), then that is directly spilled to disk.
> To implement the above, I am planning to extend the FileSystem class to provide an InMemoryFileSystem class that will ease the integration of the in-memory scratch/merge with the existing APIs (like SequenceFile, MapOutputCopier) since all them work with the abstractions of FileSystem and Input/Output streams.
> Comments?

--
This message is automatically generated by JIRA.
-
If you think it was sent incorrectly contact one of the administrators: https://issues.apache.org/jira/secure/Administrators.jspa
-
For more information on JIRA, see: http://www.atlassian.com/software/jira

       
Reply | Threaded
Open this post in threaded view
|

[jira] Commented: (HADOOP-830) Improve the performance of the Merge phase

David Eric Pugh (Jira)
In reply to this post by David Eric Pugh (Jira)

    [ https://issues.apache.org/jira/browse/HADOOP-830?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#action_12463962 ]

Devaraj Das commented on HADOOP-830:
------------------------------------

The patch 830-with-javadoc-fix.patch is the correct one

> Improve the performance of the Merge phase
> ------------------------------------------
>
>                 Key: HADOOP-830
>                 URL: https://issues.apache.org/jira/browse/HADOOP-830
>             Project: Hadoop
>          Issue Type: Improvement
>          Components: mapred
>            Reporter: Devaraj Das
>         Assigned To: Devaraj Das
>         Attachments: 830-after-comments.patch, 830-for-review.new.patch, 830-for-review.patch, 830-initial9.patch, 830-with-javadoc-fix.patch
>
>
> This issue is about trying to improve the performance of the merge phase (especially on the reduces). Currently, all the map outputs are copied to disk and then the merge phase starts (just to make a note - sorting happens on the maps).
> The first optimization that I plan to implement is to do in-memory merging of the map outputs. There are two buffers maintained -
> 1) a scratch buffer for writing map outputs (directly off the socket). This is a first-come-first-serve buffer (as opposed to strategies like best fit). The map output copier copies the map output off the socket and puts it here (assuming there is sufficient space in the buffer).
> 2) a merge buffer - when the scratch buffer cannot accomodate any more map output, the roles of the buffers are switched - that is, the scratch buffer becomes the merge buffer and the merge buffer becomes the scratch buffer. We avoid copying by doing this switch of roles. The copier threads can continue writing data from the socket buffer to the current scratch buffer (access to the scratch buffer is synchronized).
> Both the above buffers are of equal sizes configured to have default values of 100M.
> Before switching roles, a check is done to see whether the merge buffer is in use (merge algorithm is working on the data there). We wait till the merge buffer is free. The hope is that while merging we are reading key/value data from an in-memory buffer and it will be really fast and so we won't see client timeouts on the server serving the map output. However, if they really timeout, the client sees an exception, and resubmits the request to the server.
> With the above we are doing copying/merging in parallel.
> The merge happens and then a spill to disk happens. At the end of the in-memory merge of all the map outputs, we will end up with ~100M files on disk that we will need to merge. Also, the in-memory merge gets triggered when the in-memory scratch buffer has been idle too long (like 30 secs), or, the number of outputs copied so far is equal to the number of maps in the job, whichever is earlier. We can proceed with the regular merge for these on-disk-files and maybe we can do some optimizations there too (haven't put much thought there).
> If the map output can never be copied to the buffer (because the map output is let's say 200M), then that is directly spilled to disk.
> To implement the above, I am planning to extend the FileSystem class to provide an InMemoryFileSystem class that will ease the integration of the in-memory scratch/merge with the existing APIs (like SequenceFile, MapOutputCopier) since all them work with the abstractions of FileSystem and Input/Output streams.
> Comments?

--
This message is automatically generated by JIRA.
-
If you think it was sent incorrectly contact one of the administrators: https://issues.apache.org/jira/secure/Administrators.jspa
-
For more information on JIRA, see: http://www.atlassian.com/software/jira

       
Reply | Threaded
Open this post in threaded view
|

[jira] Commented: (HADOOP-830) Improve the performance of the Merge phase

David Eric Pugh (Jira)
In reply to this post by David Eric Pugh (Jira)

    [ https://issues.apache.org/jira/browse/HADOOP-830?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#action_12463972 ]

Hadoop QA commented on HADOOP-830:
----------------------------------

-1, because 1 attempts failed to build and test the latest attachment (http://issues.apache.org/jira/secure/attachment/12348750/830-with-javadoc-fix.patch) against trunk revision r495045. Please note that this message is automatically generated and may represent a problem with the automation system and not the patch.

> Improve the performance of the Merge phase
> ------------------------------------------
>
>                 Key: HADOOP-830
>                 URL: https://issues.apache.org/jira/browse/HADOOP-830
>             Project: Hadoop
>          Issue Type: Improvement
>          Components: mapred
>            Reporter: Devaraj Das
>         Assigned To: Devaraj Das
>         Attachments: 830-after-comments.patch, 830-for-review.new.patch, 830-for-review.patch, 830-initial9.patch, 830-with-javadoc-fix.patch
>
>
> This issue is about trying to improve the performance of the merge phase (especially on the reduces). Currently, all the map outputs are copied to disk and then the merge phase starts (just to make a note - sorting happens on the maps).
> The first optimization that I plan to implement is to do in-memory merging of the map outputs. There are two buffers maintained -
> 1) a scratch buffer for writing map outputs (directly off the socket). This is a first-come-first-serve buffer (as opposed to strategies like best fit). The map output copier copies the map output off the socket and puts it here (assuming there is sufficient space in the buffer).
> 2) a merge buffer - when the scratch buffer cannot accomodate any more map output, the roles of the buffers are switched - that is, the scratch buffer becomes the merge buffer and the merge buffer becomes the scratch buffer. We avoid copying by doing this switch of roles. The copier threads can continue writing data from the socket buffer to the current scratch buffer (access to the scratch buffer is synchronized).
> Both the above buffers are of equal sizes configured to have default values of 100M.
> Before switching roles, a check is done to see whether the merge buffer is in use (merge algorithm is working on the data there). We wait till the merge buffer is free. The hope is that while merging we are reading key/value data from an in-memory buffer and it will be really fast and so we won't see client timeouts on the server serving the map output. However, if they really timeout, the client sees an exception, and resubmits the request to the server.
> With the above we are doing copying/merging in parallel.
> The merge happens and then a spill to disk happens. At the end of the in-memory merge of all the map outputs, we will end up with ~100M files on disk that we will need to merge. Also, the in-memory merge gets triggered when the in-memory scratch buffer has been idle too long (like 30 secs), or, the number of outputs copied so far is equal to the number of maps in the job, whichever is earlier. We can proceed with the regular merge for these on-disk-files and maybe we can do some optimizations there too (haven't put much thought there).
> If the map output can never be copied to the buffer (because the map output is let's say 200M), then that is directly spilled to disk.
> To implement the above, I am planning to extend the FileSystem class to provide an InMemoryFileSystem class that will ease the integration of the in-memory scratch/merge with the existing APIs (like SequenceFile, MapOutputCopier) since all them work with the abstractions of FileSystem and Input/Output streams.
> Comments?

--
This message is automatically generated by JIRA.
-
If you think it was sent incorrectly contact one of the administrators: https://issues.apache.org/jira/secure/Administrators.jspa
-
For more information on JIRA, see: http://www.atlassian.com/software/jira

       
Reply | Threaded
Open this post in threaded view
|

[jira] Commented: (HADOOP-830) Improve the performance of the Merge phase

David Eric Pugh (Jira)
In reply to this post by David Eric Pugh (Jira)

    [ https://issues.apache.org/jira/browse/HADOOP-830?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#action_12463982 ]

Hadoop QA commented on HADOOP-830:
----------------------------------

-1, because the javadoc command appears to have generated warning messages when testing the latest attachment (http://issues.apache.org/jira/secure/attachment/12348750/830-with-javadoc-fix.patch</a>) against trunk revision r495045. Please note that this message is automatically generated and may represent a problem with the automation system and not the patch. </div>) against trunk revision r495045. Please note that this message is automatically generated and may represent a problem with the automation system and not the patch.

> Improve the performance of the Merge phase
> ------------------------------------------
>
>                 Key: HADOOP-830
>                 URL: https://issues.apache.org/jira/browse/HADOOP-830
>             Project: Hadoop
>          Issue Type: Improvement
>          Components: mapred
>            Reporter: Devaraj Das
>         Assigned To: Devaraj Das
>         Attachments: 830-after-comments.patch, 830-for-review.new.patch, 830-for-review.patch, 830-initial9.patch, 830-with-javadoc-fix.patch
>
>
> This issue is about trying to improve the performance of the merge phase (especially on the reduces). Currently, all the map outputs are copied to disk and then the merge phase starts (just to make a note - sorting happens on the maps).
> The first optimization that I plan to implement is to do in-memory merging of the map outputs. There are two buffers maintained -
> 1) a scratch buffer for writing map outputs (directly off the socket). This is a first-come-first-serve buffer (as opposed to strategies like best fit). The map output copier copies the map output off the socket and puts it here (assuming there is sufficient space in the buffer).
> 2) a merge buffer - when the scratch buffer cannot accomodate any more map output, the roles of the buffers are switched - that is, the scratch buffer becomes the merge buffer and the merge buffer becomes the scratch buffer. We avoid copying by doing this switch of roles. The copier threads can continue writing data from the socket buffer to the current scratch buffer (access to the scratch buffer is synchronized).
> Both the above buffers are of equal sizes configured to have default values of 100M.
> Before switching roles, a check is done to see whether the merge buffer is in use (merge algorithm is working on the data there). We wait till the merge buffer is free. The hope is that while merging we are reading key/value data from an in-memory buffer and it will be really fast and so we won't see client timeouts on the server serving the map output. However, if they really timeout, the client sees an exception, and resubmits the request to the server.
> With the above we are doing copying/merging in parallel.
> The merge happens and then a spill to disk happens. At the end of the in-memory merge of all the map outputs, we will end up with ~100M files on disk that we will need to merge. Also, the in-memory merge gets triggered when the in-memory scratch buffer has been idle too long (like 30 secs), or, the number of outputs copied so far is equal to the number of maps in the job, whichever is earlier. We can proceed with the regular merge for these on-disk-files and maybe we can do some optimizations there too (haven't put much thought there).
> If the map output can never be copied to the buffer (because the map output is let's say 200M), then that is directly spilled to disk.
> To implement the above, I am planning to extend the FileSystem class to provide an InMemoryFileSystem class that will ease the integration of the in-memory scratch/merge with the existing APIs (like SequenceFile, MapOutputCopier) since all them work with the abstractions of FileSystem and Input/Output streams.
> Comments?

--
This message is automatically generated by JIRA.
-
If you think it was sent incorrectly contact one of the administrators: https://issues.apache.org/jira/secure/Administrators.jspa
-
For more information on JIRA, see: http://www.atlassian.com/software/jira

       
Reply | Threaded
Open this post in threaded view
|

[jira] Updated: (HADOOP-830) Improve the performance of the Merge phase

David Eric Pugh (Jira)
In reply to this post by David Eric Pugh (Jira)

     [ https://issues.apache.org/jira/browse/HADOOP-830?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Devaraj Das updated HADOOP-830:
-------------------------------

    Attachment: 830-with-real-javadoc-fix.patch

> Improve the performance of the Merge phase
> ------------------------------------------
>
>                 Key: HADOOP-830
>                 URL: https://issues.apache.org/jira/browse/HADOOP-830
>             Project: Hadoop
>          Issue Type: Improvement
>          Components: mapred
>            Reporter: Devaraj Das
>         Assigned To: Devaraj Das
>         Attachments: 830-after-comments.patch, 830-for-review.new.patch, 830-for-review.patch, 830-with-javadoc-fix.patch, 830-with-real-javadoc-fix.patch
>
>
> This issue is about trying to improve the performance of the merge phase (especially on the reduces). Currently, all the map outputs are copied to disk and then the merge phase starts (just to make a note - sorting happens on the maps).
> The first optimization that I plan to implement is to do in-memory merging of the map outputs. There are two buffers maintained -
> 1) a scratch buffer for writing map outputs (directly off the socket). This is a first-come-first-serve buffer (as opposed to strategies like best fit). The map output copier copies the map output off the socket and puts it here (assuming there is sufficient space in the buffer).
> 2) a merge buffer - when the scratch buffer cannot accomodate any more map output, the roles of the buffers are switched - that is, the scratch buffer becomes the merge buffer and the merge buffer becomes the scratch buffer. We avoid copying by doing this switch of roles. The copier threads can continue writing data from the socket buffer to the current scratch buffer (access to the scratch buffer is synchronized).
> Both the above buffers are of equal sizes configured to have default values of 100M.
> Before switching roles, a check is done to see whether the merge buffer is in use (merge algorithm is working on the data there). We wait till the merge buffer is free. The hope is that while merging we are reading key/value data from an in-memory buffer and it will be really fast and so we won't see client timeouts on the server serving the map output. However, if they really timeout, the client sees an exception, and resubmits the request to the server.
> With the above we are doing copying/merging in parallel.
> The merge happens and then a spill to disk happens. At the end of the in-memory merge of all the map outputs, we will end up with ~100M files on disk that we will need to merge. Also, the in-memory merge gets triggered when the in-memory scratch buffer has been idle too long (like 30 secs), or, the number of outputs copied so far is equal to the number of maps in the job, whichever is earlier. We can proceed with the regular merge for these on-disk-files and maybe we can do some optimizations there too (haven't put much thought there).
> If the map output can never be copied to the buffer (because the map output is let's say 200M), then that is directly spilled to disk.
> To implement the above, I am planning to extend the FileSystem class to provide an InMemoryFileSystem class that will ease the integration of the in-memory scratch/merge with the existing APIs (like SequenceFile, MapOutputCopier) since all them work with the abstractions of FileSystem and Input/Output streams.
> Comments?

--
This message is automatically generated by JIRA.
-
If you think it was sent incorrectly contact one of the administrators: https://issues.apache.org/jira/secure/Administrators.jspa
-
For more information on JIRA, see: http://www.atlassian.com/software/jira

       
Reply | Threaded
Open this post in threaded view
|

[jira] Updated: (HADOOP-830) Improve the performance of the Merge phase

David Eric Pugh (Jira)
In reply to this post by David Eric Pugh (Jira)

     [ https://issues.apache.org/jira/browse/HADOOP-830?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Devaraj Das updated HADOOP-830:
-------------------------------

    Attachment:     (was: 830-initial9.patch)

> Improve the performance of the Merge phase
> ------------------------------------------
>
>                 Key: HADOOP-830
>                 URL: https://issues.apache.org/jira/browse/HADOOP-830
>             Project: Hadoop
>          Issue Type: Improvement
>          Components: mapred
>            Reporter: Devaraj Das
>         Assigned To: Devaraj Das
>         Attachments: 830-after-comments.patch, 830-for-review.new.patch, 830-for-review.patch, 830-with-javadoc-fix.patch, 830-with-real-javadoc-fix.patch
>
>
> This issue is about trying to improve the performance of the merge phase (especially on the reduces). Currently, all the map outputs are copied to disk and then the merge phase starts (just to make a note - sorting happens on the maps).
> The first optimization that I plan to implement is to do in-memory merging of the map outputs. There are two buffers maintained -
> 1) a scratch buffer for writing map outputs (directly off the socket). This is a first-come-first-serve buffer (as opposed to strategies like best fit). The map output copier copies the map output off the socket and puts it here (assuming there is sufficient space in the buffer).
> 2) a merge buffer - when the scratch buffer cannot accomodate any more map output, the roles of the buffers are switched - that is, the scratch buffer becomes the merge buffer and the merge buffer becomes the scratch buffer. We avoid copying by doing this switch of roles. The copier threads can continue writing data from the socket buffer to the current scratch buffer (access to the scratch buffer is synchronized).
> Both the above buffers are of equal sizes configured to have default values of 100M.
> Before switching roles, a check is done to see whether the merge buffer is in use (merge algorithm is working on the data there). We wait till the merge buffer is free. The hope is that while merging we are reading key/value data from an in-memory buffer and it will be really fast and so we won't see client timeouts on the server serving the map output. However, if they really timeout, the client sees an exception, and resubmits the request to the server.
> With the above we are doing copying/merging in parallel.
> The merge happens and then a spill to disk happens. At the end of the in-memory merge of all the map outputs, we will end up with ~100M files on disk that we will need to merge. Also, the in-memory merge gets triggered when the in-memory scratch buffer has been idle too long (like 30 secs), or, the number of outputs copied so far is equal to the number of maps in the job, whichever is earlier. We can proceed with the regular merge for these on-disk-files and maybe we can do some optimizations there too (haven't put much thought there).
> If the map output can never be copied to the buffer (because the map output is let's say 200M), then that is directly spilled to disk.
> To implement the above, I am planning to extend the FileSystem class to provide an InMemoryFileSystem class that will ease the integration of the in-memory scratch/merge with the existing APIs (like SequenceFile, MapOutputCopier) since all them work with the abstractions of FileSystem and Input/Output streams.
> Comments?

--
This message is automatically generated by JIRA.
-
If you think it was sent incorrectly contact one of the administrators: https://issues.apache.org/jira/secure/Administrators.jspa
-
For more information on JIRA, see: http://www.atlassian.com/software/jira

       
Reply | Threaded
Open this post in threaded view
|

[jira] Commented: (HADOOP-830) Improve the performance of the Merge phase

David Eric Pugh (Jira)
In reply to this post by David Eric Pugh (Jira)

    [ https://issues.apache.org/jira/browse/HADOOP-830?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#action_12464140 ]

Devaraj Das commented on HADOOP-830:
------------------------------------

Sorry had uploaded the wrong patch (due to which the javadoc generation gave warnings). 830-with-real-javadoc-fix.patch hopefully doesn't have the javadoc issues.

> Improve the performance of the Merge phase
> ------------------------------------------
>
>                 Key: HADOOP-830
>                 URL: https://issues.apache.org/jira/browse/HADOOP-830
>             Project: Hadoop
>          Issue Type: Improvement
>          Components: mapred
>            Reporter: Devaraj Das
>         Assigned To: Devaraj Das
>         Attachments: 830-after-comments.patch, 830-for-review.new.patch, 830-for-review.patch, 830-with-javadoc-fix.patch, 830-with-real-javadoc-fix.patch
>
>
> This issue is about trying to improve the performance of the merge phase (especially on the reduces). Currently, all the map outputs are copied to disk and then the merge phase starts (just to make a note - sorting happens on the maps).
> The first optimization that I plan to implement is to do in-memory merging of the map outputs. There are two buffers maintained -
> 1) a scratch buffer for writing map outputs (directly off the socket). This is a first-come-first-serve buffer (as opposed to strategies like best fit). The map output copier copies the map output off the socket and puts it here (assuming there is sufficient space in the buffer).
> 2) a merge buffer - when the scratch buffer cannot accomodate any more map output, the roles of the buffers are switched - that is, the scratch buffer becomes the merge buffer and the merge buffer becomes the scratch buffer. We avoid copying by doing this switch of roles. The copier threads can continue writing data from the socket buffer to the current scratch buffer (access to the scratch buffer is synchronized).
> Both the above buffers are of equal sizes configured to have default values of 100M.
> Before switching roles, a check is done to see whether the merge buffer is in use (merge algorithm is working on the data there). We wait till the merge buffer is free. The hope is that while merging we are reading key/value data from an in-memory buffer and it will be really fast and so we won't see client timeouts on the server serving the map output. However, if they really timeout, the client sees an exception, and resubmits the request to the server.
> With the above we are doing copying/merging in parallel.
> The merge happens and then a spill to disk happens. At the end of the in-memory merge of all the map outputs, we will end up with ~100M files on disk that we will need to merge. Also, the in-memory merge gets triggered when the in-memory scratch buffer has been idle too long (like 30 secs), or, the number of outputs copied so far is equal to the number of maps in the job, whichever is earlier. We can proceed with the regular merge for these on-disk-files and maybe we can do some optimizations there too (haven't put much thought there).
> If the map output can never be copied to the buffer (because the map output is let's say 200M), then that is directly spilled to disk.
> To implement the above, I am planning to extend the FileSystem class to provide an InMemoryFileSystem class that will ease the integration of the in-memory scratch/merge with the existing APIs (like SequenceFile, MapOutputCopier) since all them work with the abstractions of FileSystem and Input/Output streams.
> Comments?

--
This message is automatically generated by JIRA.
-
If you think it was sent incorrectly contact one of the administrators: https://issues.apache.org/jira/secure/Administrators.jspa
-
For more information on JIRA, see: http://www.atlassian.com/software/jira

       
Reply | Threaded
Open this post in threaded view
|

[jira] Commented: (HADOOP-830) Improve the performance of the Merge phase

David Eric Pugh (Jira)
In reply to this post by David Eric Pugh (Jira)

    [ https://issues.apache.org/jira/browse/HADOOP-830?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#action_12464254 ]

Hadoop QA commented on HADOOP-830:
----------------------------------

+1, because http://issues.apache.org/jira/secure/attachment/12348794/830-with-real-javadoc-fix.patch applied and successfully tested against trunk revision r495045.

> Improve the performance of the Merge phase
> ------------------------------------------
>
>                 Key: HADOOP-830
>                 URL: https://issues.apache.org/jira/browse/HADOOP-830
>             Project: Hadoop
>          Issue Type: Improvement
>          Components: mapred
>            Reporter: Devaraj Das
>         Assigned To: Devaraj Das
>         Attachments: 830-after-comments.patch, 830-for-review.new.patch, 830-for-review.patch, 830-with-javadoc-fix.patch, 830-with-real-javadoc-fix.patch
>
>
> This issue is about trying to improve the performance of the merge phase (especially on the reduces). Currently, all the map outputs are copied to disk and then the merge phase starts (just to make a note - sorting happens on the maps).
> The first optimization that I plan to implement is to do in-memory merging of the map outputs. There are two buffers maintained -
> 1) a scratch buffer for writing map outputs (directly off the socket). This is a first-come-first-serve buffer (as opposed to strategies like best fit). The map output copier copies the map output off the socket and puts it here (assuming there is sufficient space in the buffer).
> 2) a merge buffer - when the scratch buffer cannot accomodate any more map output, the roles of the buffers are switched - that is, the scratch buffer becomes the merge buffer and the merge buffer becomes the scratch buffer. We avoid copying by doing this switch of roles. The copier threads can continue writing data from the socket buffer to the current scratch buffer (access to the scratch buffer is synchronized).
> Both the above buffers are of equal sizes configured to have default values of 100M.
> Before switching roles, a check is done to see whether the merge buffer is in use (merge algorithm is working on the data there). We wait till the merge buffer is free. The hope is that while merging we are reading key/value data from an in-memory buffer and it will be really fast and so we won't see client timeouts on the server serving the map output. However, if they really timeout, the client sees an exception, and resubmits the request to the server.
> With the above we are doing copying/merging in parallel.
> The merge happens and then a spill to disk happens. At the end of the in-memory merge of all the map outputs, we will end up with ~100M files on disk that we will need to merge. Also, the in-memory merge gets triggered when the in-memory scratch buffer has been idle too long (like 30 secs), or, the number of outputs copied so far is equal to the number of maps in the job, whichever is earlier. We can proceed with the regular merge for these on-disk-files and maybe we can do some optimizations there too (haven't put much thought there).
> If the map output can never be copied to the buffer (because the map output is let's say 200M), then that is directly spilled to disk.
> To implement the above, I am planning to extend the FileSystem class to provide an InMemoryFileSystem class that will ease the integration of the in-memory scratch/merge with the existing APIs (like SequenceFile, MapOutputCopier) since all them work with the abstractions of FileSystem and Input/Output streams.
> Comments?

--
This message is automatically generated by JIRA.
-
If you think it was sent incorrectly contact one of the administrators: https://issues.apache.org/jira/secure/Administrators.jspa
-
For more information on JIRA, see: http://www.atlassian.com/software/jira

       
12