← Back to team overview

maria-discuss team mailing list archive

Re: stored programs

 

One other thing (and this is the last thing, I promise!) some functions,
like cume_dist don't take a framing clause, but others like sum do.

Here is how I handle it:
$state->windows[$num]['mode']  'RANGE'; or 'ROWS'
$state->windows[$num]['start']  // false means unbounded preceeding,
otherwise numeric means offset into resultset, zero is "current row"
$state->windows[$num]['end']   // false means unbounded following,
otherwise it numeric is an offset, zero is "current row"

Here is the code for wf_sum, which uses it:
protected function wf_sum($num,$state) {
    static $sum;
    $win = $state->windows[$num];
    if($win['order_by'] == "") {
      $sql = "SELECT distinct wf{$num}_hash h from " . $state->table_name;
      $stmt = $state->DAL->my_query($sql);
      if($err = $state->DAL->my_error()) {
        $this->errors[] = $err;
        return false;
      }
      while($row = $state->DAL->my_fetch_assoc($stmt)) {
        $colref = "wf{$num}";
        $sql = "select sum($colref) as s from (select " .
$state->winfunc_sql . " WHERE wf{$num}_hash = '{$row['h']}') sq";
        $stmt2 = $state->DAL->my_query($sql);
        $row2 = $state->DAL->my_fetch_assoc($stmt2);
        $sum = $row2['s'];
        $sql = "UPDATE " . $state->table_name . " SET wf$num = $sum WHERE
wf{$num}_hash = '{$row['h']}'";
        $state->DAL->my_query($sql);
        if($err = $state->DAL->my_error()) {
          $this->errors[] = $err;
          return false;
        }
      }
      return true;
    } else {
      $sql = "SELECT distinct wf{$num}_hash h from " . $state->table_name .
" ORDER BY " . $win['order_by'];
      $stmt = $state->DAL->my_query($sql);
      if($err = $state->DAL->my_error()) {
        $this->errors[] = $err;
        return false;
      }
      while($row = $state->DAL->my_fetch_assoc($stmt)) { /* loop over each
partition */

        $sql = "SELECT *," . $state->winfunc_sql . " where wf{$num}_hash='"
. $row['h'] . "'" . $state->winfunc_group . " ORDER BY " . $win['order_by'];
        $partition_rows = $this->get_all_rows($sql, $state);
        if(!$partition_rows) return false;

        $colref = "wf{$num}";

        for($i=0;$i<count($partition_rows);++$i) {
          $row3 = $partition_rows[$i];
          $frame = $this->frame_window($partition_rows, $win, $i, $colref,
"wf{$num}_obhash");

          if($this->all_null($frame)) { // will also return true on empty
set
            $sum = "NULL";
          } else {
            $sum = array_sum($frame);
          }

          $sql = "UPDATE " . $state->table_name . " SET wf{$num} = {$sum}
WHERE wf_rownum = {$row3['wf_rownum']}";
          $state->DAL->my_query($sql);
          if($err = $state->DAL->my_error()) {
            $this->errors[] = $err;
            return false;
          }
        }
      }
    }
    return true;
  }

/* This function calculates the 'frame' for a window */
  protected function &frame_window(&$rows,$win, $cur=0,$key = "wf_rownum",
$ob_key="",$default=null) {
    $key = ltrim($key,",");
    $start = $win['start'];
    $end = $win['end'];
    $mode = $win['mode'];
    $peers = true;

    if($start === 0) { // 0 is current_row
      $start = $cur;
    } elseif($start === false) { //unbounded preceeding
      $start =0;
    } else {
      $start = $cur + $start; // positive if "value following" or negative
for "value preceeding"
    }

    if($end === false) {
      $end = count($rows); // unbounded following
    } elseif($end === 0) {
      $end = $cur;
    } else {
      $end = $cur + $end; // positive for "value following", negative for
"value preceeding"
    }

    if($mode == 'ROWS') $peers = false;
    $vals = array();
    /* The frame can extend from before the resultset or past the end of
it, but the
       values are NULL when that happens.
    */
    if($start < 0) {
      $rows_to_add = abs($start);
      for($i=1;$i<=$rows_to_add;$i++) {
        $rows[-1*$i][$key] = $default;
      }
    }

    if($end < 0) {
      $rows_to_add = abs($end);
      for($i=1;$i<=$rows_to_add;$i++) {
        $rows[count($rows)+($i-1)][$key] = $default;
      }
    }

    if($start > count($rows)) {
      $rows_to_add = $start - count($rows);
      for($i=1;$i<=$rows_to_add;$i++) {
        $rows[count($rows)+($i-1)][$key] = $default;
      }
    }

    if($end> count($rows)) {
      $rows_to_add = $end - count($rows);
      for($i=1;$i<=$rows_to_add;$i++) {
        $rows[count($rows)+($i-1)][$key] = $default;
      }
    }

    $i = $start;

    while($i<count($rows)) {

      $row = $rows[$i];
      $val = $row[$key];
      if(!empty($row[$ob_key])) $sort = $row[$ob_key];
      $vals[] = $val;
      if($i == $end && !$peers) break;
      if($i == $end && !empty($row[$ob_key])) {
        for($n=$i+1;$n<count($rows);++$n) { // continue through peers
          $row2 = $rows[$n];
          $val2 = $row2[$key];
          $sort2 = $row2[$ob_key];
          if($sort != $sort2) break 2;
          $vals[] = $val;
          ++$i;
        }
      }
      ++$i;
    }

    return $vals;
  }





On Tue, Mar 3, 2015 at 2:20 PM, Justin Swanhart <greenlion@xxxxxxxxx> wrote:

> Hi,
>
> One other thing, in that example I sent, there is $state->winfunc_sql.
> For those window functions that take column input, it will be the column to
> input into the function, and is always of form "expr$[0-9]+ as wf[0-9]+"
> For example, for the following query it is "expr$0 as wf0 "
> SELECT salary, sum(salary) OVER (ORDER BY salary) as ss FROM empsalary;
>
> The SQL looks like this:
>  1610 Query     SELECT NULL as wf_rownum, salary AS expr$0,NULL  as
> wf0,SHA1(CONCAT_WS('#','ONE_PARTITION')) as
> wf0_hash,salary,SHA1(CONCAT_WS('#',salary)) as wf0_obhash
> FROM empsalary  AS `empsalary`  WHERE  1=1
>                  1611 Query     INSERT INTO `aggregation_tmp_86564985`
> VALUES
> (NULL,5200,NULL,'3dce9641066f316f334c8008fd3e364274470068',5200,'2fad4efb8e6aaaa53ecdf638137d390e002f9783'),(NULL,4200,NULL,'3dce9641066f316f334c8008fd3e364274470068',4200,'c79fce75b1583ddd36a96178757e0d8d0ac91228'),(NULL,4500,NULL,'3dce9641066f316f334c8008fd3e364274470068',4500,'97a87b470fe9ed5ff51ff9b8543e937e6016d48c'),(NULL,6000,NULL,'3dce9641066f316f334c8008fd3e364274470068',6000,'31d9ddeaa80bc88c1f3117b9724726ebcc7fc72d'),(NULL,5200,NULL,'3dce9641066f316f334c8008fd3e364274470068',5200,'2fad4efb8e6aaaa53ecdf638137d390e002f9783'),(NULL,3500,NULL,'3dce9641066f316f334c8008fd3e364274470068',3500,'65609286cc04ece831a844984a6bc9eb80450cf7'),(NULL,3900,NULL,'3dce9641066f316f334c8008fd3e364274470068',3900,'5446569e8572251dcac168152d6c37074427eae3'),(NULL,4800,NULL,'3dce9641066f316f334c8008fd3e364274470068',4800,'98e9f55262269a05dd4f1ed788626580fdef2e95'),(NULL,5000,NULL,'3dce9641066f316f334c8008fd3e364274470068',5000,'f8237d8959e03355010bb85cc3dc46a46fb31110'),(NULL,4800,NULL,'3dce9641066f316f334c8008fd3e364274470068',4800,'98e9f55262269a05dd4f1ed788626580fdef2e95')
>
>                  1607 Query     SELECT distinct wf0_hash h from
> aggregation_tmp_86564985 ORDER BY salary asc
>                  1607 Query     SELECT *,expr$0 as wf0
> FROM `aggregation_tmp_86564985` where
> wf0_hash='3dce9641066f316f334c8008fd3e364274470068' ORDER BY salary asc
>                  1607 Query     UPDATE aggregation_tmp_86564985 SET wf0 =
> 3500 WHERE wf_rownum = 6
>                  1607 Query     UPDATE aggregation_tmp_86564985 SET wf0 =
> 7400 WHERE wf_rownum = 7
>                  1607 Query     UPDATE aggregation_tmp_86564985 SET wf0 =
> 11600 WHERE wf_rownum = 2
>                  1607 Query     UPDATE aggregation_tmp_86564985 SET wf0 =
> 16100 WHERE wf_rownum = 3
>                  1607 Query     UPDATE aggregation_tmp_86564985 SET wf0 =
> 25700 WHERE wf_rownum = 8
>                  1607 Query     UPDATE aggregation_tmp_86564985 SET wf0 =
> 25700 WHERE wf_rownum = 10
>                  1607 Query     UPDATE aggregation_tmp_86564985 SET wf0 =
> 30700 WHERE wf_rownum = 9
>                  1607 Query     UPDATE aggregation_tmp_86564985 SET wf0 =
> 41100 WHERE wf_rownum = 1
>                  1607 Query     UPDATE aggregation_tmp_86564985 SET wf0 =
> 41100 WHERE wf_rownum = 5
>                  1607 Query     UPDATE aggregation_tmp_86564985 SET wf0 =
> 47100 WHERE wf_rownum = 4
>
>
>
>
> The cumulative distance example doesn't use any column input so in the SQL
> examples you see "NULL as wf0." for the window sql
>
> --Justin
>
> On Tue, Mar 3, 2015 at 1:40 PM, Justin Swanhart <greenlion@xxxxxxxxx>
> wrote:
>
>> Hi,
>>
>> I was a little confusing.  The query orders of the order by columns
>> (salary is project out into the temporary table) but, later ob_hash is
>> used.  This is because the frame extends to all the values the order the
>> same, and comparing multiple columns is hard, so a hash is used instead.
>>
>> --Justin
>>
>> On Tue, Mar 3, 2015 at 1:28 PM, Justin Swanhart <greenlion@xxxxxxxxx>
>> wrote:
>>
>>> Hi,
>>>
>>> Well here is how Shard-Query does them.  I assume you would do something
>>> very similarly internally with a temp table.
>>>
>>> a) it create a temporary table for the query reserving null rows for the
>>> window functions
>>> b) it adds to the temporary a unique id for each row of the resultset.
>>> This is used for framing.
>>> c) it adds a hash of the order by columns for the window function for
>>> ordering
>>> d) it adds a hash of the partition columns for partitioning
>>>
>>> After the regular resultset is stored in the temp table, a function
>>> sweeps the table for
>>> each window function, calculating the result of the function based on
>>> the framing clause,
>>> then the column in the resultset is updated to reflect the computed
>>> value.
>>>
>>> Finally, the resultset is returned to the client.
>>>
>>> Here is the SQL log of the following query:
>>> mysql> call shard_query.sq_helper("SELECT depname, empno, salary,
>>> cume_dist() OVER (PARTITION BY depname ORDER by salary rows between 1
>>> following and 1 following) ss FROM empsalary", "", 'test','testtab',1,1);
>>>  +-----------+-------+--------+------------------+
>>> | depname   | empno | salary | ss               |
>>> +-----------+-------+--------+------------------+
>>> | develop   | 7     | 4200   | 0.2              |
>>> | develop   | 9     | 4500   | 0.4              |
>>> | develop   | 11    | 5200   | 0.8              |
>>> | develop   | 10    | 5200   | 0.8              |
>>> | develop   | 8     | 6000   | 1                |
>>> | sales     | 3     | 4800   | 0.66666666666667 |
>>> | sales     | 4     | 4800   | 0.66666666666667 |
>>> | sales     | 1     | 5000   | 1                |
>>> | personnel | 5     | 3500   | 0.5              |
>>> | personnel | 2     | 3900   | 1                |
>>> +-----------+-------+--------+------------------+
>>> 10 rows in set (0.08 sec)
>>>
>>> Query OK, 0 rows affected (0.09 sec)
>>>
>>> 150303 13:12:05 1533 Query call shard_query.sq_helper("SELECT depname,
>>> empno, salary, cume_dist() OVER (PARTITION BY depname ORDER by salary rows
>>> between 1 following and 1 following) ss FROM empsalary", "",
>>> 'test','testtab',1,1)
>>>
>>> -- get meta data for resultset (notice 0=1 in where clause)
>>>  1553 Query SELECT NULL as wf_rownum, depname AS expr$0,empno AS
>>> expr$1,salary AS expr$2,NULL as wf0,depname,SHA1(CONCAT_WS('#',depname)) as
>>> wf0_hash,salary,SHA1(CONCAT_WS('#',salary)) as wf0_obhash
>>> FROM empsalary  AS `empsalary`  WHERE  1=1   LIMIT 0
>>>
>>> -- create temp table
>>>  1553 Query CREATE TABLE aggregation_tmp_45403179 (wf_rownum bigint
>>> auto_increment primary key,expr$0 VARCHAR(255),expr$1 VARCHAR(255),expr$2
>>> VARCHAR(255),wf0 VARCHAR(255),depname VARCHAR(255),wf0_hash
>>> VARCHAR(255),salary VARCHAR(255),wf0_obhash VARCHAR(255)) ENGINE=MYISAM
>>>
>>> -- get resultset
>>>  1556 Query SELECT NULL as wf_rownum, depname AS expr$0,empno AS
>>> expr$1,salary AS expr$2,NULL as wf0,depname,SHA1(CONCAT_WS('#',depname)) as
>>> wf0_hash,salary,SHA1(CONCAT_WS('#',salary)) as wf0_obhash
>>> FROM empsalary  AS `empsalary`  WHERE  1=1
>>>
>>> -- store resultset
>>>  1557 Query INSERT INTO `aggregation_tmp_45403179` VALUES
>>> (NULL,'develop',11,5200,NULL,'develop','418a6bc4deccf0f7d5182192d51a54e504b3f3c9',5200,'2fad4efb8e6aaaa53ecdf638137d390e002f9783'),(NULL,'develop',7,4200,NULL,'develop','418a6bc4deccf0f7d5182192d51a54e504b3f3c9',4200,'c79fce75b1583ddd36a96178757e0d8d0ac91228'),(NULL,'develop',9,4500,NULL,'develop','418a6bc4deccf0f7d5182192d51a54e504b3f3c9',4500,'97a87b470fe9ed5ff51ff9b8543e937e6016d48c'),(NULL,'develop',8,6000,NULL,'develop','418a6bc4deccf0f7d5182192d51a54e504b3f3c9',6000,'31d9ddeaa80bc88c1f3117b9724726ebcc7fc72d'),(NULL,'develop',10,5200,NULL,'develop','418a6bc4deccf0f7d5182192d51a54e504b3f3c9',5200,'2fad4efb8e6aaaa53ecdf638137d390e002f9783'),(NULL,'personnel',5,3500,NULL,'personnel','fc08ce9ebee8734c2dc883c0dbd607686bdce8f3',3500,'65609286cc04ece831a844984a6bc9eb80450cf7'),(NULL,'personnel',2,3900,NULL,'personnel','fc08ce9ebee8734c2dc883c0dbd607686bdce8f3',3900,'5446569e8572251dcac168152d6c37074427eae3'),(NULL,'sales',3,4800,NULL,'sales','59248c4dae276a021cb296d2ee0e6a0c962a8d7f',4800,'98e9f55262269a05dd4f1ed788626580fdef2e95'),(NULL,'sales',1,5000,NULL,'sales','59248c4dae276a021cb296d2ee0e6a0c962a8d7f',5000,'f8237d8959e03355010bb85cc3dc46a46fb31110'),(NULL,'sales',4,4800,NULL,'sales','59248c4dae276a021cb296d2ee0e6a0c962a8d7f',4800,'98e9f55262269a05dd4f1ed788626580fdef2e95')
>>>
>>> -- handle window function 0 (the only one in this case)
>>>
>>> -- get the hashes for each partition
>>>  1553 Query SELECT distinct wf0_hash h from aggregation_tmp_45403179
>>> ORDER BY salary asc
>>>
>>> -- compute the values for each partition (three in this case)
>>>
>>> -- process the window and update the temp table (see code at the end for
>>> wf_cume) for each partition in turn.
>>> -- as you can see there is a select followed by updates
>>>
>>>  1553 Query SELECT *,NULL as wf0
>>> FROM `aggregation_tmp_45403179` where
>>> wf0_hash='fc08ce9ebee8734c2dc883c0dbd607686bdce8f3' ORDER BY salary asc
>>>
>>>
>>>  1553 Query UPDATE aggregation_tmp_45403179 SET wf0 = 0.5 WHERE
>>> wf_rownum in (6)
>>>  1553 Query UPDATE aggregation_tmp_45403179 SET wf0 = 1 WHERE wf_rownum
>>> in (7)
>>>  1553 Query SELECT *,NULL as wf0
>>> FROM `aggregation_tmp_45403179` where
>>> wf0_hash='59248c4dae276a021cb296d2ee0e6a0c962a8d7f' ORDER BY salary asc
>>>  1553 Query UPDATE aggregation_tmp_45403179 SET wf0 = 0.66666666666667
>>> WHERE wf_rownum in (8,10)
>>>  1553 Query UPDATE aggregation_tmp_45403179 SET wf0 = 1 WHERE wf_rownum
>>> in (9)
>>>  1553 Query SELECT *,NULL as wf0
>>> FROM `aggregation_tmp_45403179` where
>>> wf0_hash='418a6bc4deccf0f7d5182192d51a54e504b3f3c9' ORDER BY salary asc
>>>  1553 Query UPDATE aggregation_tmp_45403179 SET wf0 = 0.2 WHERE
>>> wf_rownum in (2)
>>>  1553 Query UPDATE aggregation_tmp_45403179 SET wf0 = 0.4 WHERE
>>> wf_rownum in (3)
>>>  1553 Query UPDATE aggregation_tmp_45403179 SET wf0 = 0.8 WHERE
>>> wf_rownum in (1,5)
>>>  1553 Query UPDATE aggregation_tmp_45403179 SET wf0 = 1 WHERE wf_rownum
>>> in (4)
>>>
>>> -- return resultset to client
>>>
>>>  1559 Query SELECT expr$0 AS `depname`,expr$1 AS `empno`,expr$2 AS
>>> `salary`,wf0 as `ss`
>>> FROM `aggregation_tmp_45403179`   ORDER BY wf0_hash,salary asc
>>>
>>> -- remove temp table
>>>
>>>  1559 Query DROP TABLE IF EXISTS aggregation_tmp_45403179
>>>  1559 Quit
>>>  1550 Quit
>>>
>>> 150303 13:12:12 1533 Query set global general_log=0
>>>
>>>
>>>
>>>  protected function wf_cume_dist($num,$state) {
>>>     static $sum;
>>>     $win = $state->windows[$num];
>>>     if(empty($win['order'])) {
>>>       if($percent)
>>>         $sql = "update " . $state->table_name . " set wf{$num}=1";
>>>       else
>>>         $sql = "update " . $state->table_name . " set wf{$num}=0";
>>>
>>>       $state->DAL->my_query($sql);
>>>       if($err = $state->DAL->my_error()) {
>>>         $this->errors[] = $err;
>>>         return false;
>>>       }
>>>       return true;
>>>     } else {
>>>       /* running sum*/
>>>       $sql = "SELECT distinct wf{$num}_hash h from " .
>>> $state->table_name . " ORDER BY " . $win['order_by'];
>>>       $stmt = $state->DAL->my_query($sql);
>>>       if($err = $state->DAL->my_error()) {
>>>         $this->errors[] = $err;
>>>         return false;
>>>       }
>>>       $last_hash = "";
>>>       $hash = "";
>>>       $last_ob_hash = "";
>>>       $ob_hash = "";
>>>       while($row = $state->DAL->my_fetch_assoc($stmt)) {
>>>         #$sql = "select * from " . $state->table_name . " where
>>> wf{$num}_hash='" . $row['h'] . "' ORDER BY " . $win['order_by'];
>>>         $sql = "SELECT *," . $state->winfunc_sql . " where
>>> wf{$num}_hash='" . $row['h'] . "' ORDER BY " . $win['order_by'];
>>>         $stmt2 = $state->DAL->my_query($sql);
>>>         if($err = $state->DAL->my_error()) {
>>>           $this->errors[] = $err;
>>>           return false;
>>>         }
>>>         $done=array();
>>>         $rows=array();
>>>         while($row2=$state->DAL->my_fetch_assoc($stmt2)) {
>>>           $rows[] = $row2;
>>>         }
>>>         $last_hash = "";
>>>         $last_ob_hash = "";
>>>         $i = 0;
>>>         $rowlist="";
>>>         $rank = 0;
>>>
>>>         while($i<count($rows)) {
>>>           $row2 = $rows[$i];
>>>           ++$rank;
>>>           $ob_hash = $row2["wf{$num}_obhash"];
>>>           $rowlist=$row2['wf_rownum'];
>>>           for($n=$i+1;$n<count($rows);++$n) {
>>>             $row3 = $rows[$n];
>>>             $new_ob_hash = $row3["wf{$num}_obhash"];
>>>             if($new_ob_hash != $ob_hash) {
>>>               break;
>>>             }
>>>             $rowlist .= "," . $row3['wf_rownum'];
>>>             ++$i;
>>>             ++$rank;
>>>           }
>>>           $dist = $rank/count($rows);
>>>           $sql = "UPDATE " . $state->table_name . " SET wf{$num} =
>>> {$dist} WHERE wf_rownum in ({$rowlist})";
>>>           $state->DAL->my_query($sql);
>>>           if($err = $state->DAL->my_error()) {
>>>             $this->errors[] = $err;
>>>             return false;
>>>           }
>>>           ++$i;
>>>         }
>>>       }
>>>     }
>>>     return true;
>>>   }
>>>
>>>
>>>
>>> On Tue, Mar 3, 2015 at 12:55 PM, Igor Babaev <igor@xxxxxxxxxxxx> wrote:
>>>
>>>> On 03/03/2015 10:25 AM, Sergei Golubchik wrote:
>>>> > Hi, Igor!
>>>> >
>>>> > On Mar 03, Igor Babaev wrote:
>>>> >>>
>>>> >>>> I'd also like to discuss window functions too.  I've implemented
>>>> them
>>>> >>>> in shard-query and have ideas about how to implement them in the
>>>> >>>> server, but pluggable parser would be really useful here.
>>>> >>>
>>>> >>> Window functions have a good chance of being in 10.2, it's
>>>> MDEV-6115.
>>>> >>> But I don't think that somebody is working on MDEV-6115 yet.
>>>> >>
>>>> >> I started working on MDEV-6115 some time ago.
>>>> >
>>>> > Ah, great. Sorry, I didn't know it.
>>>> > Could you then discuss it with Justin, please?
>>>> > See above, he has some ideas about the implementation.
>>>>
>>>>
>>>> Justin,
>>>> How do you prefer discussing your ideas?
>>>> On IRC (#maria) or by phone? (I don't have skype at the moment)
>>>>
>>>> Regards,
>>>> Igor.
>>>>
>>>> >
>>>> > Regards,
>>>> > Sergei
>>>>
>>>>
>>>> _______________________________________________
>>>> Mailing list: https://launchpad.net/~maria-discuss
>>>> Post to     : maria-discuss@xxxxxxxxxxxxxxxxxxx
>>>> Unsubscribe : https://launchpad.net/~maria-discuss
>>>> More help   : https://help.launchpad.net/ListHelp
>>>>
>>>
>>>
>>
>

References