Google BigQuery: Динамический UPSERT с помощью EXECUTE IMMEDIATE

Обычный паттерн обновления данных в BigQuery — добавление новых записей, даже если это означает дублирование данных. А затем обрабатывать дубликаты при чтении с помощью группировки или оконных функций, или удаление записей, и последующая повторная запись одних и тех же данных, но уже с новыми значениями. В любом случае все эти варианты не являются лучшей практикой.

Иногда вам нужно выполнить UPSERT операцию (обновление или вставку) данных одним запросом. Возможно вы хотите «исправить» некоторые записи, или внести какие-то обновления в них (например обновить статусы транзакций), либо хотите сохранить чистую таблицу без дубликатов для своего коллеги-аналитика или специалиста по данным.

UPDATE — оператор языка SQL, позволяющий обновить значения в заданных столбцах таблицы.

С недавних пор в BigQuery есть оператор MERGE, который мы можем использовать для реализации логики UPSERT. Как обсуждалось, например, в этой ветке StackOverflow .

MERGE `project.merge_example.table_data` T
USING `project.merge_example.table_changes` S
ON T.id = S.id
WHEN MATCHED THEN
  UPDATE SET value = s.value
WHEN NOT MATCHED THEN
  INSERT (id, value) VALUES(id, value)

Данный запрос обновит значения поля value, в записях таблицы project.merge_example.table_data, которые будут найдены в таблице project.merge_example.table_changes по полю id. Записи, id которых есть в таблице project.merge_example.table_changes, но нет в project.merge_example.table_data, будут добавлены в таблицу project.merge_example.table_data.

Проблема тут в следующем, хорошо, когда необходимо обновить значение одного или нескольких полей, в нашем примере поля value, но, если вам надо обновить значения в сотне столбцов, то вам придётся перечислить каждое это поле в блоке UPDATE SET.

К счастью, мы можем превратить приведенный выше пример в динамический запрос UPSERT:

DECLARE fields STRING;
DECLARE updates STRING;
EXECUTE IMMEDIATE (
     "SELECT STRING_AGG(column_name) FROM `project.merge_example`.INFORMATION_SCHEMA.COLUMNS WHERE table_name = 'table_data'"
  ) INTO fields;
EXECUTE IMMEDIATE (
    """WITH t AS (SELECT column_name FROM `project.merge_example`.INFORMATION_SCHEMA.COLUMNS WHERE table_name = 'table_data')
       SELECT STRING_AGG("t."||column_name ||" = "|| "s."||column_name) from t join t as s using(column_name)"""
  ) INTO updates;
EXECUTE IMMEDIATE """
  MERGE `project.merge_example.table_data` T
  USING `project.merge_example.table_changes` S
    ON T.id = S.id
  WHEN MATCHED THEN 
    UPDATE SET """||updates||"""
  WHEN NOT MATCHED THEN
    INSERT ("""||fields||""") VALUES ("""||fields||""")"""

Давайте разберём этот запрос:

DECLARE создает переменные, которые мы позже заполняем значениями.

EXECUTE IMMEDIATE позволяет нам динамически создавать SQL запрос и записывать результат в объявленную переменную.

Чтобы получить разделенную запятыми строку полей в нашей таблице, мы запрашиваем метаданные набора данных:

SELECT STRING_AGG(column_name) FROM `project.merge_example`.INFORMATION_SCHEMA.COLUMNS WHERE table_name = 'table_data'

Чтобы получить строку со списком полей, разделенных запятыми, для обновления, то есть SET t.value1 = s.value1, t.value2 = s.value2 мы снова запрашиваем метаданные и добавляем некоторые конкатенации в функцию STRING_AGG:SELECT STRING_AGG(“t.”||column_name ||” = “|| “s.”||column_name) from t join t as s using(column_name).

Наконец, мы создаем оператор UPSERT, динамически заполняющий столбцы для обновления и значения для вставки из переменных, объявленных выше:

EXECUTE IMMEDIATE """
  MERGE `project.merge_example.table_data` T
  USING `project.merge_example.table_changes` S
    ON T.id = S.id
  WHEN MATCHED THEN 
    UPDATE SET """||updates||"""
  WHEN NOT MATCHED THEN
    INSERT ("""||fields||""") VALUES ("""||fields||""")"""

Чтобы сделать этот пример еще более универсальным, мы можем завернуть описанный выше динамический запрос в ХРАНИМУЮ ПРОЦЕДУРУ, которая будет принимать имена таблиц в качестве аргументов и выполнять наш код UPSERT.


Код процедуры будет выглядеть примерно так:

CREATE OR REPLACE PROCEDURE `project.merge_example.upsert`(table_data STRING, table_changes STRING, project_dataset STRING)
BEGIN
DECLARE fields STRING;
DECLARE updates STRING;EXECUTE IMMEDIATE (
     "SELECT STRING_AGG(column_name) FROM `"||project_dataset||"`.INFORMATION_SCHEMA.COLUMNS WHERE table_name = '"||table_data||"'"
  ) INTO fields;EXECUTE IMMEDIATE (
    """WITH t AS (SELECT column_name FROM `"""||project_dataset||"""`.INFORMATION_SCHEMA.COLUMNS WHERE table_name = '"""||table_data||"""')
       SELECT STRING_AGG("t."||column_name ||" = "|| "s."||column_name) from t join t as s using(column_name)"""
  ) INTO updates;EXECUTE IMMEDIATE """
  MERGE `"""||project_dataset||"""."""||table_data||"""` T
  USING `"""||project_dataset||"""."""||table_changes||"""` S
    ON T.id = S.id
  WHEN MATCHED THEN 
    UPDATE SET """||updates||"""
  WHEN NOT MATCHED THEN
    INSERT ("""||fields||""") VALUES ("""||fields||""")""";
END;

Данная статья является свободным переводом статьи «BigQuery dynamic UPSERT with EXECUTE IMMEDIATE».

Реклама

Добавить комментарий

Заполните поля или щелкните по значку, чтобы оставить свой комментарий:

Логотип WordPress.com

Для комментария используется ваша учётная запись WordPress.com. Выход /  Изменить )

Фотография Twitter

Для комментария используется ваша учётная запись Twitter. Выход /  Изменить )

Фотография Facebook

Для комментария используется ваша учётная запись Facebook. Выход /  Изменить )

Connecting to %s

This site uses Akismet to reduce spam. Learn how your comment data is processed.

Блог на WordPress.com. Тема: Baskerville 2, автор: Anders Noren.

Вверх ↑

%d такие блоггеры, как: