Notebook sample for the write flow
This sample shows three scenarios:
-
how fybrik prevents writing a new asset due to governance restrictions.
-
how to write data generated by the workload to an object store.
-
how to read data from a dataset stored in an object store.
In this sample you play multiple roles:
-
As a data governance officer you setup data governance policies.
-
As a data user you specify your data usage requirements and use a notebook to write and read the data.
Create an account in object storage
Create an account in object storage of your choice such as AWS S3, IBM Cloud Object Storage or Ceph. Make a note of the service endpoint and access credentials. You will need them later.
Setup localstack
For experimentation you can install localstack to your cluster instead of using a cloud service.
- Define variables for access key and secret key
export ACCESS_KEY="myaccesskey" export SECRET_KEY="mysecretkey"
- Install localstack to the currently active namespace and wait for it to be ready:
helm repo add localstack-charts https://localstack.github.io/helm-charts
helm install localstack localstack-charts/localstack \
--version 0.4.3 \
--set image.tag="1.2.0" \
--set startServices="s3" \
--set service.type=ClusterIP \
--set livenessProbe.initialDelaySeconds=25
kubectl wait --for=condition=ready --all pod -n fybrik-notebook-sample --timeout=120s
helm repo add localstack-charts https://localstack.github.io/helm-charts
helm install localstack localstack-charts/localstack \
--version 0.4.3 \
--set image.tag="1.2.0" \
--set startServices="s3" \
--set service.type=ClusterIP \
--set livenessProbe.initialDelaySeconds=25 \
--set persistence.enabled=true \
--set persistence.storageClass=ibmc-file-gold-gid \
--set persistence.accessModes[0]=ReadWriteMany
kubectl wait --for=condition=ready --all pod -n fybrik-notebook-sample --timeout=120s
create a port-forward to communicate with localstack server:
kubectl port-forward svc/localstack 4566:4566 &
export REGION=theshire
aws configure set aws_access_key_id ${ACCESS_KEY}
aws configure set aws_secret_access_key ${SECRET_KEY}
aws configure set region ${REGION}
Deploy resources for write scenarios
Register the credentials required for accessing the object storage. Replace the values for access_key
and secret_key
with the values from the object storage service that you used and run:
cat << EOF | kubectl apply -f -
apiVersion: v1
kind: Secret
metadata:
name: bucket-creds
namespace: fybrik-system
type: Opaque
stringData:
access_key: "${ACCESS_KEY}"
secret_key: "${SECRET_KEY}"
EOF
theshire
and one in neverland
. Replace the value for endpoint
with value from the object storage service that you used and run:
cat << EOF | kubectl apply -f -
apiVersion: app.fybrik.io/v1beta2
kind: FybrikStorageAccount
metadata:
name: theshire-storage-account
namespace: fybrik-system
spec:
id: theshire-object-store
type: s3
geography: theshire
s3:
endpoint: "http://localstack.fybrik-notebook-sample.svc.cluster.local:4566"
secretRef: bucket-creds
EOF
cat << EOF | kubectl apply -f -
apiVersion: app.fybrik.io/v1beta2
kind: FybrikStorageAccount
metadata:
name: neverland-storage-account
namespace: fybrik-system
spec:
id: neverland-object-store
geography: neverland
type: s3
s3:
endpoint: "http://localstack.fybrik-notebook-sample.svc.cluster.local:4566"
secretRef: bucket-creds
EOF
Note that for evaluation purposes the same object store is used for different regions in the storage accounts.
Scenario one: write is forbidden due to governance restrictions
Define data governance policies for write
Define an OpenPolicyAgent policy to forbid the writing of personal data to regions neverland
and theshire
in datasets tagged with Purpose.finance
. This policy prevents the writing as the deployed fybrik storage account resources applied are in neverland
and theshire
.
Below is the policy (written in Rego language):
package dataapi.authz
# If the conditions between the curly braces are true then Fybrik will get an object for the "Deny" action.
rule[{"action": {"name":"Deny"}, "policy": description}] {
description := "Forbid writing sensitive data to theshire object-stores in datasets tagged with `finance`"
# this condition is true if it is a write operation
input.action.actionType == "write"
# this condition is true if the asset has "Purpose.finance" tag
input.resource.metadata.tags["Purpose.finance"]
# this condition is true if write destination is theshire object-stores
input.action.destination == "theshire"
# this statement is true if one of the columns has "PersonalData.Personal" tag
input.resource.metadata.columns[i].tags["PersonalData.Personal"]
}
# If the conditions between the curly braces are true then Fybrik will get an object for the "Deny" action.
rule[{"action": {"name":"Deny"}, "policy": description}] {
description := "Forbid writing sensitive data to neverland object-stores in datasets tagged with `finance`"
# this condition is true if it is a write operation
input.action.actionType == "write"
# this condition is true if the asset has "Purpose.finance" tag
input.resource.metadata.tags["Purpose.finance"]
# this condition is true if write destination is neverland object-stores
input.action.destination == "neverland"
# this statement is true if one of the columns has "PersonalData.Personal" tag
input.resource.metadata.columns[i].tags["PersonalData.Personal"]
}
For more details on OPA policies please refer to Using OPA for Data Governance task.
Copy the policy to a file named sample-policy-write.rego
and then run:
kubectl -n fybrik-system create configmap sample-policy-write --from-file=sample-policy-write.rego
kubectl -n fybrik-system label configmap sample-policy-write openpolicyagent.org/policy=rego
while [[ $(kubectl get cm sample-policy-write -n fybrik-system -o 'jsonpath={.metadata.annotations.openpolicyagent\.org/policy-status}') != '{"status":"ok"}' ]]; do echo "waiting for policy to be applied" && sleep 5; done
Create a FybrikApplication
resource to write the new asset
Create a FybrikApplication
resource to register the notebook workload to the control plane of Fybrik:
cat <<EOF | kubectl apply -f -
apiVersion: app.fybrik.io/v1beta1
kind: FybrikApplication
metadata:
name: my-notebook-write
namespace: fybrik-notebook-sample
labels:
app: my-notebook-write
spec:
selector:
clusterName: thegreendragon
workloadSelector:
matchLabels:
app: my-notebook-write
appInfo:
intent: Fraud Detection
data:
- dataSetID: 'new-data'
flow: write
requirements:
flowParams:
isNewDataSet: true
catalog: fybrik-notebook-sample
metadata:
tags:
Purpose.finance: true
columns:
- name: nameOrig
tags:
PII.Sensitive: true
- name: oldbalanceOrg
tags:
PersonalData.Personal: true
interface:
protocol: fybrik-arrow-flight
EOF
Notice that:
- The
selector
field matches the labels of our Jupyter notebook workload. - The
data
field includes adataSetID
that matches the asset identifier in the catalog. - The
protocol
indicates that the developer wants to consume the data using Apache Arrow Flight. For some protocols adataformat
can be specified as well (e.g.,s3
protocol andparquet
format). - The
isNewDataSet
field indicates is a new asset. - The
catalog
field holds the catalog id. It will be used by fybrik to register the new asset in the catalog. metadata
field specifies the dataset tags. These attributes can later be used in policies.
Run the following command to wait until the FybrikApplication
status is updated:
while [[ $(kubectl get fybrikapplication my-notebook-write -o 'jsonpath={.status.ready}') != "true" ]]; do echo "waiting for FybrikApplication" && sleep 5; done
while [[ $(kubectl get fybrikapplication my-notebook-write -n fybrik-notebook-sample -o 'jsonpath={.status.assetStates.new-data.conditions[?(@.type == "Deny")].status}') != "True" ]]; do echo "waiting for my-notebook-write asset" && sleep 5; done
We expect the asset's status in FybrikApplication.status
to be denied
due to the policy defined above. Next, a new policy will be applied which will allow the writing to theshire
object store.
Cleanup scenario one
Before proceeding to scenario two the OPA policy and fybrikapplications should be deleted:
kubectl delete cm sample-policy-write -n fybrik-system
kubectl delete fybrikapplications.app.fybrik.io my-notebook-write -n fybrik-notebook-sample
Notice that FybrikStorageAccount
resources are still applied after the cleanup.
Scenario two: write new data
To write the new data a new policy should be defined.
Define data access policies for writing the data
Define an OpenPolicyAgent policy to return the list of column names tagged as PersonalData.Personal
, whose destination is not neverland, when the actionType is write. The columns are passed to the FybrikModule
together with RedactAction upon deployment of the module by Fybrik.
Below is the policy (written in Rego language):
package dataapi.authz
rule[{"action": {"name":"RedactAction","columns": column_names}, "policy": description}] {
description := "Redact written columns tagged as PersonalData.Personal in datasets tagged with Purpose.finance = true. The data should not be stored in `neverland` storage account"
input.action.actionType == "write"
input.resource.metadata.tags["Purpose.finance"]
input.action.destination != "neverland"
column_names := [input.resource.metadata.columns[i].name | input.resource.metadata.columns[i].tags["PersonalData.Personal"]]
}
Copy the policy to a file named sample-policy-write.rego
and then run:
kubectl -n fybrik-system create configmap sample-policy-write --from-file=sample-policy-write.rego
kubectl -n fybrik-system label configmap sample-policy-write openpolicyagent.org/policy=rego
while [[ $(kubectl get cm sample-policy-write -n fybrik-system -o 'jsonpath={.metadata.annotations.openpolicyagent\.org/policy-status}') != '{"status":"ok"}' ]]; do echo "waiting for policy to be applied" && sleep 5; done
Deploy a Jupyter notebook
In this sample a Jupyter notebook is used as the user workload and its business logic requires writing the new asset. Deploy a notebook to your cluster: execute the instructions from Deploy a Jupyter notebook
section in the notebook sample for the read flow to deploy a Jupyter notebook.
Create a FybrikApplication
resource associated with the notebook
Re-apply FybrikApplication as defined in scenario 1.
Run the following command to wait until the FybrikApplication
status is ready:
while [[ $(kubectl get fybrikapplication my-notebook-write -o 'jsonpath={.status.ready}') != "true" ]]; do echo "waiting for FybrikApplication" && sleep 5; done
while [[ $(kubectl get fybrikapplication my-notebook-write -o 'jsonpath={.status.assetStates.new-data.conditions[?(@.type == "Ready")].status}') != "True" ]]; do echo "waiting for new-data asset" && sleep 5; done
Although the dataset has not yet been written to the object storage, a data asset has already been created in the data catalog. We will need the name of the cataloged asset in Scenario 3, where we will read the contents of the dataset. Obtaining the name of the asset depends on the data catalog with which Fybrik is configured to work.
Run the following command to extract the new cataloged asset id from fybrikapplication status. This asset id will be used in the third scenario when we try to read the new asset.
CATALOGED_ASSET=$(kubectl get fybrikapplication my-notebook-write -o 'jsonpath={.status.assetStates.new-data.catalogedAsset}')
CATALOGED_ASSET_MODIFIED=$(echo $CATALOGED_ASSET | sed 's/\./\\\./g')
BUCKET=$(kubectl get fybrikapplication my-notebook-write -o 'jsonpath={.status.provisionedStorage.new-data.details.connection.s3.bucket}')
Run the following command to extract the new cataloged asset id from fybrikapplication status. This asset id will be used in the third scenario when we try to read the new asset.
CATALOGED_ASSET=$(kubectl get fybrikapplication my-notebook-write -o 'jsonpath={.status.assetStates.new-data.catalogedAsset}')
CATALOGED_ASSET=fybrik-notebook-sample/${CATALOGED_ASSET}
CATALOGED_ASSET_MODIFIED=${CATALOGED_ASSET}
BUCKET=$(kubectl get fybrikapplication my-notebook-write -o 'jsonpath={.status.provisionedStorage.new-data.details.connection.s3.bucket}')
Write the data from the notebook
This sample uses the Synthetic Financial Datasets For Fraud Detection dataset1 as the data that the notebook needs to write. Download and extract the file to your machine. You should now see a file named PS_20174392719_1491204439457_log.csv
. Alternatively, use a sample of 100 lines of the same dataset by downloading PS_20174392719_1491204439457_log.csv
from GitHub.
To reference PS_20174392719_1491204439457_log.csv
from Jupyter notebook cells as shown later in this section do the following:
Jupyter notebook has an Upload Files
button that can be used to upload PS_20174392719_1491204439457_log.csv
to the notebook from the local machine. When referencing PS_20174392719_1491204439457_log.csv
in the notebook cell the following should be used:
file_path = "PS_20174392719_1491204439457_log.csv"
Alternatively, in your terminal, run the following commands to copy PS_20174392719_1491204439457_log.csv
file from your local machine into /tmp
directory in the Jupyter notebook pod:
export FILEPATH="/path/to/PS_20174392719_1491204439457_log.csv"
export NOTEBOOK_POD_NAME=$(kubectl get pods | grep notebook |awk '{print $1}')
kubectl cp $FILEPATH $NOTEBOOK_POD_NAME:/tmp
In that case, when referencing PS_20174392719_1491204439457_log.csv
in the notebook cell, /tmp/
directory should be specified, for example:
file_path = "/tmp/PS_20174392719_1491204439457_log.csv"
In your terminal, run the following command to print the endpoint to use for reading the data. It fetches the code from the FybrikApplication
resource:
ENDPOINT_SCHEME=$(kubectl get fybrikapplication my-notebook-write -o jsonpath={.status.assetStates.new-data.endpoint.fybrik-arrow-flight.scheme})
ENDPOINT_HOSTNAME=$(kubectl get fybrikapplication my-notebook-write -o jsonpath={.status.assetStates.new-data.endpoint.fybrik-arrow-flight.hostname})
ENDPOINT_PORT=$(kubectl get fybrikapplication my-notebook-write -o jsonpath={.status.assetStates.new-data.endpoint.fybrik-arrow-flight.port})
printf "\n${ENDPOINT_SCHEME}://${ENDPOINT_HOSTNAME}:${ENDPOINT_PORT}\n\n"
- Insert a new notebook cell to install needed packages:
%pip install pyarrow==7.0.*
- Insert a new notebook cell to write data using the endpoint value extracted from the
FybrikApplication
in the previous step:import pyarrow.flight as fl import json from pyarrow import csv # Create a Flight client client = fl.connect('<ENDPOINT>') # Prepare the request request = { "asset": "new-data", # To request specific columns add to the request a "columns" key with a list of column names # "columns": [...] } # write the new dataset file_path = "/path/to/PS_20174392719_1491204439457_log.csv" my_table = csv.read_csv(file_path) writer, _ = client.do_put(fl.FlightDescriptor.for_command(json.dumps(request)), my_table.schema) # Note that we do not indicate the data store nor allocate a bucket in which # to write the dataset. This is all done by Fybrik. writer.write_table(my_table) writer.close()
View new asset through OpenMetadata UI
If Fybrik is configured to work with the OpenMetadata data catalog, then the newly-created asset is registered in OpenMetadata and can be viewed through the OpenMetadata UI. A tutorial on working with the OpenMetadata UI can be found here. It begins with an explanation how to connect to the UI and login. Once you are logged in, choose Tables
on the menu on the left and you will see all the registered assets.
Cleanup scenario two
kubectl delete cm sample-policy-write -n fybrik-system
kubectl delete fybrikapplications.app.fybrik.io my-notebook-write -n fybrik-notebook-sample
Scenario 3: Read the newly written data
Create a FybrikApplication
resource to read the data for the notebook
Create a FybrikApplication
resource to register the notebook workload to the control plane of Fybrik:
cat <<EOF | kubectl apply -f -
apiVersion: app.fybrik.io/v1beta1
kind: FybrikApplication
metadata:
name: my-notebook-read
namespace: fybrik-notebook-sample
labels:
app: my-notebook-read
spec:
selector:
clusterName: thegreendragon
workloadSelector:
matchLabels:
app: my-notebook-read
appInfo:
intent: Fraud Detection
data:
- dataSetID: ${CATALOGED_ASSET}
flow: read
requirements:
interface:
protocol: fybrik-arrow-flight
EOF
Run the following command to wait until the FybrikApplication
is ready:
while [[ $(kubectl get fybrikapplication my-notebook-read -o 'jsonpath={.status.ready}') != "true" ]]; do echo "waiting for FybrikApplication" && sleep 5; done
while [[ $(kubectl get fybrikapplication my-notebook-read -o "jsonpath={.status.assetStates.${CATALOGED_ASSET_MODIFIED}.conditions[?(@.type == 'Ready')].status}") != "True" ]]; do echo "waiting for ${CATALOGED_ASSET} asset" && sleep 5; done
Read the dataset from the notebook
In your terminal, run the following command to print the endpoint to use for reading the data. It fetches the code from the FybrikApplication
resource:
ENDPOINT_SCHEME=$(kubectl get fybrikapplication my-notebook-read -o jsonpath={.status.assetStates.${CATALOGED_ASSET_MODIFIED}.endpoint.fybrik-arrow-flight.scheme})
ENDPOINT_HOSTNAME=$(kubectl get fybrikapplication my-notebook-read -o jsonpath={.status.assetStates.${CATALOGED_ASSET_MODIFIED}.endpoint.fybrik-arrow-flight.hostname})
ENDPOINT_PORT=$(kubectl get fybrikapplication my-notebook-read -o jsonpath={.status.assetStates.${CATALOGED_ASSET_MODIFIED}.endpoint.fybrik-arrow-flight.port})
printf "\n${ENDPOINT_SCHEME}://${ENDPOINT_HOSTNAME}:${ENDPOINT_PORT}\n\n"
- Insert a new notebook cell to install pandas and pyarrow packages:
%pip install pandas pyarrow==7.0.*
- Insert a new notebook cell to read the data. Notice:
ENDPOINT
andCATALOGED_ASSET
should be replaced with the values extracted from theFybrikApplication
as described in previous steps.import json import pyarrow.flight as fl import pandas as pd # Create a Flight client client = fl.connect('<ENDPOINT>') # Prepare the request request = { "asset": "<CATALOGED_ASSET>", # To request specific columns add to the request a "columns" key with a list of column names # "columns": [...] } # show all columns pd.set_option('display.max_columns', None) # Send request and fetch result as a pandas DataFrame info = client.get_flight_info(fl.FlightDescriptor.for_command(json.dumps(request))) reader: fl.FlightStreamReader = client.do_get(info.endpoints[0].ticket) df_read: pd.DataFrame = reader.read_pandas()
- Insert a new notebook cell with the following command to visualize the result:
df_read
- Execute all notebook cells and notice that data in the
oldbalanceOrg
column was redacted.
Cleanup
You can use the AWS CLI to remove the bucket and objects created in this sample.
To list all the created objects, run:
aws --endpoint-url=http://localhost:4566 s3api --bucket=${BUCKET} list-objects
The output should look something like:
{
"Contents": [
{
"Key": "new-data22fb16f0c0/",
"LastModified": "2023-03-02T10:02:26+00:00",
"ETag": "\"d41d8cd98f00b204e9800998ecf8427e\"",
"Size": 0,
"StorageClass": "STANDARD",
"Owner": {
"DisplayName": "webfile",
"ID": "75aa57f09aa0c8caeab4f8c24e99d10f8e7faeebf76c078efc7c6caea54ba06a"
}
},
{
"Key": "new-data22fb16f0c0/part-2023-03-02-10-02-19-979068-0.parquet",
"LastModified": "2023-03-02T10:02:26+00:00",
"ETag": "\"a91aefdb4bf09a1a94254a9c8b6ba473-1\"",
"Size": 8396,
"StorageClass": "STANDARD",
"Owner": {
"DisplayName": "webfile",
"ID": "75aa57f09aa0c8caeab4f8c24e99d10f8e7faeebf76c078efc7c6caea54ba06a"
}
}
]
}
Given the object keys returned by the previous command, run:
aws --endpoint-url=http://localhost:4566 s3api --bucket=${BUCKET} delete-objects --delete='{"Objects": [{"Key": "new-data22fb16f0c0/"}, {"Key": "new-data22fb16f0c0/part-2023-03-02-10-02-19-979068-0.parquet"}]}'
Be sure to replace the keys in the previous command with those returned by the AWS list-objects
command above.
Finally, remove the bucket by running:
aws --endpoint-url=http://localhost:4566 s3api --bucket=${BUCKET} delete-bucket
Removing the dataset object does not remove the corresponding entry from OpenMetadata. To do so, go to the OpenMetadata UI through your browser. Choose your data asset table. At the top right, press the three vertical dots and choose Delete
. Type DELETE
into the form to confirm deletion.
-
Created by NTNU and shared under the CC BY-SA 4.0 license. ↩