# Metaflow Tutorials

# What and Why?

Basics:

Code is organized as a DAG.

Versioning:

Out of the box data and execution versioning. It does so by assigning an ID to each execution (every run command)

# CLI Utilities

python xxx.py show

Shows the DAG and the documentation on the code.

To run the flow

python xxx.py run

The command above first validates your flow and runs pylint to verify that the code is executable. If it's not executable, then it doesn't even start in the first place. At the end of the process it runs the code.

python xxx.py run 

Same as above but runs with defined parameters in the code. For example a function would be defined as

Class myFlow():
    """
    Some doco here
    """
    # Code here

    genre = Parameter('param_name',
                    help="Some Documentation Here",
                    default='default_value')
    # Code here

The above is an example of a parameter definition.

metaflow status

Shows the flows that are available.

python xxx.py run --help

Shows the relevant metadata for the specific python file. For example, if there are parameters, then this would show the parameter definition and documentation.

python xxx.py dump <run-id>

Allows to inspect elements in each run-id. This is more useful from a Notebook

python xxx.py dump --tag <>

Allows to tag a particular run

# Other Characteristics

# Persisiting States

Metaflow automatically persists the states and data in the internal store, removing the decision from the developer. There is no longer a need to store the data in a DB.

Metadata can be local storage or S3.

Data is generally not duplicated. If it doesn't change, it is not duplicated.

from metaflow import FlowSpec, step

Class HelloFlow(FlowSpec):

    @step
    def start(self):
        self.x = 1
        self.next(self.end)
    
    @step
    def end(self):
        self.x += 1
        print("x is",self.x)

if __name__ == '__main__':
    HelloFlow()

Even if the steps are executed in separated processess Metaflow automatically persists in the datastore as immutable artifacts. You can then inspect the values using code.

python xxx.py dump <run-id>

This gives the data.

# Resuming the code from failure

Let's assume there is an error in one of the steps. For example there is a step that does a 0 division.

from metaflow import FlowSpec, Parameter, step

class ParameterFlow(FlowSpec):
    alpha = Parameter('alpha',
                      help='Learning rate',
                      default=0.01)

    @step
    def start(self):
        print('alpha is %f' % self.alpha)
        self.next(self.end)

    @step
    def end(self):
        print('alpha is still %f' % (self.alpha / 0))

if __name__ == '__main__':
    ParameterFlow()

The code above will throw an error, you can fix the error and the run

python xxx.py resume

This clones the results from the previous run and then continues to run.

# Failure Handling (opens new window)

Decorators in each step will determine what the failure policy is.

retry decorator

By default it attempts to retry 3 times. Metaflow manages retries so that only artifacts from the last retry are visible - this means that there are no duplicate artifacts.

from metaflow import FlowSpec, step, retry

class RetryFlow(FlowSpec):

    @retry
    @step
    def start(self):
        import time
        if int(time.time()) % 2 == 0:
            raise Exception("Bad luck!")
        else:
            print("Lucky you!")
        self.next(self.end)

    @step
    def end(self):
        print("Phew!")

if __name__ == '__main__':
    RetryFlow()

catch decorator

Catches any exceptions that occur during the step and then continues execution of subsequent steps. Note that the code has to be able to be modified so that it can tolerate faulty steps. This might be useful for hyperparameter search where one failure might not matter.

from metaflow import FlowSpec, catch, step

class CatchFlow(FlowSpec):

    @step
    def start(self):
        self.params = range(3)
        self.next(self.compute, foreach='params')

    @catch(var='compute_failed')
    @step
    def compute(self):
        self.div = self.input
        self.x = 5 / self.div
        self.next(self.join)

    @step
    def join(self, inputs):
        for input in inputs:
            if input.compute_failed:
                print('compute failed for parameter: %d' % input.div)
        self.next(self.end)

    @step
    def end(self):
        pass

if __name__ == '__main__':
    CatchFlow()

timeout decorator

Pretty self-explanatory - avoids the code getting stuck

from metaflow import FlowSpec, timeout, step
import time

class TimeoutFlow(FlowSpec):

    @timeout(seconds=5)
    @step
    def start(self):
        for i in range(100):
            print(i)
            time.sleep(1)
        self.next(self.end)

    @step
    def end(self):
        pass

if __name__ == '__main__':
    TimeoutFlow()

# Dependency Management

Integrates out of the box with conda manager.

from metaflow import FlowSpec, step, conda

Class CondaExample(FlowSpec):

    @step
    def start(self):
        print("Hola")
        self.next(self.end)
    
    @conda(libraries = {'python-levenshtein':'0.12.0'})
    @step
    def end(self):
        import levenshtein
        print('distance',Levenshtein.distance('foo','moo'))
if __name__ == "__main__"
    CondaExample()