[jira] Created: (HADOOP-830) Improve the performance of the Merge phase

classic Classic list List threaded Threaded
29 messages Options
12
Reply | Threaded
Open this post in threaded view
|

[jira] Commented: (HADOOP-830) Improve the performance of the Merge phase

JIRA jira@apache.org

    [ https://issues.apache.org/jira/browse/HADOOP-830?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#action_12465287 ]

Doug Cutting commented on HADOOP-830:
-------------------------------------

I note that the ramfs doesn't perform checksums.  Why is that?  Checksums are useful when data spends a long time in memory, as it may here.

Also, should we put RamFileSystem in the fs package instead of mapred?

> Improve the performance of the Merge phase
> ------------------------------------------
>
>                 Key: HADOOP-830
>                 URL: https://issues.apache.org/jira/browse/HADOOP-830
>             Project: Hadoop
>          Issue Type: Improvement
>          Components: mapred
>            Reporter: Devaraj Das
>         Assigned To: Devaraj Das
>         Attachments: 830-after-comments.patch, 830-for-review.new.patch, 830-for-review.patch, 830-with-javadoc-fix.patch, 830-with-real-javadoc-fix.patch
>
>
> This issue is about trying to improve the performance of the merge phase (especially on the reduces). Currently, all the map outputs are copied to disk and then the merge phase starts (just to make a note - sorting happens on the maps).
> The first optimization that I plan to implement is to do in-memory merging of the map outputs. There are two buffers maintained -
> 1) a scratch buffer for writing map outputs (directly off the socket). This is a first-come-first-serve buffer (as opposed to strategies like best fit). The map output copier copies the map output off the socket and puts it here (assuming there is sufficient space in the buffer).
> 2) a merge buffer - when the scratch buffer cannot accomodate any more map output, the roles of the buffers are switched - that is, the scratch buffer becomes the merge buffer and the merge buffer becomes the scratch buffer. We avoid copying by doing this switch of roles. The copier threads can continue writing data from the socket buffer to the current scratch buffer (access to the scratch buffer is synchronized).
> Both the above buffers are of equal sizes configured to have default values of 100M.
> Before switching roles, a check is done to see whether the merge buffer is in use (merge algorithm is working on the data there). We wait till the merge buffer is free. The hope is that while merging we are reading key/value data from an in-memory buffer and it will be really fast and so we won't see client timeouts on the server serving the map output. However, if they really timeout, the client sees an exception, and resubmits the request to the server.
> With the above we are doing copying/merging in parallel.
> The merge happens and then a spill to disk happens. At the end of the in-memory merge of all the map outputs, we will end up with ~100M files on disk that we will need to merge. Also, the in-memory merge gets triggered when the in-memory scratch buffer has been idle too long (like 30 secs), or, the number of outputs copied so far is equal to the number of maps in the job, whichever is earlier. We can proceed with the regular merge for these on-disk-files and maybe we can do some optimizations there too (haven't put much thought there).
> If the map output can never be copied to the buffer (because the map output is let's say 200M), then that is directly spilled to disk.
> To implement the above, I am planning to extend the FileSystem class to provide an InMemoryFileSystem class that will ease the integration of the in-memory scratch/merge with the existing APIs (like SequenceFile, MapOutputCopier) since all them work with the abstractions of FileSystem and Input/Output streams.
> Comments?

--
This message is automatically generated by JIRA.
-
If you think it was sent incorrectly contact one of the administrators: https://issues.apache.org/jira/secure/Administrators.jspa
-
For more information on JIRA, see: http://www.atlassian.com/software/jira

       
Reply | Threaded
Open this post in threaded view
|

[jira] Commented: (HADOOP-830) Improve the performance of the Merge phase

JIRA jira@apache.org
In reply to this post by JIRA jira@apache.org

    [ https://issues.apache.org/jira/browse/HADOOP-830?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#action_12465387 ]

Owen O'Malley commented on HADOOP-830:
--------------------------------------

I agree with moving it to the fs package.

I think checksums for the ram file system are the wrong place for us to spend:
  1. program complexity
  2. memory
  3. cpu cycles
we have limited resources in all 3 and it ram corruption is rare.

I used to work at NASA and talked to the guys who wrote flight software for satellites and how they dealt with Single Event Upsets (SEU). They studied the effects (and mapped them across the Earth and time) and still had very simple coping mechanisms. (They used one thread to sweep memory looking for ECC errors and rebooted the CPU if they found any.) The astronauts use a more straight-forward approach and just reboot all of the laptops after flying over the south Atlantic. *smile*

So anyways, it is possible to design software to be resistant to random memory corruption errors, but it takes a lot of effort. Since our computers run at basically sea-level, I really don't think the need justifies the cost. Also note that this is a big buffer (100m or so), but compared to the size of the total ram it is tiny. I don't know how it works out in Java, but in C/C++ most random memory errors lead to segmentation faults in very short order.

> Improve the performance of the Merge phase
> ------------------------------------------
>
>                 Key: HADOOP-830
>                 URL: https://issues.apache.org/jira/browse/HADOOP-830
>             Project: Hadoop
>          Issue Type: Improvement
>          Components: mapred
>            Reporter: Devaraj Das
>         Assigned To: Devaraj Das
>         Attachments: 830-after-comments.patch, 830-for-review.new.patch, 830-for-review.patch, 830-with-javadoc-fix.patch, 830-with-real-javadoc-fix.patch
>
>
> This issue is about trying to improve the performance of the merge phase (especially on the reduces). Currently, all the map outputs are copied to disk and then the merge phase starts (just to make a note - sorting happens on the maps).
> The first optimization that I plan to implement is to do in-memory merging of the map outputs. There are two buffers maintained -
> 1) a scratch buffer for writing map outputs (directly off the socket). This is a first-come-first-serve buffer (as opposed to strategies like best fit). The map output copier copies the map output off the socket and puts it here (assuming there is sufficient space in the buffer).
> 2) a merge buffer - when the scratch buffer cannot accomodate any more map output, the roles of the buffers are switched - that is, the scratch buffer becomes the merge buffer and the merge buffer becomes the scratch buffer. We avoid copying by doing this switch of roles. The copier threads can continue writing data from the socket buffer to the current scratch buffer (access to the scratch buffer is synchronized).
> Both the above buffers are of equal sizes configured to have default values of 100M.
> Before switching roles, a check is done to see whether the merge buffer is in use (merge algorithm is working on the data there). We wait till the merge buffer is free. The hope is that while merging we are reading key/value data from an in-memory buffer and it will be really fast and so we won't see client timeouts on the server serving the map output. However, if they really timeout, the client sees an exception, and resubmits the request to the server.
> With the above we are doing copying/merging in parallel.
> The merge happens and then a spill to disk happens. At the end of the in-memory merge of all the map outputs, we will end up with ~100M files on disk that we will need to merge. Also, the in-memory merge gets triggered when the in-memory scratch buffer has been idle too long (like 30 secs), or, the number of outputs copied so far is equal to the number of maps in the job, whichever is earlier. We can proceed with the regular merge for these on-disk-files and maybe we can do some optimizations there too (haven't put much thought there).
> If the map output can never be copied to the buffer (because the map output is let's say 200M), then that is directly spilled to disk.
> To implement the above, I am planning to extend the FileSystem class to provide an InMemoryFileSystem class that will ease the integration of the in-memory scratch/merge with the existing APIs (like SequenceFile, MapOutputCopier) since all them work with the abstractions of FileSystem and Input/Output streams.
> Comments?

--
This message is automatically generated by JIRA.
-
If you think it was sent incorrectly contact one of the administrators: https://issues.apache.org/jira/secure/Administrators.jspa
-
For more information on JIRA, see: http://www.atlassian.com/software/jira

       
Reply | Threaded
Open this post in threaded view
|

[jira] Commented: (HADOOP-830) Improve the performance of the Merge phase

JIRA jira@apache.org
In reply to this post by JIRA jira@apache.org

    [ https://issues.apache.org/jira/browse/HADOOP-830?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#action_12465462 ]

Doug Cutting commented on HADOOP-830:
-------------------------------------

>   1. program complexity

Checksums for ramfs add no complexity, since they're provided in the base class.

>   2. memory

Checksums add less than 1% to the storage requirements, by design.

>   3. cpu cycles

I won't accept this without benchmarks.  The CRC32 code is native and should be quite fast.

> ram corruption is rare

When sorting (and ramfs is used in the sorting code) data is ram-resident for long periods.  We don't really know how rare ram corruption is, but when slinging terabytes on thousands of nodes, we know it happens.  Most of our memory is used for caching sort io.  We should checksum this when possible.

> Improve the performance of the Merge phase
> ------------------------------------------
>
>                 Key: HADOOP-830
>                 URL: https://issues.apache.org/jira/browse/HADOOP-830
>             Project: Hadoop
>          Issue Type: Improvement
>          Components: mapred
>            Reporter: Devaraj Das
>         Assigned To: Devaraj Das
>         Attachments: 830-after-comments.patch, 830-for-review.new.patch, 830-for-review.patch, 830-with-javadoc-fix.patch, 830-with-real-javadoc-fix.patch
>
>
> This issue is about trying to improve the performance of the merge phase (especially on the reduces). Currently, all the map outputs are copied to disk and then the merge phase starts (just to make a note - sorting happens on the maps).
> The first optimization that I plan to implement is to do in-memory merging of the map outputs. There are two buffers maintained -
> 1) a scratch buffer for writing map outputs (directly off the socket). This is a first-come-first-serve buffer (as opposed to strategies like best fit). The map output copier copies the map output off the socket and puts it here (assuming there is sufficient space in the buffer).
> 2) a merge buffer - when the scratch buffer cannot accomodate any more map output, the roles of the buffers are switched - that is, the scratch buffer becomes the merge buffer and the merge buffer becomes the scratch buffer. We avoid copying by doing this switch of roles. The copier threads can continue writing data from the socket buffer to the current scratch buffer (access to the scratch buffer is synchronized).
> Both the above buffers are of equal sizes configured to have default values of 100M.
> Before switching roles, a check is done to see whether the merge buffer is in use (merge algorithm is working on the data there). We wait till the merge buffer is free. The hope is that while merging we are reading key/value data from an in-memory buffer and it will be really fast and so we won't see client timeouts on the server serving the map output. However, if they really timeout, the client sees an exception, and resubmits the request to the server.
> With the above we are doing copying/merging in parallel.
> The merge happens and then a spill to disk happens. At the end of the in-memory merge of all the map outputs, we will end up with ~100M files on disk that we will need to merge. Also, the in-memory merge gets triggered when the in-memory scratch buffer has been idle too long (like 30 secs), or, the number of outputs copied so far is equal to the number of maps in the job, whichever is earlier. We can proceed with the regular merge for these on-disk-files and maybe we can do some optimizations there too (haven't put much thought there).
> If the map output can never be copied to the buffer (because the map output is let's say 200M), then that is directly spilled to disk.
> To implement the above, I am planning to extend the FileSystem class to provide an InMemoryFileSystem class that will ease the integration of the in-memory scratch/merge with the existing APIs (like SequenceFile, MapOutputCopier) since all them work with the abstractions of FileSystem and Input/Output streams.
> Comments?

--
This message is automatically generated by JIRA.
-
If you think it was sent incorrectly contact one of the administrators: https://issues.apache.org/jira/secure/Administrators.jspa
-
For more information on JIRA, see: http://www.atlassian.com/software/jira

       
Reply | Threaded
Open this post in threaded view
|

[jira] Commented: (HADOOP-830) Improve the performance of the Merge phase

JIRA jira@apache.org
In reply to this post by JIRA jira@apache.org

    [ https://issues.apache.org/jira/browse/HADOOP-830?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#action_12465492 ]

Owen O'Malley commented on HADOOP-830:
--------------------------------------

The ramfs seems like a very strange place to insist on crcs. We don't have crcs on any other ram buffers. Furthermore, the time period is not that long. The ramfs is used while fetching and doing the initial merge. If the shuffle is happening at a reasonable rate, we shouldn't take longer than 1 minute to fill and in memory sort 100mb.

If we really need to protect ourselves down at that level, we should do a value-level or key/value-level crc when it comes out of the map and keep it with the record all the way through the framework until it hits the reduce. THAT would protect you from a wide variety of problems from network, disk, ram, or bugs.

> Improve the performance of the Merge phase
> ------------------------------------------
>
>                 Key: HADOOP-830
>                 URL: https://issues.apache.org/jira/browse/HADOOP-830
>             Project: Hadoop
>          Issue Type: Improvement
>          Components: mapred
>            Reporter: Devaraj Das
>         Assigned To: Devaraj Das
>         Attachments: 830-after-comments.patch, 830-for-review.new.patch, 830-for-review.patch, 830-with-javadoc-fix.patch, 830-with-real-javadoc-fix.patch
>
>
> This issue is about trying to improve the performance of the merge phase (especially on the reduces). Currently, all the map outputs are copied to disk and then the merge phase starts (just to make a note - sorting happens on the maps).
> The first optimization that I plan to implement is to do in-memory merging of the map outputs. There are two buffers maintained -
> 1) a scratch buffer for writing map outputs (directly off the socket). This is a first-come-first-serve buffer (as opposed to strategies like best fit). The map output copier copies the map output off the socket and puts it here (assuming there is sufficient space in the buffer).
> 2) a merge buffer - when the scratch buffer cannot accomodate any more map output, the roles of the buffers are switched - that is, the scratch buffer becomes the merge buffer and the merge buffer becomes the scratch buffer. We avoid copying by doing this switch of roles. The copier threads can continue writing data from the socket buffer to the current scratch buffer (access to the scratch buffer is synchronized).
> Both the above buffers are of equal sizes configured to have default values of 100M.
> Before switching roles, a check is done to see whether the merge buffer is in use (merge algorithm is working on the data there). We wait till the merge buffer is free. The hope is that while merging we are reading key/value data from an in-memory buffer and it will be really fast and so we won't see client timeouts on the server serving the map output. However, if they really timeout, the client sees an exception, and resubmits the request to the server.
> With the above we are doing copying/merging in parallel.
> The merge happens and then a spill to disk happens. At the end of the in-memory merge of all the map outputs, we will end up with ~100M files on disk that we will need to merge. Also, the in-memory merge gets triggered when the in-memory scratch buffer has been idle too long (like 30 secs), or, the number of outputs copied so far is equal to the number of maps in the job, whichever is earlier. We can proceed with the regular merge for these on-disk-files and maybe we can do some optimizations there too (haven't put much thought there).
> If the map output can never be copied to the buffer (because the map output is let's say 200M), then that is directly spilled to disk.
> To implement the above, I am planning to extend the FileSystem class to provide an InMemoryFileSystem class that will ease the integration of the in-memory scratch/merge with the existing APIs (like SequenceFile, MapOutputCopier) since all them work with the abstractions of FileSystem and Input/Output streams.
> Comments?

--
This message is automatically generated by JIRA.
-
If you think it was sent incorrectly contact one of the administrators: https://issues.apache.org/jira/secure/Administrators.jspa
-
For more information on JIRA, see: http://www.atlassian.com/software/jira

       
Reply | Threaded
Open this post in threaded view
|

[jira] Commented: (HADOOP-830) Improve the performance of the Merge phase

JIRA jira@apache.org
In reply to this post by JIRA jira@apache.org

    [ https://issues.apache.org/jira/browse/HADOOP-830?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#action_12465530 ]

Doug Cutting commented on HADOOP-830:
-------------------------------------

> We don't have crcs on any other ram buffers.

That's not quite true, and, to the degree it's true it's not a feature.  Right now we checksum data as it exits the java output stream buffer and enters the filesystem's buffer cache, roughly speaking.  We check the checksums again as data returns from the filesystem's buffer cache into the java stream buffer.  So data in the filesystem's buffer cache is checksummed.  Arguably we should instead compute checksums earlier and check them later.  We could instead compute them as they enter the output stream's buffer, as objects are serialized, and check them as they exit the input stream, as they're deserialized.  This should be a minor change with no performance impact.

My original question was more to the effect: since this is already built into the base class, why explicitly turn it off?  The only substantive reason that's been stated is performance, yet no benchmarks are provided indicating how much slower i/o is when bytes are passed through a native CRC32 implementation.

> Improve the performance of the Merge phase
> ------------------------------------------
>
>                 Key: HADOOP-830
>                 URL: https://issues.apache.org/jira/browse/HADOOP-830
>             Project: Hadoop
>          Issue Type: Improvement
>          Components: mapred
>            Reporter: Devaraj Das
>         Assigned To: Devaraj Das
>         Attachments: 830-after-comments.patch, 830-for-review.new.patch, 830-for-review.patch, 830-with-javadoc-fix.patch, 830-with-real-javadoc-fix.patch
>
>
> This issue is about trying to improve the performance of the merge phase (especially on the reduces). Currently, all the map outputs are copied to disk and then the merge phase starts (just to make a note - sorting happens on the maps).
> The first optimization that I plan to implement is to do in-memory merging of the map outputs. There are two buffers maintained -
> 1) a scratch buffer for writing map outputs (directly off the socket). This is a first-come-first-serve buffer (as opposed to strategies like best fit). The map output copier copies the map output off the socket and puts it here (assuming there is sufficient space in the buffer).
> 2) a merge buffer - when the scratch buffer cannot accomodate any more map output, the roles of the buffers are switched - that is, the scratch buffer becomes the merge buffer and the merge buffer becomes the scratch buffer. We avoid copying by doing this switch of roles. The copier threads can continue writing data from the socket buffer to the current scratch buffer (access to the scratch buffer is synchronized).
> Both the above buffers are of equal sizes configured to have default values of 100M.
> Before switching roles, a check is done to see whether the merge buffer is in use (merge algorithm is working on the data there). We wait till the merge buffer is free. The hope is that while merging we are reading key/value data from an in-memory buffer and it will be really fast and so we won't see client timeouts on the server serving the map output. However, if they really timeout, the client sees an exception, and resubmits the request to the server.
> With the above we are doing copying/merging in parallel.
> The merge happens and then a spill to disk happens. At the end of the in-memory merge of all the map outputs, we will end up with ~100M files on disk that we will need to merge. Also, the in-memory merge gets triggered when the in-memory scratch buffer has been idle too long (like 30 secs), or, the number of outputs copied so far is equal to the number of maps in the job, whichever is earlier. We can proceed with the regular merge for these on-disk-files and maybe we can do some optimizations there too (haven't put much thought there).
> If the map output can never be copied to the buffer (because the map output is let's say 200M), then that is directly spilled to disk.
> To implement the above, I am planning to extend the FileSystem class to provide an InMemoryFileSystem class that will ease the integration of the in-memory scratch/merge with the existing APIs (like SequenceFile, MapOutputCopier) since all them work with the abstractions of FileSystem and Input/Output streams.
> Comments?

--
This message is automatically generated by JIRA.
-
If you think it was sent incorrectly contact one of the administrators: https://issues.apache.org/jira/secure/Administrators.jspa
-
For more information on JIRA, see: http://www.atlassian.com/software/jira

       
Reply | Threaded
Open this post in threaded view
|

[jira] Commented: (HADOOP-830) Improve the performance of the Merge phase

JIRA jira@apache.org
In reply to this post by JIRA jira@apache.org

    [ https://issues.apache.org/jira/browse/HADOOP-830?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#action_12465841 ]

Doug Cutting commented on HADOOP-830:
-------------------------------------

To be clear: I'd prefer we commit this first with checksums on, then do some performance analysis, comparing checksums with no checksums, and, if they appear significant, add a flag to disable them.  To do otherwise seems a premature optimization.

> Improve the performance of the Merge phase
> ------------------------------------------
>
>                 Key: HADOOP-830
>                 URL: https://issues.apache.org/jira/browse/HADOOP-830
>             Project: Hadoop
>          Issue Type: Improvement
>          Components: mapred
>            Reporter: Devaraj Das
>         Assigned To: Devaraj Das
>         Attachments: 830-after-comments.patch, 830-for-review.new.patch, 830-for-review.patch, 830-with-javadoc-fix.patch, 830-with-real-javadoc-fix.patch
>
>
> This issue is about trying to improve the performance of the merge phase (especially on the reduces). Currently, all the map outputs are copied to disk and then the merge phase starts (just to make a note - sorting happens on the maps).
> The first optimization that I plan to implement is to do in-memory merging of the map outputs. There are two buffers maintained -
> 1) a scratch buffer for writing map outputs (directly off the socket). This is a first-come-first-serve buffer (as opposed to strategies like best fit). The map output copier copies the map output off the socket and puts it here (assuming there is sufficient space in the buffer).
> 2) a merge buffer - when the scratch buffer cannot accomodate any more map output, the roles of the buffers are switched - that is, the scratch buffer becomes the merge buffer and the merge buffer becomes the scratch buffer. We avoid copying by doing this switch of roles. The copier threads can continue writing data from the socket buffer to the current scratch buffer (access to the scratch buffer is synchronized).
> Both the above buffers are of equal sizes configured to have default values of 100M.
> Before switching roles, a check is done to see whether the merge buffer is in use (merge algorithm is working on the data there). We wait till the merge buffer is free. The hope is that while merging we are reading key/value data from an in-memory buffer and it will be really fast and so we won't see client timeouts on the server serving the map output. However, if they really timeout, the client sees an exception, and resubmits the request to the server.
> With the above we are doing copying/merging in parallel.
> The merge happens and then a spill to disk happens. At the end of the in-memory merge of all the map outputs, we will end up with ~100M files on disk that we will need to merge. Also, the in-memory merge gets triggered when the in-memory scratch buffer has been idle too long (like 30 secs), or, the number of outputs copied so far is equal to the number of maps in the job, whichever is earlier. We can proceed with the regular merge for these on-disk-files and maybe we can do some optimizations there too (haven't put much thought there).
> If the map output can never be copied to the buffer (because the map output is let's say 200M), then that is directly spilled to disk.
> To implement the above, I am planning to extend the FileSystem class to provide an InMemoryFileSystem class that will ease the integration of the in-memory scratch/merge with the existing APIs (like SequenceFile, MapOutputCopier) since all them work with the abstractions of FileSystem and Input/Output streams.
> Comments?

--
This message is automatically generated by JIRA.
-
If you think it was sent incorrectly contact one of the administrators: https://issues.apache.org/jira/secure/Administrators.jspa
-
For more information on JIRA, see: http://www.atlassian.com/software/jira

       
Reply | Threaded
Open this post in threaded view
|

[jira] Updated: (HADOOP-830) Improve the performance of the Merge phase

JIRA jira@apache.org
In reply to this post by JIRA jira@apache.org

     [ https://issues.apache.org/jira/browse/HADOOP-830?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Devaraj Das updated HADOOP-830:
-------------------------------

    Attachment: 830-with-checksum.patch

> Improve the performance of the Merge phase
> ------------------------------------------
>
>                 Key: HADOOP-830
>                 URL: https://issues.apache.org/jira/browse/HADOOP-830
>             Project: Hadoop
>          Issue Type: Improvement
>          Components: mapred
>            Reporter: Devaraj Das
>         Assigned To: Devaraj Das
>         Attachments: 830-after-comments.patch, 830-for-review.new.patch, 830-for-review.patch, 830-with-checksum.patch, 830-with-javadoc-fix.patch, 830-with-real-javadoc-fix.patch
>
>
> This issue is about trying to improve the performance of the merge phase (especially on the reduces). Currently, all the map outputs are copied to disk and then the merge phase starts (just to make a note - sorting happens on the maps).
> The first optimization that I plan to implement is to do in-memory merging of the map outputs. There are two buffers maintained -
> 1) a scratch buffer for writing map outputs (directly off the socket). This is a first-come-first-serve buffer (as opposed to strategies like best fit). The map output copier copies the map output off the socket and puts it here (assuming there is sufficient space in the buffer).
> 2) a merge buffer - when the scratch buffer cannot accomodate any more map output, the roles of the buffers are switched - that is, the scratch buffer becomes the merge buffer and the merge buffer becomes the scratch buffer. We avoid copying by doing this switch of roles. The copier threads can continue writing data from the socket buffer to the current scratch buffer (access to the scratch buffer is synchronized).
> Both the above buffers are of equal sizes configured to have default values of 100M.
> Before switching roles, a check is done to see whether the merge buffer is in use (merge algorithm is working on the data there). We wait till the merge buffer is free. The hope is that while merging we are reading key/value data from an in-memory buffer and it will be really fast and so we won't see client timeouts on the server serving the map output. However, if they really timeout, the client sees an exception, and resubmits the request to the server.
> With the above we are doing copying/merging in parallel.
> The merge happens and then a spill to disk happens. At the end of the in-memory merge of all the map outputs, we will end up with ~100M files on disk that we will need to merge. Also, the in-memory merge gets triggered when the in-memory scratch buffer has been idle too long (like 30 secs), or, the number of outputs copied so far is equal to the number of maps in the job, whichever is earlier. We can proceed with the regular merge for these on-disk-files and maybe we can do some optimizations there too (haven't put much thought there).
> If the map output can never be copied to the buffer (because the map output is let's say 200M), then that is directly spilled to disk.
> To implement the above, I am planning to extend the FileSystem class to provide an InMemoryFileSystem class that will ease the integration of the in-memory scratch/merge with the existing APIs (like SequenceFile, MapOutputCopier) since all them work with the abstractions of FileSystem and Input/Output streams.
> Comments?

--
This message is automatically generated by JIRA.
-
If you think it was sent incorrectly contact one of the administrators: https://issues.apache.org/jira/secure/Administrators.jspa
-
For more information on JIRA, see: http://www.atlassian.com/software/jira

       
Reply | Threaded
Open this post in threaded view
|

[jira] Commented: (HADOOP-830) Improve the performance of the Merge phase

JIRA jira@apache.org
In reply to this post by JIRA jira@apache.org

    [ https://issues.apache.org/jira/browse/HADOOP-830?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#action_12465987 ]

Devaraj Das commented on HADOOP-830:
------------------------------------

Doug, the patch 830-with-checksum.patch uses the checksums implementation from the base FileSystem class. There are two major changes here:
1) Space is reserved in advance for both files  (the actual file and the checksum file). This is needed since I am now using the regular FileSystem.create API that doesn't take a "filesize" argument (and we need to apriori tell the ram fs the length of files we are going to create there). This is in contrast to the earlier patch where I had a custom create API in the ram fs that would take the length of the file as an argument.
2) Added an API to compute the length of the checksum file given the size of the actual file.

Haven't noticed any peformance degradation with my tests (sort benchmark on a small cluster of 14 nodes) so far. Hope this patch makes you happy :)

> Improve the performance of the Merge phase
> ------------------------------------------
>
>                 Key: HADOOP-830
>                 URL: https://issues.apache.org/jira/browse/HADOOP-830
>             Project: Hadoop
>          Issue Type: Improvement
>          Components: mapred
>            Reporter: Devaraj Das
>         Assigned To: Devaraj Das
>         Attachments: 830-after-comments.patch, 830-for-review.new.patch, 830-for-review.patch, 830-with-checksum.patch, 830-with-javadoc-fix.patch, 830-with-real-javadoc-fix.patch
>
>
> This issue is about trying to improve the performance of the merge phase (especially on the reduces). Currently, all the map outputs are copied to disk and then the merge phase starts (just to make a note - sorting happens on the maps).
> The first optimization that I plan to implement is to do in-memory merging of the map outputs. There are two buffers maintained -
> 1) a scratch buffer for writing map outputs (directly off the socket). This is a first-come-first-serve buffer (as opposed to strategies like best fit). The map output copier copies the map output off the socket and puts it here (assuming there is sufficient space in the buffer).
> 2) a merge buffer - when the scratch buffer cannot accomodate any more map output, the roles of the buffers are switched - that is, the scratch buffer becomes the merge buffer and the merge buffer becomes the scratch buffer. We avoid copying by doing this switch of roles. The copier threads can continue writing data from the socket buffer to the current scratch buffer (access to the scratch buffer is synchronized).
> Both the above buffers are of equal sizes configured to have default values of 100M.
> Before switching roles, a check is done to see whether the merge buffer is in use (merge algorithm is working on the data there). We wait till the merge buffer is free. The hope is that while merging we are reading key/value data from an in-memory buffer and it will be really fast and so we won't see client timeouts on the server serving the map output. However, if they really timeout, the client sees an exception, and resubmits the request to the server.
> With the above we are doing copying/merging in parallel.
> The merge happens and then a spill to disk happens. At the end of the in-memory merge of all the map outputs, we will end up with ~100M files on disk that we will need to merge. Also, the in-memory merge gets triggered when the in-memory scratch buffer has been idle too long (like 30 secs), or, the number of outputs copied so far is equal to the number of maps in the job, whichever is earlier. We can proceed with the regular merge for these on-disk-files and maybe we can do some optimizations there too (haven't put much thought there).
> If the map output can never be copied to the buffer (because the map output is let's say 200M), then that is directly spilled to disk.
> To implement the above, I am planning to extend the FileSystem class to provide an InMemoryFileSystem class that will ease the integration of the in-memory scratch/merge with the existing APIs (like SequenceFile, MapOutputCopier) since all them work with the abstractions of FileSystem and Input/Output streams.
> Comments?

--
This message is automatically generated by JIRA.
-
If you think it was sent incorrectly contact one of the administrators: https://issues.apache.org/jira/secure/Administrators.jspa
-
For more information on JIRA, see: http://www.atlassian.com/software/jira

       
Reply | Threaded
Open this post in threaded view
|

[jira] Updated: (HADOOP-830) Improve the performance of the Merge phase

JIRA jira@apache.org
In reply to this post by JIRA jira@apache.org

     [ https://issues.apache.org/jira/browse/HADOOP-830?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Doug Cutting updated HADOOP-830:
--------------------------------

       Resolution: Fixed
    Fix Version/s: 0.11.0
           Status: Resolved  (was: Patch Available)

I just committed this.  Thanks, Devaraj!

> Improve the performance of the Merge phase
> ------------------------------------------
>
>                 Key: HADOOP-830
>                 URL: https://issues.apache.org/jira/browse/HADOOP-830
>             Project: Hadoop
>          Issue Type: Improvement
>          Components: mapred
>            Reporter: Devaraj Das
>         Assigned To: Devaraj Das
>             Fix For: 0.11.0
>
>         Attachments: 830-after-comments.patch, 830-for-review.new.patch, 830-for-review.patch, 830-with-checksum.patch, 830-with-javadoc-fix.patch, 830-with-real-javadoc-fix.patch
>
>
> This issue is about trying to improve the performance of the merge phase (especially on the reduces). Currently, all the map outputs are copied to disk and then the merge phase starts (just to make a note - sorting happens on the maps).
> The first optimization that I plan to implement is to do in-memory merging of the map outputs. There are two buffers maintained -
> 1) a scratch buffer for writing map outputs (directly off the socket). This is a first-come-first-serve buffer (as opposed to strategies like best fit). The map output copier copies the map output off the socket and puts it here (assuming there is sufficient space in the buffer).
> 2) a merge buffer - when the scratch buffer cannot accomodate any more map output, the roles of the buffers are switched - that is, the scratch buffer becomes the merge buffer and the merge buffer becomes the scratch buffer. We avoid copying by doing this switch of roles. The copier threads can continue writing data from the socket buffer to the current scratch buffer (access to the scratch buffer is synchronized).
> Both the above buffers are of equal sizes configured to have default values of 100M.
> Before switching roles, a check is done to see whether the merge buffer is in use (merge algorithm is working on the data there). We wait till the merge buffer is free. The hope is that while merging we are reading key/value data from an in-memory buffer and it will be really fast and so we won't see client timeouts on the server serving the map output. However, if they really timeout, the client sees an exception, and resubmits the request to the server.
> With the above we are doing copying/merging in parallel.
> The merge happens and then a spill to disk happens. At the end of the in-memory merge of all the map outputs, we will end up with ~100M files on disk that we will need to merge. Also, the in-memory merge gets triggered when the in-memory scratch buffer has been idle too long (like 30 secs), or, the number of outputs copied so far is equal to the number of maps in the job, whichever is earlier. We can proceed with the regular merge for these on-disk-files and maybe we can do some optimizations there too (haven't put much thought there).
> If the map output can never be copied to the buffer (because the map output is let's say 200M), then that is directly spilled to disk.
> To implement the above, I am planning to extend the FileSystem class to provide an InMemoryFileSystem class that will ease the integration of the in-memory scratch/merge with the existing APIs (like SequenceFile, MapOutputCopier) since all them work with the abstractions of FileSystem and Input/Output streams.
> Comments?

--
This message is automatically generated by JIRA.
-
If you think it was sent incorrectly contact one of the administrators: https://issues.apache.org/jira/secure/Administrators.jspa
-
For more information on JIRA, see: http://www.atlassian.com/software/jira

       
12