In this post I’ll demonstrate one of many use cases of Presto technology, that you might overlooked – How to troubleshoot distributed transactions which are very common these days as a result of a complex Microservices architecture.

In the following SELECT statement I’ll combine three different data sources:

  • Oracle
  • Postgres
  • Kafka

by using good old SQL language instead of domain specific language – DSL.

 

select description, 'oracle_source' as data_source from test_table1 where entity_id = 123
union all  
select description, 'postgres_source' as data_source from test_table2 where entity_id = 123
union all 
select cast(json_extract(_message, '$.DESCRIPTION') as varchar) as description, 'kafka_source' as data_source from test_topic 
where 
cast(json_extract(_message, '$.ENTITY_ID') as decimal(38,0)) = 123
and cast(json_extract(_message, '$.table') as varchar) = 'TEST_TABLE';   

 

 

Result of SELECT over relational and Kafka data sources

 

As you can see, I can get records from various sources displayed in a simple table format by using powerfull and efficient SQL language.

To filter out the same record from the Kafka source, I need to cast value of ENTITY_ID column.

For that reason, SQL on Kafka is a bit more complicated, but nothing special.

For those familiar with EXPLAIN plan of relational databases, Presto uses completely different technology which you can see on the code below:

Fragment 0 [SINGLE]
    Output layout: [description, data_source]
    Output partitioning: SINGLE []
    Stage Execution Strategy: UNGROUPED_EXECUTION
    Output[description, data_source]
    │   Layout: [description:varchar, data_source:varchar(15)]
    │   Estimates: {rows: ? (?), cpu: ?, memory: 0B, network: ?}
    └─ RemoteSource[1,2,3]
           Layout: [description:varchar, data_source:varchar(15)]

Fragment 1 [SOURCE]
    Output layout: [expr_60, expr_3]
    Output partitioning: SINGLE []
    Stage Execution Strategy: UNGROUPED_EXECUTION
    ScanProject[table = test_table1 test_table columns=[DESCRIPTION:varchar(200):VARCHAR2], grouped = false]
        Layout: [expr_3:varchar(15), expr_60:varchar]
        Estimates: {rows: ? (?), cpu: ?, memory: 0B, network: 0B}/{rows: ? (?), cpu: ?, memory: 0B, network: 0B}
        expr_3 := CAST('oracle_source' AS varchar(15))
        expr_60 := CAST("description_2" AS varchar)
        description_2 := DESCRIPTION:varchar(200):VARCHAR2

Fragment 2 [SOURCE]
    Output layout: [expr_62, expr_55]
    Output partitioning: SINGLE []
    Stage Execution Strategy: UNGROUPED_EXECUTION
    ScanProject[table = test_table2 test_table2 columns=[DESCRIPTION:varchar(200):varchar], grouped = false]
        Layout: [expr_55:varchar(15), expr_62:varchar]
        Estimates: {rows: ? (?), cpu: ?, memory: 0B, network: 0B}/{rows: ? (?), cpu: ?, memory: 0B, network: 0B}
        expr_55 := 'postgres_source'
        expr_62 := CAST("description_16" AS varchar)
        description_16 := DESCRIPTION:varchar(200):varchar

Fragment 3 [SOURCE]
    Output layout: [expr_57, expr_59]
    Output partitioning: SINGLE []
    Stage Execution Strategy: UNGROUPED_EXECUTION
    ScanFilterProject[table = kafka_test_topic:KafkaTableHandle{schemaName=default, tableName=test_topic, topicName=test_topic, keyDataFormat=dummy, messageDataFormat=dummy, keyDataSchemaLocation=Optional.empty, messageDataSchemaLocation=Optional.empty, columns=[KafkaColumnHandle{name=_partition_id, type=bigint, mapping=null, dataFormat=null, formatHint=null, keyCodec=false, hidden=false, internal=true}, KafkaColumnHandle{name=_partition_offset, type=bigint, mapping=null, dataFormat=null, formatHint=null, keyCodec=false, hidden=false, internal=true}, KafkaColumnHandle{name=_message_corrupt, type=boolean, mapping=null, dataFormat=null, formatHint=null, keyCodec=false, hidden=false, internal=true}, KafkaColumnHandle{name=_message, type=varchar, mapping=null, dataFormat=null, formatHint=null, keyCodec=false, hidden=false, internal=true}, KafkaColumnHandle{name=_headers, type=map(varchar, array(varbinary)), mapping=null, dataFormat=null, formatHint=null, keyCodec=false, hidden=false, internal=true}, KafkaColumnHandle{name=_message_length, type=bigint, mapping=null, dataFormat=null, formatHint=null, keyCodec=false, hidden=false, internal=true}, KafkaColumnHandle{name=_key_corrupt, type=boolean, mapping=null, dataFormat=null, formatHint=null, keyCodec=false, hidden=false, internal=true}, KafkaColumnHandle{name=_key, type=varchar, mapping=null, dataFormat=null, formatHint=null, keyCodec=false, hidden=false, internal=true}, KafkaColumnHandle{name=_key_length, type=bigint, mapping=null, dataFormat=null, formatHint=null, keyCodec=false, hidden=false, internal=true}, KafkaColumnHandle{name=_timestamp, type=timestamp(3), mapping=null, dataFormat=null, formatHint=null, keyCodec=false, hidden=false, internal=true}], constraint=TupleDomain{ALL}}, grouped = false, filterPredicate = ((CAST("@json_extract@52QIVV94NOQG805290E1OS079R27R3KI8MRIGSCR434GPAUDODO0QVTEEBAVV1V70E6L3Q2IJFQ13V1K1IT4N59K1T6PD0R4GNUJI8KI8MJCMAC7406C78BU3BHSIOK05O3V1SS695L5HS02RMMUDSKGGOVME181DLNR7VBS2ER2BP405QJHOF8NQGPUHMEUK8HCL7890O0EIAAF1D9QDRJL1BMN4MNF4TJ1JNBL4DCG===="("_message", "@$literal$@52QIVV94KFMG600I90DHR01759M1UP8ITNJ760LIK4MPJ03M1IKBAE851AL8A299U01P2KB4MFRCH5HS8M83LGRL4GH8I3GUQ1SVDQDB134S1C1954GEMA9IVRKEA6MG5DSA3JHMI5KOLNMJ17R05CJ10372746Q2RQUJH3IHATHDONAP1E1SKI7DLTQIB3M50201QSHPC2V7NEJTJC9KUJBNPJ1M==="("@from_base64@52QIVV94MBIG605IOSCHL06715I78N1PMUQKOG1C549KPLRK3G257TPC9587AVUHC5KC7N6IKSU4B43Q9ER384QD4S2UIF2E2N2O280IUOA1QNPCLH2S63FLSOQ9JDPC7RULIL465GD9OPT0VDEETKO1G5M8G44OCQAQM9S3F943RTPPQI2RAGO10O0FDI1EO5E82PHC2F204V7BTFA07G3FFHOG===="('DgAAAFZBUklBQkxFX1dJRFRIAQAAAAsAAAAACwAAACQuRU5USVRZX0lE'))) AS decimal(38, 0)) = CAST(DECIMAL '105393763' AS decimal(38, 0))) AND (CAST("@json_extract@52QIVV94NOQG805290E1OS079R27R3KI8MRIGSCR434GPAUDODO0QVTEEBAVV1V70E6L3Q2IJFQ13V1K1IT4N59K1T6PD0R4GNUJI8KI8MJCMAC7406C78BU3BHSIOK05O3V1SS695L5HS02RMMUDSKGGOVME181DLNR7VBS2ER2BP405QJHOF8NQGPUHMEUK8HCL7890O0EIAAF1D9QDRJL1BMN4MNF4TJ1JNBL4DCG===="("_message", "@$literal$@52QIVV94KFMG600I90DHR01759M1UP8ITNJ760LIK4MPJ03M1IKBAE851AL8A299U01P2KB4MFRCH5HS8M83LGRL4GH8I3GUQ1SVDQDB134S1C1954GEMA9IVRKEA6MG5DSA3JHMI5KOLNMJ17R05CJ10372746Q2RQUJH3IHATHDONAP1E1SKI7DLTQIB3M50201QSHPC2V7NEJTJC9KUJBNPJ1M==="("@from_base64@52QIVV94MBIG605IOSCHL06715I78N1PMUQKOG1C549KPLRK3G257TPC9587AVUHC5KC7N6IKSU4B43Q9ER384QD4S2UIF2E2N2O280IUOA1QNPCLH2S63FLSOQ9JDPC7RULIL465GD9OPT0VDEETKO1G5M8G44OCQAQM9S3F943RTPPQI2RAGO10O0FDI1EO5E82PHC2F204V7BTFA07G3FFHOG===="('DgAAAFZBUklBQkxFX1dJRFRIAQAAAAcAAAAABwAAACQudGFibGU='))) AS varchar) = CAST('TEST_TABLE' AS varchar)))]
        Layout: [expr_59:varchar(15), expr_57:varchar]
        Estimates: {rows: ? (?), cpu: ?, memory: 0B, network: 0B}/{rows: ? (?), cpu: ?, memory: 0B, network: 0B}/{rows: ? (?), cpu: ?, memory: 0B, network: 0B}
        expr_59 := CAST('kafka_source' AS varchar(15))
        expr_57 := CAST(json_extract("_message", "$literal$"(from_base64('DgAAAFZBUklBQkxFX1dJRFRIAQAAABQAAAAAFAAAACQuVEFSSUZGX0RFU0NSSVBUSU9O'))) AS varchar)
        _message := KafkaColumnHandle{name=_message, type=varchar, mapping=null, dataFormat=null, formatHint=null, keyCodec=false, hidden=false, internal=true}

It is out of scope of this article to explain every step, but as you can see,  plan is very different from what I’ll get from the relational databases.

There is also a web console which can provide a nice details of various aspect of distributed query engine.

On the following slides I’ll provide a couple of screenshots to see what you can expect.

Basic info related to the query
Session info
Additional statistics
Detail statistics for each phase of execution
Graphical layout of execution plan
Task details
Timeline of query execution

 

Wrap it up:

I hope that you, by now, get the feeling how powerful distributed engine is, and I’m sure you’ll find many use cases for implement it in your environment.



Get notified when a new post is published!

Loading

Comments

There are no comments yet. Why not start the discussion?

Leave a Reply

Your email address will not be published. Required fields are marked *

This site uses Akismet to reduce spam. Learn how your comment data is processed.