This project simulates building an Airbnb data pipeline & datamart using dbt + Postgres, including:
- Staging, core, and mart layers
- Snapshots
- Incremental models (backfill support)
- Data retention & partition management (simulated with pg_partman)
- Generic tests, schema tests, and source freshness checks
.
├── README.md
├── analyses
├── dbt_project.yml
├── macros
│ ├── tests
│ │ └── no_future_dates.sql
│ └── utils
│ ├── avg_sentiment.sql
│ ├── generate_schema_name.sql
│ └── join_listing_to_host.sql
├── models
│ ├── core
│ │ ├── _core.yml
│ │ ├── dim_date.sql
│ │ ├── dim_host.sql
│ │ ├── dim_listing.sql
│ │ └── fact_review.sql
│ ├── mart
│ │ ├── _mart.yml
│ │ ├── adhoc_analysis.sql.BAK
│ │ ├── agg_daily_reviews.sql
│ │ ├── agg_host_performance.sql
│ │ ├── agg_reviews_by_listing.sql
│ │ └── agg_room_type_trends.sql
│ ├── sources.yml
│ └── src
│ ├── _src.yml
│ ├── stg_hosts.sql
│ ├── stg_listings.sql
│ └── stg_reviews.sql
├── package-lock.yml
├── packages.yml
├── seeds
│ └── date_spine.csv
├── snapshots
│ ├── host_snapshot.sql
│ └── listing_snapshot.sql
└── tests
Click to view DDL
CREATE TABLE airbnb_raw.hosts (
id int8 NOT NULL PRIMARY KEY,
"name" text NULL,
is_superhost bool NULL,
created_at timestamp NULL,
updated_at timestamp NULL
-- CONSTRAINT hosts_pkey PRIMARY KEY (id)
);
CREATE TABLE airbnb_raw.listings (
id int8 NOT NULL PRIMARY KEY,
listing_url text NULL,
"name" text NULL,
room_type varchar(100) NULL,
minimum_nights int4 NULL,
host_id int8 NULL REFERENCES hosts(id),
price numeric(10, 2) NULL,
created_at timestamp NULL,
updated_at timestamp NULL
-- CONSTRAINT listings_pkey PRIMARY KEY (id),
-- CONSTRAINT listings_host_id_fkey FOREIGN KEY (host_id) REFERENCES airbnb_raw.hosts(id)
);
CREATE TABLE airbnb_raw.reviews (
review_id bigserial NOT NULL PRIMARY KEY,
listing_id int8 NULL REFERENCES listings(id),
review_date date NULL,
reviewer_name text NULL,
"comments" text NULL,
sentiment varchar(20) NULL
-- CONSTRAINT reviews_pkey PRIMARY KEY (review_id),
-- CONSTRAINT reviews_listing_id_fkey FOREIGN KEY (listing_id) REFERENCES airbnb_raw.listings(id)
);
CREATE INDEX idx_hosts_created_updated ON airbnb_raw.hosts (created_at, updated_at);
CREATE INDEX idx_listings_created_updated ON airbnb_raw.listings (created_at, updated_at);
CREATE INDEX idx_reviews_date ON airbnb_raw.reviews (review_date);
ALTER TABLE airbnb_stg.stg_reviews RENAME TO stg_reviews_old;
CREATE TABLE airbnb_stg.stg_reviews (
review_id int8 NULL,
listing_id int8 NULL,
review_date date NULL,
reviewer_name text NULL,
"comments" text NULL,
sentiment varchar(20) NULL
) PARTITION BY RANGE (review_date);
CREATE OR REPLACE FUNCTION airbnb_raw.create_dynamic_partitions(
p_schema_name TEXT,
p_parent_table_name TEXT,
p_start_date DATE,
p_end_date DATE
)
RETURNS VOID AS $$
DECLARE
p_date DATE;
next_date DATE;
parent_fqn TEXT; -- Fully Qualified Name: "schema"."table"
partition_table_name TEXT; -- Just the base name: table_2017_01
partition_fqn TEXT; -- Fully Qualified Name for partition: "schema"."table_2017_01"
sql_command TEXT;
BEGIN
-- 1. Thiết lập Tên Đủ Điều kiện của Bảng Cha (vẫn dùng %I.%I)
parent_fqn := format('%I.%I', p_schema_name, p_parent_table_name);
-- 2. Đảm bảo ngày bắt đầu là ngày đầu tiên của tháng
p_date := date_trunc('month', p_start_date)::DATE;
WHILE p_date <= p_end_date LOOP
next_date := p_date + INTERVAL '1 month';
-- Tên bảng con (CHỈ TÊN BẢNG, KHÔNG CÓ SCHEMA VÀ DẤU NGOẶC KÉP)
-- Ví dụ: reviews_2017_01
partition_table_name := format('%s_%s', p_parent_table_name, to_char(p_date, 'YYYYMM'));
-- Tên đủ điều kiện của bảng con (Dùng %I.%I để trích dẫn toàn bộ tên)
-- Ví dụ: "airbnb_prod"."reviews_2017_01"
partition_fqn := format('%I.%I', p_schema_name, partition_table_name);
-- Cú pháp CREATE TABLE ... PARTITION OF ...
-- Sử dụng %s cho partition_fqn và parent_fqn vì chúng đã được định dạng hoàn chỉnh ở trên.
sql_command := format(
'CREATE TABLE IF NOT EXISTS %s PARTITION OF %s '
'FOR VALUES FROM (%L) TO (%L);',
partition_fqn,
parent_fqn,
p_date,
next_date
);
EXECUTE sql_command;
RAISE NOTICE 'Done created partition table: %', partition_fqn;
p_date := next_date;
END LOOP;
END;
$$ LANGUAGE plpgsql;
SELECT airbnb_stg.create_dynamic_partitions(
'airbnb_stg',
'stg_reviews',
'2017-01-01',
'2019-10-31'
);
INSERT INTO airbnb_stg.stg_reviews
SELECT * FROM airbnb_stg.stg_reviews_old;
-- 1. Liệt kê các partition tables trong schema 'airbnb_stg'
SELECT
n.nspname AS schemaname,
c.relname
FROM
pg_class c
JOIN
pg_namespace n ON n.oid = c.relnamespace
WHERE
n.nspname = 'airbnb_stg'
AND relname LIKE 'stg_reviews\_20%' -- Tìm kiếm tên bảng con
ORDER BY
relname;dbt debug
dbt depsdbt source snapshot-freshnessdbt test --select source:airbnb# 1. Full load host info (for snapshots)
dbt run --select stg_hosts
# 2. Load all remaining staging models (with backfill)
dbt run --select tag:staging --vars '{ "backfill_start_date": "2018-01-01" }' --exclude stg_hosts
# 3. Snapshot host information
dbt snapshot
# 4. Load core + mart models (incremental + backfill)
dbt run --vars '{ "backfill_start_date": "2018-01-01" }' --exclude stg_hosts
Access directly here: Live dbt Lineage Documentation
Another Option: Integrate running dbt docs generate into Cloud CI/CD tools (Cloud Build on GCP), then push artifacts (docs files) directly to the GCS bucket.
- Fact & staging layers: 2 years of data
- Mart layer: 4 years of data
- In practice, pg_partman can be used to manage partitions & retention:
- Create partitions by day/month
- Auto-drop old partitions
- Auto-create new partitions
- Learn more about dbt in the docs
- Check out Discourse for commonly asked questions and answers
- Join the chat on Slack for live discussions and support
- Find dbt events near you
- Check out the blog for the latest news on dbt's development and best practices