Skip to main content

Parallel Operations With pl/pgSQL


Hi,

I am pretty sure that, there will be a right heading for this post. For now, i am going with this. If you could suggest me proper heading, i will update it :-)

OK. let me explain the situation. Then will let you know what i am trying to do here, and how i did it.

Situation here is,

We have a table, which we need to run update on “R” no.of records. The update query is using some joins to get the desired result, and do update the table. 
 To process these “R” no.of records, it is taking “H” no.of hours. That too, it’s giving load on the production server. So, we planned to run this UPDATE as batch process. 
Per a batch process, we took “N” no.or records. To process this batch UPDATE, it is taking “S” no.of seconds.
With the above batch process, production server is pretty stable, and doing great. So, we planned to run these Batch updates parallel. 
 I mean, “K” sessions, running different record UPDATEs. Of-course, we can also increase the Batch size here. 
But we want to use much cpus to complete all these UPDATES as soon as possible.
Problem here is, So, as i said, we need to run multiple UPDATEs on multiple records in parallel. But, how can one session is going to communicate with other sessions on this batch records.
I mean, If one session is running updates on 1 to 1000, how could the second session knows that the other session was processing from 1 to 1000.
If the second session knows this information, this will start from 1001 to 2000 in parallel. This is the problem i am trying to solve here.

I am not sure whether this is the optimal solution, but as per my requirement it’s working. :-)  Let me know if you see any problems in it.

Object Definitions
                      Table "public.test"
 Column |  Type   |                     Modifiers
--------+---------+----------------------------------------------------
 t      | text    |
 i      | boolean |
 seq    | bigint  | not null default nextval('test_seq_seq'::regclass)

postgres=# INSERT INTO test VALUES(generate_series(1, 9000), false, generate_series(1, 9000));
INSERT 0 9000

postgres=# \ds testing
           List of relations
 Schema |  Name   |   Type   |  Owner
--------+---------+----------+----------
 public | testing | sequence | postgres
(1 row)


CREATE OR REPLACE FUNCTION public.update_test_parallel(batch integer)
 RETURNS void
 LANGUAGE plpgsql
AS $function$
DECLARE
VAR BIGINT;
DUMMY TEXT;
BEGIN

-- Adding this for Demo
--

SELECT pg_sleep(10) INTO DUMMY;

SELECT pg_advisory_lock(-1234) INTO DUMMY;

        SELECT nextval('testing') INTO VAR;
        EXECUTE E'SELECT nextval(\'testing\') FROM generate_series('||VAR||','||VAR+BATCH||')';

        -- We need to decrease the sequence value by one, since we executed nextval expression once
        -- Otherwise, it will affect the other session''s  execution.
        --
        SELECT setval('testing', currval('testing')-1) INTO DUMMY;

SELECT pg_advisory_unlock(-1234) INTO DUMMY;

        -- I want to update the test table of the column "I" with value "true".
        --
UPDATE test SET I=true WHERE SEQ BETWEEN VAR AND (VAR+BATCH);


RAISE NOTICE 'VAR IS %, VAR+BATCH IS %', VAR, (VAR+BATCH);
RAISE NOTICE 'CURRENT SEQ VALUE IS %', currval('testing');

EXCEPTION WHEN OTHERS THEN

        -- If there is an exception, we need to reset the sequence to it''s start position again.
        -- So that, the other sessions, will try with the same sequence numbers.
        --
        SELECT setval('testing', VAR-1) INTO DUMMY;
        SELECT pg_advisory_unlock(-1234) INTO DUMMY;
        RAISE EXCEPTION '%', SQLERRM;
END;
$function$;
Session 1
 
postgres=# SELECT public.update_test_parallel(3000);
NOTICE:  VAR IS 1, VAR+BATCH IS 3001
NOTICE:  CURRENT SEQ VALUE IS 3000
update_test_parallel
----------------------

(1 row)
Session 2
 
postgres=# SELECT public.update_test_parallel(3000);
NOTICE:  VAR IS 3001, VAR+BATCH IS 6001
NOTICE:  CURRENT SEQ VALUE IS 6000
update_test_parallel
----------------------

(1 row)
Session 3
 
postgres=# SELECT public.update_test_parallel(3000);
NOTICE:  VAR IS 6001, VAR+BATCH IS 9001
NOTICE:  CURRENT SEQ VALUE IS 9000
update_test_parallel
----------------------

(1 row)
Desired result
 
postgres=# SELECT COUNT(*) FROM test WHERE i is true;
count
-------
  9000
(1 row)

In the above implementation, i took "sequence" for the session's parallel execution with the help of advisory locks. Hope this helps to others as well.

Thanks as always for reading it, and welcome your inputs.
 --Dinesh Kumar

Comments

  1. Essentially what you're looking for is a Queuing mechanism in PostgreSQL. In the future (PostgreSQL 9.5+) this could be easily implemented using the SKIP LOCKED feature:
    http://michael.otacoo.com/postgresql-2/postgres-9-5-feature-highlight-skip-locked-row-level/

    ReplyDelete
    Replies
    1. Yes True. I was trying this in PG <9.5 versions.

      Delete
  2. It is actually a great and helpful piece of information about Java. I am satisfied that you simply shared this helpful information with us. Please stay us informed like this. Thanks for sharing.

    Java training in chennai | Java training in annanagar | Java training in omr | Java training in porur | Java training in tambaram | Java training in velachery

    ReplyDelete

Post a Comment

Popular posts from this blog

How To Send E-Mail From PostgreSQL

Hi ,

If you want to send E-Mails from PostgreSQL, then use the below Python 3.2 Script as below. I have used ActivePython 3.2 with PostgreSQL 9.1 for sending E-Mails from PostgreSQL.

If you want to configure the Python 3.2 with PostgreSQL 9.1 then, please refer the below steps.

http://manojadinesh.blogspot.in/2012/06/fatal-python-error-pyinitialize-unable.html

Once, your Python 3.2 successful then follow the below steps to send an e-mail.

Step 1
=====

postgres=# CREATE OR REPLACE FUNCTION public.send_email(_from Text,_password Text,smtp Text,port INT,receiver text, subject text, send_message text) RETURNS TEXT  LANGUAGE plpython3u
AS $function$

import smtplib
sender = _from
receivers = receiver
message = ("From: %s\nTo: %s\nSubject: %s\n\n %s"
 % (_from,receiver,subject,send_message))

try:

  smtpObj = smtplib.SMTP(smtp,port)
  smtpObj.starttls()
  smtpObj.login(_from, _password)
  smtpObj.sendmail(sender, receivers,message)
  print ('Successfully sent email')
except SMTPException…

:: Rownum in postgresql ::

Hi All,

As we know,we can generate rownum with window fuctions also(Rank).In oracle we have "rownum" pseduo column,but we don't in postgresql.
Here is the one of the solution for generating rownum in postgresql.

In Oracle
-----------

SQL> CREATE SEQUENCE ROWNUMSEQ;

Sequence created.


SQL> SELECT EMPNO,ROWNUMSEQ.NEXTVAL AS "ROWNUM" FROM EMP;

EMPNO ROWNUM
---------- ----------
7369 1
7499 2
7521 3
7566 4
7654 5
7698 6
7782 7
7788 8
7839 9
7844 10
7876 11
7900 12
7902 13
7934 14


In PostgreSQL
----------------

postgres=# CREATE SEQUENCE ROWNUMSEQ;
CREATE SEQUENCE

postgres=# CREATE TABLE TEST(T INT);
CREATE TABLE

postgres=# INSERT INTO TEST VALUES(GENERATE_SERIES(1,14));
INSERT 0 14


postgres=# SELECT T,NEXTVAL('ROWNUMSEQ') AS ROWNUM FROM TES…

DBLINK in PostgreSQL

Hi,

Find the below DBLINK concepts in PostgreSQL, PostgresPlus as well.


PostgreSQL
----------
Foreign Data Wrapper
--------------------
This is the one utility which encapsulates the data from the different Database servers using Handler functions.

Ex:- In PG 9.1, we can encapsulates the data from My Sql and store to PostgreSQL using handlers.
CREATE FOREIGN DATA WRAPPER HANDLER VALIDATOR ;

Currently(<= 9.0) PostgreSQL compatible with only remote PostgreSQL. From 9.1 onwards, we can make the PostgreSQL to communicate with SQL Server/My SQL through these wrappers. Servers ------- This is the one object which is having the Data wrapper methodology as well the remote host connectivity details. Ex:- CREATE SERVER FOREIGN DATA WRAPPER OPTIONS (address , port ,dbname );

User Mapping
------------
This is the one object which maps the current dbserver user/role to remote dbserver user.

Ex:-CREATE USER MAPPING FOR SERVER OPTIONS (username , password );

DBLINK
------
Dblink is one o…