No menu items!

    Constructing Information Pipeline with Prefect

    Date:

    Share post:


    Picture by Writer | Canva

     

    On this tutorial, we’ll find out about Prefect, a contemporary workflow orchestration instrument. We’ll begin by constructing a knowledge pipeline with Pandas after which evaluate it with a Prefect workflow to achieve a greater understanding. Ultimately, we’ll deploy our workflow and think about run logs on the dashboard.

     

    What’s Prefect?

     

    Prefect is a workflow administration system designed to orchestrate and handle advanced information workflows, together with machine studying (ML) pipelines. It offers a framework for constructing, scheduling, and monitoring workflows, making it a necessary instrument for managing ML operations (MLOps).

    Prefect affords job and stream administration, permitting customers to outline dependencies and execute workflows effectively. With options like state administration and observability, Prefect offers insights into job standing and historical past, aiding debugging and optimization. It comes with a extremely interactive dashboard that allows you to schedule, monitor, and combine numerous different options that can enhance your workflow for the MLOps pipeline. You’ll be able to even arrange notifications and combine different ML frameworks with a number of clicks. 

    Prefect is on the market as an open-source framework and a managed cloud service, simplifying your workflow much more.

     

    Constructing Information Pipeline with Pandas

     

    We’ll replicate the info pipeline that I used within the earlier tutorials (Constructing Information Science Pipelines Utilizing Pandas—KDnuggets) to provide you an concept of how every job works within the pipeline and methods to mix them. I’m mentioning it right here to be able to clearly evaluate how good information pipelines are totally different from regular pipelines.

    import pandas as pd
    
    def load_data(path):
        return pd.read_csv(path)
    
    def data_cleaning(information):
        information = information.drop_duplicates()
        information = information.dropna()
        information = information.reset_index(drop=True)
        return information
    
    def convert_dtypes(information, types_dict=None):
        information = information.astype(dtype=types_dict)
        ## convert the date column to datetime
        information["Date"] = pd.to_datetime(information["Date"])
        return information
    
    def data_analysis(information):
        information["month"] = information["Date"].dt.month
        new_df = information.groupby("month")["Units Sold"].imply()
        return new_df
    
    def data_visualization(new_df, vis_type="bar"):
        new_df.plot(form=vis_type, figsize=(10, 5), title="Average Units Sold by Month")
        return new_df
    
    path = "Online Sales Data.csv"
    df = (
        pd.DataFrame()
        .pipe(lambda x: load_data(path))
        .pipe(data_cleaning)
        .pipe(convert_dtypes, {"Product Category": "str", "Product Name": "str"})
        .pipe(data_analysis)
        .pipe(data_visualization, "line")
    )

     

    After we run the above code, every job will run sequentially and generate the info visualization. Other than that, it does not do something. We are able to schedule it, view the run logs, and even combine third occasion instruments for notification or monitoring. 

     

    Building Data Pipeline with Prefect

     

    Constructing Information Pipeline with Prefect

     

    Now we’ll construct the identical pipeline with the identical dataset On-line Gross sales Dataset – In style Market Information however with Prefect. We’ll first set up the PRefect library through the use of the PIP command. 

     

    In the event you evaluate the code beneath, you’ll discover that nothing has actually modified. The features are the identical, however with the addition of the Python decorators. Every step within the pipeline has the `@job` decorator, and the pipeline combining these steps has the `@stream` decorator. Moreover, we’re saving the generated determine too. 

    import pandas as pd
    import matplotlib.pyplot as plt
    from prefect import job, stream
    
    @job
    def load_data(path):
        return pd.read_csv(path)
    
    @job
    def data_cleaning(information):
        information = information.drop_duplicates()
        information = information.dropna()
        information = information.reset_index(drop=True)
        return information
    
    @job
    def convert_dtypes(information, types_dict=None):
        information = information.astype(dtype=types_dict)
        information["Date"] = pd.to_datetime(information["Date"])
        return information
    
    @job
    def data_analysis(information):
        information["month"] = information["Date"].dt.month
        new_df = information.groupby("month")["Units Sold"].imply()
        return new_df
    
    @job
    def data_visualization(new_df, vis_type="bar"):
    
        new_df.plot(form=vis_type, figsize=(10, 5), title="Average Units Sold by Month")
        plt.savefig("average_units_sold_by_month.png")
        return new_df
    
    @stream(title="Data Pipeline")
    def data_pipeline(path: str):
        df = load_data(path)
        df_cleaned = data_cleaning(df)
        df_converted = convert_dtypes(
            df_cleaned, {"Product Category": "str", "Product Name": "str"}
        )
        analysis_result = data_analysis(df_converted)
        new_df = data_visualization(analysis_result, "line")
        return new_df
    
    # Run the stream!
    if __name__ == "__main__":
        new_df = data_pipeline("Online Sales Data.csv")
        print(new_df)

     

    We’ll run our information pipeline by offering the CSV file location. It’s going to carry out all of the steps in sequence and generate logs with the run states. 

    14:18:48.649 | INFO    | prefect.engine - Created stream run 'enlightened-dingo' for stream 'Information Pipeline'
    14:18:48.816 | INFO    | Movement run 'enlightened-dingo' - Created job run 'load_data-0' for job 'load_data'
    14:18:48.822 | INFO    | Movement run 'enlightened-dingo' - Executing 'load_data-0' instantly...
    14:18:48.990 | INFO    | Activity run 'load_data-0' - Completed in state Accomplished()
    14:18:49.052 | INFO    | Movement run 'enlightened-dingo' - Created job run 'data_cleaning-0' for job 'data_cleaning'
    14:18:49.053 | INFO    | Movement run 'enlightened-dingo' - Executing 'data_cleaning-0' instantly...
    14:18:49.226 | INFO    | Activity run 'data_cleaning-0' - Completed in state Accomplished()
    14:18:49.283 | INFO    | Movement run 'enlightened-dingo' - Created job run 'convert_dtypes-0' for job 'convert_dtypes'
    14:18:49.288 | INFO    | Movement run 'enlightened-dingo' - Executing 'convert_dtypes-0' instantly...
    14:18:49.441 | INFO    | Activity run 'convert_dtypes-0' - Completed in state Accomplished()
    14:18:49.506 | INFO    | Movement run 'enlightened-dingo' - Created job run 'data_analysis-0' for job 'data_analysis'
    14:18:49.510 | INFO    | Movement run 'enlightened-dingo' - Executing 'data_analysis-0' instantly...
    14:18:49.684 | INFO    | Activity run 'data_analysis-0' - Completed in state Accomplished()
    14:18:49.753 | INFO    | Movement run 'enlightened-dingo' - Created job run 'data_visualization-0' for job 'data_visualization'
    14:18:49.760 | INFO    | Movement run 'enlightened-dingo' - Executing 'data_visualization-0' instantly...
    14:18:50.087 | INFO    | Activity run 'data_visualization-0' - Completed in state Accomplished()
    14:18:50.144 | INFO    | Movement run 'enlightened-dingo' - Completed in state Accomplished()

     

    Ultimately, you’ll get the reworked information body and visualizations. 

     

    Building Data Pipeline with Prefect

     

    Deploying the Prefect Pipeline

     

    In an effort to deploy the Prefect pipeline, we have to begin by transferring our codebase to the Python file `data_pipe.py`. After that, we’ll modify how we run our pipeline. We’ll use the `.server` perform to deploy the pipeline and cross the CSV file as an argument to the perform.

    data_pipe.py:

    import pandas as pd
    import matplotlib.pyplot as plt
    from prefect import job, stream
    
    @job
    def load_data(path: str) -> pd.DataFrame:
        return pd.read_csv(path)
    
    @job
    def data_cleaning(information: pd.DataFrame) -> pd.DataFrame:
        information = information.drop_duplicates()
        information = information.dropna()
        information = information.reset_index(drop=True)
        return information
    
    @job
    def convert_dtypes(information: pd.DataFrame, types_dict: dict = None) -> pd.DataFrame:
        information = information.astype(dtype=types_dict)
        information["Date"] = pd.to_datetime(information["Date"])
        return information
    
    @job
    def data_analysis(information: pd.DataFrame) -> pd.DataFrame:
        information["month"] = information["Date"].dt.month
        new_df = information.groupby("month")["Units Sold"].imply()
        return new_df
    
    @job
    def data_visualization(new_df: pd.DataFrame, vis_type: str = "bar") -> pd.DataFrame:
        new_df.plot(form=vis_type, figsize=(10, 5), title="Average Units Sold by Month")
        plt.savefig("average_units_sold_by_month.png")
        return new_df
    
    @job
    def save_to_csv(df: pd.DataFrame, filename: str):
        df.to_csv(filename, index=False)
        return filename
    
    @stream(title="Data Pipeline")
    def run_pipeline(path: str):
        df = load_data(path)
        df_cleaned = data_cleaning(df)
        df_converted = convert_dtypes(
            df_cleaned, {"Product Category": "str", "Product Name": "str"}
        )
        analysis_result = data_analysis(df_converted)
        data_visualization(analysis_result, "line")
        save_to_csv(analysis_result, "average_units_sold_by_month.csv")
    
    # Run the stream
    if __name__ == "__main__":
        run_pipeline.serve(
            title="pass-params-deployment",
            parameters=dict(path="Online Sales Data.csv"),
        )

     

     

    After we run the Python file, we’ll obtain the message saying that to run the deployed pipeline, we have now to make use of the next command: 

     

    Building Data Pipeline with Prefect

     

    Launch a brand new Terminal window and kind the command to set off the run for this stream. 

    $ prefect deployment run 'Information Pipeline/pass-params-deployment'

     

    As we will see, stream runs have initiated, that means the pipeline is operating within the background. We are able to all the time return to the primary Terminal window to view the logs.

     

    Building Data Pipeline with Prefect

     

    To view the logs within the dashboard, we have now to launch the Prefect dashboard by typing the next command: 

     

    Click on on the dashboard hyperlink to launch the dashboard in your internet browser. 

     

    Building Data Pipeline with Prefect

     

    The dashboard consists of varied tabs and data associated to your pipeline, workflow, and runs. To view the present run, navigate to the “Flow Runs” tab and choose the latest stream run.

     

    Building Data Pipeline with Prefect

     

    All of the supply code, information, and data can be found on the Kingabzpro/Information-Pipeline-with-Prefect GitHub repository. Please do not forget to star ⭐ it.

     

    Conclusion

     

    Constructing a pipeline utilizing the correct instruments is important so that you can scale your information workflow and keep away from pointless hiccups. Through the use of Prefect, you possibly can schedule your runs, debug the pipeline, and combine it with a number of third-party instruments that you’re already utilizing. It’s simple to make use of and comes with tons of options that you’ll love. In case you are new to Prefect, I extremely advocate testing Prefect Cloud. They provide free hours for customers to expertise the cloud platform and change into accustomed to the workflow administration system.
     
     

    Abid Ali Awan (@1abidaliawan) is a licensed information scientist skilled who loves constructing machine studying fashions. Presently, he’s specializing in content material creation and writing technical blogs on machine studying and information science applied sciences. Abid holds a Grasp’s diploma in know-how administration and a bachelor’s diploma in telecommunication engineering. His imaginative and prescient is to construct an AI product utilizing a graph neural community for college students combating psychological sickness.

    Related articles

    Technical Analysis of Startups with DualSpace.AI: Ilya Lyamkin on How the Platform Advantages Companies – AI Time Journal

    Ilya Lyamkin, a Senior Software program Engineer with years of expertise in creating high-tech merchandise, has created an...

    The New Black Assessment: How This AI Is Revolutionizing Vogue

    Think about this: you are a dressmaker on a good deadline, observing a clean sketchpad, desperately attempting to...

    Vamshi Bharath Munagandla, Cloud Integration Skilled at Northeastern College — The Way forward for Information Integration & Analytics: Reworking Public Well being, Training with AI &...

    We thank Vamshi Bharath Munagandla, a number one skilled in AI-driven Cloud Information Integration & Analytics, and real-time...

    Ajay Narayan, Sr Supervisor IT at Equinix  — AI-Pushed Cloud Integration, Occasion-Pushed Integration, Edge Computing, Procurement Options, Cloud Migration & Extra – AI Time...

    Ajay Narayan, Sr. Supervisor IT at Equinix, leads innovation in cloud integration options for one of many world’s...