Spark Structured Streaming - UI Storage Memory value growing

Keywords´╝Ü apache-spark spark-structured-streaming

Question: 

I am migrating from a DStreams Spark application to a structured streaming application. During testing, I found out that the Storage Memory in the Executors tab in Spark's UI keeps growing. It even surpasses the allocated memory, while there is no spill to disk and the cached RDDs are just a few MB. I am using Spark version 2.4.3 and consume data from Kafka version 2.1.

The example below shows an application with a driver and one executor. The driver is allocated 3 GB of memory and the executor is allocated 5 GB (and 3 cores).

Spark UI Executors tab

As you can see, the UI shows that each process (executor and driver) consumes around 8 GB of memory, while the allocated values are much smaller. It also shows that there is no spill to disk. The following image also shows that the size of the cached RDDs is around 100 MB:

Spark UI Storage tab

I tried to verify the UI reported memory usage with the system's values. I used the ps command, which shows that the driver consumes around 2 GB of memory and the executor consumes 5 GB, which are within the allocated values.

I also used Spark's REST API to get the status of the executors. The response shows that the "memoryUsed" value is the one that is shown in the UI. Here is the JSON response:

{
  "id": "driver",
  "hostPort": "ip:41214",
  "isActive": true,
  "rddBlocks": 0,
  "memoryUsed": 7909526598,
  "diskUsed": 0,
  "totalCores": 0,
  "maxTasks": 0,
  "activeTasks": 0,
  "failedTasks": 0,
  "completedTasks": 0,
  "totalTasks": 0,
  "totalDuration": 0,
  "totalGCTime": 0,
  "totalInputBytes": 0,
  "totalShuffleRead": 0,
  "totalShuffleWrite": 0,
  "isBlacklisted": false,
  "maxMemory": 1529452953,
  "addTime": "2019-05-31T14:46:51.563GMT",
  "executorLogs": {},
  "memoryMetrics": {
    "usedOnHeapStorageMemory": 7909526598,
    "usedOffHeapStorageMemory": 0,
    "totalOnHeapStorageMemory": 1529452953,
    "totalOffHeapStorageMemory": 0
  },
  "blacklistedInStages": []
},
{
  "id": "0",
  "hostPort": "ip:40787",
  "isActive": true,
  "rddBlocks": 24,
  "memoryUsed": 7996553955,
  "diskUsed": 0,
  "totalCores": 3,
  "maxTasks": 3,
  "activeTasks": 0,
  "failedTasks": 0,
  "completedTasks": 710401,
  "totalTasks": 710401,
  "totalDuration": 306845440,
  "totalGCTime": 8128264,
  "totalInputBytes": 733395681216,
  "totalShuffleRead": 475652972265,
  "totalShuffleWrite": 354298278067,
  "isBlacklisted": false,
  "maxMemory": 2674812518,
  "addTime": "2019-05-31T14:46:53.680GMT",
  "executorLogs": {
    "stdout": "http://ip:8081/logPage/?appId=app-20190531164651-0027&executorId=0&logType=stdout",
    "stderr": "http://ip:8081/logPage/?appId=app-20190531164651-0027&executorId=0&logType=stderr"
  },
  "memoryMetrics": {
    "usedOnHeapStorageMemory": 7996553955,
    "usedOffHeapStorageMemory": 0,
    "totalOnHeapStorageMemory": 2674812518,
    "totalOffHeapStorageMemory": 0
  },
  "blacklistedInStages": []
}

It seems that the "memoryUsed" and the "usedOnHeapStorageMemory" values are the same with the value that is shown in the UI.

So, is there a bug on how Spark shows the used memory for Structured Streaming? The reported values do not agree with the system values.

Please note that in my application I use aggregation with watermark and append mode. I thought that this might be the problem and that the state is not cleaned up properly. However, I used the query.lastProgress method to monitor the streaming query's state and it shows that the state is indeed cleaned up. I even removed the aggregation and used the append mode so that the query is stateless and the behaviour was the same.

Thank you in advance.

Answers: