How to Run a Method Asynchronously in a Reactive Chain in Spring WebFlux?

I’m trying to execute a method asynchronously within an existing reactive chain in my Project Reactor-based application. The method doUpdateLayoutInAsync is intended to perform a heavy background task, but it seems like my approach isn’t working as exp…


This content originally appeared on DEV Community and was authored by subrata71

I'm trying to execute a method asynchronously within an existing reactive chain in my Project Reactor-based application. The method doUpdateLayoutInAsync is intended to perform a heavy background task, but it seems like my approach isn't working as expected. Here's my current implementation:

public Mono<Boolean> publishPackage(String branchedPackageId) {
    PackagePublishingMetaDTO publishingMetaDTO = new PackagePublishingMetaDTO();
    publishingMetaDTO.setPublishEvent(true);

    return packageRepository
            .findById(branchedPackageId, packagePermission.getPublishPermission())
            .switchIfEmpty(Mono.error(new AppsmithException(
                    AppsmithError.ACL_NO_RESOURCE_FOUND, FieldName.PACKAGE_ID, branchedPackageId)))
            .flatMap(originalPackage -> {
                String nextVersion = PackageUtils.getNextVersion(originalPackage.getVersion());

                Package packageToBePublished = constructPackageToBePublished(originalPackage);

                originalPackage.setVersion(nextVersion);
                originalPackage.setLastPublishedAt(packageToBePublished.getLastPublishedAt());
                publishingMetaDTO.setOriginPackageId(branchedPackageId);
                publishingMetaDTO.setWorkspaceId(originalPackage.getWorkspaceId());

                Mono<Void> unsetCurrentLatestMono = packageRepository.unsetLatestPackageByOriginId(originalPackage.getId(), null);
                Mono<Package> saveOriginalPackage = packageRepository.save(originalPackage);
                Mono<Package> savePackageToBePublished = packageRepository.save(packageToBePublished);

                return unsetCurrentLatestMono
                        .then(Mono.zip(saveOriginalPackage, savePackageToBePublished))
                        .flatMap(tuple2 -> {
                            Package publishedPackage = tuple2.getT2();
                            publishingMetaDTO.setPublishedPackage(publishedPackage);

                            return modulePackagePublishableService
                                    .publishEntities(publishingMetaDTO)
                                    .flatMap(publishedModules -> {
                                        if (publishedModules.isEmpty()) {
                                            return Mono.error(new AppsmithException(
                                                    AppsmithError.PACKAGE_CANNOT_BE_PUBLISHED,
                                                    originalPackage.getUnpublishedPackage().getName()));
                                        }
                                        return moduleInstancePackagePublishableService
                                                .publishEntities(publishingMetaDTO)
                                                .then(Mono.defer(() ->
                                                        newActionPackagePublishableService.publishEntities(publishingMetaDTO))
                                                        .then(Mono.defer(() ->
                                                                actionCollectionPackagePublishableService
                                                                        .publishEntities(publishingMetaDTO))));
                                    })
                                    .then(Mono.defer(() -> autoUpgradeService.handleAutoUpgrade(publishingMetaDTO)));
                        })
                        .as(transactionalOperator::transactional)
                        .then(Mono.defer(() -> doUpdateLayoutInAsync(publishingMetaDTO)));
            });
}

private Mono<Boolean> doUpdateLayoutInAsync(PackagePublishingMetaDTO publishingMetaDTO) {
    Mono<List<String>> updateLayoutsMono = Flux.fromIterable(publishingMetaDTO.getAutoUpgradedPageIds())
            .flatMap(pageId -> updateLayoutService
                    .updatePageLayoutsByPageId(pageId)
                    .onErrorResume(throwable -> {
                        log.warn("Update layout failed for pageId: {} with error: {}", pageId, throwable.getMessage());
                        return Mono.just(pageId);
                    }))
            .collectList();

    // Running the updateLayoutsMono task asynchronously
    updateLayoutsMono.subscribeOn(Schedulers.boundedElastic()).subscribe();

    return Mono.just(Boolean.TRUE);
}

Issue: I want doUpdateLayoutInAsync to run in the background while the rest of the reactive chain completes. However, the method seems to execute synchronously, and the reactive chain does not continue as expected.

Question: How can I ensure that doUpdateLayoutInAsync runs asynchronously and does not block the continuation of the reactive chain?


This content originally appeared on DEV Community and was authored by subrata71


Print Share Comment Cite Upload Translate Updates
APA

subrata71 | Sciencx (2024-07-26T10:17:30+00:00) How to Run a Method Asynchronously in a Reactive Chain in Spring WebFlux?. Retrieved from https://www.scien.cx/2024/07/26/how-to-run-a-method-asynchronously-in-a-reactive-chain-in-spring-webflux/

MLA
" » How to Run a Method Asynchronously in a Reactive Chain in Spring WebFlux?." subrata71 | Sciencx - Friday July 26, 2024, https://www.scien.cx/2024/07/26/how-to-run-a-method-asynchronously-in-a-reactive-chain-in-spring-webflux/
HARVARD
subrata71 | Sciencx Friday July 26, 2024 » How to Run a Method Asynchronously in a Reactive Chain in Spring WebFlux?., viewed ,<https://www.scien.cx/2024/07/26/how-to-run-a-method-asynchronously-in-a-reactive-chain-in-spring-webflux/>
VANCOUVER
subrata71 | Sciencx - » How to Run a Method Asynchronously in a Reactive Chain in Spring WebFlux?. [Internet]. [Accessed ]. Available from: https://www.scien.cx/2024/07/26/how-to-run-a-method-asynchronously-in-a-reactive-chain-in-spring-webflux/
CHICAGO
" » How to Run a Method Asynchronously in a Reactive Chain in Spring WebFlux?." subrata71 | Sciencx - Accessed . https://www.scien.cx/2024/07/26/how-to-run-a-method-asynchronously-in-a-reactive-chain-in-spring-webflux/
IEEE
" » How to Run a Method Asynchronously in a Reactive Chain in Spring WebFlux?." subrata71 | Sciencx [Online]. Available: https://www.scien.cx/2024/07/26/how-to-run-a-method-asynchronously-in-a-reactive-chain-in-spring-webflux/. [Accessed: ]
rf:citation
» How to Run a Method Asynchronously in a Reactive Chain in Spring WebFlux? | subrata71 | Sciencx | https://www.scien.cx/2024/07/26/how-to-run-a-method-asynchronously-in-a-reactive-chain-in-spring-webflux/ |

Please log in to upload a file.




There are no updates yet.
Click the Upload button above to add an update.

You must be logged in to translate posts. Please log in or register.