VMware Tanzu Greenplum

 View Only

Mastering Greenplum Streaming Server: Simplifying Complex JSON Ingestion in Greenplum Database

By Pranav Tendolkar posted Feb 25, 2026 05:50 AM

  

Ingesting data from Kafka/Rabbit MQ into Greenplum often feels straightforward until you hit a real-world "catch" in the payload format. Recently, I spent significant time solving a challenge involving bank settlement data where a single Kafka message contained a JSON array of hundreds of transactions. Additionally, compliance rules dictated that raw account numbers could never be stored on disk—they had to be hashed immediately. 

 While many developers default to adding a preprocessing microservice (like Kafka Streams or a Python consumer), Greenplum Streaming Server (GPSS) can handle this directly using UDF input transformers.

 

The Core Strategy: SQL-Powered transformation 

GPSS allows you to attach a SQL function to a streaming job. This function runs inside Greenplum on every incoming Kafka message before the data hits the target table. This approach allows you to:

  • Parse JSON and explode arrays into multiple rows.

  • Hash sensitive fields for compliance.

  • Explode data into multiple rows

  • Add metadata without maintaining extra preprocessing services.

Let's start simple: Hashing Account Numbers

Before dealing with arrays, it helps to look at a basic transformer.

Target Table:

CREATE TABLE transactions_hashed (

    transaction_id TEXT,

    account_from_hash TEXT,

    account_to_hash TEXT,

    amount NUMERIC(15,2),

    currency TEXT

) DISTRIBUTED BY (transaction_id);

Assume Kafka is sending a single transaction per message: 

{

  "transaction_id": "TXN-20260105-001234",

  "account_from": "4532123456789012",

  "account_to": "4532987654321098",

  "amount": 1500.00,

  "currency": "USD"

}

 

We want to store this, but without the raw account numbers. One way to store them would be in a hashed form. Note that this is not easily doable with default values, but a SQL function can certainly do it.

 

Transformer Functions:

 

GPSS transformer functions look like this at the top level:

function_name(s anyelement, properties json)

Here, the parameter: s anyelement is a row from the GPSS staging table. What columns it has depends entirely on your YAML configuration. The second parameter:  properties json contains any custom properties you wish to send. 

 

The return type is flexible and can be: 

  • RETURNS TABLE(...) :  Returns a table with named columns

  • RETURNS SETOF <table_name> : Returns a set of rows matching an existing table type



if your source GPSS job config looks like:

    value_content:
      json:
        column:
          name: payload
          type: json

 In your custom function, s.payload will contain your incoming JSON object.

 

Coming back, Here is the account number hashing transformer function:

CREATE OR REPLACE FUNCTION hash_account_numbers(s anyelement, properties json)
RETURNS TABLE(
    transaction_id TEXT,
    account_from_hash TEXT,
    account_to_hash TEXT,
    amount NUMERIC(15,2),
    currency TEXT
)
AS $$
    SELECT
        (s.payload->>'transaction_id')::TEXT,
        md5(s.payload->>'account_from'),
        md5(s.payload->>'account_to'),
        (s.payload->>'amount')::NUMERIC(15,2),
        (s.payload->>'currency')::TEXT
$$ LANGUAGE SQL;

 

For the hash, we use MD5 built into Greenplum. It’s deterministic, fast, and good enough for our use case.

 

Job Configuration:

 

The GPSS yaml for the job:

 

version: v3
targets:
- gpdb:
    host: localhost
    port: 5432
    user: gpadmin
    database: postgres
    tables:
      - table: transactions_hashed
        schema: public
        mode:
          insert: {}
        transformer:
          transform: hash_account_numbers

sources:
- kafka:
    brokers: localhost:9092
    topic: transactions_raw
    fallback_offset: earliest
    value_content:
      json:
        column:
          name: payload
          type: json
    task:
      batch_size:
        max_count: 100

 

We specify the transformer function name under target.gpdb.tables.transformer.transform

value_content.json.column.name is what creates s.payload in the transformer. This references our incoming data.

Running the job, we see that the raw account numbers are obfuscated in the target table.

SELECT * FROM transactions_hashed;

  transaction_id    |        account_from_hash         |         account_to_hash          | amount  | currency

---------------------+----------------------------------+----------------------------------+---------+----------

 TXN-20260105-001235 | 8658d1d6e381dffe0a2a9b014776d45c | 582228b00bea04e8bc248d27dec13c27 |  750.50 | EUR

The Real Problem: JSON Arrays per Message

Now for the actual headache. Instead of one transaction, each Kafka message looked like this:

 [
  {"transaction_id":"TXN-001","account_from":"4532...","account_to":"4532...","amount":500.00,"currency":"USD"},

  {"transaction_id":"TXN-002","account_from":"4532...","account_to":"4532...","amount":1250.75,"currency":"USD"},

  {"transaction_id":"TXN-003","account_from":"4532...","account_to":"4532...","amount":89.99,"currency":"EUR"}
]

 

The incoming message contains an array of transactions. We need to split this into its own rows. Going through the documentation, this doesn't look easy.

This is where RETURNS SETOF comes in.

 

First, we create our target table. This will serve as the table type that the function will return. 

CREATE TABLE settlement_transactions (
    transaction_id TEXT,
    account_from_hash TEXT,
    account_to_hash TEXT,
    amount NUMERIC(15,2),
    currency TEXT
) DISTRIBUTED BY (transaction_id);

Our new transformer UDF:

CREATE OR REPLACE FUNCTION expand_and_hash_accounts(s anyelement, properties json)
RETURNS SETOF settlement_transactions
AS $$
    SELECT
        (elem->>'transaction_id')::TEXT,
        md5(elem->>'account_from'),
        md5(elem->>'account_to'),
        (elem->>'amount')::NUMERIC(15,2),
        (elem->>'currency')::TEXT
    FROM json_array_elements(s.batch_data::json) AS elem;
$$ LANGUAGE SQL STABLE;

 

Note that s.batch_data corresponds to the column name defined in the YAML configuration's value_content.json.column.name field. For this example, you would configure:

Here, json_array_elements turns the JSON array into a set of rows. GPSS simply inserts whatever the function returns.

 

If one Kafka message contains 300 transactions, this function returns 300 rows. GPSS inserts all of them in one pass.

 

NOTE:  Batch size matters. When each Kafka message expands into hundreds of rows, batch_size.max_count stops meaning what you think it means. Ten messages might turn into several thousand rows in memory. So keep this conservative.

task:
  batch_size:
    max_count: 10
    max_time: 5

Adding this function name to our job configuration and starting the job with the array data topic, we get:

 postgres=# select * from settlement_transactions;

 transaction_id |        account_from_hash         |         account_to_hash          | amount  | currency
----------------+----------------------------------+----------------------------------+---------+----------
 TXN-002        | 1a1f402b5724014d710da007cc627f55 | 9bf72fe15eb02142e444707c1931296e | 1250.75 | USD
 TXN-003        | 4a64cf10e395d7dc952397fe94ed6e28 | ac85e96dd1df77931ca077313b2fc4b9 |   89.99 | EUR
 TXN-001        | b1d1edcf672bf5bd7fff3a25b938a41d | 817e4b7ff1bf557e38d692dad0bded89 |  500.00 | USD
 TXN-004        | dd4372f33ee50b43fc10593d13f11f7a | 7828bca1fdad7bc43c43f9ddbae1d36c |  200.00 | GBP

Passing Metadata via Properties

Now imagine we need to tag the source of the data with a source_bank column. 

Instead of writing a separate function for each bank, we can do this by passing the source_bank as a property to the transformer function.

This lets us control the property during job configuration.



 Transformer UDF:

CREATE OR REPLACE FUNCTION expand_with_source_bank(s anyelement, properties json)
RETURNS TABLE(
    source_bank TEXT,
    transaction_id TEXT,
    account_from_hash TEXT,
    account_to_hash TEXT,
    amount NUMERIC(15,2),
    currency TEXT
)
AS $$
    SELECT
        COALESCE(properties->>'source_bank', 'UNKNOWN')::TEXT,
        (elem->>'transaction_id')::TEXT,
        md5(elem->>'account_from'),
        md5(elem->>'account_to'),
        (elem->>'amount')::NUMERIC(15,2),
        (elem->>'currency')::TEXT
    FROM json_array_elements(s.batch_data::json) AS elem;
$$ LANGUAGE SQL STABLE;

 

YAML:

transformer:
  transform: expand_with_source_bank
  properties:
    source_bank: "PARTNER_BANK_A"

 

And the output:

  source_bank   | transaction_id |        account_from_hash         |         account_to_hash          | amount  | currency
----------------+----------------+----------------------------------+----------------------------------+---------+----------
 PARTNER_BANK_A | TXN-004        | dd4372f33ee50b43fc10593d13f11f7a | 7828bca1fdad7bc43c43f9ddbae1d36c |  200.00 | GBP
 PARTNER_BANK_A | TXN-005        | e7a3665e6de4268b992b76eeb59fe835 | 416a608bcad011a224383ad864ff8e81 |  999.99 | JPY

 

The same UDF can be used with different job configurations. When a new bank shows up, we just change the job yaml.

 

Final Thoughts

GPSS brings a lot of transformation capabilities to the table. Simple column mapping is just the tip of the iceberg. 

From masking sensitive fields to flattening JSON arrays, transformers are a powerful way to process your data without the need for additional services.

 

Microservices are expensive. GPSS is already there.

0 comments
18 views

Permalink