-
Notifications
You must be signed in to change notification settings - Fork 6k
Rework image & texture management to use concurrent message queues. #9486
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -11,10 +11,14 @@ | |
|
||
namespace fml { | ||
|
||
ConcurrentMessageLoop::ConcurrentMessageLoop() | ||
: worker_count_(std::max(std::thread::hardware_concurrency(), 1u)), | ||
shutdown_latch_(worker_count_), | ||
shutdown_(false) { | ||
std::shared_ptr<ConcurrentMessageLoop> ConcurrentMessageLoop::Create( | ||
size_t worker_count) { | ||
return std::shared_ptr<ConcurrentMessageLoop>{ | ||
new ConcurrentMessageLoop(worker_count)}; | ||
} | ||
|
||
ConcurrentMessageLoop::ConcurrentMessageLoop(size_t worker_count) | ||
: worker_count_(std::max<size_t>(worker_count, 1ul)) { | ||
for (size_t i = 0; i < worker_count_; ++i) { | ||
workers_.emplace_back([i, this]() { | ||
fml::Thread::SetCurrentThreadName( | ||
|
@@ -26,45 +30,97 @@ ConcurrentMessageLoop::ConcurrentMessageLoop() | |
|
||
ConcurrentMessageLoop::~ConcurrentMessageLoop() { | ||
Terminate(); | ||
shutdown_latch_.Wait(); | ||
for (auto& worker : workers_) { | ||
worker.join(); | ||
} | ||
} | ||
|
||
// |fml::MessageLoopImpl| | ||
void ConcurrentMessageLoop::Run() { | ||
FML_CHECK(false); | ||
size_t ConcurrentMessageLoop::GetWorkerCount() const { | ||
return worker_count_; | ||
} | ||
|
||
// |fml::MessageLoopImpl| | ||
void ConcurrentMessageLoop::Terminate() { | ||
std::scoped_lock lock(wait_condition_mutex_); | ||
shutdown_ = true; | ||
wait_condition_.notify_all(); | ||
std::shared_ptr<ConcurrentTaskRunner> ConcurrentMessageLoop::GetTaskRunner() { | ||
return std::make_shared<ConcurrentTaskRunner>(weak_from_this()); | ||
} | ||
|
||
// |fml::MessageLoopImpl| | ||
void ConcurrentMessageLoop::WakeUp(fml::TimePoint time_point) { | ||
// Assume that the clocks are not the same. | ||
const auto duration = std::chrono::nanoseconds( | ||
(time_point - fml::TimePoint::Now()).ToNanoseconds()); | ||
next_wake_ = std::chrono::high_resolution_clock::now() + duration; | ||
wait_condition_.notify_all(); | ||
void ConcurrentMessageLoop::PostTask(fml::closure task) { | ||
if (!task) { | ||
return; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. You should probably log this in debug cases since asking for a task to post and nothing happening is probably a logical error. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The task does not exist. So the call not doing anything makes sense I think. |
||
} | ||
|
||
std::unique_lock lock(tasks_mutex_); | ||
|
||
// Don't just drop tasks on the floor in case of shutdown. | ||
if (shutdown_) { | ||
FML_DLOG(WARNING) | ||
<< "Tried to post a task to shutdown concurrent message " | ||
"loop. The task will be executed on the callers thread."; | ||
lock.unlock(); | ||
task(); | ||
return; | ||
} | ||
|
||
tasks_.push(task); | ||
|
||
// Unlock the mutex before notifying the condition variable because that mutex | ||
// has to be acquired on the other thread anyway. Waiting in this scope till | ||
// it is acquired there is a pessimization. | ||
lock.unlock(); | ||
|
||
tasks_condition_.notify_one(); | ||
} | ||
|
||
void ConcurrentMessageLoop::WorkerMain() { | ||
while (!shutdown_) { | ||
std::unique_lock<std::mutex> lock(wait_condition_mutex_); | ||
if (!shutdown_) { | ||
wait_condition_.wait(lock); | ||
while (true) { | ||
std::unique_lock lock(tasks_mutex_); | ||
tasks_condition_.wait(lock, | ||
[&]() { return tasks_.size() > 0 || shutdown_; }); | ||
|
||
if (tasks_.size() == 0) { | ||
// This can only be caused by shutdown. | ||
FML_DCHECK(shutdown_); | ||
break; | ||
} | ||
TRACE_EVENT0("fml", "ConcurrentWorkerWake"); | ||
RunSingleExpiredTaskNow(); | ||
|
||
auto task = tasks_.front(); | ||
tasks_.pop(); | ||
|
||
// Don't hold onto the mutex while the task is being executed as it could | ||
// itself try to post another tasks to this message loop. | ||
lock.unlock(); | ||
|
||
TRACE_EVENT0("flutter", "ConcurrentWorkerWake"); | ||
// Execute the one tasks we woke up for. | ||
task(); | ||
} | ||
} | ||
|
||
void ConcurrentMessageLoop::Terminate() { | ||
std::scoped_lock lock(tasks_mutex_); | ||
shutdown_ = true; | ||
tasks_condition_.notify_all(); | ||
} | ||
|
||
ConcurrentTaskRunner::ConcurrentTaskRunner( | ||
std::weak_ptr<ConcurrentMessageLoop> weak_loop) | ||
: weak_loop_(std::move(weak_loop)) {} | ||
|
||
ConcurrentTaskRunner::~ConcurrentTaskRunner() = default; | ||
|
||
void ConcurrentTaskRunner::PostTask(fml::closure task) { | ||
if (!task) { | ||
return; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Probably should log in debug build here too. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Same reasoning as above. |
||
} | ||
|
||
if (auto loop = weak_loop_.lock()) { | ||
loop->PostTask(task); | ||
return; | ||
} | ||
|
||
RunExpiredTasksNow(); | ||
shutdown_latch_.CountDown(); | ||
FML_DLOG(WARNING) | ||
<< "Tried to post to a concurrent message loop that has already died. " | ||
"Executing the task on the callers thread."; | ||
task(); | ||
} | ||
|
||
} // namespace fml |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -5,51 +5,67 @@ | |
#ifndef FLUTTER_FML_CONCURRENT_MESSAGE_LOOP_H_ | ||
#define FLUTTER_FML_CONCURRENT_MESSAGE_LOOP_H_ | ||
|
||
#include <atomic> | ||
#include <chrono> | ||
#include <condition_variable> | ||
#include <queue> | ||
#include <thread> | ||
#include <vector> | ||
|
||
#include "flutter/fml/closure.h" | ||
#include "flutter/fml/macros.h" | ||
#include "flutter/fml/message_loop_impl.h" | ||
#include "flutter/fml/synchronization/count_down_latch.h" | ||
#include "flutter/fml/synchronization/thread_annotations.h" | ||
|
||
namespace fml { | ||
|
||
class ConcurrentMessageLoop : public MessageLoopImpl { | ||
private: | ||
const size_t worker_count_; | ||
std::mutex wait_condition_mutex_; | ||
std::condition_variable wait_condition_; | ||
std::vector<std::thread> workers_; | ||
CountDownLatch shutdown_latch_; | ||
std::chrono::high_resolution_clock::time_point next_wake_; | ||
std::atomic_bool shutdown_; | ||
class ConcurrentTaskRunner; | ||
|
||
ConcurrentMessageLoop(); | ||
class ConcurrentMessageLoop | ||
: public std::enable_shared_from_this<ConcurrentMessageLoop> { | ||
public: | ||
static std::shared_ptr<ConcurrentMessageLoop> Create( | ||
size_t worker_count = std::thread::hardware_concurrency()); | ||
|
||
~ConcurrentMessageLoop(); | ||
|
||
// |fml::MessageLoopImpl| | ||
void Run() override; | ||
size_t GetWorkerCount() const; | ||
|
||
std::shared_ptr<ConcurrentTaskRunner> GetTaskRunner(); | ||
|
||
void Terminate(); | ||
|
||
// |fml::MessageLoopImpl| | ||
void Terminate() override; | ||
private: | ||
friend ConcurrentTaskRunner; | ||
|
||
// |fml::MessageLoopImpl| | ||
void WakeUp(fml::TimePoint time_point) override; | ||
size_t worker_count_ = 0; | ||
std::vector<std::thread> workers_; | ||
std::mutex tasks_mutex_; | ||
std::condition_variable tasks_condition_; | ||
std::queue<fml::closure> tasks_; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. There is no threadsafe queue class available to us? Maybe this is an opportunity to make one? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This class is thread safe. Is there a specific issue I missed? Unfortunately, I could not add thread safety annotation here because the the condition variable wait from libcxx does not have the appropriate annotations. If you meant a lockless queue instead, I didn't want to write one because we needed the mutex for the condition variable anyway. Authoring, testing and then not needing the lockless nature of the queue seemed unnecessary in this instance. |
||
bool shutdown_ = false; | ||
|
||
static void WorkerMain(ConcurrentMessageLoop* loop); | ||
ConcurrentMessageLoop(size_t worker_count); | ||
|
||
void WorkerMain(); | ||
|
||
FML_FRIEND_MAKE_REF_COUNTED(ConcurrentMessageLoop); | ||
FML_FRIEND_REF_COUNTED_THREAD_SAFE(ConcurrentMessageLoop); | ||
void PostTask(fml::closure task); | ||
|
||
FML_DISALLOW_COPY_AND_ASSIGN(ConcurrentMessageLoop); | ||
}; | ||
|
||
class ConcurrentTaskRunner { | ||
public: | ||
ConcurrentTaskRunner(std::weak_ptr<ConcurrentMessageLoop> weak_loop); | ||
|
||
~ConcurrentTaskRunner(); | ||
|
||
void PostTask(fml::closure task); | ||
|
||
private: | ||
friend ConcurrentMessageLoop; | ||
|
||
std::weak_ptr<ConcurrentMessageLoop> weak_loop_; | ||
|
||
FML_DISALLOW_COPY_AND_ASSIGN(ConcurrentTaskRunner); | ||
}; | ||
|
||
} // namespace fml | ||
|
||
#endif // FLUTTER_FML_CONCURRENT_MESSAGE_LOOP_H_ |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -244,6 +244,41 @@ class ScopedInstantEnd { | |
FML_DISALLOW_COPY_AND_ASSIGN(ScopedInstantEnd); | ||
}; | ||
|
||
// A move-only utility object that creates a new flow with a unique ID and | ||
// automatically ends it when it goes out of scope. When tracing using multiple | ||
// overlapping flows, it often gets hard to make sure to end the flow | ||
// (especially with early returns), or, end/step on the wrong flow. This | ||
// leads to corrupted or missing traces in the UI. | ||
class TraceFlow { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Please add docstring for class. Why is this inlined? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Done. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. lgtm, just make sure it's it doxygen format ( http://www.doxygen.nl/manual/docblocks.html ) There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We're using triple slashes in other places of the code methinks. |
||
public: | ||
TraceFlow(const char* label) : label_(label), nonce_(TraceNonce()) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Shouldn't you memcpy the label? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. No, all strings using in tracing need to be const. Or, more precisely, the lifecycle of the labels must extend past the shutdown on the Dart VM which is used as the trace processor. Using a memcpy here would make the VM read from garbage memory as it is constructing the buffer used to collect trace information. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Trace event labels are passed to the Dart timeline API, which requires that the label buffer remain valid until the Dart VM is shut down. Effectively this means that the labels need to be string literals that are not allocated dynamically. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ok, I think what you want to use is There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I played around with this and I'm mistaken, it isn't straightforward to do: https://stackoverflow.com/questions/7613528/restrict-passed-parameter-to-a-string-literal, feel free to ignore my suggestion =). |
||
TraceEventFlowBegin0("flutter", label_, nonce_); | ||
} | ||
|
||
~TraceFlow() { End(label_); } | ||
|
||
TraceFlow(TraceFlow&& other) : label_(other.label_), nonce_(other.nonce_) { | ||
other.nonce_ = 0; | ||
} | ||
|
||
void Step(const char* label) const { | ||
TraceEventFlowStep0("flutter", label, nonce_); | ||
} | ||
|
||
void End(const char* label = nullptr) { | ||
if (nonce_ != 0) { | ||
TraceEventFlowEnd0("flutter", label == nullptr ? label_ : label, nonce_); | ||
nonce_ = 0; | ||
} | ||
} | ||
|
||
private: | ||
const char* label_; | ||
size_t nonce_; | ||
|
||
FML_DISALLOW_COPY_AND_ASSIGN(TraceFlow); | ||
}; | ||
|
||
} // namespace tracing | ||
} // namespace fml | ||
|
||
|
Uh oh!
There was an error while loading. Please reload this page.