• 4 minute read

Exploring Azure Schema Registry with Spark

The basic premise behind the need for a Schema Registry in a data pipeline is:

To define the explicit “contract” - i.e. the Schema - between a Producer and Consumer in a manner that is decoupled from the Data, to enforce Schema Compatibility and enable Schema Evolution.

Azure Schema Registry allows us to achieve this, by enabling Producer and Consumer Applications to exchange data (e.g. with Event Hub & Kafka) without explicitly defining the Schema as part of the Application logic. This not only allows for Schema Evolution, but also establishes a central governance framework for reusable schemas across applications.

In simple words, this means we can create Consumer Spark Applications that can deserialize Event Data (e.g. avro), without explicitly defining the schema inside the Spark Pipeline. We'll be demonstrating this by leveraging both the Azure Event Hubs Connector for Apache Spark, as well as the Kafka Surface for Event Hub.

To read more about Azure Schema Registry - click here.

The Pipeline

The digram below summarizes the end-to-end data pipeline for this article:

Data Pipeline
Data Pipeline

The pipeline consists of:

  1. Producer: Python script 1.produce-events that generates random events against a schema dictated by the source application (e.g. business logic). It persists event data to the "Data" Event Hub (using the EH Python Library) and registers the schema to the "Schema" Event Hub (using the SR Python Library).

    Note: This application is not leveraging any Spark components, we simply run this in Databricks for simplicity - this could just as well run as a console application anywhere else.

    At this point, both data and schema are available in Event Hub.

  2. Consumer 1: Spark application 1.consume-events-eh that connects to the "Data" Event Hub using the native Spark Connector from Maven, while connecting to the "Schema" Event Hub using the jar from below.

  3. Consumer 2: Spark application 2.consume-events-kafka that's similar to Consumer 1, except it connects to the "Data" Event Hub using the Kafka Surface instead - for demonstration purposes.

Compiling jar with Maven

At the time of writing, the azure-schemaregistry-spark library is a work-in-progress beta, the project code from here can be compiled into a jar using maven:

mvn clean compile assembly:single

Like so: Compile JAR with Maven

Pre-compiled jar azure-schemaregistry-spark-avro-1.0.0-beta.5-jar-with-dependencies.jar available in GitHub as well.

Pre-Requisites & Environment Setup

App Registration/Service Principal

Create a Service Principal, and make note of:

  • clientId
  • tenantId
  • clientSecret

To register a resource provider, view step-by-step guide here.

Deploy Azure Resources

Here are the Azure services we need to deploy. Any configuration changes besides the default configurations are highlighted below as well.

Azure Resources
Azure Resources

All resources are deployed with default settings.

Configure Azure Resources

Assign Service Principal permissions

Assign the Service Principal the Schema Registry Contributor (Preview) role on the Event Hub Namespace hosting the Schema Registry.

Cluster Setup on Databricks

We create 3 single-node Clusters under Databricks 7.4 (Apache Spark 3.0.1, Scala 2.12, Python 3) runtime:

  1. single-sender-01: for sending data to Event Hub with 1.produce-events.py
  2. single-receiver-eh-01: for consuming data from Event Hub native connector with 1.consume-events-eh.scala
  3. single-receiver-kafka-01: for consuming data from Event Hub Kafka Surface with 2.consume-events-kafka.scala

The table below captures the libraries installed on each Cluster:

C1C2C3
Cluster NameLibrarySource
single-sender-01azure-schemaregistry-avroserializerPyPi
azure-identityPyPi
azure.eventhubPyPi
single-receiver-eh-01com.microsoft.azure:azure-eventhubs-spark_2.12:2.3.17Maven
com.azure:azure-data-schemaregistry:1.0.0-beta.4Maven
com.azure:azure-data-schemaregistry-avro:1.0.0-beta.4Maven
com.azure:azure-identity:1.1.3Maven
azure_schemaregistry_spark_avro_1_0_0_beta_5....jarJar
single-receiver-kafka-01com.azure:azure-data-schemaregistry:1.0.0-beta.4Maven
com.azure:azure-data-schemaregistry-avro:1.0.0-beta.4Maven
com.azure:azure-identity:1.1.3Maven
azure_schemaregistry_spark_avro_1_0_0_beta_5....jarJar

Cluster libraries
Cluster libraries

Schema Group setup

We create a Schema Group user_schemagroup:

Schema Group Creation
Schema Group Creation

Pipeline Walkthrough

Producer

Our Pipeline starts with the 1.produce-events Python application that writes randomly generated, schema-enforced data to the Event Hub.

The following schema is registered with Schema Registry:

{
  "namespace": "example.avro",
  "type": "record",
  "name": "User",
  "fields": [
    { "name": "user_name", "type": "string" },
    { "name": "user_score", "type": ["int", "null"] },
    { "name": "user_id", "type": ["string", "null"] }
  ]
}

The application commits the transaction data to Event Hub at the end:

with eventhub_producer, avro_serializer:
    send_event_data_batch(eventhub_producer, avro_serializer)

And the following confirmation message is seen after each transaction is sent:

Producer confirmation message
Producer confirmation message

We also see the registered schema in Schema Registry:

Schema Registry
Schema Registry

Consumer(s)

Now on the Consumer side, we begin both pipelines by first defining the property object props containing our Schema Registry endpoint and Service Principal information - for authenticating with Schema Registry:

val props: HashMap[String, String] = new HashMap()
  props.put("schema.registry.url", "http://<your-event-hub>.servicebus.windows.net")
  props.put("schema.registry.tenant.id", "<your-azure-ad-tenant-id>")
  props.put("schema.registry.client.id", "<your-client-id>")
  props.put("schema.registry.client.secret", "<your-client-secret>")

For Consumer 1 we initiate the Spark Dataframe using the Event Hub configuration:

val connectionString = ConnectionStringBuilder("Endpoint=sb://<your-event-hub>.servicebus.windows.net/;SharedAccessKeyName=<name>;SharedAccessKey=<your-access-key>;EntityPath=<your-topic>")
  .setEventHubName("<your-topic>")
  .build

val eventHubsConf = EventHubsConf(connectionString)
  .setStartingPosition(EventPosition.fromStartOfStream)

val df = spark.readStream
  .format("eventhubs")
  .options(eventHubsConf.toMap)
  .load()

And for Consumer 2 we leverage standard Kafka syntax:

val df = spark.readStream
    .format("kafka")
    .option("subscribe", TOPIC)
    .option("kafka.bootstrap.servers", BOOTSTRAP_SERVERS)
    .option("kafka.sasl.mechanism", "PLAIN")
    .option("kafka.security.protocol", "SASL_SSL")
    .option("kafka.sasl.jaas.config", EH_SASL)
    .option("kafka.request.timeout.ms", "60000")
    .option("kafka.session.timeout.ms", "60000")
    .option("failOnDataLoss", "false")
    .option("startingOffsets", "earliest")
    .load()

In both cases, this defines our connection to the Data Event Hub, as the Dataframe df.

Once that's done, we deserialize avro data from the Event Hub using the from_avro() function, connecting to our Schema Registry via the props object we defined earlier:

// Event Hub
val parsed_df = df.select(from_avro($"body", "<your-schema-guid>", props))

// Kafka
val parsed_df = df.select(from_avro($"value", "<your-schema-guid>", props))

Spark is able to immediately pull in the schema from Schema Registry and define the Dataframe schema:

Schema inferred from Schema Registry
Schema inferred from Schema Registry

And we can display our streaming data using the Databricks display(df) command.

We demonstrate both Consumer 1 (left) and Consumer 2 (right) here side by side, while generating events with our Producer application (not shown): Consumer 1 and 2 Live Action

Wrap Up

We explored the convenience and robustness of using Azure Schema Registry in data pipelines, and the flexibility of being able to leverage both the Event Hub native connector and Kafka surface in a Spark application using the from_avro() function for deserializing messages.

Get in touch 👋

If you have any questions or suggestions, feel free to open an issue on GitHub!

© 2021 Raki Rahman.