Skip to content

Zgu/threadfilter #224

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 12 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 21 additions & 0 deletions ddprof-lib/src/main/cpp/javaApi.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -406,3 +406,24 @@ Java_com_datadoghq_profiler_JVMAccess_healthCheck0(JNIEnv *env,
jobject unused) {
return true;
}

extern "C" DLLEXPORT jlong JNICALL
Java_com_datadoghq_profiler_ActiveBitmaps_bitmapAddressFor0(JNIEnv *env,
jclass unused,
jint tid) {
u64* bitmap = Profiler::instance()->threadFilter()->bitmapAddressFor((int)tid);
return (jlong)bitmap;
}

extern "C" DLLEXPORT jboolean JNICALL
Java_com_datadoghq_profiler_ActiveBitmaps_isActive(JNIEnv *env,
jclass unused,
jint tid) {
return Profiler::instance()->threadFilter()->accept((int)tid) ? JNI_TRUE : JNI_FALSE;
}

extern "C" DLLEXPORT jlong JNICALL
Java_com_datadoghq_profiler_ActiveBitmaps_getActiveCountAddr0(JNIEnv *env,
jclass unused) {
return (jlong)Profiler::instance()->threadFilter()->addressOfSize();
}
23 changes: 23 additions & 0 deletions ddprof-lib/src/main/cpp/reverse_bits.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
//
// Borrow the implementation from openjdk
// https://github.com/openjdk/jdk/blob/master/src/hotspot/share/utilities/reverse_bits.hpp
//

#ifndef REVERSE_BITS_H
#define REVERSE_BITS_H
#include "arch_dd.h"
#include <stdint.h>

static constexpr u32 rep_5555 = static_cast<u32>(UINT64_C(0x5555555555555555));
static constexpr u32 rep_3333 = static_cast<u32>(UINT64_C(0x3333333333333333));
static constexpr u32 rep_0F0F = static_cast<u32>(UINT64_C(0x0F0F0F0F0F0F0F0F));

inline u16 reverse16(u16 v) {
u32 x = static_cast<u32>(v);
x = ((x & rep_5555) << 1) | ((x >> 1) & rep_5555);
x = ((x & rep_3333) << 2) | ((x >> 2) & rep_3333);
x = ((x & rep_0F0F) << 4) | ((x >> 4) & rep_0F0F);
return __builtin_bswap16(static_cast<u16>(x));
}

#endif //REVERSE_BITS_H
44 changes: 37 additions & 7 deletions ddprof-lib/src/main/cpp/threadFilter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
#include "threadFilter.h"
#include "counters.h"
#include "os.h"
#include "reverse_bits.h"
#include <cassert>
#include <stdlib.h>
#include <string.h>

Expand Down Expand Up @@ -85,32 +87,57 @@ void ThreadFilter::clear() {
_size = 0;
}

bool ThreadFilter::accept(int thread_id) {
u64 *b = bitmap(thread_id);
return b != NULL && (word(b, thread_id) & (1ULL << (thread_id & 0x3f)));
int ThreadFilter::mapThreadId(int thread_id) {
// We want to map the thread_id inside the same bitmap
static_assert(BITMAP_SIZE >= (u16)0xffff, "Potential verflow");
u16 lower16 = (u16)(thread_id & 0xffff);
lower16 = reverse16(lower16);
int tid = (thread_id & ~0xffff) | lower16;
return tid;
}

void ThreadFilter::add(int thread_id) {
u64 *b = bitmap(thread_id);

u64* ThreadFilter::getBitmapFor(int thread_id) {
int index = static_cast<u32>(thread_id) / BITMAP_CAPACITY;
u64* b = _bitmap[index];
if (b == NULL) {
b = (u64 *)OS::safeAlloc(BITMAP_SIZE);
u64 *oldb = __sync_val_compare_and_swap(
&_bitmap[(u32)thread_id / BITMAP_CAPACITY], NULL, b);
&_bitmap[index], NULL, b);
if (oldb != NULL) {
OS::safeFree(b, BITMAP_SIZE);
b = oldb;
} else {
trackPage();
}
}
return b;
}

u64* ThreadFilter::bitmapAddressFor(int thread_id) {
u64* bitmap = getBitmapFor(thread_id);
thread_id = mapThreadId(thread_id);
return wordAddress(bitmap, thread_id);
}

bool ThreadFilter::accept(int thread_id) {
u64 *b = bitmap(thread_id);
thread_id = mapThreadId(thread_id);
return b != NULL && (word(b, thread_id) & (1ULL << (thread_id & 0x3f)));
}

void ThreadFilter::add(int thread_id) {
u64 *b = getBitmapFor(thread_id);
assert(b != NULL);
thread_id = mapThreadId(thread_id);
u64 bit = 1ULL << (thread_id & 0x3f);
if (!(__sync_fetch_and_or(&word(b, thread_id), bit) & bit)) {
atomicInc(_size);
}
}

void ThreadFilter::remove(int thread_id) {
thread_id = mapThreadId(thread_id);
u64 *b = bitmap(thread_id);
if (b == NULL) {
return;
Expand All @@ -132,7 +159,10 @@ void ThreadFilter::collect(std::vector<int> &v) {
// order here
u64 word = __atomic_load_n(&b[j], __ATOMIC_ACQUIRE);
while (word != 0) {
v.push_back(start_id + j * 64 + __builtin_ctzl(word));
int tid = start_id + j * 64 + __builtin_ctzl(word);
// restore thread id
tid = mapThreadId(tid);
v.push_back(tid);
word &= (word - 1);
}
}
Expand Down
10 changes: 10 additions & 0 deletions ddprof-lib/src/main/cpp/threadFilter.h
Original file line number Diff line number Diff line change
Expand Up @@ -45,11 +45,19 @@ class ThreadFilter {
__ATOMIC_ACQUIRE);
}

static int mapThreadId(int thread_id);

u64 &word(u64 *bitmap, int thread_id) {
// todo: add thread safe APIs
return bitmap[((u32)thread_id % BITMAP_CAPACITY) >> 6];
}

u64* wordAddress(u64 *bitmap, int thread_id) {
return &bitmap[((u32)thread_id % BITMAP_CAPACITY) >> 6];
}

u64* getBitmapFor(int thread_id);

public:
ThreadFilter();
ThreadFilter(ThreadFilter &threadFilter) = delete;
Expand All @@ -58,13 +66,15 @@ class ThreadFilter {
bool enabled() { return _enabled; }

int size() { return _size; }
const volatile int* addressOfSize() const { return &_size; }

void init(const char *filter);
void clear();

bool accept(int thread_id);
void add(int thread_id);
void remove(int thread_id);
u64* bitmapAddressFor(int thread_id);

void collect(std::vector<int> &v);
};
Expand Down
80 changes: 80 additions & 0 deletions ddprof-lib/src/main/java/com/datadoghq/profiler/ActiveBitmaps.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
package com.datadoghq.profiler;

import sun.misc.Unsafe;
import java.lang.reflect.Field;


class ActiveBitmaps {
private static final Unsafe UNSAFE;
static {
Unsafe unsafe = null;
try {
Field f = Unsafe.class.getDeclaredField("theUnsafe");
f.setAccessible(true);
unsafe = (Unsafe) f.get(null);
} catch (Exception ignore) { }
UNSAFE = unsafe;
}

private static long activeCountAddr = 0;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🟠 Code Quality Violation

Suggested change
private static long activeCountAddr = 0;
private static long activeCountAddr;
Remove initialization, this is already the default value. (...read more)

When initializing fields, prevent initializing fields to the default value. Any additional initialization means more bytecode instructions, and allocating many of these objects may impact your application performance.

If you initialize to a default value, remove the initialization.

View in Datadog  Leave us feedback  Documentation


private static final ThreadLocal<Long> Address = new ThreadLocal<Long>() {
@Override protected Long initialValue() {
return -1L;
}
};

public static void initialize() {
activeCountAddr = getActiveCountAddr0();
}

// Set bitmap to native code
static native long bitmapAddressFor0(int tid);

static long getBitmask(int tid) {
int tmp = (tid >> 8) & 0xff ;
int bits = 0;
for (int index = 0; index < 7 ; index++) {
if ((tmp & 0x01) == 0x01) {
bits |= 0x01;
}
tmp >>= 1;
bits <<= 1;
}
return 1L << (bits & 0x3f);
}

static void setActive(int tid, boolean active) {
long addr = Address.get();
if (addr == -1) {
addr = bitmapAddressFor0(tid);
Address.set(addr);
}
long bitmask = getBitmask(tid);
long value = UNSAFE.getLong(addr);
long newVal;
if (active) {
newVal = value | bitmask;
} else {
newVal = value & ~bitmask;
}
while (!UNSAFE.compareAndSwapLong(null, addr, value, newVal)) {
value = UNSAFE.getLong(addr);
newVal = active ? (value | bitmask) : (value & ~bitmask);
}
int delta = active ? 1 : -1;
assert activeCountAddr != 0;
UNSAFE.getAndAddInt(null, activeCountAddr, delta);
// if (isActive(tid) != active) {
// throw new RuntimeException("SetActive failed");
// }

assert isActive(tid) == active;
}

// For verification
static native boolean isActive(int tid);

static native long getActiveCountAddr0();
}

Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@ public static synchronized JavaProfiler getInstance(String libLocation, String s
throw new IOException("Failed to load Datadog Java profiler library", result.error);
}
init0();
ActiveBitmaps.initialize();

profiler.initializeContextStorage();
instance = profiler;
Expand Down Expand Up @@ -208,15 +209,15 @@ public boolean recordTraceRoot(long rootSpanId, String endpoint, int sizeLimit)
* 'filter' option must be enabled to use this method.
*/
public void addThread() {
filterThread0(true);
ActiveBitmaps.setActive(TID.get(), true);
}

/**
* Remove the given thread to the set of profiled threads.
* 'filter' option must be enabled to use this method.
*/
public void removeThread() {
filterThread0(false);
ActiveBitmaps.setActive(TID.get(), false);
}


Expand Down
Loading
Loading