FybrikModule chaining sample
This sample shows how to implement a use case where, based on the data source and governance policies, the Fybrik manager determines that it must deploy two FybrikModules to allow a workload access to a dataset. One FybrikModule handles reading the data and the second does the data transformation. Data is passed between the FybrikModules without writing to intermediate storage.
The data read in this example is the userdata
dataset, a Parquet file found in https://github.com/Teradata/kylo/blob/master/samples/sample-data/parquet/userdata2.parquet. Two FybrikModules are available for use by the Fybrik control plane: the arrow-flight-module and the airbyte-module. Only the airbyte-module can give read access to the dataset. However, it does not have any data transformation capabilities. Therefore, to satisfy constraints, the Fybrik manager must deploy both modules: the airbyte module for reading the dataset, and the arrow-flight-module for transforming the dataset based on the governance policies.
To recreate this scenario, you will need a copy of the airbyte-module repository.
-
Set the AIRBYTE_MODULE_DIR environment variable to be the path of the
airbyte-module
directory:cd /tmp git clone https://github.com/fybrik/airbyte-module.git cd airbyte-module export AIRBYTE_MODULE_DIR=${PWD}
-
Install the Airbyte module:
kubectl apply -f https://github.com/fybrik/airbyte-module/releases/latest/download/module.yaml -n fybrik-system
-
Install the arrow-flight module for transformations:
kubectl apply -f https://github.com/fybrik/arrow-flight-module/releases/latest/download/module.yaml -n fybrik-system
-
Next, register the data asset itself in the data catalog. The way to do it depends on the data catalog with which you are working:
We use port-forwarding to send asset creation requests to the OpenMetadata connector.
kubectl port-forward svc/openmetadata-connector -n fybrik-system 8081:8080 & cat << EOF | curl -X POST localhost:8081/createAsset -d @- { "destinationCatalogID": "openmetadata", "destinationAssetID": "userdata", "details": { "dataFormat": "parquet", "connection": { "name": "https", "https": { "url": "https://github.com/Teradata/kylo/raw/master/samples/sample-data/parquet/userdata2.parquet" } } }, "resourceMetadata": { "name": "test data", "geography": "theshire ", "tags": { "Purpose.finance": "true" }, "columns": [ { "name": "first_name", "tags": { "PII.Sensitive": "true" } }, { "name": "last_name", "tags": { "PII.Sensitive": "true" } }, { "name": "birthdate", "tags": { "PII.Sensitive": "true" } } ] } } EOF
The response from the OpenMetadata connector should look like this:
{"assetID":"openmetadata-https.default.openmetadata.userdata"}
The asset is now registered in the catalog. Store the asset ID in a
CATALOGED_ASSET
variable:CATALOGED_ASSET="openmetadata-https.default.openmetadata.userdata"
The asset is now registered in the catalog. Store the asset ID in a CATALOGED_ASSET variable:kubectl apply -f $AIRBYTE_MODULE_DIR/fybrik/read-flow/asset.yaml
CATALOGED_ASSET=fybrik-notebook-sample/userdata
-
Before creating a policy for accessing the asset, make sure that you clean up previously existing access policies:
NS="fybrik-system"; kubectl -n $NS get configmap | awk '/sample/{print $1}' | xargs kubectl delete -n $NS configmap
-
Create the policy to access the asset (we use a policy that requires redactions of
PII.Sensitive
columns):kubectl -n fybrik-system create configmap sample-policy --from-file=$AIRBYTE_MODULE_DIR/fybrik/sample-policy-restrictive.rego kubectl -n fybrik-system label configmap sample-policy openpolicyagent.org/policy=rego while [[ $(kubectl get cm sample-policy -n fybrik-system -o 'jsonpath={.metadata.annotations.openpolicyagent\.org/policy-status}') != '{"status":"ok"}' ]]; do echo "waiting for policy to be applied" && sleep 5; done
-
Create a
FybrikApplication
resource to register the workload to the control plane of Fybrik. The value you place in thedataSetID
field is your asset ID, as explained above.cat <<EOF | kubectl apply -f - apiVersion: app.fybrik.io/v1beta1 kind: FybrikApplication metadata: name: my-app labels: app: my-app spec: selector: workloadSelector: matchLabels: app: my-app appInfo: intent: Fraud Detection data: - dataSetID: ${CATALOGED_ASSET} requirements: interface: protocol: fybrik-arrow-flight EOF
-
After the FybrikApplication is applied, the Fybrik control plane attempts to create the data path for the application. Fybrik realizes that the Airbyte module can give the application access to the
userdata
dataset, and that the arrow-flight module could provide the redaction transformation. Fybrik deploys both modules in thefybrik-blueprints
namespace. To verify that the Airbyte module and the arrow-flight module were indeed deployed, run:kubectl get pods -n fybrik-blueprints
NOTE: If you are using OpenShift cluster you will see that the deployment fails because OpenShift doesn't allow
privileged: true
value insecurityContext
field by default. Thus, you should add the service account of the module's deployment to theprivileged SCC
using the following command:Then, the deployment will restart the failed pods and the pods inoc adm policy add-scc-to-user privileged system:serviceaccount:fybrik-blueprints:<SERVICE_ACCOUNT_NAME>
fybrik-blueprints
namespace should start successfully.
You should see pods with names similar to:
NAME READY STATUS RESTARTS AGE
my-app60cf6ba8-a767-4313-aa39-b78495b625c7-airb-95c0f-airbz4rh8 2/2 Running 0 88s
my-app60cf6ba8-a767-4313-aa39-b78495b625c7-arro-6f3f8-arro82dcb 1/1 Running 0 87s
-
Wait for the FybrikModule pods to be ready by running:
kubectl wait pod --all --for=condition=ready -n fybrik-blueprints --timeout 10m
-
Run the following commands to set the
CATALOGED_ASSET_MODIFIED
and theENDPOINT_HOSTNAME
environment variables:CATALOGED_ASSET_MODIFIED=$(echo $CATALOGED_ASSET | sed 's/\./\\\./g') export ENDPOINT_HOSTNAME=$(kubectl get fybrikapplication my-app -n fybrik-notebook-sample -o "jsonpath={.status.assetStates.${CATALOGED_ASSET_MODIFIED}.endpoint.fybrik-arrow-flight.hostname}")
-
To verify that the Airbyte module gives access to the
userdata
dataset, run:You should see the following output:cd $AIRBYTE_MODULE_DIR/helm/client ./deploy_airbyte_module_client_pod.sh kubectl exec -it my-shell -n default -- python3 /root/client.py --host ${ENDPOINT_HOSTNAME} --port 80 --asset ${CATALOGED_ASSET}
registration_dttm id first_name last_name email ... country birthdate salary title comments 0 2016-02-03T13:36:39 1.0 XXXXX XXXXX XXXXX ... Indonesia XXXXX 140249.37 Senior Financial Analyst 1 2016-02-03T00:22:28 2.0 XXXXX XXXXX XXXXX ... China XXXXX NaN 2 2016-02-03T18:29:04 3.0 XXXXX XXXXX XXXXX ... France XXXXX 236219.26 Teacher 3 2016-02-03T13:42:19 4.0 XXXXX XXXXX XXXXX ... Russia XXXXX NaN Nuclear Power Engineer 4 2016-02-03T00:15:29 5.0 XXXXX XXXXX XXXXX ... France XXXXX 50210.02 Senior Editor .. ... ... ... ... ... ... ... ... ... ... ... 995 2016-02-03T13:36:49 996.0 XXXXX XXXXX XXXXX ... China XXXXX 185421.82 " 996 2016-02-03T04:39:01 997.0 XXXXX XXXXX XXXXX ... Malaysia XXXXX 279671.68 997 2016-02-03T00:33:54 998.0 XXXXX XXXXX XXXXX ... Poland XXXXX 112275.78 998 2016-02-03T00:15:08 999.0 XXXXX XXXXX XXXXX ... Kazakhstan XXXXX 53564.76 Speech Pathologist 999 2016-02-03T00:53:53 1000.0 XXXXX XXXXX XXXXX ... Nigeria XXXXX 239858.70 [1000 rows x 13 columns]
-
Alternatively, one can access the
userdata
dataset from a Jupyter notebook, as described in the notebook sample. To determine the virtual endpoint from which to access the data set, run:CATALOGED_ASSET_MODIFIED=$(echo $CATALOGED_ASSET | sed 's/\./\\\./g') ENDPOINT_SCHEME=$(kubectl get fybrikapplication my-app -o jsonpath={.status.assetStates.${CATALOGED_ASSET_MODIFIED}.endpoint.fybrik-arrow-flight.scheme}) ENDPOINT_HOSTNAME=$(kubectl get fybrikapplication my-app -o jsonpath={.status.assetStates.${CATALOGED_ASSET_MODIFIED}.endpoint.fybrik-arrow-flight.hostname}) ENDPOINT_PORT=$(kubectl get fybrikapplication my-app -o jsonpath={.status.assetStates.${CATALOGED_ASSET_MODIFIED}.endpoint.fybrik-arrow-flight.port}) printf "${ENDPOINT_SCHEME}://${ENDPOINT_HOSTNAME}:${ENDPOINT_PORT}"
Note that the virtual endpoint determined from the FybrikApplication status points to the arrow-flight transform module, although this is transparent to the user.
Insert a new notebook cell to install pandas and pyarrow packages:
%pip install pandas pyarrow
Finally, given the endpoint value determined above, insert the following to a new notebook cell:
import json
import pyarrow.flight as fl
# Create a Flight client
client = fl.connect('<ENDPOINT>')
# Prepare the request
request = {
"asset": "openmetadata-https.default.openmetadata.userdata",
}
# Send request and fetch result as a pandas DataFrame
info = client.get_flight_info(fl.FlightDescriptor.for_command(json.dumps(request)))
reader: fl.FlightStreamReader = client.do_get(info.endpoints[0].ticket)
print(reader.read_pandas())