Uniffle stage retry rework

#uniffle #github

link: Rework Uniffle stage retry

After revisiting the stage retry feature, I found some obvious bugs that were not ready for production use. This issue is to track the design,existing problems and solutions.

Background and design

Some background and design are included in the previous design doc and github issues.
You can search the github to retrieve them to know more about this feature.

  1. https://github.com/apache/incubator-uniffle/issues/477
  2. https://docs.google.com/document/d/1OGswqDDQ52rpw5Lat-FpEGDfX1T6EFXYLC7L0G5O4WE/edit?usp=sharing
  3. https://github.com/apache/incubator-uniffle/issues/825

The stage retry whole design is based on the Spark's fetchFailedException , once spark scheduler accepts this exception thrown from task, it will not schedule the next tasks in this stage and trigger the stage resubmit to run. That means uniffle will use this exception as the way to trigger whatever encountering fetch/write failure.

Leveraging from this mechanism, Uniffle wants to hack this to acheive the stage retry ability to improve the spark jobs stability, especially on the case of shuffle-server shutdown unnormally.

The shuffle task failure could be split into 2 parts: write failure and read(fetch) failure, that should be digged separately and together. Below description will be shown by my understanding correct solution.

All the below design architecture is based on the consensus that we want to keep the spark's original semantic of shuffleId.

Write failure

Once the task failed on the write failure that may be shuffle-server hang or shutdown or network lost..., task will ask shuffle manager whether throwing spark's fetchFailedException.
At this time, shuffle manager will check whether the task failure reaches the spark max failure threshold. Once this condition is satisfied, this will make task throw and fail.

To avoid the data duplication, the unregister map output tracker related with the shuffleId is necessary.

Fetch failure

Fetch failure shows the similar logic with the above write failure.

The special point of fetch failure is that will rerun all the map tasks from the upstream stage, which is different with the spark vanilla shuffle service that only will trigger the maps executed on the broken servers.

Write + fetch failure together

Write and fetch failures won't happen at the same time, that's ensured by the spark, that's not pipelined data flow.

For uniffle shuffle manager, the write and fetch failure problem is on the retry condition. But I think the shuffle manager only needs to worry about the reported index of task attempts, whatever the failure type.

QA

How to clear previous stage attempt data and reassign

Before answering this question, another question is that do you really need to clear previous stage data. I think this is necessary

  1. Current blockId layout's taskAttemptId is not real unique taskId, that means we can't depend this to filter the retry tasks outdate data. (Although in the following QA, I will re-introduce this real task attempt id into blockId layout)
  2. Too much data will make client read unnecessary memory data, although the localfile data could be filtered out by the index file. This will bring too much cost
  3. Outdate data remains on the shuffle-server that will waste disk/memory space, this is unnessary

Based on the above thought, let's answer the initial question.

The unregister and reassign will occur on the first request that reaches the retry condition with the lock to make other tasks' ask requests to hang.

Once the reassign is finished, shuffle manager will register the next stage attempt shuffle to the reassignment shuffle-servers. At this time, server will accept this request and then to clear out the previous data, including in the memory/disk/hdfs.

We hope this deletion is quick, but properly not. From my survey, the 200g disk file with 3000 partition costs 40s. this is not accpetable for a grpc request.

So the two phase deletion should be introduced to solve the problem of data visibility and delete speed, that splits the deletion into rename sync and delete async 2 phases.
This is based on the rename quick speed of disk and hdfs. If having object store, the first phase will not use the rename mechansim, or store some metadata into memory and then to rename. However, there are always ways to solve the OSS storage type.

And in now codebase, the report failure and reassignOnStageRetry is splitted into 2 rpcs, it's necessary to unify them into one.

How to ensure shuffle data consistency in different stage attempt

Everything looks fine now, but some corner cases are ignored especially on the high-concurrent jobs.

When deleting previous shuffle data, the another writer or reader may read the same partition data.

For the writer, it's hard to use lock to ensure consistency as it need to ensure write throughput. So in this case, we hope that the partial leak data could be filtered out from the client side by the task attempt id. Based on this, the real unique task attemptId will be introduced here to solve this problem, which may bring some limitation for the huge jobs, but I think this is necessary and worthwhile.

For the reader, the premise of accuracy is to obtain the real blockId bitmap, so locking is required here by stage attempt number check.

Next step

The current code has some bugs described on the above Q&A, so this is time to create some issues to solve them.

  1. Unify the fetch failure + write failure retry checking into the RssStageResubmitManager, the checking logic may be different but we could distingush them and use different strategies.
  2. RPC merge from 2 rpcs of reportXXFailure and reassignOnStageRetry into reportXXFailure, that include the reassign into one request.
  3. Introduce the real unique taskAttemptId into uniffle by the indiviual config option to make client filter out previous stage data correctly
  4. Ensure the getting shuffle result safe
  5. Some bugs