Flow#

The most basic ConverSight object is Flow. It is represented in Python as a single function.

Flows are identical to functions. They can accept inputs, perform work, and output results. In fact, by using the @flow decorator, you can designate any function into a flow. When a function designates into a flow, its behaviour changes, granting it the following benefits:

  • Input arguments types can be validated.

  • Retries can be performed on failure.

  • Timeouts can be enforced to prevent unintentional, long-running workflows.

When you need a larger number of tasks to accomplish your requirements, all tasks can be invoked from a flow. Flow is entirely composed of tasks.

The code snippet below shows how a flow is created using task.

import pandas as pd
from conversight import Dataset, Flow, Parameter, SmartAnalytics,  task, ProactiveInsights, Dataset,TaskLibrary,FlowLibrary
tsk = TaskLibrary()
flw = FlowLibrary()
@task
def resolveQuery(dataSetID : str , query: str ) -> dict:
    ''''Task to rsolve the query '''
  
    try:
        import os
        from conversight import Context
        ctx = Context()
        import time 
        import requests
        dataEngineURL = "{}/formatQuery".format(os.getenv("DATAENGINE_SERVICE"))
        ctx.log.info("Info lof!!! Data Engine is Connnected   New updated version 3.7 version with try and catch  ")
        ctx.log.info("Log with all special char !@#$%^&*()_+}{}:>?~!@ Cghat ' :}{} Bhartah testing *-++")
        ctx.log.critical("ws://stocket-new.staging.conversight.ai/register/cs-3831a913-4f1c-4e8d-8668-3f3e3cbfb19012345?token=JWT%20eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJfZG9jIjp7InVzZXJJZCI6ImUzMjJjNzM0LWM5NWUtNDQ4OC1iZmJkLThiMGMwY2JjNGZlMyIsImF0aGVuYUlkIjoiZDVjYzBkOTItNTk5YS00MDRkLTk2MzUtMGEyYjVlNzRlNGMwIiwib3JnSWQiOiI2NjRhMzA3Ny1lNjZmLTRkOGQtYTRjOC1mNWE1NGI2NjJmYWYiLCJkZXZpY2VJZCI6IjEyMzQ1NiIsImRldmljZU5hbWUiOiJCcm93c2VyV2ViIiwiaXNUcmlhbFVzZXIiOmZhbHNlLCJpc0ZpcnN0VGltZUxvZ2luIjpmYWxzZX0sImlhdCI6MTY2NTM4NjA1MH0.XS-s41fQmuLd5FQcBKSpiOfeU-RySaG8cFqvakYzeIg might be temporarily down or it may have moved permanently to a new web address.")

        #time.sleep(100)
        body = {"query": query, "dataSetID": dataSetID}
        ctx.log.error("This is not a error Message Just Ignore ---> Dataset is connected ")
        response = requests.post(url=dataEngineURL, json=body)
        status = {401:"Un Authorized", 500: "Internal Server Error", 400: "Bad Request"}                                                             
        if response.status_code in status:
            print("Error From Data engine {}".format(status[response.status_code]))
            ctx.log.critical("Error from Data Engine ")
            return {"error":status[response.status_code]}
        res = response.json()
        if res is not None:
        print("Resolved Query ====> {} ".format(res["query"]))
            ctx.log.info("Query is Resolved ")
            return res
        else:
            print("Error From Resolved Query ====> {} ".format("Error from data engine"))
            return {"error": "Empty response from data engine "}
    except Exception as e:
        print("Error From Resolved Query ====> {} ".format(str(e)))
        return {"error": str(e)}
description = "Creating flow"
with Flow(name="Flowname", description=description) as flow:
    from conversight import SmartAnalytics , FlowLibrary , Dataset
    datasetIdEffectiveness = Parameter("data set Id", "63a160ce-RVxc3Q54i")
    Query = Parameter("Query", """Select   @RetailSales.sto as store_dat , @RetailSales.rscost , @RetailSales.revenue , @RetailSales.buyer from #RetailSales  limit 100""")
    objectName = Parameter("objectName", "SalesAnalytics")
    isArrow = Parameter("arrowData",False)
    isOverWrite = Parameter("overwriteTable",False)
    isPublish = Parameter("publishDataset",True)
    resolvedForecastQuery = tsk.taskLibraryName.Taskname(datasetIdEffectiveness, Query)
    forecast = tsk.taskLibraryName.Taskname(resolvedForecastQuery)
    smartAnalytics = tsk.taskLibraryName.Taskname(objectName, datasetIdEffectiveness,forecast,isArrow,isOverWrite)
    published = tsk.taskLibraryName.Taskname(datasetIdEffectiveness,smartAnalytics,isPublish)

Flow has access to functions like register(), run(), and promote().

flow.register(libraryName="FlowLibraryname",flowName="Flowname",description="Creating flow")
flw.FlowLibraryName.Flowname.run()

Additional functions such as getVersion(), setVersion(), deleteVersion(), delete() are similar to additional fucntions of Task.

flw.FlowLibraryName.Flowname.getversions()
#this will provide the list of versions available for the flow "Flowname"
flw.FlowLibraryName.Flowname.Setversion()
#this will set the version of flow which is specified within ()
flw.FlowLibraryName.Flowname.deleteversion()
#this will delete the version of flow which is specified within ()
flw.FlowLibraryName.Flowname.Promote()
#this will promote the latest version of flow to "O", "U" or "P" level as specified.
flw.FlowLibraryName.Flowname.delete()
#this will delete the flow "Flowname"

The only function that is new is serve().

serve(): This will deploy the flow on the cluster and gives you the deployment id which we can use it on the POST request, the body arguments would be the parameers we pass to the flow. It has the following arguments.

clusterId (str): The host address of the cluster to deploy the flow. By default “localhost” will be assignd if no cluster id given. clusterPort (str): The port of the cluster to deploy the flow. By default the port “10001” will be take if no port given.