Skip to content

Notebook sample for the write flow

This sample shows three scenarios:

  1. how fybrik prevents writing a new asset due to governance restrictions.

  2. how to write data generated by the workload to an object store.

  3. how to read data from a dataset stored in an object store.

In this sample you play multiple roles:

  • As a data governance officer you setup data governance policies.

  • As a data user you specify your data usage requirements and use a notebook to write and read the data.

Create an account in object storage

Create an account in object storage of your choice such as AWS S3, IBM Cloud Object Storage or Ceph. Make a note of the service endpoint and access credentials. You will need them later.

Setup localstack

For experimentation you can install localstack to your cluster instead of using a cloud service.

  1. Define variables for access key and secret key
    export ACCESS_KEY="myaccesskey"
    export SECRET_KEY="mysecretkey"
    
  2. Install localstack to the currently active namespace and wait for it to be ready:
    helm repo add localstack-charts https://localstack.github.io/helm-charts
    helm install localstack localstack-charts/localstack \
         --set startServices="s3" \
         --set service.type=ClusterIP \
         --set livenessProbe.initialDelaySeconds=25
    kubectl wait --for=condition=ready --all pod -n fybrik-notebook-sample --timeout=120s
    
    create a port-forward to communicate with localstack server:
    kubectl port-forward svc/localstack 4566:4566 &
    
  3. Use AWS CLI to configure localstack server:
    aws configure set aws_access_key_id ${ACCESS_KEY} && aws configure set aws_secret_access_key ${SECRET_KEY}
    

Deploy resources for write scenarios

Deploy Datashim:

kubectl apply -f https://raw.githubusercontent.com/fybrik/fybrik/master/third_party/datashim/dlf.yaml

For more deployment options of Datashim based on your environment please refer to the datashim site.

Register the credentials required for accessing the object storage. Replace the values for access_key and secret_key with the values from the object storage service that you used and run:

cat << EOF | kubectl apply -f -
apiVersion: v1
kind: Secret
metadata:
  name: bucket-creds
  namespace: fybrik-system
type: Opaque
stringData:
  access_key: "${ACCESS_KEY}"
  accessKeyID: "${ACCESS_KEY}"
  secret_key: "${SECRET_KEY}"
  secretAccessKey: "${SECRET_KEY}"
EOF
Then, register two storage accounts: one in theshire and one in neverland. Replace the value for endpoint with value from the object storage service that you used and run:

cat << EOF | kubectl apply -f -
apiVersion:   app.fybrik.io/v1beta1
kind:         FybrikStorageAccount
metadata:
  name: theshire-storage-account
  namespace: fybrik-system
spec:
  id: theshire-object-store
  region: theshire
  endpoint: "http://localstack.fybrik-notebook-sample.svc.cluster.local:4566"
  secretRef:  bucket-creds
EOF
cat << EOF | kubectl apply -f -
apiVersion:   app.fybrik.io/v1beta1
kind:         FybrikStorageAccount
metadata:
  name: neverland-storage-account
  namespace: fybrik-system
spec:
  id: neverland-object-store
  region: neverland
  endpoint: "http://localstack.fybrik-notebook-sample.svc.cluster.local:4566"
  secretRef:  bucket-creds
EOF

Note that for evaluation purposes the same object store is used for different regions in the storage accounts.

Scenario one: write is forbidden due to governance restrictions

Define data governance policies for write

Define an OpenPolicyAgent policy to forbid the writing of sensitive data to regions neverland and theshire in datasets tagged with finance. This policy prevents the writing as the deployed fybrik storage account resources applied are in neverland and theshire.

Below is the policy (written in Rego language):

package dataapi.authz

rule[{"policy": description}] {
  description := "Forbid writing sensitive data in `theshire` and `neverland` storage accounts in datasets tagged with `finance`"
  input.action.actionType == "write"
  input.resource.metadata.tags.finance
  input.action.destination != "theshire"
  input.action.destination != "neverland"
  input.resource.metadata.columns[i].tags.sensitive
}

For more details on OPA policies please refer to Using OPA for Data Governance task.

Copy the policy to a file named sample-policy-write.rego and then run:

kubectl -n fybrik-system create configmap sample-policy-write --from-file=sample-policy-write.rego
kubectl -n fybrik-system label configmap sample-policy-write openpolicyagent.org/policy=rego
while [[ $(kubectl get cm sample-policy-write -n fybrik-system -o 'jsonpath={.metadata.annotations.openpolicyagent\.org/policy-status}') != '{"status":"ok"}' ]]; do echo "waiting for policy to be applied" && sleep 5; done

Create a FybrikApplication resource to write the new asset

Create a FybrikApplication resource to register the notebook workload to the control plane of Fybrik:

cat <<EOF | kubectl apply -f -
apiVersion: app.fybrik.io/v1beta1
kind: FybrikApplication
metadata:
  name: my-notebook-write
  namespace: fybrik-notebook-sample
  labels:
    app: my-notebook-write
spec:
  selector:
    clusterName: thegreendragon
    workloadSelector:
      matchLabels:
        app: my-notebook-write
  appInfo:
    intent: Fraud Detection
  data:
    - dataSetID: 'new-data'
      flow: write
      requirements:
        flowParams:
          isNewDataSet: true
          catalog: fybrik-notebook-sample
          metadata:
            tags:
              finance: true
            columns:
              - name: nameOrig
                tags:
                  PII: true
              - name: oldbalanceOrg
                tags:
                  sensitive: true
        interface:
          protocol: fybrik-arrow-flight
EOF

Notice that:

  • The selector field matches the labels of our Jupyter notebook workload.
  • The data field includes a dataSetID that matches the asset identifier in the catalog.
  • The protocol indicates that the developer wants to consume the data using Apache Arrow Flight. For some protocols a dataformat can be specified as well (e.g., s3 protocol and parquet format).
  • The isNewDataSet field indicates is a new asset.
  • The catalog field holds the catalog id. It will be used by fybrik to register the new asset in the catalog.
  • metadata field specifies the dataset tags. These attributes can later be used in policies.

Run the following command to wait until the FybrikApplication status is updated:

while [[ $(kubectl get fybrikapplication my-notebook-write -o 'jsonpath={.status.ready}') != "true" ]]; do echo "waiting for FybrikApplication" && sleep 5; done
while [[ $(kubectl get fybrikapplication my-notebook-write -n fybrik-notebook-sample -o 'jsonpath={.status.assetStates.new-data.conditions[?(@.type == "Deny")].status}') != "True" ]]; do echo "waiting for my-notebook-write asset" && sleep 5; done

We expect the asset's status in FybrikApplication.status to be denied due to the policy defined above. Next, a new policy will be applied which will allow the writing to theshire object store.

Cleanup scenario one

Before prceeding to scenario two the OPA policy and fybrikapplications should be deleted:

kubectl delete cm sample-policy-write -n fybrik-system
kubectl delete fybrikapplications.app.fybrik.io my-notebook-write -n fybrik-notebook-sample

Notice that FybrikStorageAccount resources are still applied after the cleanup.

Scenario two: write new data

To write the new data a new policy should be defined.

Define data access policies for writing the data

Define an OpenPolicyAgent policy to return the list of column names tagged as sensitive, whose destination is not neverland, when the actionType is write. The columns are passed to the FybrikModule together with RedactAction upon deployment of the module by Fybrik. Below is the policy (written in Rego language):

package dataapi.authz

rule[{"action": {"name":"RedactAction","columns": column_names}, "policy": description}] {
  description := "Redact written columns tagged as sensitive in datasets tagged with finance = true. The data should not be stored in `neverland` storage account"
  input.action.actionType == "write"
  input.resource.metadata.tags.finance
  input.action.destination != "neverland"
  column_names := [input.resource.metadata.columns[i].name | input.resource.metadata.columns[i].tags.sensitive]
}

Copy the policy to a file named sample-policy-write.rego and then run:

kubectl -n fybrik-system create configmap sample-policy-write --from-file=sample-policy-write.rego
kubectl -n fybrik-system label configmap sample-policy-write openpolicyagent.org/policy=rego
while [[ $(kubectl get cm sample-policy-write -n fybrik-system -o 'jsonpath={.metadata.annotations.openpolicyagent\.org/policy-status}') != '{"status":"ok"}' ]]; do echo "waiting for policy to be applied" && sleep 5; done

Deploy a Jupyter notebook

In this sample a Jupyter notebook is used as the user workload and its business logic requires writing the new asset. Deploy a notebook to your cluster: execute the instructions from Deploy a Jupyter notebook section in the notebook sample for the read flow to deploy a Jupyter notebook.

Create a FybrikApplication resource associated with the notebook

Re-apply FybrikApplication as defined in scenario 1.

Run the following command to wait until the FybrikApplication status is ready:

while [[ $(kubectl get fybrikapplication my-notebook-write -o 'jsonpath={.status.ready}') != "true" ]]; do echo "waiting for FybrikApplication" && sleep 5; done
while [[ $(kubectl get fybrikapplication my-notebook-write -o 'jsonpath={.status.assetStates.new-data.conditions[?(@.type == "Ready")].status}') != "True" ]]; do echo "waiting for new-data asset" && sleep 5; done

Run the following command to extract the new cataloged asset id from fybrikapplication status. This asset id will be used in the third secnario when we try to read the new asset.

CATALOGED_ASSET=$(kubectl get fybrikapplication my-notebook-write -o 'jsonpath={.status.assetStates.new-data.catalogedAsset}')

Write the data from the notebook

This sample uses the Synthetic Financial Datasets For Fraud Detection dataset1 as the data that the notebook needs to write. Download and extract the file to your machine. You should now see a file named PS_20174392719_1491204439457_log.csv. Alternatively, use a sample of 100 lines of the same dataset by downloading PS_20174392719_1491204439457_log.csv from GitHub.

In your terminal, run the following command to print the endpoint to use for reading the data. It fetches the code from the FybrikApplication resource:

ENDPOINT_SCHEME=$(kubectl get fybrikapplication my-notebook-write -o jsonpath={.status.assetStates.new-data.endpoint.fybrik-arrow-flight.scheme})
ENDPOINT_HOSTNAME=$(kubectl get fybrikapplication my-notebook-write -o jsonpath={.status.assetStates.new-data.endpoint.fybrik-arrow-flight.hostname})
ENDPOINT_PORT=$(kubectl get fybrikapplication my-notebook-write -o jsonpath={.status.assetStates.new-data.endpoint.fybrik-arrow-flight.port})
printf "\n${ENDPOINT_SCHEME}://${ENDPOINT_HOSTNAME}:${ENDPOINT_PORT}\n\n"
The next steps use the endpoint to write the data in a python notebook

  1. Insert a new notebook cell to install needed packages:
    %pip install pyarrow==7.0.*
    
  2. Insert a new notebook cell to write data using the endpoint value extracted from the FybrikApplication in the previous step:
    import pyarrow.flight as fl
    import json
    from pyarrow import csv
    
    # Create a Flight client
    client = fl.connect('<ENDPOINT>')
    
    # Prepare the request
    request = {
        "asset": "new-data",
        # To request specific columns add to the request a "columns" key with a list of column names
        # "columns": [...]
    }
    
    # write the new dataset
    file_path = "/path/to/PS_20174392719_1491204439457_log.csv"
    my_table = csv.read_csv(file_path)
    writer, _ = client.do_put(fl.FlightDescriptor.for_command(json.dumps(request)),
                              my_table.schema)
    # Note that we do not indicate the data store nor allocate a bucket in which 
    # to write the dataset. This is all done by Fybrik.
    writer.write_table(my_table)
    writer.close()
    

Cleanup scenario two

kubectl delete cm sample-policy-write -n fybrik-system
kubectl delete fybrikapplications.app.fybrik.io my-notebook-write -n fybrik-notebook-sample

Scenario 3: Read the newly written data

Define data access policies to read the new data

Define an OpenPolicyAgent policy to allow reading the data. Below is the policy (written in Rego language):

package dataapi.authz

rule[{}] {
  description := "allow read datasets"
  input.action.actionType == "read"
}

Copy the policy to a file named sample-policy-read.rego and then run:

kubectl -n fybrik-system create configmap sample-policy-read --from-file=sample-policy-read.rego
kubectl -n fybrik-system label configmap sample-policy-read openpolicyagent.org/policy=rego
while [[ $(kubectl get cm sample-policy-read -n fybrik-system -o 'jsonpath={.metadata.annotations.openpolicyagent\.org/policy-status}') != '{"status":"ok"}' ]]; do echo "waiting for policy to be applied" && sleep 5; done

Create a FybrikApplication resource to read the data for the notebook

Create a FybrikApplication resource to register the notebook workload to the control plane of Fybrik:

cat <<EOF | kubectl apply -f -
apiVersion: app.fybrik.io/v1beta1
kind: FybrikApplication
metadata:
  name: my-notebook-read
  namespace: fybrik-notebook-sample
  labels:
    app: my-notebook-read
spec:
  selector:
    clusterName: thegreendragon
    workloadSelector:
      matchLabels:
        app: my-notebook-read
  appInfo:
    intent: Fraud Detection
  data:
    - dataSetID: fybrik-notebook-sample/${CATALOGED_ASSET}
      flow: read
      requirements:
        interface:
          protocol: fybrik-arrow-flight
EOF

Run the following command to wait until the FybrikApplication is ready:

while [[ $(kubectl get fybrikapplication my-notebook-read -o 'jsonpath={.status.ready}') != "true" ]]; do echo "waiting for FybrikApplication" && sleep 5; done
while [[ $(kubectl get fybrikapplication my-notebook-read -o "jsonpath={.status.assetStates.fybrik-notebook-sample/${CATALOGED_ASSET}.conditions[?(@.type == 'Ready')].status}") != "True" ]]; do echo "waiting for fybrik-notebook-sample/${CATALOGED_ASSET} asset" && sleep 5; done

Read the dataset from the notebook

In your terminal, run the following command to print the endpoint to use for reading the data. It fetches the code from the FybrikApplication resource:

ENDPOINT_SCHEME=$(kubectl get fybrikapplication my-notebook-read -o jsonpath={.status.assetStates.fybrik-notebook-sample/${CATALOGED_ASSET}.endpoint.fybrik-arrow-flight.scheme})
ENDPOINT_HOSTNAME=$(kubectl get fybrikapplication my-notebook-read -o jsonpath={.status.assetStates.fybrik-notebook-sample/${CATALOGED_ASSET}.endpoint.fybrik-arrow-flight.hostname})
ENDPOINT_PORT=$(kubectl get fybrikapplication my-notebook-read -o jsonpath={.status.assetStates.fybrik-notebook-sample/${CATALOGED_ASSET}.endpoint.fybrik-arrow-flight.port})
printf "\n${ENDPOINT_SCHEME}://${ENDPOINT_HOSTNAME}:${ENDPOINT_PORT}\n\n"
The next steps use the endpoint to read the data in a python notebook.

  1. Insert a new notebook cell to install pandas and pyarrow packages:
    %pip install pandas pyarrow==7.0.*
    
  2. Insert a new notebook cell to read the data. Notice: ENDPOINT and CATALOGED_ASSET should be replaced with the values extracted from the FybrikApplication as described in previous steps.
    import json
    import pyarrow.flight as fl
    import pandas as pd
    
    # Create a Flight client
    client = fl.connect('<ENDPOINT>')
    
    # Prepare the request
    request = {
        "asset": "fybrik-notebook-sample/<CATALOGED_ASSET>",
        # To request specific columns add to the request a "columns" key with a list of column names
        # "columns": [...]
    }
    # show all columns
    pd.set_option('display.max_columns', None)
    # Send request and fetch result as a pandas DataFrame
    info = client.get_flight_info(fl.FlightDescriptor.for_command(json.dumps(request)))
    reader: fl.FlightStreamReader = client.do_get(info.endpoints[0].ticket)
    df_read: pd.DataFrame = reader.read_pandas()
    
  3. Insert a new notebook cell with the following command to visualize the result:
    df_read
    
  4. Execute all notebook cells and notice that the oldbalanceOrg column do not appear because it was redacted.

  1. Created by NTNU and shared under the CC BY-SA 4.0 license.