Skip to content
This repository was archived by the owner on Feb 25, 2025. It is now read-only.

Commit ad582b5

Browse files
authored
Rework image & texture management to use concurrent message queues. (#9486)
This patch reworks image decompression and collection in the following ways because of misbehavior in the described edge cases. The current flow for realizing a texture on the GPU from a blob of compressed bytes is to first pass it to the IO thread for image decompression and then upload to the GPU. The handle to the texture on the GPU is then passed back to the UI thread so that it can be included in subsequent layer trees for rendering. The GPU contexts on the Render & IO threads are in the same sharegroup so the texture ends up being visible to the Render Thread context during rendering. This works fine and does not block the UI thread. All references to the image are owned on UI thread by Dart objects. When the final reference to the image is dropped, the texture cannot be collected on the UI thread (because it has not GPU context). Instead, it must be passed to either the GPU or IO threads. The GPU thread is usually in the middle of a frame workload so we redirect the same to the IO thread for eventual collection. While texture collections are usually (comparatively) fast, texture decompression and upload are slow (order of magnitude of frame intervals). For application that end up creating (by not necessarily using) numerous large textures in straight-line execution, it could be the case that texture collection tasks are pending on the IO task runner after all the image decompressions (and upload) are done. Put simply, the collection of the first image could be waiting for the decompression and upload of the last image in the queue. This is exacerbated by two other hacks added to workaround unrelated issues. * First, creating a codec with a single image frame immediately kicks of decompression and upload of that frame image (even if the frame was never request from the codec). This hack was added because we wanted to get rid of the compressed image allocation ASAP. The expectation was codecs would only be created with the sole purpose of getting the decompressed image bytes. However, for applications that only create codecs to get image sizes (but never actually decompress the same), we would end up replacing the compressed image allocation with a larger allocation (device resident no less) for no obvious use. This issue is particularly insidious when you consider that the codec is usually asked for the native image size first before the frame is requested at a smaller size (usually using a new codec with same data but new targetsize). This would cause the creation of a whole extra texture (at 1:1) when the caller was trying to “optimize” for memory use by requesting a texture of a smaller size. * Second, all image collections we delayed in by the unref queue by 250ms because of observations that the calling thread (the UI thread) was being descheduled unnecessarily when a task with a timeout of zero was posted from the same (recall that a task has to be posted to the IO thread for the collection of that texture). 250ms is multiple frame intervals worth of potentially unnecessary textures. The net result of these issues is that we may end up creating textures when all that the application needs is to ask it’s codec for details about the same (but not necessarily access its bytes). Texture collection could also be delayed behind other jobs to decompress the textures on the IO thread. Also, all texture collections are delayed for an arbitrary amount of time. These issues cause applications to be susceptible to OOM situations. These situations manifest in various ways. Host memory exhaustion causes the usual OOM issues. Device memory exhaustion seems to manifest in different ways on iOS and Android. On Android, allocation of a new texture seems to be causing an assertion (in the driver). On iOS, the call hangs (presumably waiting for another thread to release textures which we won’t do because those tasks are blocked behind the current task completing). To address peak memory usage, the following changes have been made: * Image decompression and upload/collection no longer happen on the same thread. All image decompression will now be handled on a workqueue. The number of worker threads in this workqueue is equal to the number of processors on the device. These threads have a lower priority that either the UI or Render threads. These workers are shared between all Flutter applications in the process. * Both the images and their codec now report the correct allocation size to Dart for GC purposes. The Dart VM uses this to pick objects for collection. Earlier the image allocation was assumed to 32bpp with no mipmapping overhead reported. Now, the correct image size is reported and the mipmapping overhead is accounted for. Image codec sizes were not reported to the VM earlier and now are. Expect “External” VM allocations to be higher than previously reported and the numbers in Observatory to line up more closely with actual memory usage (device and host). * Decoding images to a specific size used to decode to 1:1 before performing a resize to the correct dimensions before texture upload. This has now been reworked so that images are first decompressed to a smaller size supported natively by the codec before final resizing to the requested target size. The intermediate copy is now smaller and more promptly collected. Resizing also happens on the workqueue worker. * The drain interval of the unref queue is now sub-frame-interval. I am hesitant to remove the delay entirely because I have not been able to instrument the performance overhead of the same. That is next on my list. But now, multiple frame intervals worth of textures no longer stick around. The following issues have been addressed: * flutter/flutter#34070 Since this was the first usage of the concurrent message loops, the number of idle wakes were determined to be too high and this component has been rewritten to be simpler and not use the existing task runner and MessageLoopImpl interface. * Image decoding had no tests. The new `ui_unittests` harness has been added that sets up a GPU test harness on the host using SwiftShader. Tests have been added for image decompression, upload and resizing. * The device memory exhaustion in this benchmark has been addressed. That benchmark is still not viable for inclusion in any harness however because it creates 9 million codecs in straight-line execution. Because these codecs are destroyed in the microtask callbacks, these are referenced till those callbacks are executed. So now, instead of device memory exhaustion, this will lead to (slower) exhaustion of host memory. This is expected and working as intended. This patch only addresses peak memory use and makes collection of unused images and textures more prompt. It does NOT address memory use by images referenced strongly by the application or framework.
1 parent 1dcd5f5 commit ad582b5

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

48 files changed

+1680
-687
lines changed

BUILD.gn

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,7 @@ group("flutter") {
5555
public_deps += [
5656
"$flutter_root/flow:flow_unittests",
5757
"$flutter_root/fml:fml_unittests",
58+
"$flutter_root/lib/ui:ui_unittests",
5859
"$flutter_root/runtime:runtime_unittests",
5960
"$flutter_root/shell/common:shell_unittests",
6061
"$flutter_root/shell/platform/common/cpp/client_wrapper:client_wrapper_unittests",

ci/licenses_golden/licenses_flutter

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -316,6 +316,8 @@ FILE: ../../../flutter/lib/ui/dart_runtime_hooks.h
316316
FILE: ../../../flutter/lib/ui/dart_ui.cc
317317
FILE: ../../../flutter/lib/ui/dart_ui.h
318318
FILE: ../../../flutter/lib/ui/dart_wrapper.h
319+
FILE: ../../../flutter/lib/ui/fixtures/DashInNooglerHat.jpg
320+
FILE: ../../../flutter/lib/ui/fixtures/ui_test.dart
319321
FILE: ../../../flutter/lib/ui/geometry.dart
320322
FILE: ../../../flutter/lib/ui/hash_codes.dart
321323
FILE: ../../../flutter/lib/ui/hooks.dart
@@ -342,6 +344,9 @@ FILE: ../../../flutter/lib/ui/painting/gradient.cc
342344
FILE: ../../../flutter/lib/ui/painting/gradient.h
343345
FILE: ../../../flutter/lib/ui/painting/image.cc
344346
FILE: ../../../flutter/lib/ui/painting/image.h
347+
FILE: ../../../flutter/lib/ui/painting/image_decoder.cc
348+
FILE: ../../../flutter/lib/ui/painting/image_decoder.h
349+
FILE: ../../../flutter/lib/ui/painting/image_decoder_unittests.cc
345350
FILE: ../../../flutter/lib/ui/painting/image_encoding.cc
346351
FILE: ../../../flutter/lib/ui/painting/image_encoding.h
347352
FILE: ../../../flutter/lib/ui/painting/image_filter.cc
@@ -350,6 +355,8 @@ FILE: ../../../flutter/lib/ui/painting/image_shader.cc
350355
FILE: ../../../flutter/lib/ui/painting/image_shader.h
351356
FILE: ../../../flutter/lib/ui/painting/matrix.cc
352357
FILE: ../../../flutter/lib/ui/painting/matrix.h
358+
FILE: ../../../flutter/lib/ui/painting/multi_frame_codec.cc
359+
FILE: ../../../flutter/lib/ui/painting/multi_frame_codec.h
353360
FILE: ../../../flutter/lib/ui/painting/paint.cc
354361
FILE: ../../../flutter/lib/ui/painting/paint.h
355362
FILE: ../../../flutter/lib/ui/painting/path.cc
@@ -364,6 +371,8 @@ FILE: ../../../flutter/lib/ui/painting/rrect.cc
364371
FILE: ../../../flutter/lib/ui/painting/rrect.h
365372
FILE: ../../../flutter/lib/ui/painting/shader.cc
366373
FILE: ../../../flutter/lib/ui/painting/shader.h
374+
FILE: ../../../flutter/lib/ui/painting/single_frame_codec.cc
375+
FILE: ../../../flutter/lib/ui/painting/single_frame_codec.h
367376
FILE: ../../../flutter/lib/ui/painting/vertices.cc
368377
FILE: ../../../flutter/lib/ui/painting/vertices.h
369378
FILE: ../../../flutter/lib/ui/plugins.dart

fml/concurrent_message_loop.cc

Lines changed: 84 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -11,10 +11,14 @@
1111

1212
namespace fml {
1313

14-
ConcurrentMessageLoop::ConcurrentMessageLoop()
15-
: worker_count_(std::max(std::thread::hardware_concurrency(), 1u)),
16-
shutdown_latch_(worker_count_),
17-
shutdown_(false) {
14+
std::shared_ptr<ConcurrentMessageLoop> ConcurrentMessageLoop::Create(
15+
size_t worker_count) {
16+
return std::shared_ptr<ConcurrentMessageLoop>{
17+
new ConcurrentMessageLoop(worker_count)};
18+
}
19+
20+
ConcurrentMessageLoop::ConcurrentMessageLoop(size_t worker_count)
21+
: worker_count_(std::max<size_t>(worker_count, 1ul)) {
1822
for (size_t i = 0; i < worker_count_; ++i) {
1923
workers_.emplace_back([i, this]() {
2024
fml::Thread::SetCurrentThreadName(
@@ -26,45 +30,97 @@ ConcurrentMessageLoop::ConcurrentMessageLoop()
2630

2731
ConcurrentMessageLoop::~ConcurrentMessageLoop() {
2832
Terminate();
29-
shutdown_latch_.Wait();
3033
for (auto& worker : workers_) {
3134
worker.join();
3235
}
3336
}
3437

35-
// |fml::MessageLoopImpl|
36-
void ConcurrentMessageLoop::Run() {
37-
FML_CHECK(false);
38+
size_t ConcurrentMessageLoop::GetWorkerCount() const {
39+
return worker_count_;
3840
}
3941

40-
// |fml::MessageLoopImpl|
41-
void ConcurrentMessageLoop::Terminate() {
42-
std::scoped_lock lock(wait_condition_mutex_);
43-
shutdown_ = true;
44-
wait_condition_.notify_all();
42+
std::shared_ptr<ConcurrentTaskRunner> ConcurrentMessageLoop::GetTaskRunner() {
43+
return std::make_shared<ConcurrentTaskRunner>(weak_from_this());
4544
}
4645

47-
// |fml::MessageLoopImpl|
48-
void ConcurrentMessageLoop::WakeUp(fml::TimePoint time_point) {
49-
// Assume that the clocks are not the same.
50-
const auto duration = std::chrono::nanoseconds(
51-
(time_point - fml::TimePoint::Now()).ToNanoseconds());
52-
next_wake_ = std::chrono::high_resolution_clock::now() + duration;
53-
wait_condition_.notify_all();
46+
void ConcurrentMessageLoop::PostTask(fml::closure task) {
47+
if (!task) {
48+
return;
49+
}
50+
51+
std::unique_lock lock(tasks_mutex_);
52+
53+
// Don't just drop tasks on the floor in case of shutdown.
54+
if (shutdown_) {
55+
FML_DLOG(WARNING)
56+
<< "Tried to post a task to shutdown concurrent message "
57+
"loop. The task will be executed on the callers thread.";
58+
lock.unlock();
59+
task();
60+
return;
61+
}
62+
63+
tasks_.push(task);
64+
65+
// Unlock the mutex before notifying the condition variable because that mutex
66+
// has to be acquired on the other thread anyway. Waiting in this scope till
67+
// it is acquired there is a pessimization.
68+
lock.unlock();
69+
70+
tasks_condition_.notify_one();
5471
}
5572

5673
void ConcurrentMessageLoop::WorkerMain() {
57-
while (!shutdown_) {
58-
std::unique_lock<std::mutex> lock(wait_condition_mutex_);
59-
if (!shutdown_) {
60-
wait_condition_.wait(lock);
74+
while (true) {
75+
std::unique_lock lock(tasks_mutex_);
76+
tasks_condition_.wait(lock,
77+
[&]() { return tasks_.size() > 0 || shutdown_; });
78+
79+
if (tasks_.size() == 0) {
80+
// This can only be caused by shutdown.
81+
FML_DCHECK(shutdown_);
82+
break;
6183
}
62-
TRACE_EVENT0("fml", "ConcurrentWorkerWake");
63-
RunSingleExpiredTaskNow();
84+
85+
auto task = tasks_.front();
86+
tasks_.pop();
87+
88+
// Don't hold onto the mutex while the task is being executed as it could
89+
// itself try to post another tasks to this message loop.
90+
lock.unlock();
91+
92+
TRACE_EVENT0("flutter", "ConcurrentWorkerWake");
93+
// Execute the one tasks we woke up for.
94+
task();
95+
}
96+
}
97+
98+
void ConcurrentMessageLoop::Terminate() {
99+
std::scoped_lock lock(tasks_mutex_);
100+
shutdown_ = true;
101+
tasks_condition_.notify_all();
102+
}
103+
104+
ConcurrentTaskRunner::ConcurrentTaskRunner(
105+
std::weak_ptr<ConcurrentMessageLoop> weak_loop)
106+
: weak_loop_(std::move(weak_loop)) {}
107+
108+
ConcurrentTaskRunner::~ConcurrentTaskRunner() = default;
109+
110+
void ConcurrentTaskRunner::PostTask(fml::closure task) {
111+
if (!task) {
112+
return;
113+
}
114+
115+
if (auto loop = weak_loop_.lock()) {
116+
loop->PostTask(task);
117+
return;
64118
}
65119

66-
RunExpiredTasksNow();
67-
shutdown_latch_.CountDown();
120+
FML_DLOG(WARNING)
121+
<< "Tried to post to a concurrent message loop that has already died. "
122+
"Executing the task on the callers thread.";
123+
task();
68124
}
69125

70126
} // namespace fml

fml/concurrent_message_loop.h

Lines changed: 40 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -5,51 +5,67 @@
55
#ifndef FLUTTER_FML_CONCURRENT_MESSAGE_LOOP_H_
66
#define FLUTTER_FML_CONCURRENT_MESSAGE_LOOP_H_
77

8-
#include <atomic>
9-
#include <chrono>
108
#include <condition_variable>
9+
#include <queue>
1110
#include <thread>
12-
#include <vector>
1311

12+
#include "flutter/fml/closure.h"
1413
#include "flutter/fml/macros.h"
15-
#include "flutter/fml/message_loop_impl.h"
16-
#include "flutter/fml/synchronization/count_down_latch.h"
1714
#include "flutter/fml/synchronization/thread_annotations.h"
1815

1916
namespace fml {
2017

21-
class ConcurrentMessageLoop : public MessageLoopImpl {
22-
private:
23-
const size_t worker_count_;
24-
std::mutex wait_condition_mutex_;
25-
std::condition_variable wait_condition_;
26-
std::vector<std::thread> workers_;
27-
CountDownLatch shutdown_latch_;
28-
std::chrono::high_resolution_clock::time_point next_wake_;
29-
std::atomic_bool shutdown_;
18+
class ConcurrentTaskRunner;
3019

31-
ConcurrentMessageLoop();
20+
class ConcurrentMessageLoop
21+
: public std::enable_shared_from_this<ConcurrentMessageLoop> {
22+
public:
23+
static std::shared_ptr<ConcurrentMessageLoop> Create(
24+
size_t worker_count = std::thread::hardware_concurrency());
3225

3326
~ConcurrentMessageLoop();
3427

35-
// |fml::MessageLoopImpl|
36-
void Run() override;
28+
size_t GetWorkerCount() const;
29+
30+
std::shared_ptr<ConcurrentTaskRunner> GetTaskRunner();
31+
32+
void Terminate();
3733

38-
// |fml::MessageLoopImpl|
39-
void Terminate() override;
34+
private:
35+
friend ConcurrentTaskRunner;
4036

41-
// |fml::MessageLoopImpl|
42-
void WakeUp(fml::TimePoint time_point) override;
37+
size_t worker_count_ = 0;
38+
std::vector<std::thread> workers_;
39+
std::mutex tasks_mutex_;
40+
std::condition_variable tasks_condition_;
41+
std::queue<fml::closure> tasks_;
42+
bool shutdown_ = false;
4343

44-
static void WorkerMain(ConcurrentMessageLoop* loop);
44+
ConcurrentMessageLoop(size_t worker_count);
4545

4646
void WorkerMain();
4747

48-
FML_FRIEND_MAKE_REF_COUNTED(ConcurrentMessageLoop);
49-
FML_FRIEND_REF_COUNTED_THREAD_SAFE(ConcurrentMessageLoop);
48+
void PostTask(fml::closure task);
49+
5050
FML_DISALLOW_COPY_AND_ASSIGN(ConcurrentMessageLoop);
5151
};
5252

53+
class ConcurrentTaskRunner {
54+
public:
55+
ConcurrentTaskRunner(std::weak_ptr<ConcurrentMessageLoop> weak_loop);
56+
57+
~ConcurrentTaskRunner();
58+
59+
void PostTask(fml::closure task);
60+
61+
private:
62+
friend ConcurrentMessageLoop;
63+
64+
std::weak_ptr<ConcurrentMessageLoop> weak_loop_;
65+
66+
FML_DISALLOW_COPY_AND_ASSIGN(ConcurrentTaskRunner);
67+
};
68+
5369
} // namespace fml
5470

5571
#endif // FLUTTER_FML_CONCURRENT_MESSAGE_LOOP_H_

fml/message_loop.cc

Lines changed: 0 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,6 @@
66

77
#include <utility>
88

9-
#include "flutter/fml/concurrent_message_loop.h"
109
#include "flutter/fml/memory/ref_counted.h"
1110
#include "flutter/fml/memory/ref_ptr.h"
1211
#include "flutter/fml/message_loop_impl.h"
@@ -44,13 +43,6 @@ MessageLoop::MessageLoop()
4443
FML_CHECK(task_runner_);
4544
}
4645

47-
MessageLoop::MessageLoop(Type)
48-
: loop_(fml::MakeRefCounted<ConcurrentMessageLoop>()),
49-
task_runner_(fml::MakeRefCounted<fml::TaskRunner>(loop_)) {
50-
FML_CHECK(loop_);
51-
FML_CHECK(task_runner_);
52-
}
53-
5446
MessageLoop::~MessageLoop() = default;
5547

5648
void MessageLoop::Run() {

fml/message_loop.h

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -18,10 +18,6 @@ class MessageLoop {
1818
FML_EMBEDDER_ONLY
1919
static MessageLoop& GetCurrent();
2020

21-
enum class Type { kConcurrent };
22-
23-
MessageLoop(Type type);
24-
2521
bool IsValid() const;
2622

2723
void Run();

fml/message_loop_unittests.cc

Lines changed: 17 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
#include <iostream>
88
#include <thread>
99

10+
#include "flutter/fml/concurrent_message_loop.h"
1011
#include "flutter/fml/message_loop.h"
1112
#include "flutter/fml/synchronization/count_down_latch.h"
1213
#include "flutter/fml/synchronization/waitable_event.h"
@@ -281,19 +282,31 @@ TEST(MessageLoop, TaskObserverFire) {
281282
ASSERT_TRUE(terminated);
282283
}
283284

285+
TEST(MessageLoop, CanCreateAndShutdownConcurrentMessageLoopsOverAndOver) {
286+
for (size_t i = 0; i < 10; ++i) {
287+
auto loop = fml::ConcurrentMessageLoop::Create(i + 1);
288+
ASSERT_EQ(loop->GetWorkerCount(), i + 1);
289+
}
290+
}
291+
284292
TEST(MessageLoop, CanCreateConcurrentMessageLoop) {
285-
fml::MessageLoop loop(fml::MessageLoop::Type::kConcurrent);
286-
auto task_runner = loop.GetTaskRunner();
293+
auto loop = fml::ConcurrentMessageLoop::Create();
294+
auto task_runner = loop->GetTaskRunner();
287295
const size_t kCount = 10;
288296
fml::CountDownLatch latch(kCount);
297+
std::mutex thread_ids_mutex;
298+
std::set<std::thread::id> thread_ids;
289299
for (size_t i = 0; i < kCount; ++i) {
290-
task_runner->PostTask([&latch]() {
291-
std::this_thread::sleep_for(std::chrono::milliseconds(5));
300+
task_runner->PostTask([&]() {
301+
std::this_thread::sleep_for(std::chrono::seconds(1));
292302
std::cout << "Ran on thread: " << std::this_thread::get_id() << std::endl;
303+
std::scoped_lock lock(thread_ids_mutex);
304+
thread_ids.insert(std::this_thread::get_id());
293305
latch.CountDown();
294306
});
295307
}
296308
latch.Wait();
309+
ASSERT_GE(thread_ids.size(), 1u);
297310
}
298311

299312
TEST(MessageLoop, CanSwapMessageLoopsAndPreserveThreadConfiguration) {

fml/trace_event.h

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -244,6 +244,41 @@ class ScopedInstantEnd {
244244
FML_DISALLOW_COPY_AND_ASSIGN(ScopedInstantEnd);
245245
};
246246

247+
// A move-only utility object that creates a new flow with a unique ID and
248+
// automatically ends it when it goes out of scope. When tracing using multiple
249+
// overlapping flows, it often gets hard to make sure to end the flow
250+
// (especially with early returns), or, end/step on the wrong flow. This
251+
// leads to corrupted or missing traces in the UI.
252+
class TraceFlow {
253+
public:
254+
TraceFlow(const char* label) : label_(label), nonce_(TraceNonce()) {
255+
TraceEventFlowBegin0("flutter", label_, nonce_);
256+
}
257+
258+
~TraceFlow() { End(label_); }
259+
260+
TraceFlow(TraceFlow&& other) : label_(other.label_), nonce_(other.nonce_) {
261+
other.nonce_ = 0;
262+
}
263+
264+
void Step(const char* label) const {
265+
TraceEventFlowStep0("flutter", label, nonce_);
266+
}
267+
268+
void End(const char* label = nullptr) {
269+
if (nonce_ != 0) {
270+
TraceEventFlowEnd0("flutter", label == nullptr ? label_ : label, nonce_);
271+
nonce_ = 0;
272+
}
273+
}
274+
275+
private:
276+
const char* label_;
277+
size_t nonce_;
278+
279+
FML_DISALLOW_COPY_AND_ASSIGN(TraceFlow);
280+
};
281+
247282
} // namespace tracing
248283
} // namespace fml
249284

0 commit comments

Comments
 (0)