Skip to content

Conversation

suyanNone
Copy link
Contributor

Some memory not count into memory used by memoryStore or unrollMemory.
Thread A after unrollsafely memory, it will release 40MB unrollMemory(40MB will used by other threads). then ThreadA wait get accountingLock to tryToPut blockA(30MB). before Thread A get accountingLock, blockA memory size is not counting into unrollMemory or memoryStore.currentMemory.
IIUC, freeMemory should minus that block memory

So, put this release memory into pending, and release it in tryToPut before ensureSpace

@AmplabJenkins
Copy link

Can one of the admins verify this patch?

@andrewor14
Copy link
Contributor

add to whitelist

@SparkQA
Copy link

SparkQA commented Dec 10, 2014

Test build #24321 has started for PR 3629 at commit 4f46dd7.

  • This patch merges cleanly.

@SparkQA
Copy link

SparkQA commented Dec 10, 2014

Test build #24321 has finished for PR 3629 at commit 4f46dd7.

  • This patch fails to build.
  • This patch merges cleanly.
  • This patch adds no public classes.

@AmplabJenkins
Copy link

Test FAILed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/24321/
Test FAILed.

@SparkQA
Copy link

SparkQA commented Dec 11, 2014

Test build #24345 has started for PR 3629 at commit 42dfa9b.

  • This patch merges cleanly.

@SparkQA
Copy link

SparkQA commented Dec 11, 2014

Test build #24345 has finished for PR 3629 at commit 42dfa9b.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@AmplabJenkins
Copy link

Test FAILed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/24345/
Test FAILed.

@SparkQA
Copy link

SparkQA commented Dec 11, 2014

Test build #24352 has started for PR 3629 at commit 34cfbe8.

  • This patch merges cleanly.

@SparkQA
Copy link

SparkQA commented Dec 11, 2014

Test build #24352 has finished for PR 3629 at commit 34cfbe8.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@AmplabJenkins
Copy link

Test PASSed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/24352/
Test PASSed.

@suyanNone
Copy link
Contributor Author

It is already resolved in [SPARK-3000][CORE] drop old blocks to disk in parallel when free memory is not enough for caching new blocks #2134

@suyanNone suyanNone closed this Dec 16, 2014
@liyezhang556520
Copy link
Contributor

Hi @suyanNone , you don't need to close this PR, since #2134 is not merged yet. And the bug still exists in current code. You can reopen this PR.

@suyanNone
Copy link
Contributor Author

@liyezhang556520 I not familiar with the process about pull request = =, ok, I will reopen it...

@suyanNone
Copy link
Contributor Author

Reopen

@suyanNone suyanNone reopened this Dec 16, 2014
@SparkQA
Copy link

SparkQA commented Dec 16, 2014

Test build #24491 has started for PR 3629 at commit 34cfbe8.

  • This patch merges cleanly.

@SparkQA
Copy link

SparkQA commented Dec 16, 2014

Test build #24491 has finished for PR 3629 at commit 34cfbe8.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@AmplabJenkins
Copy link

Test FAILed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/24491/
Test FAILed.

@andrewor14
Copy link
Contributor

@suyanNone Having taken a detailed look over the patch, I believe the correct thing to do here is to just remove the memory release in unrollSafely. Even after we successfully unroll the entire iterator into an array, we still hold onto the array so it doesn't make sense (in my original code) to release the memory prematurely. For this reason, I believe much of the changes in this PR is unnecessary and adds further complexity to the already complex logic.

@suyanNone
Copy link
Contributor Author

@andrewor14
Hi, yes, but the valueA (unrollmemoryForThisThread) can't used by others even after this value already added to memoryStore part. it will be double count.

others unrollSafely, will get all threads reservedUnrollMemoryForThisThread , it will count valueA.(due to valueA will be released after the task is completed)

In MemoryStore, it will also count ValueA

Did I miss sth in the code?

@andrewor14
Copy link
Contributor

Sorry, I don't understand what you're saying. Why will there be double counting? Can you list the steps that causes this? (What is Value A?)

@liyezhang556520
Copy link
Contributor

Hi @andrewor14 , I think @suyanNone 's explain is correct. What @suyanNone wants to say that if we just remove the memory release in unrollSafely, then the memory that marked as unrolled will be never released, even after the the corresponding block is actually put into the memory. You can check in ensureFreeMemory method, the atcualMemory is calculated as val actualFreeMemory = freeMemory - currentUnrollMemory. In which, the currentUnrollMemory will be never decreased. That is the double counting of unrollmemory in @suyanNone 's explanation (one copy already put in memory, one copy unrolled in unrollMemory).

@suyanNone
Copy link
Contributor Author

@andrewor14 sorry for my poor english, and @liyezhang556520 has explained well.

now, I just talk the situation if just remove the memory release in unrollSafely.

In MemoryStore, the memory has 2 use, one for unroll and the other for entrys.Every thread has a preserved unroll memory in current design, which will be stored in unrollMemoryMap.

actual free memory in the memory store = maxMemory - entrys.size - currenUnrollMemory(=unrollMemoryMap.size).

Thread A: Thread A's preserved unroll memory, let suppose it equals = 20MB

  1. after Thread A unrollSafely(), 20MB will still hold in currentUnrollMemory
  2. atter thread A tryToPut this block, 20MB will be hold in entrys
  3. after thread A done the task, it will release the preserved unroll memory in currentUnrollMemory, 20MB.

if thread A now between step 2 and step 3. 20MB be counted both in entrys and currentMemoryMap

Thread B begin to put a block in memory, he will have 3 places to be affected by the Thread A unreleased unroll Memory(20MB), where use freeMemory or currentUnrollMemory.

we can says
trueFreeMemory should = freeMemory + 20MB

trueCurrentUnrollMemory should = currentUnrollMemory - 20MB

first place:
reserveUnrollMemoryForThisThread, to judge current memoryStore whether have enough free memory to allocate preserved memory for current thread.

  def reserveUnrollMemoryForThisThread(memory: Long): Boolean = {
    accountingLock.synchronized {
      val granted = freeMemory > currentUnrollMemory + memory
      if (granted) {
        val threadId = Thread.currentThread().getId
        unrollMemoryMap(threadId) = unrollMemoryMap.getOrElse(threadId, 0L) + memory
      }
      granted
    }
  }

second place
ensureFreeSpace: to get actual free memory to judge if need to drop old blocks.

    // Take into account the amount of memory currently occupied by unrolling blocks
    val actualFreeMemory = freeMemory - currentUnrollMemory

    if (actualFreeMemory < space) {

third place
unrollSafely: To make sure if currentUnrollMemory is larger than max, will not allocate more memory for current thread.

   val spaceToEnsure = maxUnrollMemory - currentUnrollMemory
   if (spaceToEnsure > 0) {
     ....
   }

@andrewor14
Copy link
Contributor

I see, thanks for your detailed explanations @suyanNone @liyezhang556520. If the problem is that we double count after we put the block in memory, shouldn't we release the pending memory after we actually put the block (i.e. after this line), not before?

@andrewor14
Copy link
Contributor

Also, the other issue with this patch is that unrollSafely is not used exclusively with tryToPut; it is also used in CacheManager#putInBlockManager. In the CacheManager case, if we acquire pending memory in unrollSafely, then we will never release the pending memory because we never call trytoPut.

@suyanNone
Copy link
Contributor Author

@andrewor14

shouldn't we release the pending memory after we actually put the block (i.e. after this line), not before?
Agree. I think this line is better. Make sure we always release the pending unroll memory never mind ensureFreeSpace is true or false.

CacheManager.putInBlockManager,

do sth like:

  1. after unroll, may the block can put in memoryStore, it will return a unrolled data(array), and then call blockManager.putArray, it will finally to call memoryStore.tryToPut, because we call unrollSafelly in CacheManager only if the block level have memory_level.
  2. in contrast, it can't put in memoryStore, then return an iterator, and unrollMemory will be preserved until the task is completed.
        }
      } finally {
        // Release memory used by this thread for shuffles
        env.shuffleMemoryManager.releaseMemoryForThisThread()
        // Release memory used by this thread for unrolling blocks
        env.blockManager.memoryStore.releaseUnrollMemoryForThisThread()
        // Release memory used by this thread for accumulators
        Accumulators.clear()
        runningTasks.remove(taskId)
      }

In my code:

  1. I only pending the unroll memory if we unroll a block successful. and in current spark code, if a block unroll successful, it always will call memoryStore.tryToPut.

right?

// we release the memory claimed by this thread later on when the task finishes.
if (keepUnrolling) {
val amountToRelease = currentUnrollMemoryForThisThread - previousMemoryReserved
releaseUnrollMemoryForThisThread(amountToRelease)
releaseUnrollMemoryForThisThread(amountToRelease, true)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of introducing a random boolean flag here, I would just move the acquire pending memory code into unrollSafely:

if (keepUnrolling) {
  val amountToRelease = currentUnrollMemoryForThisThread - previousMemoryReserved
  // Mark the unroll memory as pending so that we release
  // it later as soon as we finish caching the block
  releaseUnrollMemoryForThisThread(amountToRelease)
  reservePendingUnrollMemoryForThisThread(amountToRelease)
}

then somewhere down there you'll have to define reservePendingUnrollMemoryForThisThread

@andrewor14
Copy link
Contributor

@suyanNone I have left mostly documentation and code style comments. The only slightly less trivial thing is to revert the pending flag added to releaseMemoryForThisThread. Once you address all of the comments I will merge this. Thanks for your patience and hard work all this time!

@suyanNone
Copy link
Contributor Author

@andrewor14 , Already refine according comments, please review~

@SparkQA
Copy link

SparkQA commented Feb 27, 2015

Test build #28042 has started for PR 3629 at commit 28408f2.

  • This patch merges cleanly.

@SparkQA
Copy link

SparkQA commented Feb 27, 2015

Test build #28042 has finished for PR 3629 at commit 28408f2.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@AmplabJenkins
Copy link

Test PASSed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/28042/
Test PASSed.

@@ -381,6 +395,8 @@ private[spark] class MemoryStore(blockManager: BlockManager, maxMemory: Long)
}

// Take into account the amount of memory currently occupied by unrolling blocks
// and minus the pending unroll memory for that block on current thread.
val threadId = Thread.currentThread().getId
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

where is this variable used?

@suyanNone
Copy link
Contributor Author

@andrewor14
If put it at the end of accountingLock.synchronized, as I described above,

accountingLock.synchronized {
  1. ensureFreeSpace() //here will call the currentUnrollMemory, it will re-count the pendingUnrollmemmory to get freeMemory, then to judge whether free space or not.
   2. entries.put(blockId, entry)
}

If still need to put it at the end of accountingLock.synchronized, for get free space in ensureFreeSpace.

may change the current code val actualFreeMemory = freeMemory - currentUnrollMemory to val actualFreeMemory = freeMemory - currentUnrollMemory + unrollPendingMmeory(threadId)

@SparkQA
Copy link

SparkQA commented Feb 27, 2015

Test build #28065 has started for PR 3629 at commit 809cc41.

  • This patch merges cleanly.

@SparkQA
Copy link

SparkQA commented Feb 27, 2015

Test build #28065 has finished for PR 3629 at commit 809cc41.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@AmplabJenkins
Copy link

Test PASSed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/28065/
Test PASSed.

@suyanNone
Copy link
Contributor Author

ping @andrewor14

@andrewor14
Copy link
Contributor

Ok LGTM I'm merging into master finally. Thanks @suyanNone and @liyezhang556520 for uncovering this tricky issue.

@asfgit asfgit closed this in e3a88d1 Mar 3, 2015
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants