Google BigQuery: Динамический UPSERT с помощью EXECUTE IMMEDIATE

Обычный паттерн обновления данных в BigQuery — добавление новых записей, даже если это означает дублирование данных. А затем обрабатывать дубликаты при чтении с помощью группировки или оконных функций, или удаление записей, и последующая повторная запись одних и тех же данных, но уже с новыми значениями. В любом случае все эти варианты не являются лучшей практикой.

Иногда вам нужно выполнить UPSERT операцию (обновление или вставку) данных одним запросом. Возможно вы хотите «исправить» некоторые записи, или внести какие-то обновления в них (например обновить статусы транзакций), либо хотите сохранить чистую таблицу без дубликатов для своего коллеги-аналитика или специалиста по данным.

UPDATE — оператор языка SQL, позволяющий обновить значения в заданных столбцах таблицы.

С недавних пор в BigQuery есть оператор MERGE, который мы можем использовать для реализации логики UPSERT. Как обсуждалось, например, в этой ветке StackOverflow .

MERGE `project.merge_example.table_data` T
USING `project.merge_example.table_changes` S
ON T.id = S.id
WHEN MATCHED THEN
  UPDATE SET value = s.value
WHEN NOT MATCHED THEN
  INSERT (id, value) VALUES(id, value)

Данный запрос обновит значения поля value, в записях таблицы project.merge_example.table_data, которые будут найдены в таблице project.merge_example.table_changes по полю id. Записи, id которых есть в таблице project.merge_example.table_changes, но нет в project.merge_example.table_data, будут добавлены в таблицу project.merge_example.table_data.

Проблема тут в следующем, хорошо, когда необходимо обновить значение одного или нескольких полей, в нашем примере поля value, но, если вам надо обновить значения в сотне столбцов, то вам придётся перечислить каждое это поле в блоке UPDATE SET.

К счастью, мы можем превратить приведенный выше пример в динамический запрос UPSERT:

DECLARE fields STRING;
DECLARE updates STRING;
EXECUTE IMMEDIATE (
     "SELECT STRING_AGG(column_name) FROM `project.merge_example`.INFORMATION_SCHEMA.COLUMNS WHERE table_name = 'table_data'"
  ) INTO fields;
EXECUTE IMMEDIATE (
    """WITH t AS (SELECT column_name FROM `project.merge_example`.INFORMATION_SCHEMA.COLUMNS WHERE table_name = 'table_data')
       SELECT STRING_AGG("t."||column_name ||" = "|| "s."||column_name) from t join t as s using(column_name)"""
  ) INTO updates;
EXECUTE IMMEDIATE """
  MERGE `project.merge_example.table_data` T
  USING `project.merge_example.table_changes` S
    ON T.id = S.id
  WHEN MATCHED THEN 
    UPDATE SET """||updates||"""
  WHEN NOT MATCHED THEN
    INSERT ("""||fields||""") VALUES ("""||fields||""")"""

Давайте разберём этот запрос:

DECLARE создает переменные, которые мы позже заполняем значениями.

EXECUTE IMMEDIATE позволяет нам динамически создавать SQL запрос и записывать результат в объявленную переменную.

Чтобы получить разделенную запятыми строку полей в нашей таблице, мы запрашиваем метаданные набора данных:

SELECT STRING_AGG(column_name) FROM `project.merge_example`.INFORMATION_SCHEMA.COLUMNS WHERE table_name = 'table_data'

Чтобы получить строку со списком полей, разделенных запятыми, для обновления, то есть SET t.value1 = s.value1, t.value2 = s.value2 мы снова запрашиваем метаданные и добавляем некоторые конкатенации в функцию STRING_AGG:SELECT STRING_AGG(“t.”||column_name ||” = “|| “s.”||column_name) from t join t as s using(column_name).

Наконец, мы создаем оператор UPSERT, динамически заполняющий столбцы для обновления и значения для вставки из переменных, объявленных выше:

EXECUTE IMMEDIATE """
  MERGE `project.merge_example.table_data` T
  USING `project.merge_example.table_changes` S
    ON T.id = S.id
  WHEN MATCHED THEN 
    UPDATE SET """||updates||"""
  WHEN NOT MATCHED THEN
    INSERT ("""||fields||""") VALUES ("""||fields||""")"""

Чтобы сделать этот пример еще более универсальным, мы можем завернуть описанный выше динамический запрос в ХРАНИМУЮ ПРОЦЕДУРУ, которая будет принимать имена таблиц в качестве аргументов и выполнять наш код UPSERT.


Код процедуры будет выглядеть примерно так:

CREATE OR REPLACE PROCEDURE `project.merge_example.upsert`(table_data STRING, table_changes STRING, project_dataset STRING)
BEGIN
DECLARE fields STRING;
DECLARE updates STRING;EXECUTE IMMEDIATE (
     "SELECT STRING_AGG(column_name) FROM `"||project_dataset||"`.INFORMATION_SCHEMA.COLUMNS WHERE table_name = '"||table_data||"'"
  ) INTO fields;EXECUTE IMMEDIATE (
    """WITH t AS (SELECT column_name FROM `"""||project_dataset||"""`.INFORMATION_SCHEMA.COLUMNS WHERE table_name = '"""||table_data||"""')
       SELECT STRING_AGG("t."||column_name ||" = "|| "s."||column_name) from t join t as s using(column_name)"""
  ) INTO updates;EXECUTE IMMEDIATE """
  MERGE `"""||project_dataset||"""."""||table_data||"""` T
  USING `"""||project_dataset||"""."""||table_changes||"""` S
    ON T.id = S.id
  WHEN MATCHED THEN 
    UPDATE SET """||updates||"""
  WHEN NOT MATCHED THEN
    INSERT ("""||fields||""") VALUES ("""||fields||""")""";
END;

Данная статья является свободным переводом статьи «BigQuery dynamic UPSERT with EXECUTE IMMEDIATE».

Оставьте комментарий

Этот сайт использует Akismet для борьбы со спамом. Узнайте, как обрабатываются ваши данные комментариев.

Создайте бесплатный сайт или блог на WordPress.com.

Вверх ↑