Skip to content

Notebook sample for the write flow

This sample shows three scenarios:

  1. how fybrik prevents writing a new asset due to governance restrictions.

  2. how to write data generated by the workload to an object store.

  3. how to read data from a dataset stored in an object store.

In this sample you play multiple roles:

  • As a data governance officer you setup data governance policies.

  • As a data user you specify your data usage requirements and use a notebook to write and read the data.

Create an account in object storage

Create an account in object storage of your choice such as AWS S3, IBM Cloud Object Storage or Ceph. Make a note of the service endpoint and access credentials. You will need them later.

Setup localstack

For experimentation you can install localstack to your cluster instead of using a cloud service.

  1. Define variables for access key and secret key
    export ACCESS_KEY="myaccesskey"
    export SECRET_KEY="mysecretkey"
    
  2. Install localstack to the currently active namespace and wait for it to be ready:
helm repo add localstack-charts https://localstack.github.io/helm-charts
helm install localstack localstack-charts/localstack \
     --version 0.4.3 \
     --set image.tag="1.2.0" \
     --set startServices="s3" \
     --set service.type=ClusterIP \
     --set livenessProbe.initialDelaySeconds=25
kubectl wait --for=condition=ready --all pod -n fybrik-notebook-sample --timeout=120s
helm repo add localstack-charts https://localstack.github.io/helm-charts
helm install localstack localstack-charts/localstack \
     --version 0.4.3 \
     --set image.tag="1.2.0" \
     --set startServices="s3" \
     --set service.type=ClusterIP \
     --set livenessProbe.initialDelaySeconds=25 \
     --set persistence.enabled=true \
     --set persistence.storageClass=ibmc-file-gold-gid \
     --set persistence.accessModes[0]=ReadWriteMany
kubectl wait --for=condition=ready --all pod -n fybrik-notebook-sample --timeout=120s

create a port-forward to communicate with localstack server:

kubectl port-forward svc/localstack 4566:4566 &
3. Use AWS CLI to configure localstack server:
export REGION=theshire
aws configure set aws_access_key_id ${ACCESS_KEY}
aws configure set aws_secret_access_key ${SECRET_KEY}
aws configure set region ${REGION}

Deploy resources for write scenarios

Register the credentials required for accessing the object storage. Replace the values for access_key and secret_key with the values from the object storage service that you used and run:

cat << EOF | kubectl apply -f -
apiVersion: v1
kind: Secret
metadata:
  name: bucket-creds
  namespace: fybrik-system
type: Opaque
stringData:
  access_key: "${ACCESS_KEY}"
  secret_key: "${SECRET_KEY}"
EOF
Then, register two storage accounts: one in theshire and one in neverland. Replace the value for endpoint with value from the object storage service that you used and run:

cat << EOF | kubectl apply -f -
apiVersion:   app.fybrik.io/v1beta2
kind:         FybrikStorageAccount
metadata:
  name: theshire-storage-account
  namespace: fybrik-system
spec:
  id: theshire-object-store
  type: s3
  geography: theshire
  s3:
    endpoint: "http://localstack.fybrik-notebook-sample.svc.cluster.local:4566"
  secretRef:  bucket-creds
EOF
cat << EOF | kubectl apply -f -
apiVersion:   app.fybrik.io/v1beta2
kind:         FybrikStorageAccount
metadata:
  name: neverland-storage-account
  namespace: fybrik-system
spec:
  id: neverland-object-store
  geography: neverland
  type: s3
  s3:
    endpoint: "http://localstack.fybrik-notebook-sample.svc.cluster.local:4566"
  secretRef:  bucket-creds
EOF

Note that for evaluation purposes the same object store is used for different regions in the storage accounts.

Scenario one: write is forbidden due to governance restrictions

Define data governance policies for write

Define an OpenPolicyAgent policy to forbid the writing of personal data to regions neverland and theshire in datasets tagged with Purpose.finance. This policy prevents the writing as the deployed fybrik storage account resources applied are in neverland and theshire.

Below is the policy (written in Rego language):

package dataapi.authz

# If the conditions between the curly braces are true then Fybrik will get an object for the "Deny" action.
rule[{"action": {"name":"Deny"}, "policy": description}] {
  description := "Forbid writing sensitive data to theshire object-stores in datasets tagged with `finance`"
    # this condition is true if it is a write operation
    input.action.actionType == "write"
    # this condition is true if the asset has "Purpose.finance" tag
    input.resource.metadata.tags["Purpose.finance"]
    # this condition is true if write destination is theshire object-stores
    input.action.destination == "theshire"
    # this statement is true if one of the columns has "PersonalData.Personal" tag
    input.resource.metadata.columns[i].tags["PersonalData.Personal"]
}

# If the conditions between the curly braces are true then Fybrik will get an object for the "Deny" action.
rule[{"action": {"name":"Deny"}, "policy": description}] {
  description := "Forbid writing sensitive data to neverland object-stores in datasets tagged with `finance`"
    # this condition is true if it is a write operation
    input.action.actionType == "write"
    # this condition is true if the asset has "Purpose.finance" tag
    input.resource.metadata.tags["Purpose.finance"]
    # this condition is true if write destination is neverland object-stores
    input.action.destination == "neverland"
    # this statement is true if one of the columns has "PersonalData.Personal" tag
    input.resource.metadata.columns[i].tags["PersonalData.Personal"]
}

For more details on OPA policies please refer to Using OPA for Data Governance task.

Copy the policy to a file named sample-policy-write.rego and then run:

kubectl -n fybrik-system create configmap sample-policy-write --from-file=sample-policy-write.rego
kubectl -n fybrik-system label configmap sample-policy-write openpolicyagent.org/policy=rego
while [[ $(kubectl get cm sample-policy-write -n fybrik-system -o 'jsonpath={.metadata.annotations.openpolicyagent\.org/policy-status}') != '{"status":"ok"}' ]]; do echo "waiting for policy to be applied" && sleep 5; done

Create a FybrikApplication resource to write the new asset

Create a FybrikApplication resource to register the notebook workload to the control plane of Fybrik:

cat <<EOF | kubectl apply -f -
apiVersion: app.fybrik.io/v1beta1
kind: FybrikApplication
metadata:
  name: my-notebook-write
  namespace: fybrik-notebook-sample
  labels:
    app: my-notebook-write
spec:
  selector:
    clusterName: thegreendragon
    workloadSelector:
      matchLabels:
        app: my-notebook-write
  appInfo:
    intent: Fraud Detection
  data:
    - dataSetID: 'new-data'
      flow: write
      requirements:
        flowParams:
          isNewDataSet: true
          catalog: fybrik-notebook-sample
          metadata:
            tags:
              Purpose.finance: true
            columns:
              - name: nameOrig
                tags:
                  PII.Sensitive: true
              - name: oldbalanceOrg
                tags:
                  PersonalData.Personal: true
        interface:
          protocol: fybrik-arrow-flight
EOF

Notice that:

  • The selector field matches the labels of our Jupyter notebook workload.
  • The data field includes a dataSetID that matches the asset identifier in the catalog.
  • The protocol indicates that the developer wants to consume the data using Apache Arrow Flight. For some protocols a dataformat can be specified as well (e.g., s3 protocol and parquet format).
  • The isNewDataSet field indicates is a new asset.
  • The catalog field holds the catalog id. It will be used by fybrik to register the new asset in the catalog.
  • metadata field specifies the dataset tags. These attributes can later be used in policies.

Run the following command to wait until the FybrikApplication status is updated:

while [[ $(kubectl get fybrikapplication my-notebook-write -o 'jsonpath={.status.ready}') != "true" ]]; do echo "waiting for FybrikApplication" && sleep 5; done
while [[ $(kubectl get fybrikapplication my-notebook-write -n fybrik-notebook-sample -o 'jsonpath={.status.assetStates.new-data.conditions[?(@.type == "Deny")].status}') != "True" ]]; do echo "waiting for my-notebook-write asset" && sleep 5; done

We expect the asset's status in FybrikApplication.status to be denied due to the policy defined above. Next, a new policy will be applied which will allow the writing to theshire object store.

Cleanup scenario one

Before proceeding to scenario two the OPA policy and fybrikapplications should be deleted:

kubectl delete cm sample-policy-write -n fybrik-system
kubectl delete fybrikapplications.app.fybrik.io my-notebook-write -n fybrik-notebook-sample

Notice that FybrikStorageAccount resources are still applied after the cleanup.

Scenario two: write new data

To write the new data a new policy should be defined.

Define data access policies for writing the data

Define an OpenPolicyAgent policy to return the list of column names tagged as PersonalData.Personal, whose destination is not neverland, when the actionType is write. The columns are passed to the FybrikModule together with RedactAction upon deployment of the module by Fybrik. Below is the policy (written in Rego language):

package dataapi.authz

rule[{"action": {"name":"RedactAction","columns": column_names}, "policy": description}] {
  description := "Redact written columns tagged as PersonalData.Personal in datasets tagged with Purpose.finance = true. The data should not be stored in `neverland` storage account"
  input.action.actionType == "write"
  input.resource.metadata.tags["Purpose.finance"]
  input.action.destination != "neverland"
  column_names := [input.resource.metadata.columns[i].name | input.resource.metadata.columns[i].tags["PersonalData.Personal"]]
}

Copy the policy to a file named sample-policy-write.rego and then run:

kubectl -n fybrik-system create configmap sample-policy-write --from-file=sample-policy-write.rego
kubectl -n fybrik-system label configmap sample-policy-write openpolicyagent.org/policy=rego
while [[ $(kubectl get cm sample-policy-write -n fybrik-system -o 'jsonpath={.metadata.annotations.openpolicyagent\.org/policy-status}') != '{"status":"ok"}' ]]; do echo "waiting for policy to be applied" && sleep 5; done

Deploy a Jupyter notebook

In this sample a Jupyter notebook is used as the user workload and its business logic requires writing the new asset. Deploy a notebook to your cluster: execute the instructions from Deploy a Jupyter notebook section in the notebook sample for the read flow to deploy a Jupyter notebook.

Create a FybrikApplication resource associated with the notebook

Re-apply FybrikApplication as defined in scenario 1.

Run the following command to wait until the FybrikApplication status is ready:

while [[ $(kubectl get fybrikapplication my-notebook-write -o 'jsonpath={.status.ready}') != "true" ]]; do echo "waiting for FybrikApplication" && sleep 5; done
while [[ $(kubectl get fybrikapplication my-notebook-write -o 'jsonpath={.status.assetStates.new-data.conditions[?(@.type == "Ready")].status}') != "True" ]]; do echo "waiting for new-data asset" && sleep 5; done

Although the dataset has not yet been written to the object storage, a data asset has already been created in the data catalog. We will need the name of the cataloged asset in Scenario 3, where we will read the contents of the dataset. Obtaining the name of the asset depends on the data catalog with which Fybrik is configured to work.

Run the following command to extract the new cataloged asset id from fybrikapplication status. This asset id will be used in the third scenario when we try to read the new asset.

CATALOGED_ASSET=$(kubectl get fybrikapplication my-notebook-write -o 'jsonpath={.status.assetStates.new-data.catalogedAsset}')
CATALOGED_ASSET_MODIFIED=$(echo $CATALOGED_ASSET | sed 's/\./\\\./g')
BUCKET=$(kubectl get fybrikapplication my-notebook-write -o 'jsonpath={.status.provisionedStorage.new-data.details.connection.s3.bucket}')

Run the following command to extract the new cataloged asset id from fybrikapplication status. This asset id will be used in the third scenario when we try to read the new asset.

CATALOGED_ASSET=$(kubectl get fybrikapplication my-notebook-write -o 'jsonpath={.status.assetStates.new-data.catalogedAsset}')
CATALOGED_ASSET=fybrik-notebook-sample/${CATALOGED_ASSET}
CATALOGED_ASSET_MODIFIED=${CATALOGED_ASSET}
BUCKET=$(kubectl get fybrikapplication my-notebook-write -o 'jsonpath={.status.provisionedStorage.new-data.details.connection.s3.bucket}')

Write the data from the notebook

This sample uses the Synthetic Financial Datasets For Fraud Detection dataset1 as the data that the notebook needs to write. Download and extract the file to your machine. You should now see a file named PS_20174392719_1491204439457_log.csv. Alternatively, use a sample of 100 lines of the same dataset by downloading PS_20174392719_1491204439457_log.csv from GitHub.

To reference PS_20174392719_1491204439457_log.csv from Jupyter notebook cells as shown later in this section do the following:

Jupyter notebook has an Upload Files button that can be used to upload PS_20174392719_1491204439457_log.csv to the notebook from the local machine. When referencing PS_20174392719_1491204439457_log.csv in the notebook cell the following should be used:

file_path = "PS_20174392719_1491204439457_log.csv"

Alternatively, in your terminal, run the following commands to copy PS_20174392719_1491204439457_log.csv file from your local machine into /tmp directory in the Jupyter notebook pod:

export FILEPATH="/path/to/PS_20174392719_1491204439457_log.csv"
export NOTEBOOK_POD_NAME=$(kubectl get pods | grep notebook |awk '{print $1}')
kubectl cp $FILEPATH $NOTEBOOK_POD_NAME:/tmp

In that case, when referencing PS_20174392719_1491204439457_log.csv in the notebook cell, /tmp/ directory should be specified, for example:

file_path = "/tmp/PS_20174392719_1491204439457_log.csv"

In your terminal, run the following command to print the endpoint to use for reading the data. It fetches the code from the FybrikApplication resource:

ENDPOINT_SCHEME=$(kubectl get fybrikapplication my-notebook-write -o jsonpath={.status.assetStates.new-data.endpoint.fybrik-arrow-flight.scheme})
ENDPOINT_HOSTNAME=$(kubectl get fybrikapplication my-notebook-write -o jsonpath={.status.assetStates.new-data.endpoint.fybrik-arrow-flight.hostname})
ENDPOINT_PORT=$(kubectl get fybrikapplication my-notebook-write -o jsonpath={.status.assetStates.new-data.endpoint.fybrik-arrow-flight.port})
printf "\n${ENDPOINT_SCHEME}://${ENDPOINT_HOSTNAME}:${ENDPOINT_PORT}\n\n"
The next steps use the endpoint to write the data in a python notebook

  1. Insert a new notebook cell to install needed packages:
    %pip install pyarrow==7.0.*
    
  2. Insert a new notebook cell to write data using the endpoint value extracted from the FybrikApplication in the previous step:
    import pyarrow.flight as fl
    import json
    from pyarrow import csv
    
    # Create a Flight client
    client = fl.connect('<ENDPOINT>')
    
    # Prepare the request
    request = {
        "asset": "new-data",
        # To request specific columns add to the request a "columns" key with a list of column names
        # "columns": [...]
    }
    
    # write the new dataset
    file_path = "/path/to/PS_20174392719_1491204439457_log.csv"
    my_table = csv.read_csv(file_path)
    writer, _ = client.do_put(fl.FlightDescriptor.for_command(json.dumps(request)),
                              my_table.schema)
    # Note that we do not indicate the data store nor allocate a bucket in which 
    # to write the dataset. This is all done by Fybrik.
    writer.write_table(my_table)
    writer.close()
    

View new asset through OpenMetadata UI

If Fybrik is configured to work with the OpenMetadata data catalog, then the newly-created asset is registered in OpenMetadata and can be viewed through the OpenMetadata UI. A tutorial on working with the OpenMetadata UI can be found here. It begins with an explanation how to connect to the UI and login. Once you are logged in, choose Tables on the menu on the left and you will see all the registered assets.

Cleanup scenario two

kubectl delete cm sample-policy-write -n fybrik-system
kubectl delete fybrikapplications.app.fybrik.io my-notebook-write -n fybrik-notebook-sample

Scenario 3: Read the newly written data

Create a FybrikApplication resource to read the data for the notebook

Create a FybrikApplication resource to register the notebook workload to the control plane of Fybrik:

cat <<EOF | kubectl apply -f -
apiVersion: app.fybrik.io/v1beta1
kind: FybrikApplication
metadata:
  name: my-notebook-read
  namespace: fybrik-notebook-sample
  labels:
    app: my-notebook-read
spec:
  selector:
    clusterName: thegreendragon
    workloadSelector:
      matchLabels:
        app: my-notebook-read
  appInfo:
    intent: Fraud Detection
  data:
    - dataSetID: ${CATALOGED_ASSET}
      flow: read
      requirements:
        interface:
          protocol: fybrik-arrow-flight
EOF

Run the following command to wait until the FybrikApplication is ready:

while [[ $(kubectl get fybrikapplication my-notebook-read -o 'jsonpath={.status.ready}') != "true" ]]; do echo "waiting for FybrikApplication" && sleep 5; done
while [[ $(kubectl get fybrikapplication my-notebook-read -o "jsonpath={.status.assetStates.${CATALOGED_ASSET_MODIFIED}.conditions[?(@.type == 'Ready')].status}") != "True" ]]; do echo "waiting for ${CATALOGED_ASSET} asset" && sleep 5; done

Read the dataset from the notebook

In your terminal, run the following command to print the endpoint to use for reading the data. It fetches the code from the FybrikApplication resource:

ENDPOINT_SCHEME=$(kubectl get fybrikapplication my-notebook-read -o jsonpath={.status.assetStates.${CATALOGED_ASSET_MODIFIED}.endpoint.fybrik-arrow-flight.scheme})
ENDPOINT_HOSTNAME=$(kubectl get fybrikapplication my-notebook-read -o jsonpath={.status.assetStates.${CATALOGED_ASSET_MODIFIED}.endpoint.fybrik-arrow-flight.hostname})
ENDPOINT_PORT=$(kubectl get fybrikapplication my-notebook-read -o jsonpath={.status.assetStates.${CATALOGED_ASSET_MODIFIED}.endpoint.fybrik-arrow-flight.port})
printf "\n${ENDPOINT_SCHEME}://${ENDPOINT_HOSTNAME}:${ENDPOINT_PORT}\n\n"
The next steps use the endpoint to read the data in a python notebook.

  1. Insert a new notebook cell to install pandas and pyarrow packages:
    %pip install pandas pyarrow==7.0.*
    
  2. Insert a new notebook cell to read the data. Notice: ENDPOINT and CATALOGED_ASSET should be replaced with the values extracted from the FybrikApplication as described in previous steps.
    import json
    import pyarrow.flight as fl
    import pandas as pd
    
    # Create a Flight client
    client = fl.connect('<ENDPOINT>')
    
    # Prepare the request
    request = {
        "asset": "<CATALOGED_ASSET>",
        # To request specific columns add to the request a "columns" key with a list of column names
        # "columns": [...]
    }
    # show all columns
    pd.set_option('display.max_columns', None)
    # Send request and fetch result as a pandas DataFrame
    info = client.get_flight_info(fl.FlightDescriptor.for_command(json.dumps(request)))
    reader: fl.FlightStreamReader = client.do_get(info.endpoints[0].ticket)
    df_read: pd.DataFrame = reader.read_pandas()
    
  3. Insert a new notebook cell with the following command to visualize the result:
    df_read
    
  4. Execute all notebook cells and notice that data in the oldbalanceOrg column was redacted.

Cleanup

You can use the AWS CLI to remove the bucket and objects created in this sample.

To list all the created objects, run:

aws --endpoint-url=http://localhost:4566 s3api  --bucket=${BUCKET} list-objects

The output should look something like:

{
    "Contents": [
        {
            "Key": "new-data22fb16f0c0/",
            "LastModified": "2023-03-02T10:02:26+00:00",
            "ETag": "\"d41d8cd98f00b204e9800998ecf8427e\"",
            "Size": 0,
            "StorageClass": "STANDARD",
            "Owner": {
                "DisplayName": "webfile",
                "ID": "75aa57f09aa0c8caeab4f8c24e99d10f8e7faeebf76c078efc7c6caea54ba06a"
            }
        },
        {
            "Key": "new-data22fb16f0c0/part-2023-03-02-10-02-19-979068-0.parquet",
            "LastModified": "2023-03-02T10:02:26+00:00",
            "ETag": "\"a91aefdb4bf09a1a94254a9c8b6ba473-1\"",
            "Size": 8396,
            "StorageClass": "STANDARD",
            "Owner": {
                "DisplayName": "webfile",
                "ID": "75aa57f09aa0c8caeab4f8c24e99d10f8e7faeebf76c078efc7c6caea54ba06a"
            }
        }
    ]
}

Given the object keys returned by the previous command, run:

aws --endpoint-url=http://localhost:4566 s3api --bucket=${BUCKET} delete-objects --delete='{"Objects": [{"Key": "new-data22fb16f0c0/"}, {"Key": "new-data22fb16f0c0/part-2023-03-02-10-02-19-979068-0.parquet"}]}'

Be sure to replace the keys in the previous command with those returned by the AWS list-objects command above.

Finally, remove the bucket by running:

aws --endpoint-url=http://localhost:4566 s3api  --bucket=${BUCKET} delete-bucket

Removing the dataset object does not remove the corresponding entry from OpenMetadata. To do so, go to the OpenMetadata UI through your browser. Choose your data asset table. At the top right, press the three vertical dots and choose Delete. Type DELETE into the form to confirm deletion.


  1. Created by NTNU and shared under the CC BY-SA 4.0 license.