Case (III) – KisFlow-Golang Stream Real- Application of KisFlow in Multi-Goroutines

Github: https://github.com/aceld/kis-flow
Document: https://github.com/aceld/kis-flow/wiki

Part1-OverView
Part2.1-Project Construction / Basic Modules
Part2.2-Project Construction / Basic Modules
Part3-Data Stream
Part4-Function Scheduling
Part5-C…


This content originally appeared on DEV Community and was authored by Aceld

Github: https://github.com/aceld/kis-flow
Document: https://github.com/aceld/kis-flow/wiki

Part1-OverView
Part2.1-Project Construction / Basic Modules
Part2.2-Project Construction / Basic Modules
Part3-Data Stream
Part4-Function Scheduling
Part5-Connector
Part6-Configuration Import and Export
Part7-KisFlow Action
Part8-Cache/Params Data Caching and Data Parameters
Part9-Multiple Copies of Flow
Part10-Prometheus Metrics Statistics
Part11-Adaptive Registration of FaaS Parameter Types Based on Reflection

Case1-Quick Start
Case2-Flow Parallel Operation
Case3-Application of KisFlow in Multi-Goroutines

Download KisFlow Source

$go get github.com/aceld/kis-flow

KisFlow Developer Documentation

Source Code Example

https://github.com/aceld/kis-flow-usage/tree/main/6-flow_in_goroutines

If you need the same Flow to run concurrently in multiple Goroutines, you can use the flow.Fork() function to clone a Flow instance with isolated memory but the same configuration. Each Flow instance can then be executed in different Goroutines to compute their respective data streams.

Goroutines

package main

import (
    "context"
    "fmt"
    "github.com/aceld/kis-flow/file"
    "github.com/aceld/kis-flow/kis"
    "sync"
)

func main() {
    ctx := context.Background()
    // Get a WaitGroup
    var wg sync.WaitGroup

    // Load Configuration from file
    if err := file.ConfigImportYaml("conf/"); err != nil {
        panic(err)
    }

    // Get the flow
    flow1 := kis.Pool().GetFlow("CalStuAvgScore")
    if flow1 == nil {
        panic("flow1 is nil")
    }
    // Fork the flow
    flowClone1 := flow1.Fork(ctx)

    // Add to WaitGroup
    wg.Add(2)

    // Run Flow1
    go func() {
        defer wg.Done()
        // Submit a string
        _ = flow1.CommitRow(`{"stu_id":101, "score_1":100, "score_2":90, "score_3":80}`)
        // Submit a string
        _ = flow1.CommitRow(`{"stu_id":1001, "score_1":100, "score_2":70, "score_3":60}`)

        // Run the flow
        if err := flow1.Run(ctx); err != nil {
            fmt.Println("err: ", err)
        }
    }()

    // Run FlowClone1
    go func() {
        defer wg.Done()
        // Submit a string
        _ = flowClone1.CommitRow(`{"stu_id":201, "score_1":100, "score_2":90, "score_3":80}`)
        // Submit a string
        _ = flowClone1.CommitRow(`{"stu_id":2001, "score_1":100, "score_2":70, "score_3":60}`)

        if err := flowClone1.Run(ctx); err != nil {
            fmt.Println("err: ", err)
        }
    }()

    // Wait for Goroutines to finish
    wg.Wait()

    fmt.Println("All flows completed.")

    return
}

func init() {
    // Register functions
    kis.Pool().FaaS("VerifyStu", VerifyStu)
    kis.Pool().FaaS("AvgStuScore", AvgStuScore)
    kis.Pool().FaaS("PrintStuAvgScore", PrintStuAvgScore)
}

In this code snippet, we start two Goroutines to run Flow1 and its clone (FlowClone1) concurrently to calculate the final average scores for students 101, 1001, 201, and 2001.

Author: Aceld
GitHub: https://github.com/aceld

KisFlow Open Source Project Address: https://github.com/aceld/kis-flow

Document: https://github.com/aceld/kis-flow/wiki

Part1-OverView
Part2.1-Project Construction / Basic Modules
Part2.2-Project Construction / Basic Modules
Part3-Data Stream
Part4-Function Scheduling
Part5-Connector
Part6-Configuration Import and Export
Part7-KisFlow Action
Part8-Cache/Params Data Caching and Data Parameters
Part9-Multiple Copies of Flow
Part10-Prometheus Metrics Statistics
Part11-Adaptive Registration of FaaS Parameter Types Based on Reflection

Case1-Quick Start
Case2-Flow Parallel Operation
Case3-Application of KisFlow in Multi-Goroutines


This content originally appeared on DEV Community and was authored by Aceld


Print Share Comment Cite Upload Translate Updates
APA

Aceld | Sciencx (2024-07-09T01:53:20+00:00) Case (III) – KisFlow-Golang Stream Real- Application of KisFlow in Multi-Goroutines. Retrieved from https://www.scien.cx/2024/07/09/case-iii-kisflow-golang-stream-real-application-of-kisflow-in-multi-goroutines/

MLA
" » Case (III) – KisFlow-Golang Stream Real- Application of KisFlow in Multi-Goroutines." Aceld | Sciencx - Tuesday July 9, 2024, https://www.scien.cx/2024/07/09/case-iii-kisflow-golang-stream-real-application-of-kisflow-in-multi-goroutines/
HARVARD
Aceld | Sciencx Tuesday July 9, 2024 » Case (III) – KisFlow-Golang Stream Real- Application of KisFlow in Multi-Goroutines., viewed ,<https://www.scien.cx/2024/07/09/case-iii-kisflow-golang-stream-real-application-of-kisflow-in-multi-goroutines/>
VANCOUVER
Aceld | Sciencx - » Case (III) – KisFlow-Golang Stream Real- Application of KisFlow in Multi-Goroutines. [Internet]. [Accessed ]. Available from: https://www.scien.cx/2024/07/09/case-iii-kisflow-golang-stream-real-application-of-kisflow-in-multi-goroutines/
CHICAGO
" » Case (III) – KisFlow-Golang Stream Real- Application of KisFlow in Multi-Goroutines." Aceld | Sciencx - Accessed . https://www.scien.cx/2024/07/09/case-iii-kisflow-golang-stream-real-application-of-kisflow-in-multi-goroutines/
IEEE
" » Case (III) – KisFlow-Golang Stream Real- Application of KisFlow in Multi-Goroutines." Aceld | Sciencx [Online]. Available: https://www.scien.cx/2024/07/09/case-iii-kisflow-golang-stream-real-application-of-kisflow-in-multi-goroutines/. [Accessed: ]
rf:citation
» Case (III) – KisFlow-Golang Stream Real- Application of KisFlow in Multi-Goroutines | Aceld | Sciencx | https://www.scien.cx/2024/07/09/case-iii-kisflow-golang-stream-real-application-of-kisflow-in-multi-goroutines/ |

Please log in to upload a file.




There are no updates yet.
Click the Upload button above to add an update.

You must be logged in to translate posts. Please log in or register.