Monthly Archives: March 2010

Writing efficient SQL

The other day I gave a presentation on ‘Efficient SQL’. It was the first of a number of presentations, so I started with explaining some basic concepts.

Maybe it will be interesting for other people too. So here is the summary of it.

 

MAIN RULES OF EFFICIENT SQL

 

The main rules of efficient SQL are:

·         Less is better

·         More is better

That seems easy, since everything is better. But let me explain.

Less is better: The less I/O generated the better your statement will perform. Even though somebody might be able to think of some exceptions. It is safe to keep this as a rule of thumb.

Avoid unnecessary work for your query. You can do that by selecting only the rows that you need. (That sounds obvious, but I’ll give an example soon). In complex queries, make sure that you select the smallest possible set in every part of your query. Rather than collecting a huge amount of data and then selecting what you need, select the smallest set possible before you join.

Also select only the columns that you need. One argument is that it might give Oracle a chance to skip the table access, and use an index-only access.

Think about the use of ‘select *’. Most often this is a complete waste of resources. In packaged software, it can easily lead to bugs when the table definition changes. In all situations, it will cost extra resources to collect the data, send them to the client and then filter out the data that is not needed.

 

More is better: This is of course not about I/O. It is about the information that you give Oracle about your data and your query. Add as many predicates as possible, since it can help the optimizer do a better job. When 2 tables are joined on an ID-column. But you know, from your knowledge of the data, that another column can also be used as a join-condition then use both join conditions.

It will help the optimizer work out the relationship between the tables, and select the optimal plan. If 2 columns have related data, and the predicate on one column means that the other column is restricted too, put a predicate on both columns.

 

Consider the following:

SQL> create table Xxx_inner

  2  as select * from dba_objects

  3  where object_type='TABLE';

 

Table created.

 

SQL> create index xxx_inner_n1 on xxx_inner(object_id,object_type);

 

Index created.

 

SQL> create index xxx_inner_n2 on xxx_inner(Object_type);

 

Index created.

 

SQL> create table xxx_outer

  2  as select * from dba_objects;

 

Table created.

 

SQL> insert into  xxx_outer

  2  select * from dba_objects;

 

71136 rows created.

 

SQL> create index xxx_outer_n1 on xxx_outer(Object_id,object_type);

 

Index created.

 

SQL> create index xxx_outer_n2 on xxx_outer(object_type);

 

Index created.

 

SQL> analyze table xxx_inner compute statistics;

 

Table analyzed.

 

SQL> analyze table xxx_outer compute statistics;

 

Table analyzed.

 

SQL> set autotrace traceonly;

SQL> select o.*

  2  from xxx_outer o

  3  where o.object_id in (select object_id from xxx_inner i)

  4  and o.object_type='PROCEDURE';

 

no rows selected

 

Statistics

———————————————————-

        113  consistent gets

       1064  bytes sent via SQL*Net to client

 

Consistent gets is the number of times data was read from the buffer cache into our session memory. It is therefore a good measure of the amount of work a session needed to do while executing a query.

SQL> select o.*

  2  from xxx_outer o

  3  where o.object_id in (select object_id

    from xxx_inner i

    where i.object_type=o.object_type)

  4  and o.object_type='PROCEDURE';

 

no rows selected

 

Statistics

———————————————————-

         15  consistent gets

       1064  bytes sent via SQL*Net to client

 

SQL> select o.object_id

  2  from xxx_outer o

  3  where o.object_id in (select object_id

    from xxx_inner i

    where i.object_type=o.object_type)

  4  and o.object_type='PROCEDURE';

 

no rows selected

 

Statistics

———————————————————-

         15  consistent gets

        254  bytes sent via SQL*Net to client

 

 

Here we see two tables created from ‘DBA_OBJECTS’. We join these tables on ‘OBJECT_ID’. In this case, we have some information that the Oracle Optimizer does not have. The object_id,object_type combination is the same in both tables. Unaware of this fact, Oracle has to search all object_id’s for ‘XXX_INNER’. When we tell Oracle that the object_type is the same, it can skip most of the records resulting in less I/O.

When we finally tell Oracle that we’re only interested in the object_id, we also reduce the network traffic by 75%.

 

The concepts

 

For every query, Oracle will have to collect some data. And in most cases, it will have to join one or more data-sets. Let’s see what options Oracle has for collecting data and joining the data together. In this presentation we only look at the basic options, not the more sophisticated features. So don’t expect this list to be complete. We will see how data can be retrieved from the database, and how tables / data-sets can be joined together. We will not yet go into the most efficient way to do it, because the most efficient way to retrieve data depends on many factors. Only when you have a basic understanding of the concepts, we can start thinking about the most efficient way to do things.

 

Collecting data

 

To gather data from a table, Oracle can use 3 different options, called ‘Access Paths’:

·         Full Table Scan

·         Index Scan / Table Access by Rowid

·         Index Scan

The Full Table Scan (FTS) is exactly as the name implies, a full scan of the table. All the records of the table are read into memory, and checked against the predicates in the query (where clause or Join condition). The blocks of the table don’t have to be read in any particular order. Oracle will just try to get the blocks of the table as quickly as possible.

When a sizable part of the table needs to be selected, this will be the most efficient access path. It will be the only one available, if there is no predicate available that matches (part of) an index.

The ‘Index Scan / Table Access by Rowid’, will use an index to decide which rows need to be read into memory. Then based on the rowid in the index, the correct row will be retrieved. The rowid refers directly to the position on disk where the row is located.

And the last option is the ‘Index Scan’. The difference with the previous option is that Oracle does not need to get the table data anymore. The data in the index is sufficient to answer the query.

 

So which one of these is the most efficient?

As usual, it depends. Many people think a FTS is less efficient than an Index Scan. But consider this:

SQL> create table xxx_access as select * from dba_objects;

 

Table created.

 

SQL> create index xxx_access_n1 on xxx_access (object_id);

 

Index created.

 

SQL> select object_id,object_name from xxx_access where object_id>0;

 

71139 rows selected.

 

Elapsed: 00:00:08.02

 

Execution Plan

———————————————————-

   0      SELECT STATEMENT Optimizer=ALL_ROWS (Cost=287 Card=78799 Bytes=6225121)

   1    0   TABLE ACCESS (FULL) OF 'XXX_ACCESS' (TABLE) (Cost=287 Card=78799 Bytes=6225121)

 

Statistics

———————————————————-

       5803  consistent gets

    2727442  bytes sent via SQL*Net to client

      71139  rows processed

 

SQL> select /*+ INDEX (xxx_access) */ object_id, object_name from xxx_access where object_id>0;

 

71139 rows selected.

 

Elapsed: 00:00:10.05

 

Execution Plan

———————————————————-

0     SELECT STATEMENT Optimizer=ALL_ROWS (Cost=1290 Card=78799 Bytes=6225121)

1   0   TABLE ACCESS (BY INDEX ROWID) OF 'XXX_ACCESS' (TABLE) (Cost=1290 Card=78799 By=6225121)

2   1     INDEX (RANGE SCAN) OF 'XXX_ACCESS_N1' (INDEX) (Cost=161 Card=78799)

 

Statistics

———————————————————-

      10771  consistent gets

    2727442  bytes sent via SQL*Net to client

      71139  rows processed

 

The FTS needs 5803 I/O operations. While the Index Scan takes almost double at 10771 I/O’s and 2 seconds more. (out of 10 sec’s!)

Imagine what will happen, when you try this with a multi-million row table.

 

But for sure an Index Scan will be the most efficient when it can be done? Again, it depends. Consider this:

 

SQL> create table test1 (l number, txt varchar2(500));

Table created.

 

SQL> create index test1_idx on test1(l,txt);

Index created.

 

SQL> Begin

  2  For I In (Select  Level L, Rpad('ABC',500,To_char(Level)) Txt

  3            From Dual Connect By Level <= 50000) Loop

  4    Insert Into Test1 Values (I.L,I.Txt);

  5    If Mod(I.L,5)!=0 Then

  6       Delete From Test1 Where L=I.L;

  7    End If;

  8  End Loop;

  9  Commit;

 10  end;

 11  /

PL/SQL procedure successfully completed.

 

SQL> analyze table test1 compute statistics;

Table analyzed.

 

SQL> set autotrace traceonly;

SQL> select l from test1 t where l>0;

10000 rows selected.

 

Execution Plan

———————————————————-

   0      SELECT STATEMENT Optimizer=ALL_ROWS (Cost=773 Card=10000 Bytes=40000)

   1    0   TABLE ACCESS (FULL) OF 'TEST1' (TABLE) (Cost=773 Card=10000 Bytes=40000)

 

Statistics

———————————————————-

       1393  consistent gets

     140549  bytes sent via SQL*Net to client

      10000  rows processed

 

SQL> select /*+ INDEX(t,test1_idx) */ l from test1 t where l>0;

10000 rows selected.

 

Execution Plan

———————————————————-

   0      SELECT STATEMENT Optimizer=ALL_ROWS (Cost=3338 Card=10000 Bytes=40000)

   1    0   INDEX (RANGE SCAN) OF 'TEST1_IDX' (INDEX) (Cost=3338 Card=10000 Bytes=40000)

Statistics

———————————————————-

       4003  consistent gets

     140549  bytes sent via SQL*Net to client

      10000  rows processed

 

 

 

 

How can the Full Table Scan be more efficient?

 

Good question. To answer it, we have to look at the amount of work that Oracle has to do to get the data. We start with the FTS:

As mentioned, a FTS needs to scan all the blocks in a table. In my system, the table XXX_ACCESS was created with 1152 blocks. So basically Oracle will read 1152 blocks. However Oracle has optimized this process. One of the most noticeable optimizations is the db_file_multiblock_readcount, which tells Oracle to read multiple blocks in one I/O operation. So instead of doing 1152 reads, Oracle reads 4 (on my system) blocks at a time in 288 reads. (Take a look at the explain plan above again, and notice the cost of the Full Table Scan!).

Now how much work does an Index Scan / Table Access rowid need to do?

To answer that question, you need to know the structure of an index. An index in Oracle is a B-Tree structure. It looks like an inverted tree, with the top being the ‘Root-block’. The lowest level contains the ‘Leaf blocks’, that hold the index keys and the matching rowid’s. The index keys in the leaf blocks are sorted. So the lowest value will be in the utter-left block, the highest in the far right block.

All upper level blocks show the ranges that the lower level blocks contain.

To find a range of data, Oracle starts at the root block, and follows the pointers to the first leaf block containing an index key within the range. From here Oracle can walk the leaf blocks from left to right (or right to left, if needed).

See the following picture:

That means that Oracle has to walk the ‘height’ of the index (Index level). Then read a number of leaf blocks. And for every entry in the leaf-block, it needs to retrieve the data from the table.

In the worst case scenario, the data is spread throughout the table. And for every index key, Oracle has to retrieve a different table block. In that case, the amount of work is: (index level – 1) + number of index leaf blocks + (Number of keys * Number of table blocks).

It will be obvious that is a lot more I/O than just reading the table once. The formula is not completely correct, because Oracle does not read the same table block twice, when the next rowid from the index is in the same block.

But the formula should make it clear that with more data being retrieved, the cost of the index access / Table Rowid, is increasing faster than the cost of the Full Table Scan.

 

What happened to Index Only Access?

 

The example showed that a Full Table Scan is still more efficient than using only an index. Even though Oracle would only have to walk through the index to retrieve all the data.

This is caused by a feature of the index structure and the way the data was entered into the table. This caused the index to grow bigger than the actual table:

SQL> select table_name,num_rows,blocks from dba_Tables where table_name='TEST1';

 

TABLE_NAME             NUM_ROWS     BLOCKS

——————– ———- ———-

TEST1                     10000        772

 

SQL> select index_name, distinct_keys, leaf_blocks from dba_indexes where index_name='TEST1_IDX';

 

INDEX_NAME           DISTINCT_KEYS LEAF_BLOCKS

——————– ————- ———–

TEST1_IDX                    10000        3334

 

This situation can occur when a lot of inserts and deletes are taking place in the same transaction. The space for the deleted table rows can be reused immediately. But the space for the deleted index keys only comes available after the commit. NOTE: The space in the index will be reusable after the commit!

Another situation where this can happen is with an index key based on a sequence. So new data is only inserted at the end of the range. When you delete a lot of values on the lower end of the range (but not all). Richard Foote has some excellent material on his website about Deleted Index Keys:

http://en.wordpress.com/tag/index-delete-operations/

 

Join Mechanisms

 

I hope you’re not yet in despair. Because so far we have only retrieved data from single tables. In most cases, we need to join tables together to get our data. Now we will look at 3 basic forms of joining data-sets together. Note that it is not necessarily only tables that we join. It can also be a result set from an earlier part of your query. For example we might collect some data from an Index Only Access and then join it to data from a Full Table Scan.

Oracle has 3 mechanisms to perform Joins: Nested Loops, Hash Join, Merge Join. Let’s take a look at them.

Nested Loops

 

The Nested Loop join loops over a smaller data set, and for every record in that set, it searches a matching record in the second data set. Visually, it looks like this:

 

You can see that the first set is fully scanned. This is the Outer or Driving Set. For every record in this set, we look up a matching record in the second set. Usually this will be done through an index, even though this is not mandatory.

The efficiency of this mechanism depends on the size of Set A, and the number of records we need to  retrieve from Set B.

We will see some situations where a Nested Loops join is more or less efficient.

The second join mechanism is the

Hash Join

 

For a Hash Join, Oracle builds a hash table from a (preferably) smaller data set (Build set), where the key is a hash value derived from the join columns. Then Oracle reads the second data set (Probe set), derives the hash value on the join columns and probes the hash table for a match.

Schematically, it looks like this:

For the Hash Join, it is not relevant if the data is retrieved through an index, or from a Table scan. Both data sets need to be read fully. One of the features of hashing is that collisions may occur. A collision means that 2 different values result in the same hash value. Therefore, the full data set is stored in the hash table. When a match is found on the hash values, Oracle double-checks the actual values of the join columns to see if they match.

There is a major drawback for this mechanism. When the hash table does not fit into the available memory (hash_area_size). Then Oracle will dump parts of the hash table to disk (Temp tablespace). A bitmap of all possible hash values is kept in memory with a bit indicating if a hash-value is used or not.

So while scanning the Probe set, Oracle can check if there is a possible match. If that part of the hash table is not in memory, Oracle will set aside the records from the probe table. When finished with the probe set, the next part of the hash table is loaded into memory, and the records that were set aside are tested again. This is called a ‘Multipass Hash Join’, instead of the ‘Onepass Hash Join’ where the entire hash table is held in memory.

You can imagine that the hash join can be very efficient even without index access. However it can deteriorate quickly when a ‘Multipass Hash Join’  is needed.

That brings us to the

Merge Join

 

A Merge Join is possible when 2 datasets are both ordered on their join columns. The datasets can then be read in an alternating way. You start reading the first dataset, then you read the second dataset until you find a matching value, or exceed the value. If it is a matching value, you can start building your result set. If you pass the join value, you continue reading the first dataset again.

Schematically, it looks like this:

 The main requirement is of course that both data sets are ordered, before the join takes place. If that is the case, this is probably the most efficient join mechanism. It can handle all kinds of join comparisons, including range comparisons.

Back to efficient SQL

 

Now it is time to go back to the main focus of this article. How to write efficient SQL.

We have seen how Oracle can access data and join it together. It is the job of the Oracle Optimizer to find out the most efficient way to do that, for a given query. It does this based on statistics on the tables and indexes. It is not in the scope of this article to discuss how to gather statistics. But we will see the use of several of the statistics. These statistics include (but are not limited to) the number of rows in the table, the number of blocks, the average row length, etc. For an index, they include (but are not limited to) the number of leaf blocks, the number of keys per leaf block, the number of datablocks in the table per key, etc.

In an ideal world, we would be able to trust the optimizer to do the right thing all the time.

However, we live in an imperfect world, and the Oracle Optimizer does not always have perfect information about the data or your query. Either the statistics might not be up to date, or they might not include some dependencies within the data. Also your query might not give the optimizer all the information that you have. (see the first example in this article, where we can give the optimizer extra information by adding a predicate).

Consider this query:

Select * From

Tiny  T, –7089 rec

Small S   — 71151 Rec

Where t.Object_id = s.Object_id;

 

Which explain plan is more efficient:

 

0 SELECT STATEMENT  

1  HASH JOIN        

2   TABLE ACCESS FULL  TINY

3   TABLE ACCESS FULL  SMALL

 

Or:

 

0 SELECT STATEMENT                      

1  TABLE ACCESS BY INDEX ROWID SMALL     

2   NESTED LOOPS                       

3    TABLE ACCESS FULL         TINY    

4    INDEX RANGE SCAN          SMALL_IDX

 

The first explain plan uses a Hash Join, with Tiny as the Build Table. The second uses a Nested Loops Join, with Tiny as the Inner Table.

I hope you have decided that you don’t know which one is the most efficient. Because you cannot know based on the information you have. If I selected 2 non-overlapping sets of data, the Nested Loop Join might be more efficient.  It would scan Tiny for 7089 records, then do 7089 index lookups on Small (without result), so it would need to read approximately 7089+7089=14178 blocks.

 

If the 2 sets would have a 1-n relationship, and every occurrence of object_id in Small is also in Tiny, both tables would need to be read fully and the Hash Join would be more efficient.

 

That means that to write efficient SQL, you’ll need to have an understanding of the data. And you must have considered the optimal execution plan for your query. When you write your query, keep in mind the mantra in first part of this article. Less is Better, More is Better. You want to give Oracle as much information as possible, and you want to get as small as possible result sets.

 

The last part that we will do in this part, is to look at the

Explain plan

 

To see what Oracle will do when executing a query, you can make an explain plan. Many tools have built-in options to show explain plans on queries. Alternatively, you can use the Oracle commands:

SQL> Explain plan for

  2  Select * From

  3  Tiny  T, –7089 rec

  4  Small S   — 71151 Rec

  5  Where t.Object_id = s.Object_id;

 

Explained.

 

SQL> select * from table( dbms_xplan.display() );

 

PLAN_TABLE_OUTPUT

——————————————————————————–

Plan hash value: 803478362

 

—————————————————————————-

| Id  | Operation          | Name  | Rows  | Bytes | Cost (%CPU)| Time     |

—————————————————————————-

|   0 | SELECT STATEMENT   |       |  7089 |  1287K|  1264   (1)| 00:00:16 |

|*  1 |  HASH JOIN         |       |  7089 |  1287K|  1264   (1)| 00:00:16 |

|   2 |   TABLE ACCESS FULL| TINY  |  7089 |   650K|   213   (0)| 00:00:03 |

|   3 |   TABLE ACCESS FULL| SMALL | 71151 |  6392K|  1049   (1)| 00:00:13 |

—————————————————————————-

 

 

PLAN_TABLE_OUTPUT

——————————————————————————–

Predicate Information (identified by operation id):

—————————————————

 

   1 – access("T"."OBJECT_ID"="S"."OBJECT_ID")

 

15 rows selected.

 

I took the previous query, and ran an explain plan on it. First I should tell you how I created tiny and small, so you can verify:

 

create table tiny as select * from dba_objects where mod(object_id,10)=0;

 

create table small as select * from dba_objects;

 

create index tiny_idx on tiny(object_id);

 

create index small_idx on small(object_id);

 

Analyze Table Tiny Compute Statistics;

 

analyze table small compute statistics;

 

You can see that both tables were created from dba_objects and tiny is a subset of small. Therefore a Hash Join makes most sense.

 

Let’s take a closer look at the explain plan now. Keep in mind that all the numbers are estimates from the optimizer. They are based on the statistics available, and the runtime numbers might be completely different.

The execution starts at the rows furthest right. And top down. In this case that means that we start with a Full Table Scan of Tiny. Then we do a Full Table Scan of Small, and finally we Hash join the 2 sets together.

The result of the Hash Join is returned, and becomes the result of the query (Select).

 

Let’s take a closer look at the Full Table Scans. After the name of the table, we see ‘Rows’,’Bytes’,’Cost’ and ‘Time’.

The ‘Time’ is an estimate of the amount of time it will take to complete this step. It is useful in query tuning  to see which step will take the most time.

The ‘Bytes’ are an estimate of the amount of data used to complete this step. I find it useful when a hash join is involved. Because when it exceeds my hash_area_size, Oracle will need a ‘Multi-Pass Hash Join’.

The ‘Cost’ is often used for query tuning, and people will try to ‘tune down’ the cost. This makes sense, because the cost is an estimate for the amount of I/O Oracle needs to do for this step. However, there is a logical trap in this.

Oracle has used the cost of different explain plans for the original query to decide the most efficient one. Now when we change the query, the cost can no longer be compared to the original query. After all, it is a different query. It will return a different result. Unless, of course there is some information about the data we are selecting, that we didn’t give the optimizer initially.

 

To me, it makes more sense to look at the ‘Rows’. This is the number of records Oracle expects from that step in the query. If this is very different from the number you are expecting, something is wrong. Either the statistics are outdated, or the Optimizer is missing some information that you have, or you are not selecting the data you are expecting.

 

In the query above, we see that Oracle has made the perfect assumptions. Tiny will return 7089 rows, Small will return 71151 rows and there is a 1-1 relationship  between them. So the join will return one record for each record in Tiny. No use looking for a more efficient plan here. Unless again, we did not query what we are looking for.

 

 

 

Oracle Advanced Queues (AQ)

Oracle Advanced Queuing (AQ)

 

A customer asked me to do a presentation about AQ (Advanced Queuing). This article is a rewrite of that presentation. We’ll be getting some hands-on experience with AQ, and then some tips on the issues that might occur.

As always, your comments to this article are more than welcome. If you enjoy this article, find it useful or maybe not at all, please let me know by leaving a comment on the site.

The article is also available in PDF format from here.

 

Queuing: I don’t want to queue.

 

The title might be true in many cases, but there are also situations where a queue is very convenient. For example in the case of batch processing where a batch process handles multiple incoming messages from an online process. Or when 2 processes need inter-process communication, but still need to function independently of each other.

In eBS we use queues for the workflow system. (Deferred items, notifications for the workflow mailer and the Business Event System). Some more queues are found for concurrent processing and SFM.

 

So whether you like it or not, you’ll have to queue. The trick is to manage these queues to get optimal performance for your system.

 

(Advanced) Queue design

 

Before we can start building queues, there are some things to consider.

AQ supports both point-to-point queues and publish-subscribe-queues (also called multi-consumer queues).

 

Point-to-point queues are emptied by only one specific process. One or more processes can enqueue messages on the queue, but only one process can dequeue them.

 

In contrast, a publish-subscribe queue can have many processes reading the messages in the queue. Either the messages are broadcasted, or the receivers have to subscribe to a certain kind of messages.

Of course the publish-subscribe queue has some very interesting properties. But we’ll start our item with the point-to-point queue.

 

So you’ll first have to decide who the senders and receivers of the queue data will be. In this article, we start with using a point-to-point queue. After that we start using multi-consumer queues.

Another thing to consider is the payload of the message. Of course, the messages will need some content to give it a meaning to the receiver. This content is called the payload. And you can either use a custom type (including XML), or a raw type.

During this article, we’ll see some more features of AQ. But when we decide on the type of queue and the payload type, we can build our own queues. All queues are built on queue-tables. These tables hold the data in the queue. On top of these tables, the actual queue and some management views are built.

To build a queue-table, we use the dbms_aqadm package:

dbms_aqadm.create_queue_table(queue_table =>’<table_name>’

                          ,queue_payload_type => [‘RAW’|<custom_type>]);

 

This creates the queue table including a LOB segment for the payload, some indexes, and an ‘Error queue’:

 

Begin

dbms_aqadm.create_queue_table(queue_table=>’xxx_test’

                           ,queue_payload_type=>’RAW’);

End;

 

Select object_name,object_type from dba_objects where created>sysdate-1/24;

 

OBJECT_NAME                 OBJECT_TYPE

—————————–         ———–

SYS_C0011768                INDEX

XXX_TEST                    TABLE

SYS_LOB0000073754C00029$$   LOB

SYS_LOB0000073754C00028$$   LOB

AQ$_XXX_TEST_T              INDEX

AQ$_XXX_TEST_I              INDEX

AQ$_XXX_TEST_E              QUEUE

AQ$_XXX_TEST_F              VIEW

AQ$XXX_TEST                 VIEW

 

This created the base-table for a point-to-point queue. The table is a regular heap-oriented table. And you are free to create extra indexes on it, if you feel the urge. The necessary indexes have been created already.

 

The queue that is created now is the default error queue. Messages that failed dequeuing will be set on this queue.

 

Now it’s time to create the actual queue. The queue-tables are the infrastructure for storing the messages and related information. The queue can now be created to control the queuing and dequeuing of messages.

 

For both point-to-point as publish-subscriber queues, the command is:

 

dbms_aqadm.create_queue (queue_table =>’<table_name>’

                      queue_name => ‘<queue_name>’);

 

 

So for us we run:

Begin

dbms_aqadm.create_queue (queue_name => 'xxx_test_q'

  ,queue_table => 'xxx_mc_test');

End;

 

 

This creates an object of type QUEUE. This is the object that will control the contents of the underlying tables / IOT’s.

 

Before we can start using our queues, we also have to ‘start’ them. On starting, we indicate whether the queue is available for queuing, dequeuing or both:

 

Begin

 dbms_aqadm.start_queue(queue_name=>’xxx_test_q’

 ,enqueue=>TRUE

 ,dequeue=>TRUE);

End;

 

Our queue is now enabled for both queueing and dequeuing. Let’s first verify if things are working correctly.

 

To enqueue (or dequeue) a message, we use the dbms_aq package. It has an enqueue and dequeue procedure. Both with their own parameters. The parameters include en-/dequeue options, message properties, a message_id and of course the message itself:

 

Declare

V_payload Raw(200);

V_msgid Raw(200);

V_enq_options Dbms_aq.Enqueue_options_t;

v_msg_properties dbms_aq.message_properties_t;

Begin

V_payload:=Utl_raw.Cast_to_raw('Hello world!');

Dbms_aq.Enqueue(Queue_name=>'xxx_test_q'

,Message_properties=>V_msg_properties

,Enqueue_options=>V_enq_options

,Payload=>V_payload

,Msgid=>V_msgid);

Dbms_output.Put_line(rawtohex(V_msgid));

end;

 

This enqueues a ‘Hello world!’ message, and returns the message id. If everything works correctly, you’ll see the msgid as a hexadecimal string. (Don’t forget to set serveroutput on).

 

We created 2 extra parameters: v_enq_options with the options used to enqueue this message. And v_msg_properties to set additional information about the message.

V_enq_options is of type ‘dbms_aq.enqueue_options_t’. This is a record of:

 

Visibility   BINARY_INTEGER  –Options are: dbms_aq.on_commit and dbms_aq.immediate. This indicates whether the enqueue is part of the current transaction, or done autonomously.

Relative_msgid      RAW(16)             –If the message needs to be enqueued at a specific position, it will be relative to this msgid.

Sequence_deviation BINARY_INTEGER –-Options are: DBMS_AQ.BEFORE, DBMS_AQ.TOP or NULL (default). If before then the message is before the relative_msgid. If top, the message will be the first to be dequeued.

 

V_msg_properties is of type ‘dbms_aq.message_properties_t’. This is a record of:

 

priority        BINARY_INTEGER  — Any integer, to set the priority. Smaller is higher priority. The default is 1.

delay           BINARY_INTEGER  — If the message needs to be delayed before it can be dequeued, set the time in seconds here. The default is dbms_aq.no_delay.

expiration      BINARY_INTEGER  — For messages that need to expire after a certain time, set the expiration time in seconds. (Offset from the delay). The default is dbms_aq.never.

correlation     VARCHAR2(128)   — A free text field that can be used to identify groups of messages.

attempts        BINARY_INTEGER  — Number of failed attempts to dequeue, before the message will be failed and marked as expired.

recipient_list  DBMS_AQ.AQ$_RECIPIENT_LIST_T –- Only valid for multi-consumer queues. Sets the designated recipients.

exception_queue VARCHAR2(51)    — The exception queue to use, when it is not the default.

enqueue_time    DATE                — Set automatically during enqueue

state           BINARY_INTEGER  — Automatically maintained by AQ, to indicate the status of the message.

 

Let’s see if the dequeue also works. For this the procedure dequeue is used, with similar parameters.

 

Declare

V_payload Raw(200);

V_msgid Raw(200);

V_deq_options Dbms_aq.dequeue_options_t;

v_msg_properties dbms_aq.message_properties_t;

Begin

Dbms_aq.dequeue(Queue_name=>'xxx_test_q'

,Message_properties=>V_msg_properties

              ,dequeue_options=>V_deq_options

              ,Payload=>V_payload

             ,Msgid=>V_msgid);

Dbms_output.Put_line(utl_raw.cast_to_varchar2(V_payload));

End;

 

This time, our message should be displayed.

 

For the dequeue, we used v_deq_options of type ‘dbms_aq.dequeue_options_t’. This is a record of:

 

consumer_name  VARCHAR2(30)       — Indicates the consumer for multi-consumer queues.

dequeue_mode   BINARY_INTEGER –- How to dequeue the messages. Either leave it on the queue, or remove it. Either dbms_aq.browse and dbms_aq.remove (default).

navigation     BINARY_INTEGER –- Indicate where to start dequeuing. Dbms_aq.next_message (default), to continue from the previous dequeue. Dbms_aq.first_message to start at the top of the queue. Dbms_aq.next_transaction to skip the rest of this message group.

visibility     BINARY_INTEGER –- same as dbms_aq.enqueue_options_t.visibility.

wait           BINARY_INTEGER –- The time (in seconds) the package should wait if no message is available. Default is dbms_aq.forever.

msgid          RAW(16)     — When specified, only the message with this msgid will be dequeued.

correlation    VARCHAR2(128)  — Only messages with this correlation will be dequeued (may include wildcards).

 

Note how message_properties and payload are now out-parameters.

 

This is probably the simplest queue possible. We enqueued and dequeued a raw message. We didn’t specify the visibility. So your session still needs to commit these actions.

 

Now let’s enqueue our message again, and see how it works behind the curtain.

 

Declare

V_payload Raw(200);

V_msgid Raw(200);

V_enq_options Dbms_aq.Enqueue_options_t;

v_msg_properties dbms_aq.message_properties_t;

Begin

V_payload:=Utl_raw.Cast_to_raw('Hello world!');

Dbms_aq.Enqueue(Queue_name=>'xxx_test_q'

,Message_properties=>V_msg_properties

,Enqueue_options=>V_enq_options

,Payload=>V_payload

,Msgid=>V_msgid);

Dbms_output.Put_line('Msg_id: '||rawtohex(V_msgid));

end;

 

Msg_id: 499CE4809F2641E1BFBC8AFBC8DB5AFA

 

The queue table is an ordinary heap-table, so we can query it.

 

select q_name, rawtohex(msgid) msg_id, priority, state, enq_time, enq_uid

from   xxx_test;

 

q_name     msg_id                         priority state enq_time                   enq_uid

XXX_TEST_Q  499CE4809F2641E1BFBC8AFBC8DB5AFA     1     0 21-03-10 17:24:01,876000000 SYSTEM

 

 

We see our msgid again. A priority flag. A state flag, the time of enqueueing the message, and the user that enqueued the message. The message is also in the table, but since it is a blob, we won’t bother selecting from it yet.

 

There are more columns in the table, that control the order and by who the messages are dequeued. Most of them are still null, so we will see them when needed.

 

A useful alternative to the table is to query the queue-view aq$<table_name>. This will show the translated values of the state. (0 = READY). And especially when using multi-consumer queues, it will use a join to select a more complete picture of the queue.

 

When we dequeue the message, it will disappear from the queue. (And be deleted from the queue table). However, this can be controlled by the retention parameter of the queue.

 

Let’s set this parameter, so we can check the data after the dequeue.

We set the retention time to 240 (seconds):

 

begin

DBMS_AQADM.ALTER_QUEUE(queue_name =>'xxx_test_q'

,retention_time => 240);

end;

 

Now when we dequeue the message, it will remain in the queue:

 

Declare

V_payload Raw(200);

V_msgid Raw(200);

V_deq_options Dbms_aq.dequeue_options_t;

v_msg_properties dbms_aq.message_properties_t;

Begin

Dbms_aq.dequeue(Queue_name=>'xxx_test_q'

,Message_properties=>V_msg_properties

              ,dequeue_options=>V_deq_options

              ,Payload=>V_payload

,Msgid=>V_msgid);

Dbms_output.Put_line(utl_raw.cast_to_varchar2(V_payload));

End;

 

select       queue

,      rawtohex(msg_id)msg_id

,      msg_priority

,      msg_state

,      enq_timestamp

,      enq_user_id

,      deq_timestamp

,      deq_user_id

from   aq$xxx_test

 

QUEU     MSG_ID                         MSG_PRIO MSG_STATE ENQ_TIMESTAMP  ENQ_USER DEQ_TIMESTAMP     DEQ_USER_ID

XXX_TEST_Q AEC2CD2E34514363B6739969E8E8D353    1 PROCESSED 19-03-10 18:31:40 SYSTEM 19-03-10 21:26:45 SYSTEM

 

Now the message has been set to state ‘PROCESSED’, and some dequeue information has been added.

 

It’s time to start navigating queues when there are multiple messages in the queue.

 

Messages are by default dequeued in the order in which they are enqueued. On creation of the queue table, you can set other dequeue orders. But it is also possible to dequeue messages in a different order by navigating the queues, or using filter-criteria.

 

To show the dequeueing order we enqueue 10 different messages.

 

Declare

V_payload Raw(200);

V_msgid Raw(200);

V_enq_options Dbms_aq.Enqueue_options_t;

v_msg_properties dbms_aq.message_properties_t;

Begin

V_enq_options.visibility := dbms_aq.immediate;

for i in 1..10 loop

    V_payload:=Utl_raw.Cast_to_raw('This is the '||to_char(to_date(i,'J'),'jspth')||' message');

    Dbms_aq.Enqueue(Queue_name=>'xxx_test_q'

    ,Message_properties=>V_msg_properties

                  ,Enqueue_options=>V_enq_options

                  ,Payload=>V_payload

                        ,Msgid=>V_msgid);

    Dbms_output.Put_line(rawtohex(V_msgid));

end loop;

end;

 

This enqueues the text ‘This is the first message’ till ‘This is the tenth message’. On dequeuing, the messages come out in the same order:

 

Declare

V_payload Raw(200);

V_msgid Raw(200);

V_deq_options Dbms_aq.dequeue_options_t;

v_msg_properties dbms_aq.message_properties_t;

Begin

for i in 1..10 loop

    Dbms_aq.dequeue(Queue_name=>'xxx_test_q'

                 ,Message_properties=>V_msg_properties

                  ,dequeue_options=>V_deq_options

                   ,Payload=>V_payload

                 ,Msgid=>V_msgid);

    Dbms_output.Put_line(utl_raw.cast_to_varchar2(V_payload));

end loop;

End;

 

This is the first message

This is the second message

………

This is the tenth message

 

 

When we created the queue table, we choose the default sort order. This is by enqueue_time. We can also build a queue that uses priority dequeuing. First we create a queue:

 

begin

dbms_aqadm.create_queue_table(queue_table=>'xxx_test_prio'

                             ,sort_list => 'PRIORITY,ENQ_TIME'

                             ,queue_payload_type=>'RAW');

dbms_aqadm.create_queue(queue_name=>'xxx_test_prio_q'

                       ,queue_table=>'xxx_test_prio');

dbms_aqadm.start_queue(queue_name=>'xxx_test_prio_q');

end;

 

We indicated a sort_list now. The options are ‘ENQ_TIME’ (default), ‘ENQ_TIME,PRIORITY’,‘PRIORITY’,’PRIORITY,ENQ_TIME‘. Now we enqueue some messages with reversed priorities:

 

Declare

V_payload Raw(200);

V_msgid Raw(200);

V_enq_options Dbms_aq.Enqueue_options_t;

v_msg_properties dbms_aq.message_properties_t;

Begin

V_enq_options.visibility := dbms_aq.immediate;

for i in 1..10 loop

    V_payload:=Utl_raw.Cast_to_raw('This is the '||to_char(to_date(i,'J'),'jspth')||' message');

    v_msg_properties.priority:=11-i;

    Dbms_aq.Enqueue(Queue_name=>'xxx_test_prio_q'

    ,Message_properties=>V_msg_properties

                  ,Enqueue_options=>V_enq_options

                  ,Payload=>V_payload

                 ,Msgid=>V_msgid);

    Dbms_output.Put_line(rawtohex(V_msgid));

end loop;

end;

 

And we dequeue them again:

 

Declare

V_payload Raw(200);

V_msgid Raw(200);

V_deq_options Dbms_aq.dequeue_options_t;

v_msg_properties dbms_aq.message_properties_t;

Begin

for i in 1..10 loop

    Dbms_aq.dequeue(Queue_name=>'xxx_test_prio_q'

                 ,Message_properties=>V_msg_properties

                  ,dequeue_options=>V_deq_options

                  ,Payload=>V_payload

                 ,Msgid=>V_msgid);

    Dbms_output.Put_line(utl_raw.cast_to_varchar2(V_payload));

end loop;

End;

 

This is the tenth message

This is the ninth message

…………

This is the second message

This is the first message

Now it’s time to look at queueing navigation. It is possible to dequeue specific messages from the queue. You can select messages with a specific msg_id, correlation or recipient_list (for mc-queueus).

We’ll first search for a specific correlation and then a message_id. We enqueue ten messages, with different correlations:

Declare

V_payload Raw(200);

V_msgid Raw(200);

V_enq_options Dbms_aq.Enqueue_options_t;

v_msg_properties dbms_aq.message_properties_t;

Begin

V_enq_options.visibility := dbms_aq.immediate;

for i in 1..10 loop

    V_payload:=Utl_raw.Cast_to_raw('This is the '||to_char(to_date(i,'J'),'jspth')||' message');

    v_msg_properties.correlation:=to_char('Corr'||i);

    Dbms_aq.Enqueue(Queue_name=>'xxx_test_q'

                 ,Message_properties=>V_msg_properties

                  ,Enqueue_options=>V_enq_options

                  ,Payload=>V_payload

                 ,Msgid=>V_msgid);

    dbms_output.Put_line('Msg_id: '||rawtohex(V_msgid)||' Correlation: Corr'||i);

end loop;

end;

 

Msg_id: E8BE83A2A2A04F1EA74863B4A7C78DAF Correlation: Corr1

Msg_id: 7159B80BC3194C7AAA6910AB10E753C5 Correlation: Corr2

Msg_id: 4AF3693CF7EE4994B0F78830371437B9 Correlation: Corr3

Msg_id: 44DBC0CB09C94BB98DF2D7E48971849C Correlation: Corr4

Msg_id: 98F3E119041E47F5BF46604E014120BF Correlation: Corr5

Msg_id: B71B7F097A9E4EDBA696958326BF6300 Correlation: Corr6

Msg_id: C4F5050B02904EEEAD2842405A0BDE2A Correlation: Corr7

Msg_id: E4D923A4CB4B4DF2B64B8421A88FFC42 Correlation: Corr8

Msg_id: BE199053188648AE8FA238A01A5C9CD1 Correlation: Corr9

Msg_id: 8991E793D2DB41F5B3F9D00D283B6F6D Correlation: Corr10

 

Now we can dequeue the 5th (correlation) and 8th (msg_id) message:

 

Declare

V_payload Raw(200);

V_msgid Raw(200);

V_deq_options Dbms_aq.dequeue_options_t;

v_msg_properties dbms_aq.message_properties_t;

Begin

v_deq_options.correlation:='Corr5';

Dbms_aq.dequeue(Queue_name=>'xxx_test_q'

,Message_properties=>V_msg_properties

,dequeue_options=>V_deq_options

,Payload=>V_payload

,Msgid=>V_msgid);

Dbms_output.Put_line(utl_raw.cast_to_varchar2(V_payload));

v_deq_options.correlation:=NULL;

v_deq_options.msgid:='E4D923A4CB4B4DF2B64B8421A88FFC42';

Dbms_aq.dequeue(Queue_name=>'xxx_test_q'

,Message_properties=>V_msg_properties

,dequeue_options=>V_deq_options

              ,Payload=>V_payload

,Msgid=>V_msgid);

Dbms_output.Put_line(utl_raw.cast_to_varchar2(V_payload));

End;

 

This is the fifth message

This is the eighth message

 

Note how we have to set the correlation back to NULL for the second dequeue. Otherwise we would be trying to dequeue a message with correlation ‘Corr5’ and the specified msg_id. Since that message does not exist, our procedure will just wait for the message to appear.

 

By default when you dequeue from an empty queue, or try to dequeue a non-available message, the dequeue will wait indefinitely for a message to appear. You can control this behavior with the dequeue options.

 

V_deq_options.wait := 10; — to wait 10 seconds. Any number of 0 or higher is allowed.

V_deq_options.wait := dbms_aq.no_wait; — not waiting for the message.

V_deq_options.wait := dbms_aq.forever; — wait indefinitely 

Do note that when the time-out is reached an ’ORA-25228: timeout in dequeue from <queue> while waiting for a message’ raised. So you will need to handle the exception.

One more feature to consider is the browsing mode. So far we have seen the messages that we dequeued were removed from the queue (or at least got status ‘Processed’). By setting the dequeue options, we can first inspect messages before dequeuing them. Consider the following. We have 8 messages left in our queue:

Declare

V_payload Raw(200);

V_msgid Raw(200);

V_deq_options Dbms_aq.dequeue_options_t;

v_deq_options_rm dbms_aq.dequeue_options_t;

v_msg_properties dbms_aq.message_properties_t;

Begin

v_deq_options.wait:=dbms_aq.no_wait;

v_deq_options.dequeue_mode:=DBMS_AQ.BROWSE;

for i in 1..10 loop

begin

 

   Dbms_aq.dequeue(Queue_name=>'xxx_test_q'

   ,Message_properties=>V_msg_properties

   ,dequeue_options=>V_deq_options

   ,Payload=>V_payload

   ,Msgid=>V_msgid);

   Dbms_output.Put_line(utl_raw.cast_to_varchar2(V_payload));

   dbms_output.put_line(v_msg_properties.correlation);

   if v_msg_properties.correlation='Corr6' then

       v_deq_options.dequeue_mode:=DBMS_AQ.REMOVE;

       v_deq_options.msgid:=v_msgid;

       Dbms_aq.dequeue(Queue_name=>'xxx_test_q'

                      ,Message_properties=>V_msg_properties

                      ,dequeue_options=>V_deq_options

                     ,Payload=>V_payload

                      ,Msgid=>V_msgid);

      Dbms_output.Put_line(utl_raw.cast_to_varchar2(V_payload));

      v_deq_options.dequeue_mode:=DBMS_AQ.BROWSE;

      v_deq_options.msgid:=NULL;

   end if;

   exception

    when others then

      null;

   end;

end loop;

End;

 

This removed only the 6th message from the queue, and left the others intact.

 

There are more options to the queuing / dequeuing like retrying failed attempts (rollback after a dequeue is considered a failed attempt) and queuing with a delay or an expiration time. But I think the information so far will allow you to test these options on a need-by basis.

 

Multi-consumer or publish-subscribe queues

 

Both ‘publish-subscribe’ and ‘multi-consumer’ are used for these queues. I think ‘multi-consumer’ is most often used informally. That will also be the one I will use in this article (even though ‘publish-subscribe’ is more accurate).

 

We build multi-consumer queues with dbms_aqadm again. But on creating the queue-table, we say that it has to be a multi-consumer queue-table:

 

Begin

dbms_aqadm.create_queue_table (queue_table=>’xxx_mc_test’

                           ,multiple_consumers=>TRUE

                           ,queue_payload_type=>’RAW’);

End;

 

Now we see more objects being created. The most important ones are:

Xxx_mc_test                      The queue table itself.

Table aq$_xxx_mc_test_s with information about the subscribers to the queue

Table aq$_xxx_mc_test_r with information about the rules for the subscriptions

IOT aq$_xxx_mc_test_h with historic information about dequeuing

IOT aq$_xxx_mc_test_i with dequeuing information

 

As you can see, a lot more information is stored for multi-consumer queues. In part this information has to do with the subscription and subscriber mechanism. But there is also the need to keep a history of the dequeuing, to know when a message has been dequeued by all subscribers.

 

We will be seeing the use of all the objects in a few minutes, when we start queuing and dequeuing messages.

 

When we try to enqueue messages on this queue now, we receive an ORA-24033: no recipients for message. This means we need to set up subscribers first. If we enqueue without a recipient list, the message will be made available for all subscribers.

 

To add a subscriber, we use the dbms_aqadm package and a new object_type: sys.aq$_agent.

This type is defined as an object of name , address and protocol. The last 2 are used in inter-system communication only.

 

We can just call the following procedure:

 

DECLARE

V_agent sys.aq$_agent;

BEGIN

   V_agent:= sys.aq$_agent('Agent1',NULL,NULL);

   DBMS_AQADM.ADD_SUBSCRIBER(queue_name=>'xxx_mc_test_q'

      ,subscriber=>v_agent);

END;

 

We can see the subscribers from the view aq$xxx_mc_test_s (or the underlying table: aq$_xxx_mc_test_s):

 

select * from aq$xxx_mc_test_s;

 

QUEUE        NAME   ADDRESS      PROTOCOL TRANSFORMATION

————– —— ——-     ——– ————–

XXX_MC_TEST_Q AGENT1              0    

 

Now let’s enqueue a message:

 

Declare

V_payload Raw(200);

V_msgid Raw(200);

V_enq_options Dbms_aq.enqueue_options_t;

v_msg_properties dbms_aq.message_properties_t;

Begin

v_payload := utl_raw.cast_to_raw('Hello world, again!');

Dbms_aq.enqueue(Queue_name=>'xxx_mc_test_q'

             ,Message_properties=>V_msg_properties

              ,enqueue_options=>V_enq_options

,Payload=>V_payload

             ,Msgid=>V_msgid);

Dbms_output.Put_line(utl_raw.cast_to_varchar2(V_msgid));

End;

 

Now when we look at the queue-view, we can see that a subscriber has been selected:

 

select queue,rawtohex(msg_id) msg_id,msg_state,consumer_name from aq$xxx_mc_test;

 

QUEUE        MSG_ID                        MSG_STATE CONSUMER_NAME

————– ——————————– ——— ————-

XXX_MC_TEST_Q BC4C48AC659946428F38F8BC3AB02184 READY     AGENT1

 

Now to dequeue the message, we also need to set the consumer_name in the dequeue_options. When enqueuing a message without a subscriber_name, it can be dequeued by all subscribers. But on dequeueing, the subscriber needs to identify itself.

 

Declare

V_payload Raw(200);

V_msgid Raw(200);

V_deq_options Dbms_aq.dequeue_options_t;

v_msg_properties dbms_aq.message_properties_t;

Begin

v_deq_options.consumer_name:='Agent1';

Dbms_aq.dequeue(Queue_name=>'xxx_mc_test_q'

,Message_properties=>V_msg_properties

,dequeue_options=>V_deq_options

,Payload=>V_payload

,Msgid=>V_msgid);

Dbms_output.Put_line(utl_raw.cast_to_varchar2(V_payload));

End;

 

Hello world, again!

 

Now when we check the queue contents, we see that the message is still there. Even after a commit, the message has been retained. Maybe you won’t see it on your system immediately. But then run:

 

Begin

Dbms_aqadm.stop_time_manager;

End;

 

And enqueue/dequeue a message again. Now when you look in xxx_mc_test or aq$xxx_mc_test, you will see the message being retained (with status ‘PROCESSED’). When you start the time_manager again, the message will disappear after some time.

 

The reason for this, is that Oracle enhances concurrency by using a separate table (IOT) for the dequeuing. When we enqueue a message again:

 

Declare

V_payload Raw(200);

V_msgid Raw(200);

V_enq_options Dbms_aq.enqueue_options_t;

v_msg_properties dbms_aq.message_properties_t;

Begin

v_payload := utl_raw.cast_to_raw('Hello world');

Dbms_aq.enqueue(Queue_name=>'xxx_mc_test_q'

,Message_properties=>V_msg_properties

,enqueue_options=>V_enq_options

,Payload=>V_payload

,Msgid=>V_msgid);

Dbms_output.Put_line(utl_raw.cast_to_varchar2(V_msgid));

End;

 

We can see the data in the dequeue-IOT:

 

select subscriber#, queue#, msg_enq_time, msgid from Aq$_xxx_mc_test_i;

 

SUBSCRIBER# QUEUE# MSG_ENQ_TIME                MSG_ID

———– —— ————————— ——————————–

          1      0 21-03-10 14:16:00,252000000 75E41875D957455B84D80B55AE06F81C       

 

Here the basic information about our message is recorded. After a subscriber dequeues the message it’s version of the record is deleted only from this table (Please try this yourself, to confirm). The queue-monitors are responsible for cleaning up the queue-table after all subscribers have dequeued the message.

 

Now let’s see what happens when we add a second subscriber for our queue:

 

DECLARE

V_agent sys.aq$_agent;

BEGIN

   V_agent:= sys.aq$_agent('Agent2',NULL,NULL);

   DBMS_AQADM.ADD_SUBSCRIBER(queue_name=>'xxx_mc_test_q'

      ,subscriber=>v_agent);

END;

 

Any messages that were enqueued already, won’t be available for this new subscriber. It can only dequeue messages enqueued after the subscriber was added.

 

Also you can’t just change subscribers in an existing session. If you try, you will get an ORA-25242: Cannot change subscriber name from string to string without FIRST_MESSAGE option.

 

As the message describes further, you need to change the navigation of the dequeue. The default navigation is next_message, which means that Oracle will read the queue in a read-consistent and ordered way. It will take a snapshot of the queue when the first message is dequeued, and will dequeue the messages in that order. Messages that were enqueued after the first dequeue, will be read after reading all the messages in the queue. Even if priority ordering means they are enqueued earlier.

 

An alternative navigation is ‘first_message’. When the navigation is set to ‘first_message’, Oracle will take a new snapshot before every dequeue, and start with the first message eligible for dequeuing.

Because we change subscribers, we need to set navigation to ‘First_message’, to force Oracle to take a new snapshot.

 

(Btw. If you would try ‘first_message’ with dequeue_mode ‘Browse’, you would never get beyond the first message. Try it!)

(Btw2. The same goes for changing the filter options like correlation.)

 

Let’s start a new session, and enqueue a new message:

 

Declare

V_payload Raw(200);

V_msgid Raw(200);

V_enq_options Dbms_aq.enqueue_options_t;

v_msg_properties dbms_aq.message_properties_t;

Begin

v_payload := utl_raw.cast_to_raw('Hello agents!');

Dbms_aq.enqueue(Queue_name=>'xxx_mc_test_q'

             ,Message_properties=>V_msg_properties

,enqueue_options=>V_enq_options

,Payload=>V_payload

,Msgid=>V_msgid);

Dbms_output.Put_line(utl_raw.cast_to_varchar2(V_msgid));

End;

 

The message is still the same in xxx_mc_test:

 

select q_name, rawtohex(msgid) msg_id,state,enq_time,enq_uid from xxx_mc_test

 

Q_NAME        MSG_ID                           STATE ENQ_TIME                    ENQ_UID             

————- ——————————– —– ————————— ——-

XXX_MC_TEST_Q 45F11423444747B99600BCD8E9B3141E     0 21-03-10 14:33:23,783000000 SYSTEM

 

But in the queue view, we now see 2 records:

 

select queue,msg_id,msg_state,enq_time,enq_user_id,consumer_name from aq$xxx_mc_test;

 

QUEUE          MSG_ID                           STATE ENQ_TIME          ENQ_USER_ID CONSUMER_NAME

————– ——————————– —– —————– ———– ————-

XXX_MC_TEST_Q 45F11423444747B99600BCD8E9B3141E READY 21-03-10 14:33:24 SYSTEM      AGENT1

XXX_MC_TEST_Q 45F11423444747B99600BCD8E9B3141E READY 21-03-10 14:33:24 SYSTEM      AGENT2

 

One record for each subscriber. We can see the same in the dequeue_iot and in the history table:

 

select subscriber#,queue#,msg_enq_time,msgid from aq$_xxx_mc_test_i;

 

SUBSCRIBER# QUEUE# MSG_ENQ_TIME                MSGID

———– —— ————————— ——————————–

          1      0 21-03-10 14:33:23,783000000 45F11423444747B99600BCD8E9B3141E

         21      0 21-03-10 14:33:23,783000000 45F11423444747B99600BCD8E9B3141E

 

Select Msgid,Subscriber#,Name,Dequeue_time,Dequeue_user From Aq$_xxx_mc_test_h;

 

MSGID                            SUBSCRIBER# NAME DEQUEUE_TIME DEQUEUE_USER

——————————– ———– —- ———— ————

45F11423444747B99600BCD8E9B3141E           1    0          

45F11423444747B99600BCD8E9B3141E          21    0          

 

Now when we dequeue the message, the queue table is not updated:

Select Rawtohex(Msgid) Msg_id,State,Enq_time,Enq_uid,deq_time,deq_uid From Xxx_mc_test;

 

MSG_ID                           STATE ENQ_TIME                    ENQ_UID DEQ_TIME DEQ_UID             

——————————– —– ————————— ——- ——– ——-

45F11423444747B99600BCD8E9B3141E     0 21-03-10 14:33:23,783000000 SYSTEM

 

However, the queue view reflects that the message has been dequeued by one subscriber.

 

Select Queue,Msg_id,Msg_state,Enq_time,Enq_user_id,Consumer_name From Aq$xxx_mc_test;

 

QUEUE         MSG_ID                           MSG_STATE ENQ_TIME          ENQ_USER CONSUMER_NAME

————- ——————————– ——— —————– ——– ————-

XXX_MC_TEST_Q 45F11423444747B99600BCD8E9B3141E PROCESSED 21-03-10 14:33:24 SYSTEM   AGENT1

XXX_MC_TEST_Q 45F11423444747B99600BCD8E9B3141E READY     21-03-10 14:33:24 SYSTEM   AGENT2

 

The record for Agent1 has been deleted from the dequeue-IOT:

 

select subscriber#,queue#,msg_enq_time,msgid from aq$_xxx_mc_test_i;

 

SUBSCRIBER# QUEUE# MSG_ENQ_TIME                MSGID

———– —— ————————— ——————————–

         21      0 21-03-10 14:33:23,783000000 45F11423444747B99600BCD8E9B3141E

 

And the history table also shows the dequeue:

 

Select msgid, subscriber#, Dequeue_time,Dequeue_user From Aq$_xxx_mc_test_h;

 

MSGID                            SUBSCRIBER# DEQUEUE_TIME                DEQUEUE_USER

45F11423444747B99600BCD8E9B3141E          21   21-03-10 14:33:23,783000000 SYSTEM

45F11423444747B99600BCD8E9B3141E           1

 

To dequeue the message for ‘Agent2’. We of course need to set the navigation to ‘First_message’:

 

Declare

V_payload Raw(200);

V_msgid Raw(200);

V_deq_options Dbms_aq.dequeue_options_t;

v_msg_properties dbms_aq.message_properties_t;

Begin

V_deq_options.Wait := Dbms_aq.No_wait;

V_deq_options.Navigation:=Dbms_aq.First_message;

v_deq_options.consumer_name:='Agent2';

Dbms_aq.dequeue(Queue_name=>'xxx_mc_test_q'

,Message_properties=>V_msg_properties

,dequeue_options=>V_deq_options

,Payload=>V_payload

,Msgid=>V_msgid);

Dbms_output.Put_line(utl_raw.cast_to_varchar2(V_payload));

End;

 

Now after the QMON has processed the queue, the records will be deleted from all queues. (When a retention time has been set, the records will of course be retained for that time).

 

Rules for multi-consumer queues

 

So far we have seen different kinds of filtering for dequeuing messages. A new option comes with multi-consumer queues, where different subscribers can put a filter on their subscriptions. These filters (rules) can take the form of (complex) predicates that return a Boolean value. The rule can reference both message_properties as payload. To reference the payload, use a qualifier of ‘tab.user_data’.

 

Let’s build a new queue. To make optimal use of the ‘rule’-functionality we’ll use a custom type that can be referred to in the ‘rules’. The type that we’ll use is loosely based on the emp table.

 

create type t_emp as object

(empno   number

,ename   varchar2(10)

,job     varchar2(9)

);

 

BEGIN

DBMS_AQADM.CREATE_QUEUE_TABLE (queue_table => 'xxx_rule_test'

,queue_payload_type => 't_emp');

DBMS_AQADM.CREATE_QUEUE(queue_name => 'xxx_rule_test_q'

 ,queue_table => 'xxx_rule_test');

DBMS_AQADM.START_QUEUE (queue_name => 'xxx_rule_test_q');

END;

 

We add 2 subscribers to this queue.

 

DECLARE

V_agent sys.aq$_agent;

BEGIN

V_agent:= sys.aq$_agent('HR_President',NULL,NULL);

DBMS_AQADM.ADD_SUBSCRIBER(queue_name=>'xxx_rule_test_q'

   ,subscriber=>v_agent

   ,rule=>'tab.user_data.job=''President''');

V_agent:= sys.aq$_agent('HR_Employee',NULL,NULL);

DBMS_AQADM.ADD_SUBSCRIBER(queue_name=>'xxx_rule_test_q'

   ,subscriber=>v_agent);

END;

 

Note how the agent ‘HR_President’ has a rule added to its subscription. Only messages where the job attribute of the payload is ‘President’ are eligible for dequeuing by this agent. Let’s enqueue some messages on this queue.

 

Declare

V_payload t_emp;

V_msgid Raw(200);

V_enq_options Dbms_aq.enqueue_options_t;

v_msg_properties dbms_aq.message_properties_t;

Begin

v_payload := t_emp(1,'Jones','Manager');

Dbms_aq.enqueue(Queue_name=>'xxx_rule_test_q'

             ,Message_properties=>V_msg_properties

,enqueue_options=>V_enq_options

,Payload=>V_payload

,Msgid=>V_msgid);

Dbms_output.Put_line(utl_raw.cast_to_varchar2(V_msgid));

v_payload := t_emp(2,'King','President');

Dbms_aq.enqueue(Queue_name=>'xxx_rule_test_q'

             ,Message_properties=>V_msg_properties

,enqueue_options=>V_enq_options

,Payload=>V_payload

,Msgid=>V_msgid);

Dbms_output.Put_line(utl_raw.cast_to_varchar2(V_msgid));

End;

 

Now we have 2 messages. Only one of which matches the rule for the ‘HR_President’ subscriber. The ‘HR_Employee’ subscriber does not have any rule, and is thus eligible for all messages. We can see this when we query the queue-view:

 

select queue,rawtohex(msg_id) msg_id,msg_state,consumer_name from aq$xxx_rule_test;

 

QUEUE        MSG_ID                              MSG_STATE CONSUMER_NAME

————— ——————————– ——— ————-

XXX_RULE_TEST_Q 4D0FF7A800834559809AD90AFCA81444 READY     HR_EMPLOYEE

XXX_RULE_TEST_Q E5A2FDFD8EE942349E9BC9DEE88CEB10 READY     HR_EMPLOYEE

XXX_RULE_TEST_Q E5A2FDFD8EE942349E9BC9DEE88CEB10 READY     HR_PRESIDENT

 

We see that both messages are enqueued for the ‘HR_Employee’. But only the message with the job ‘President’ is enqueued for the the ‘HR_President’.

 

Let’s dequeue the messages as ‘HR_President’ first, then as ‘HR_Employee’.

 

Declare

V_payload t_emp;

V_msgid Raw(200);

V_deq_options Dbms_aq.dequeue_options_t;

v_msg_properties dbms_aq.message_properties_t;

Begin

v_deq_options.wait := dbms_aq.no_wait;

v_deq_options.consumer_name:='HR_President';

Dbms_aq.dequeue(Queue_name=>'xxx_rule_test_q'

,Message_properties=>V_msg_properties

,dequeue_options=>V_deq_options

,Payload=>V_payload

,Msgid=>V_msgid);

Dbms_output.Put_line(v_payload.empno||' '||v_payload.ename);

end;

 

2 King

 

This time the first message enqueued was ignored for this subscriber. Only the message that met its rule was dequeued. When dequeuing as the ‘HR_Employee’ both messages will be dequeued.

 

Declare

V_payload t_emp;

V_msgid Raw(200);

V_deq_options Dbms_aq.dequeue_options_t;

v_msg_properties dbms_aq.message_properties_t;

Begin

v_deq_options.wait := dbms_aq.no_wait;

v_deq_options.navigation := dbms_aq.first_message;

v_deq_options.consumer_name:='HR_Employee';

Dbms_aq.dequeue(Queue_name=>'xxx_rule_test_q'

,Message_properties=>V_msg_properties

,dequeue_options=>V_deq_options

,Payload=>V_payload

,Msgid=>V_msgid);

Dbms_output.Put_line(v_payload.empno||' '||v_payload.ename);

v_deq_options.navigation := dbms_aq.next_message;

Dbms_aq.dequeue(Queue_name=>'xxx_rule_test_q'

,Message_properties=>V_msg_properties

,dequeue_options=>V_deq_options

,Payload=>V_payload

,Msgid=>V_msgid);

Dbms_output.Put_line(v_payload.empno||' '||v_payload.ename);

end;

 

1 Jones

2 King

 

After these dequeues, the queue is empty for these subscribers. The only message eligible for ‘HR_President’ was the message with ‘2,King,President’. ‘HR_Employee’ was eligible for both messages.

Remember that the ‘Rule’ must evaluate to a Boolean value. Valid references are to ‘tab.user_data.’, for object_type payloads. Also columns like ‘priority’ or ‘correlation’ from the message properties can be referenced in the rule.

 

Common issues with queues and troubleshooting

 

Above we already saw several error messages related to queues. Most of them can be expected, and should be handled in the code.

 

The most common issues with queues are from queues not started, or not started for enqueuing or dequeuing. The error messages for this should be quite clear, and you can just start the queue with the ‘dbms_aqadm.start_queue’ package. Note that when the queue is started for enqueuing or dequeuing only, you need to stop it first, then start again with the correct options enabled.

 

Another issue may occur because of the AQ error handling system. A dequeue with dequeue_mode ‘REMOVE’ that needs to roll back afterwards, is considered a failed attempt. When the number of failed attempts exceed the retry count of the queue, the message will be moved to the Exception queue. The message remains in the queue table, but with status 3: Expired. The exception_queue field will be set to the name of the exception queue.

 

These messages are not available for dequeuing anymore. They must be dequeued from the exception queue.

 

To dequeue from an exception queue, it first needs to be enabled for dequeuing. (It cannot be enabled for enqueuing). Also no subscriber_name is allowed for the dequeue.

 

Begin

Dbms_aqadm.Start_queue(Queue_name=>'aq$_xxx_mc_test_e',Enqueue=>False,Dequeue=>True);

end;

 

Declare

V_payload Raw(200);

V_msgid Raw(200);

V_deq_options Dbms_aq.dequeue_options_t;

v_msg_properties dbms_aq.message_properties_t;

Begin

V_deq_options.Wait := Dbms_aq.No_wait;

Dbms_aq.dequeue(Queue_name=>'AQ$_XXX_MC_TEST_E'

,Message_properties=>V_msg_properties

,dequeue_options=>V_deq_options

       ,Payload=>V_payload

,Msgid=>V_msgid);

Dbms_output.Put_line(Utl_raw.Cast_to_varchar2(V_payload));

End;

 

The last issue to note with queues (especially in eBS systems) is described in Metalink note  267137.1. If multi-consumer queues are created in an ASSM tablespace, or when using freelist groups, QMON will not perform space management on the IOT’s. This will result in ever growing IOT’s and eventually in deteriorating performance.