Velocity – MLOps and DataOps
Velocity
Tackling the velocity of data is a legitimate billion-dollar question. Even today, the biggest video streamers struggle to send out livestream data consistently. Of course, there are a number of reasons for this, but the fact is a lot of solutions don’t have the right combination of budget and quality to be consistent all of the time. We can get pretty close, though.
In this exercise, we will be using something that a lot of people call the future, and perhaps the present, of data streaming: Apache Flink. This is a stream and batch processing framework developed by the Apache Software Foundation for a smooth, fast data flow. Unlike a lot of frameworks managed by the Apache Software Foundation, this one was created with the express intent of being maintained by them as opposed to a project created by a company and made open source for easier maintenance.
Flink itself does not offer any data storage solutions and is instead simply supposed to process incoming data into a storage location. It has APIs in Java, Python, and Scala, and support on all cloud platforms.
To start with Python, you will need to install pyflink using the following command:
pip install apache-flink
Also install pandas if you have not:
pip install pandas
Alright, now let’s write some code to stream data from a bunch of JSON rows to a CSV table. This is just a sample program to show Flink’s workflow, but it does serve that purpose rather effectively:
from pyflink.common import Rowfrom pyflink.datastream import StreamExecutionEnvironmentfrom pyflink.table import StreamTableEnvironment, DataTypesfrom pyflink.table.descriptors import FileSystem, Json, Schemaimport pandas as pd#Function to usedef flink_input(input_data): # Set up the Flink environment env = StreamExecutionEnvironment.get_execution_environment() t_env = StreamTableEnvironment.create(env) # Define the CSV file to output to along with temporary table name t_env.connect(FileSystem().path(‘output.csv’)) \ .with_format(Json().fail_on_missing_field(True)) \ .with_schema(Schema().field(‘data’, DataTypes.STRING())) \ .create_temporary_table(‘output_table’) # Convert multiple JSON values into PyFlink CSV rows input_rows = [Row(json.dumps(json_obj)) for json_obj in input_data] df = pd.DataFrame([r[0] for r in input_rows], columns=[‘data’]) # Insert the rows into the output table which in turn inserts them into the CSV file t_env.from_pandas(df).insert_into(‘output_table’) # Execute the Flink job env.execute(‘CSVJob’)input_data = [{‘key1’: ‘value1’}, {‘key2’: ‘value2’}, {‘key3’: ‘value3’}]flink_input(input_data)
In this code, you’ll see that the JSON rows are inserted into a CSV using a temporary table as a holdover for insertion. This temporary table, when inserted, also inserts the data into the CSV file.
This is a rather simple explanation of the capabilities of Flink, whose job is to work with essentially the same context, but for millions of bits of streaming data at the same time. So, a scaled-up version of the code looks similar, and essentially performs the same function, except it would perform those operations on a larger amount of data. There are a lot of other operations that Flink can perform, an absolute vast quantity (one of the reasons it is so popular), and they all follow a similar pattern and can be integrated with most available data sources.
Now, we will move on to deal with a complication in data that is far too often experienced, and indeed one that always needs to be dealt with in some form. The next section is about variety.