Enhancing Transparency and Accountability: Implementing Entity Audit Logging in Java [Part-2/2]

In the last post, we used io.ebean’s ChangeLog annotation to log entity changes in a synchronous blocking I/O application. In this post, we will see how that works in a multithreaded or async (non-blocking) I/O application.

If your system uses a sepa…


This content originally appeared on DEV Community and was authored by Anuj Singh

In the last post, we used io.ebean's ChangeLog annotation to log entity changes in a synchronous blocking I/O application. In this post, we will see how that works in a multithreaded or async (non-blocking) I/O application.

If your system uses a separate thread-pool or executor context for doing I/O tasks such as making an HTTP API request or a database transaction, then, the audit log prepare class will not be able to detect the user context on model entities that are updated, as the model entity will be saved in a different context or thread-pool. Hence, we need to ensure that the thread-context is not lost when switching the executor or when switching threads in the same thread-pool.

To do so, first we need to understand how to store some data in thread context. For this we will use ThreadLocals. ThreadLocal construct allows us to store data that will be accessible only by a specific thread. Each thread will have its own ThreadLocal instance, hence while switching the context, we need to ensure that the thread context of the current thread is propagated to the next thread.

STORING USER CONTEXT IN THREAD LOCALS

We can set ThreadLocals with user info and use it in change log prepare’s implementation to set it in change set.

ThreadLocalManager

public class ThreadLocalManager {
    private static ThreadLocal<Map<String, Object>> context = new ThreadLocal<>();
    public static void addToContext(String key, Object value) {
        Map<String, Object> currentContext = ThreadLocalManager.context.get() == null ? new HashMap<>() : context.get();
        currentContext.put(key, value);
        context.set(currentContext);
    }
    public static void setContext(Map<String, Object> contextMap) {
        context.set(contextMap);
    }
    public static Map<String, Object> getContext() {
        return context.get();
    }
    public static Object getFromContext(String key) {
        return Nullifier.get(() -> context.get().getOrDefault(key, "NA"));
    }
    public static void clearContext() {
        ThreadLocalManager.context.remove();
    }
}

We can add additional info before setting the thread local

ThreadLocalManager.addToContext("userContext", new HashMap<String, String>() {{
    put("userName", "some user");
    put("userEmail", "some.user@company.com");
}}
entity.save()

Now we'll modify our ChangeLogPrepare to read user data from thread context

Custom ChangeLogPrepare

public class AuditLogPrepare implements ChangeLogPrepare {
    private final play.Logger.ALogger logger = Logger.of(this.getClass());
    @Override
    public boolean prepare(ChangeSet changes) {
        Map<String, String> userContext = Nullifier.get(() -> (Map<String, String>) ThreadLocalManager.getContext().get("userContext"), new HashMap<>());
        if (userContext.isEmpty()) logger.warn("[ALERT] userContext is empty for changeset: " + changes.toString());
        changes.getUserContext().put("userName", authMap.getOrDefault("userName", DEFAULT_USER_NAME));
        changes.getUserContext().put("userEmail", authMap.getOrDefault("userEmail", DEFAULT_USER_EMAIL));
        changes.setSource("MyApp");
        return true;
    }
}

As we see, the user data is taken from thread local, we need to ensure that the thread context is maintained while switching the thread. For that, we'll create a utility class that helps us propagate the thread context to next runnable/callable.

ContextUtility

public class ContextUtility {
    public static <T> Callable<T> wrapWithContext(Callable<T> task) {
        Map<String, Object> previousContext = ThreadLocalManager.getContext();
        if (previousContext == null)
            return task;
        else
            return () -> {
                ThreadLocalManager.setContext(previousContext);
                try {
                    return task.call();
                } finally {
                    ThreadLocalManager.clearContext();
                }
            };
    }
    public static Runnable wrapWithContext(Runnable task) {
        Map<String, Object> previousContext = ThreadLocalManager.getContext();
        if (previousContext == null) {
            return task;
        } else
            return () -> {
                ThreadLocalManager.setContext(previousContext);
                try {
                    task.run();
                } finally {
                    ThreadLocalManager.clearContext();
                }
            };
    }
}

Using the methods from ContextUtility we create CustomThreadPoolExecutor to override methods to attach thread context before submitting/executing tasks

CustomThreadPoolExecutor

public class CustomThreadPoolExecutor extends ThreadPoolExecutor {
    public CustomThreadPoolExecutor(int corePoolSize,
            int maximumPoolSize,
            long keepAliveTime,
            @NotNull TimeUnit unit,
            @NotNull BlockingQueue<Runnable> workQueue,
            @NotNull ThreadFactory threadFactory,
            @NotNull RejectedExecutionHandler handler) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler);
    }
    @Override
    public <T> @NotNull Future<T> submit(@NotNull Callable<T> task) {
        return super.submit(ContextUtility.wrapWithContext(task));
    }
    @Override
    public <T> @NotNull Future<T> submit(@NotNull Runnable task, T result) {
        return super.submit(ContextUtility.wrapWithContext(task), result);
    }
    @Override
    public @NotNull Future<?> submit(@NotNull Runnable task) {
        return super.submit(ContextUtility.wrapWithContext(task));
    }
    @Override
    public void execute(@NotNull Runnable task) {
        super.execute(ContextUtility.wrapWithContext(task));
    }

Now, we will use this executor in our custom MDC to allow creating custom dispatchers.

CustomDispatcherConfigurator

public class CustomDispatcherConfigurator extends MessageDispatcherConfigurator {
    private final CustomDispatcher instance;
    public CustomDispatcherConfigurator(Config config, DispatcherPrerequisites prerequisites) {
        super(config, prerequisites);
        Config threadPoolConfig = config.getConfig("thread-pool-executor");
        int fixedPoolSize = threadPoolConfig.getInt("fixed-pool-size");
        instance = new CustomDispatcher(
                this,
                config.getString("id"),
                config.getInt("throughput"),
                Duration.create(config.getDuration("throughput-deadline-time", TimeUnit.NANOSECONDS), TimeUnit.NANOSECONDS),
                (id, threadFactory) -> () -> new CustomThreadPoolExecutor(fixedPoolSize,
                                                                          fixedPoolSize,
                                                                          threadPoolConfig.getDuration("keep-alive-time", TimeUnit.MILLISECONDS),
                                                                          TimeUnit.MILLISECONDS,
                                                                          new LinkedBlockingDeque<>(),
                                                                          new ThreadFactory() {
                                                                              private int threadId = 1;
                                                                              @Override
                                                                              public Thread newThread(@NotNull Runnable r) {
                                                                                  Thread thread = new Thread(r);
                                                                                  thread.setName(config.getString("name") + "-" + threadId++);
                                                                                  return thread;
                                                                              }
                                                                          }),
                Duration.create(config.getDuration("shutdown-timeout", TimeUnit.MILLISECONDS), TimeUnit.MILLISECONDS)
        );
    }
    @Override
    public MessageDispatcher dispatcher() {
        return instance;
    }
}
class CustomDispatcher extends Dispatcher {
    public CustomDispatcher(MessageDispatcherConfigurator _configurator,
                            String id,
                            int throughput,
                            Duration throughputDeadlineTime,
                            ExecutorServiceFactoryProvider executorServiceFactoryProvider,
                            scala.concurrent.duration.FiniteDuration shutdownTimeout) {
        super(_configurator, id, throughput, throughputDeadlineTime, executorServiceFactoryProvider, shutdownTimeout);
    }
}

Now we can create different actors in our actor system using this custom MDC and define their config in aplication.conf

db-io-dispatcher {
    type = "contexts.CustomDispatcherConfigurator"
    executor = "thread-pool-executor"
    thread-pool-executor {
        fixed-pool-size = 11
    }
    throughput = 1
    shutdown-timeout = 60s
}
web-io-dispatcher {
    type = "contexts.CustomDispatcherConfigurator"
    executor = "thread-pool-executor"
    thread-pool-executor {
        fixed-pool-size = 20
    }
    throughput = 1
    shutdown-timeout = 60s
}

To create an actor for db execution context

DatabaseIODispatcher

public class DatabaseIODispatcher extends CustomExecutionContext {
    @Inject
    public DatabaseIODispatcher(ActorSystem actorSystem) {
        super(actorSystem, "db-io-dispatcher");
    }
}

This allows us to switch context from one executor to another without losing thread’s context.

Points to Remember

  • In async implementations, if the thread is switched in-between from custom executors to default ThreadPoolExecutor or ForkJoinPool, the thread context will get lost, hence we need to ensure that the thread context is not getting lost if any library method is using default pools.

  • We need to clear thread context after the task is complete or else it can cause memory leaks or OOM issues.

Thanks for reading. I hope this helps the community to provide more transparency in their applications.


This content originally appeared on DEV Community and was authored by Anuj Singh


Print Share Comment Cite Upload Translate Updates
APA

Anuj Singh | Sciencx (2024-06-28T11:01:41+00:00) Enhancing Transparency and Accountability: Implementing Entity Audit Logging in Java [Part-2/2]. Retrieved from https://www.scien.cx/2024/06/28/enhancing-transparency-and-accountability-implementing-entity-audit-logging-in-java-part-2-2/

MLA
" » Enhancing Transparency and Accountability: Implementing Entity Audit Logging in Java [Part-2/2]." Anuj Singh | Sciencx - Friday June 28, 2024, https://www.scien.cx/2024/06/28/enhancing-transparency-and-accountability-implementing-entity-audit-logging-in-java-part-2-2/
HARVARD
Anuj Singh | Sciencx Friday June 28, 2024 » Enhancing Transparency and Accountability: Implementing Entity Audit Logging in Java [Part-2/2]., viewed ,<https://www.scien.cx/2024/06/28/enhancing-transparency-and-accountability-implementing-entity-audit-logging-in-java-part-2-2/>
VANCOUVER
Anuj Singh | Sciencx - » Enhancing Transparency and Accountability: Implementing Entity Audit Logging in Java [Part-2/2]. [Internet]. [Accessed ]. Available from: https://www.scien.cx/2024/06/28/enhancing-transparency-and-accountability-implementing-entity-audit-logging-in-java-part-2-2/
CHICAGO
" » Enhancing Transparency and Accountability: Implementing Entity Audit Logging in Java [Part-2/2]." Anuj Singh | Sciencx - Accessed . https://www.scien.cx/2024/06/28/enhancing-transparency-and-accountability-implementing-entity-audit-logging-in-java-part-2-2/
IEEE
" » Enhancing Transparency and Accountability: Implementing Entity Audit Logging in Java [Part-2/2]." Anuj Singh | Sciencx [Online]. Available: https://www.scien.cx/2024/06/28/enhancing-transparency-and-accountability-implementing-entity-audit-logging-in-java-part-2-2/. [Accessed: ]
rf:citation
» Enhancing Transparency and Accountability: Implementing Entity Audit Logging in Java [Part-2/2] | Anuj Singh | Sciencx | https://www.scien.cx/2024/06/28/enhancing-transparency-and-accountability-implementing-entity-audit-logging-in-java-part-2-2/ |

Please log in to upload a file.




There are no updates yet.
Click the Upload button above to add an update.

You must be logged in to translate posts. Please log in or register.