Skip to content

Refactor thread filter mechanisms #209

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 8 commits into
base: main
Choose a base branch
from

Conversation

r1viollet
Copy link
Collaborator

@r1viollet r1viollet commented Apr 28, 2025

What does this PR do?:

Propose an alternative to the thread filter implementation.

Motivation:

This is called a large amount of times.
Using the benchmark. With 15 threads

Old: 261,109 ops / thread / sec
New: 371,809 ops / thread / sec

Todo:

  • Free list implementation
  • Other benchmarks including: Sparse threads, short lived threads
  • Memory usage bench

Additional Notes:

Although the performances are better. I think we should avoid JNI.
I am not sure how to achieve this.

How to test the change?:

For Datadog employees:

  • If this PR touches code that signs or publishes builds or packages, or handles
    credentials of any kind, I've requested a review from @DataDog/security-design-and-guidance.
  • This PR doesn't touch any of that.
  • JIRA: [JIRA-XXXX]

Unsure? Have a question? Request a review!

if (do_filter) {
Profiler::instance()->threadFilter()->collect(filtered_tids);
// Sort the TIDs for efficient lookup
std::sort(filtered_tids.begin(), filtered_tids.end());
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

todo: bench this part if useful. the collection is not idea. We could really randomize during the collection.

Comment on lines 21 to 32
int top = _free_list_top.load(std::memory_order_acquire);
for (int i = 0; i < top; ++i) {
int value = _free_list[i].load(std::memory_order_relaxed);
if (value >= 0) {
int expected = value;
if (_free_list[i].compare_exchange_strong(expected, -1, std::memory_order_acq_rel)) {
return value; // Successfully claimed a free slot
}
// If CAS fails, someone else claimed it, continue scanning
}
}

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks good. Have you experimented with a custom stride or thread-local random starting offset to improve the contended situation? Basically, to prevent all contending threads starting at 0 and then competing for all subsequent slots until everyone is satisfied - forcing the most unlucky thread to do N checks, N being the number of competing threads.

Comment on lines +449 to +450
private native void filterThread_add();
private native void filterThread_remove();
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please, use the naming convention

Suggested change
private native void filterThread_add();
private native void filterThread_remove();
private native void filterThreadAdd0();
private native void filterThreadRemove0();

EXPECT_TRUE(filter.accept(tid));
step++;

const int num_threads = 10;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it make sense to also test with the number of threads exceeding the free list size?

Comment on lines +133 to +134
std::unordered_set<int> _thread_ids[CONCURRENCY_LEVEL][2];
std::atomic<int> _active_index{0}; // 0 or 1 globally
Copy link
Collaborator

@jbachorik jbachorik Apr 29, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder - can this be factored out to a separate type?

// assumption is that we hold the lock (with lock_index)
void Recording::addThread(int lock_index, int tid) {
int active = _active_index.load(std::memory_order_acquire);
_thread_ids[lock_index][active].insert(tid);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder - does the ordering matter here?
Eg. if we have [active][lock_index] can it make things better/worse in terms of data locality?

threads.insert(_tid);

for (int i = 0; i < CONCURRENCY_LEVEL; ++i) {
// I can not use merge : cpp 17
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unrelated - but we might try to bump the accepted C++ level to 17. It should be possible ...

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice!

@@ -1053,11 +1055,18 @@ void Recording::writeExecutionModes(Buffer *buf) {
}

void Recording::writeThreads(Buffer *buf) {
addThread(_tid);
Copy link
Collaborator

@jbachorik jbachorik Apr 29, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it ok to remove this call? We should make sure that the recording thread is also included as events can be associated with it - recording info, settings, config - all point to _tid thread.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think I do (with the threads.insert(_tid);)
I had to move it down a few lines.

jboolean enable) {
int tid = ProfiledThread::currentTid();
if (tid < 0) {
Java_com_datadoghq_profiler_JavaProfiler_filterThread_1add(JNIEnv *env,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here is actually a typo 1_add this will not be linked with the Java native method.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is probably due to the mangling (I had to iterate a few times before I figured out how to link)

}

extern "C" DLLEXPORT void JNICALL
Java_com_datadoghq_profiler_JavaProfiler_filterThread_1remove(JNIEnv *env,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this actually work? The name does not correspond the to Java native method :(

// assumption is that we hold the lock (with lock_index)
void Recording::addThread(int lock_index, int tid) {
int active = _active_index.load(std::memory_order_acquire);
_thread_ids[lock_index][active].insert(tid);
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is not acceptable: no allocations can take place here.

This was referenced Jun 17, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants