Skip to main content

Parallel Operations With pl/pgSQL


Hi,

I am pretty sure that, there will be a right heading for this post. For now, i am going with this. If you could suggest me proper heading, i will update it :-)

OK. let me explain the situation. Then will let you know what i am trying to do here, and how i did it.

Situation here is,

We have a table, which we need to run update on “R” no.of records. The update query is using some joins to get the desired result, and do update the table. 
 To process these “R” no.of records, it is taking “H” no.of hours. That too, it’s giving load on the production server. So, we planned to run this UPDATE as batch process. 
Per a batch process, we took “N” no.or records. To process this batch UPDATE, it is taking “S” no.of seconds.
With the above batch process, production server is pretty stable, and doing great. So, we planned to run these Batch updates parallel. 
 I mean, “K” sessions, running different record UPDATEs. Of-course, we can also increase the Batch size here. 
But we want to use much cpus to complete all these UPDATES as soon as possible.
Problem here is, So, as i said, we need to run multiple UPDATEs on multiple records in parallel. But, how can one session is going to communicate with other sessions on this batch records.
I mean, If one session is running updates on 1 to 1000, how could the second session knows that the other session was processing from 1 to 1000.
If the second session knows this information, this will start from 1001 to 2000 in parallel. This is the problem i am trying to solve here.

I am not sure whether this is the optimal solution, but as per my requirement it’s working. :-)  Let me know if you see any problems in it.

Object Definitions
                      Table "public.test"
 Column |  Type   |                     Modifiers
--------+---------+----------------------------------------------------
 t      | text    |
 i      | boolean |
 seq    | bigint  | not null default nextval('test_seq_seq'::regclass)

postgres=# INSERT INTO test VALUES(generate_series(1, 9000), false, generate_series(1, 9000));
INSERT 0 9000

postgres=# \ds testing
           List of relations
 Schema |  Name   |   Type   |  Owner
--------+---------+----------+----------
 public | testing | sequence | postgres
(1 row)


CREATE OR REPLACE FUNCTION public.update_test_parallel(batch integer)
 RETURNS void
 LANGUAGE plpgsql
AS $function$
DECLARE
VAR BIGINT;
DUMMY TEXT;
BEGIN

-- Adding this for Demo
--

SELECT pg_sleep(10) INTO DUMMY;

SELECT pg_advisory_lock(-1234) INTO DUMMY;

        SELECT nextval('testing') INTO VAR;
        EXECUTE E'SELECT nextval(\'testing\') FROM generate_series('||VAR||','||VAR+BATCH||')';

        -- We need to decrease the sequence value by one, since we executed nextval expression once
        -- Otherwise, it will affect the other session''s  execution.
        --
        SELECT setval('testing', currval('testing')-1) INTO DUMMY;

SELECT pg_advisory_unlock(-1234) INTO DUMMY;

        -- I want to update the test table of the column "I" with value "true".
        --
UPDATE test SET I=true WHERE SEQ BETWEEN VAR AND (VAR+BATCH);


RAISE NOTICE 'VAR IS %, VAR+BATCH IS %', VAR, (VAR+BATCH);
RAISE NOTICE 'CURRENT SEQ VALUE IS %', currval('testing');

EXCEPTION WHEN OTHERS THEN

        -- If there is an exception, we need to reset the sequence to it''s start position again.
        -- So that, the other sessions, will try with the same sequence numbers.
        --
        SELECT setval('testing', VAR-1) INTO DUMMY;
        SELECT pg_advisory_unlock(-1234) INTO DUMMY;
        RAISE EXCEPTION '%', SQLERRM;
END;
$function$;
Session 1
 
postgres=# SELECT public.update_test_parallel(3000);
NOTICE:  VAR IS 1, VAR+BATCH IS 3001
NOTICE:  CURRENT SEQ VALUE IS 3000
update_test_parallel
----------------------

(1 row)
Session 2
 
postgres=# SELECT public.update_test_parallel(3000);
NOTICE:  VAR IS 3001, VAR+BATCH IS 6001
NOTICE:  CURRENT SEQ VALUE IS 6000
update_test_parallel
----------------------

(1 row)
Session 3
 
postgres=# SELECT public.update_test_parallel(3000);
NOTICE:  VAR IS 6001, VAR+BATCH IS 9001
NOTICE:  CURRENT SEQ VALUE IS 9000
update_test_parallel
----------------------

(1 row)
Desired result
 
postgres=# SELECT COUNT(*) FROM test WHERE i is true;
count
-------
  9000
(1 row)

In the above implementation, i took "sequence" for the session's parallel execution with the help of advisory locks. Hope this helps to others as well.

Thanks as always for reading it, and welcome your inputs.
 --Dinesh Kumar

Comments

  1. Essentially what you're looking for is a Queuing mechanism in PostgreSQL. In the future (PostgreSQL 9.5+) this could be easily implemented using the SKIP LOCKED feature:
    http://michael.otacoo.com/postgresql-2/postgres-9-5-feature-highlight-skip-locked-row-level/

    ReplyDelete
    Replies
    1. Yes True. I was trying this in PG <9.5 versions.

      Delete

Post a Comment

Popular posts from this blog

Pgpool Configuration & Failback

I would like to share the pgpool configuration, and it's failback mechanism in this post.

Hope it will be helpful to you in creating pgpool and it's failback setup.

Pgpool Installation & Configuration

1. Download the pgpool from below link(Latest version is 3.2.1).
    http://www.pgpool.net/mediawiki/index.php/Downloads


2. Untart the pgpool-II-3.2.1.tar.gz and goto pgpool-II-3.2.1 directory.

3. Install the pgpool by executing the below commands:

./configure ­­prefix=/opt/PostgreSQL92/ ­­--with­-pgsql­-includedir=/opt/PostgreSQL92/include/ --with­-pgsql­-libdir=/opt/PostgreSQL92/lib/ make make install 4. You can see the pgpool files in /opt/PostgreSQL92/bin location.
/opt/PostgreSQL92/bin $ ls clusterdb   droplang  pcp_attach_node  pcp_proc_count pcp_systemdb_info  pg_controldata  pgpool pg_test_fsync pltcl_loadmod  reindexdb createdb    dropuser  pcp_detach_node  pcp_proc_info createlang  ecpg      pcp_node_count   pcp_promote_node oid2name  pcp_pool_status  pcp_stop_pgpool  …

pgBucket - A new concurrent job scheduler

Hi All,

I'm so excited to announce about my first contribution tool for postgresql. I have been working with PostgreSQL from 2011 and I'm really impressed with such a nice database.

I started few projects in last 2 years like pgHawk[A beautiful report generator for Openwatch] , pgOwlt [CUI monitoring. It is still under development, incase you are interested to see what it is, attaching the image here for you ],


pgBucket [Which I'm gonna talk about] and learned a lot and lot about PostgreSQL/Linux internals.

Using pgBucket we can schedule jobs easily and we can also maintain them using it's CLI options. We can update/insert/delete jobs at online. And here is its architecture which gives you a basic idea about how it works.


Yeah, I know there are other good job schedulers available for PostgreSQL. I haven't tested them and not comparing them with this, as I implemented it in my way.
Features are: OS/DB jobsCron style sytaxOnline job modificationsRequired cli options

pgBucket v1.0 is ready

pgBucket v1.0 pgBucket v1.0 (concurrent job scheduler for PostgreSQL) is released. This version is more stable and fixed the issues which was observed in the previous beta releases.
Highlights of this tool are Schedule OS/DB level jobsCron style syntax {Schedule up to seconds}On fly job modificationsInstant daemon status by retrieving live job queue, job hashEnough cli options to deal with all the configured/scheduled job Here is the URL for the pgBucket build/usage instructions. https://bitbucket.org/dineshopenscg/pgbucket
I hope this tool will be helpful for the PostgreSQL users to get things done in the scheduled time. Note: This tool requires c++11{gcc version >= 4.9.3} to compile.
--Dinesh