In this post I’ll demonstrate one of many use cases of Presto technology, that you might overlooked – How to troubleshoot distributed transactions which are very common these days as a result of a complex Microservices architecture.
In the following SELECT statement I’ll combine three different data sources:
- Oracle
- Postgres
- Kafka
by using good old SQL language instead of domain specific language – DSL.
select description, 'oracle_source' as data_source from test_table1 where entity_id = 123
union all
select description, 'postgres_source' as data_source from test_table2 where entity_id = 123
union all
select cast(json_extract(_message, '$.DESCRIPTION') as varchar) as description, 'kafka_source' as data_source from test_topic
where
cast(json_extract(_message, '$.ENTITY_ID') as decimal(38,0)) = 123
and cast(json_extract(_message, '$.table') as varchar) = 'TEST_TABLE';
As you can see, I can get records from various sources displayed in a simple table format by using powerfull and efficient SQL language.
To filter out the same record from the Kafka source, I need to cast value of ENTITY_ID column.
For that reason, SQL on Kafka is a bit more complicated, but nothing special.
For those familiar with EXPLAIN plan of relational databases, Presto uses completely different technology which you can see on the code below:
Fragment 0 [SINGLE]
Output layout: [description, data_source]
Output partitioning: SINGLE []
Stage Execution Strategy: UNGROUPED_EXECUTION
Output[description, data_source]
│ Layout: [description:varchar, data_source:varchar(15)]
│ Estimates: {rows: ? (?), cpu: ?, memory: 0B, network: ?}
└─ RemoteSource[1,2,3]
Layout: [description:varchar, data_source:varchar(15)]
Fragment 1 [SOURCE]
Output layout: [expr_60, expr_3]
Output partitioning: SINGLE []
Stage Execution Strategy: UNGROUPED_EXECUTION
ScanProject[table = test_table1 test_table columns=[DESCRIPTION:varchar(200):VARCHAR2], grouped = false]
Layout: [expr_3:varchar(15), expr_60:varchar]
Estimates: {rows: ? (?), cpu: ?, memory: 0B, network: 0B}/{rows: ? (?), cpu: ?, memory: 0B, network: 0B}
expr_3 := CAST('oracle_source' AS varchar(15))
expr_60 := CAST("description_2" AS varchar)
description_2 := DESCRIPTION:varchar(200):VARCHAR2
Fragment 2 [SOURCE]
Output layout: [expr_62, expr_55]
Output partitioning: SINGLE []
Stage Execution Strategy: UNGROUPED_EXECUTION
ScanProject[table = test_table2 test_table2 columns=[DESCRIPTION:varchar(200):varchar], grouped = false]
Layout: [expr_55:varchar(15), expr_62:varchar]
Estimates: {rows: ? (?), cpu: ?, memory: 0B, network: 0B}/{rows: ? (?), cpu: ?, memory: 0B, network: 0B}
expr_55 := 'postgres_source'
expr_62 := CAST("description_16" AS varchar)
description_16 := DESCRIPTION:varchar(200):varchar
Fragment 3 [SOURCE]
Output layout: [expr_57, expr_59]
Output partitioning: SINGLE []
Stage Execution Strategy: UNGROUPED_EXECUTION
ScanFilterProject[table = kafka_test_topic:KafkaTableHandle{schemaName=default, tableName=test_topic, topicName=test_topic, keyDataFormat=dummy, messageDataFormat=dummy, keyDataSchemaLocation=Optional.empty, messageDataSchemaLocation=Optional.empty, columns=[KafkaColumnHandle{name=_partition_id, type=bigint, mapping=null, dataFormat=null, formatHint=null, keyCodec=false, hidden=false, internal=true}, KafkaColumnHandle{name=_partition_offset, type=bigint, mapping=null, dataFormat=null, formatHint=null, keyCodec=false, hidden=false, internal=true}, KafkaColumnHandle{name=_message_corrupt, type=boolean, mapping=null, dataFormat=null, formatHint=null, keyCodec=false, hidden=false, internal=true}, KafkaColumnHandle{name=_message, type=varchar, mapping=null, dataFormat=null, formatHint=null, keyCodec=false, hidden=false, internal=true}, KafkaColumnHandle{name=_headers, type=map(varchar, array(varbinary)), mapping=null, dataFormat=null, formatHint=null, keyCodec=false, hidden=false, internal=true}, KafkaColumnHandle{name=_message_length, type=bigint, mapping=null, dataFormat=null, formatHint=null, keyCodec=false, hidden=false, internal=true}, KafkaColumnHandle{name=_key_corrupt, type=boolean, mapping=null, dataFormat=null, formatHint=null, keyCodec=false, hidden=false, internal=true}, KafkaColumnHandle{name=_key, type=varchar, mapping=null, dataFormat=null, formatHint=null, keyCodec=false, hidden=false, internal=true}, KafkaColumnHandle{name=_key_length, type=bigint, mapping=null, dataFormat=null, formatHint=null, keyCodec=false, hidden=false, internal=true}, KafkaColumnHandle{name=_timestamp, type=timestamp(3), mapping=null, dataFormat=null, formatHint=null, keyCodec=false, hidden=false, internal=true}], constraint=TupleDomain{ALL}}, grouped = false, filterPredicate = ((CAST("@json_extract@52QIVV94NOQG805290E1OS079R27R3KI8MRIGSCR434GPAUDODO0QVTEEBAVV1V70E6L3Q2IJFQ13V1K1IT4N59K1T6PD0R4GNUJI8KI8MJCMAC7406C78BU3BHSIOK05O3V1SS695L5HS02RMMUDSKGGOVME181DLNR7VBS2ER2BP405QJHOF8NQGPUHMEUK8HCL7890O0EIAAF1D9QDRJL1BMN4MNF4TJ1JNBL4DCG===="("_message", "@$literal$@52QIVV94KFMG600I90DHR01759M1UP8ITNJ760LIK4MPJ03M1IKBAE851AL8A299U01P2KB4MFRCH5HS8M83LGRL4GH8I3GUQ1SVDQDB134S1C1954GEMA9IVRKEA6MG5DSA3JHMI5KOLNMJ17R05CJ10372746Q2RQUJH3IHATHDONAP1E1SKI7DLTQIB3M50201QSHPC2V7NEJTJC9KUJBNPJ1M==="("@from_base64@52QIVV94MBIG605IOSCHL06715I78N1PMUQKOG1C549KPLRK3G257TPC9587AVUHC5KC7N6IKSU4B43Q9ER384QD4S2UIF2E2N2O280IUOA1QNPCLH2S63FLSOQ9JDPC7RULIL465GD9OPT0VDEETKO1G5M8G44OCQAQM9S3F943RTPPQI2RAGO10O0FDI1EO5E82PHC2F204V7BTFA07G3FFHOG===="('DgAAAFZBUklBQkxFX1dJRFRIAQAAAAsAAAAACwAAACQuRU5USVRZX0lE'))) AS decimal(38, 0)) = CAST(DECIMAL '105393763' AS decimal(38, 0))) AND (CAST("@json_extract@52QIVV94NOQG805290E1OS079R27R3KI8MRIGSCR434GPAUDODO0QVTEEBAVV1V70E6L3Q2IJFQ13V1K1IT4N59K1T6PD0R4GNUJI8KI8MJCMAC7406C78BU3BHSIOK05O3V1SS695L5HS02RMMUDSKGGOVME181DLNR7VBS2ER2BP405QJHOF8NQGPUHMEUK8HCL7890O0EIAAF1D9QDRJL1BMN4MNF4TJ1JNBL4DCG===="("_message", "@$literal$@52QIVV94KFMG600I90DHR01759M1UP8ITNJ760LIK4MPJ03M1IKBAE851AL8A299U01P2KB4MFRCH5HS8M83LGRL4GH8I3GUQ1SVDQDB134S1C1954GEMA9IVRKEA6MG5DSA3JHMI5KOLNMJ17R05CJ10372746Q2RQUJH3IHATHDONAP1E1SKI7DLTQIB3M50201QSHPC2V7NEJTJC9KUJBNPJ1M==="("@from_base64@52QIVV94MBIG605IOSCHL06715I78N1PMUQKOG1C549KPLRK3G257TPC9587AVUHC5KC7N6IKSU4B43Q9ER384QD4S2UIF2E2N2O280IUOA1QNPCLH2S63FLSOQ9JDPC7RULIL465GD9OPT0VDEETKO1G5M8G44OCQAQM9S3F943RTPPQI2RAGO10O0FDI1EO5E82PHC2F204V7BTFA07G3FFHOG===="('DgAAAFZBUklBQkxFX1dJRFRIAQAAAAcAAAAABwAAACQudGFibGU='))) AS varchar) = CAST('TEST_TABLE' AS varchar)))]
Layout: [expr_59:varchar(15), expr_57:varchar]
Estimates: {rows: ? (?), cpu: ?, memory: 0B, network: 0B}/{rows: ? (?), cpu: ?, memory: 0B, network: 0B}/{rows: ? (?), cpu: ?, memory: 0B, network: 0B}
expr_59 := CAST('kafka_source' AS varchar(15))
expr_57 := CAST(json_extract("_message", "$literal$"(from_base64('DgAAAFZBUklBQkxFX1dJRFRIAQAAABQAAAAAFAAAACQuVEFSSUZGX0RFU0NSSVBUSU9O'))) AS varchar)
_message := KafkaColumnHandle{name=_message, type=varchar, mapping=null, dataFormat=null, formatHint=null, keyCodec=false, hidden=false, internal=true}
It is out of scope of this article to explain every step, but as you can see, plan is very different from what I’ll get from the relational databases.
There is also a web console which can provide a nice details of various aspect of distributed query engine.
On the following slides I’ll provide a couple of screenshots to see what you can expect.
Wrap it up:
I hope that you, by now, get the feeling how powerful distributed engine is, and I’m sure you’ll find many use cases for implement it in your environment.
Comments