Notifikasi Airflow Melalui Workplace

Adam WB
Grow at Warung Pintar
4 min readNov 25, 2019

--

Dalam cerita kali ini saya membagikan sebuah kisah unik sewaktu saya berhadapan dengan airflow. Ya kita sebagai tim Data Engineer di Warung Pintar menggunakan airflow melalui google cloud composer sebagai sarana untuk melakukan scheduling dan monitoring workflow khususnya terkait kebutuhan ETL. Untuk dokumentasi airflow lebih lengkap ada di sini .

Setelah mengetahui lebih tentang airflow, maka tibalah saatnya kisah unik itu terjadi. Airflow memiliki sebuah fungsi untuk dapat memberikan notifikasi ketika sebuah job pada DAG (Directed Acyclic Graph) error ataupun sukses. Notifikasi itu dikirim melalui email. Sehingga pada setting args secara umum untuk DAG adalah sebagai berikut :

default_args = {
'owner': 'Airflow',
'depends_on_past': False,
'start_date': datetime(2015, 6, 1),
'email': ['airflow@example.com'],
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5),
# 'queue': 'bash_queue',
# 'pool': 'backfill',
# 'priority_weight': 10,
# 'end_date': datetime(2016, 1, 1),
}

Namun yang terjadi ketika saya mencoba untuk melakukan set up email, banyak kondisi yang membuat email kita tidak dapat terbaca oleh airflow dan pesan error tidak dapat dikirim. Hal ini membuat kita harus selalu memantau airflow dashboard untuk melihat jika ada pesan error disana. Oleh karena email tidak dapat digunakan, kita memanfaatkan sebuah platform yang sudah kita gunakan untuk komunikasi secara internal di Warung Pintar, workplace. Sebenarnya ada beberapa cara untuk mengirimkan pesan error airflow, salah satunya menggunakan slack api operator ( yang sudah di sediakan oleh airflow ). Namun sekarang kita akan mencoba hal baru dimana kita menggunakan workplace untuk mengirim pesan error.

dokumentasi pribadi

Satu-satunya cara agar dapat mengirim pesan melalui workplace adalah menggunakan bantuan workchat bot yang merupakan salah satu fitur yang ada di workplace.

Sebelum menuju ke airflow, ada baiknya memahami dokumentasi workplace chatbot yang telah dibuat oleh tim engineer Warung Pintar disini. Berkat mereka lah akhirnya saya dapat mengirim pesan bot ini melalui workplace ( Engineering Life is Awesome hahaha )

Setelah chat selesai maka satu-satunya cara agar airflow dapat mengirimkan pesan adalah membuat operator sendiri. Dimana saya membuat workplace_operator. Untuk mempermudah, saya melihat dokumentasi SlackApiOperator

class WorkplaceAPIOperator(BaseOperator):
@apply_defaults
def __init__(self,
recipentId,
type_user,
token,
*args, **kwargs):
self.token = token
super(WorkplaceAPIOperator, self).__init__(*args, **kwargs)

if self.token is None:
raise AirflowException('No valid workplace token supplied.')

def execute(self,context):
ti=context.get('task_instance')
task=context.get('task_instance').task_id
dag=context.get('task_instance').dag_id
exec_date=context.get('execution_date')
log_url=context.get('task_instance').log_url

message = '''🔴 Oops Task failed ''' + str(ti) + '''\\n\\n'''
message += '''➡ *Task*: ''' + str(task) +'''\\n'''
message += '''➡ *DAG* : ''' + str(dag) + '''\\n'''
message += '''➡ *Execution Time*: ''' + str(exec_date) + '''\\n'''
message += '''➡ *Log Url*: ''' + str(log_url) + '''\\n'''

session_requests = requests.session()
WARBOT = "https://your api workchat bot"
session_requests.post(WARBOT, data=json.dumps({"message": message}))

def execute_success(self,context):
dag=context.get('task_instance').dag_id
exec_date=context.get('execution_date')

message = '''✅ Congratulation, your DAG succeeded \\n\\n'''
message += '''➡ *DAG* : ''' + str(dag) + '''\\n'''
message += '''➡ *Execution Time*: ''' + str(exec_date) + '''\\n\\n'''
message += '''Akhirnya bisa tidur dengan nyenyak 🛌 😴'''

session_requests = requests.session()
WARBOT = "WARBOT = "https://your api workchat bot"
session_requests.post(WARBOT, data=json.dumps({"message": message}))

def check_params(self):
return print(self.token + self.recipentId + self.type_user + self.text)

Pada workplace_operator tersebut saya memiliki 2 fungsi untuk mengirimkan jenis pesan dari airflow, yaitu ketika job DAG berhasil atau error. Dengan begitu kita akan tahu kondisi DAG kita.

Setelah selesai membuat operator, langkah berikutnya kita perlu setting beberapa parameter pada file DAG. Berikut contohnya :

from airflow import DAG
from airflow import models
from airflow.operators.bash_operator import BashOperator
from dependencies import workplace_operator
with DAG(
dag_id='workchat_notification_airflow',
concurrency=1,
on_success_callback=on_dag_success,
default_args=args,
schedule_interval='@once') as dag:

py_task = BashOperator(task_id='py_task',
on_failure_callback=on_dag_failure,
bash_command='python /data/dag_script/warbot_private_adam.py')
  • on_success_callback merupakan sebuah paramater airflow yang digunakan ketika sebuah task selesai, maka dia akan menjalakan fungsi on_dag_success. Perlu diperhatikan bahwa ketika ada on_success berarti ada on_failure hahaha. Disini lah kita menggunakan fungsi on_dag_failure. Untuk masing-masing operator yang digunakan tergantung kebutuhan. Disini saya menggunakan bashOperator untuk menjalankan file python langsung dari GCS.

Setelah semua siap, apakah sudah selesai ? tentu belum hahaha

kita perlu memanggil workplace_operator. Berikut fungsinya

def on_dag_failure(context):
print_task_instances(context)

def print_task_instances(context):
failed_alert = workplace_operator.WorkplaceAPIPostOperator(
task_id = 'failed_task',
recipentId= models.Variable.get('your workplace id'),
token= models.Variable.get('your token'),
type_user='user type on workplace'
)
return failed_alert.execute(context=context)
def on_dag_success(context):
print_task_success(context)

def print_task_success(context):
success_alert = workplace_operator.WorkplaceAPIPostOperator(
task_id = 'success_task',
recipentId= models.Variable.get('your workplace id'),
token= models.Variable.get('your token'),
type_user='user type on workplace'
)
return success_alert.execute_success(context=context)

Perlu diperhatikan disini bahwa workplace ID dan token saya simpan pada variable airflow. Hal ini mempermudah dalam melakukan perubahan pada target chat notifikasi tanpa harus merubah DAG file. Setelah selesai, maka kalian bisa mencobanya. Berikut hasilnya

Dokumentasi pribadi

Bagaimana ? sangat menarik dan sangat mempermudah kita dalam melakukan monitoring airflow

Selain itu, jika kita menggunakan email akan memakan banyak dokumen yang tersimpan. Mengingat sekarang kita sering menggunakan platform chat, maka notifikasi melalui chat ini sangatlah membantu.

  • Catatan
Ada beberapa kondisi disini bahwa airflow dengan versi 1.9 memiliki setting variable on_failure_callback dan on_success_callback berbeda dengan airflow 1.10. Pada airflow 1.10 kita dapat memasang parameter tersebut ketika kita mendefinisikan DAG. 

Itulah yang dapat saya bagikan pada cerita kali ini. Mungkin ada beberapa bagian yang dapat ditingkatkan namun hal yang perlu dicatat adalah kita memerlukan jalan lain untuk dapat mencapai tujuan tertentu.

Jika kalian ingin berdiskusi ataupun ada masalah terkait ini, kalian dapat menghubungi melalui email atau pun menulis di komentar.

--

--