October 18, 2021
Hình 1. Sơ đồ thực hiện

Cloud Pub/Sub – Sử dụng trong Python như thế nào?

Ở bài demo lần trước Cloud Pub/sub Demo, mình cũng đã có trình bày cách hoạt động của Cloud PubSub và sử dụng nó từ Web UI và Command Line. Nhưng khi xây dựng ứng dụng cụ thể thì Command Line không phải là sự lựa chọn hiệu quả. Nay chúng ta sẽ tìm hiểu xem cách sử dụng Cloud PubSub trong ngôn ngữ lập trình cụ thể để dễ dàng tích hợp vào ứng dụng của mình. Để hỗ trợ người dùng làm việc với các service của GCP, Google cũng đưa ra một số API cụ thể như:

  • Service API : những service được public ra dưới dạng REST API và RPC API.
  • API Client : những api được đóng gói thành những gói thư viện (libraries) cho từng ngôn ngữ lập trình cụ thể: java,C#, python, php, ruby,..

Trong phạm vi bài viết này, mình xử dụng API Client với ngôn ngữ lập trình là python – version 3.x. . Nếu bạn không quen xài python hoặc muốn thử một ngôn ngữ khác thì có thể xem ở đây.

Bài này cũng dựa trên ý tưởng là từ application – muốn gửi message lên Cloud Pubsub giống giống như cloud pubsub demo nhưng mình sử dụng python để xử lý việc push và pull message.

Hình 1: Sơ đồ thực hiện.

Theo như Hình 1, thì chúng ta sẽ thực hiện khá nhiều công việc, nhưng nhìn kỹ lại thì mình chỉ thấy 2 cái chính đó là push message và pull message. Vậy mình sẽ có 2 câu hỏi cần giải quyết:

  • Làm thế nào để push message lên pubsub bằng python code?
  • Làm thế nào để pull message từ pubsub về bằng python code?

Bài viết này đa số là coding và trình tự thực hiện như sau:

  • Tạo service account và cấp role có quyền edit cloud pubsub. (hướng dẫn ở đây).
  • Authentication & Authorization. ( Hướng dẫn ở đây).
  • Đọc dữ liệu ( file .csv) và Push từng dòng lên PubSub. Dữ liệu này là dữ liệu bất kỳ miễn sao nội dung của file có định dạng là .csv là ok. ( Do code của mình đọc từ file csv nhé ).
  • Pull dữ liệu về và gửi lại ACK cho PubSub.
  • Kiểm tra kết quả thực hiện.

Cài đặt thư viện

pip install google-auth google-auth-httplib2 google-api-python-client
pip install --upgrade google-cloud-pubsub

Thiết lập một số thông tin cần thiết.

import os
import datetime
import sys
import time
import json
import  math
import random as rd
import pandas as pd
from google.oauth2 import service_account
from google.cloud import pubsub_v1
from google.cloud import bigquery

ROOT = 'D:machine_learning'
ROOT_CERT = os.path.join(ROOT, 'certificates')
ROOT_DATA = os.path.join(ROOT, 'datas')

project_id = "seminar-demo"
topic_name = "demo"
#//D:machine_learnincertificatesseminar-demo-c0b16d489728.json
credentials = service_account.Credentials.from_service_account_file(
    os.path.join(ROOT_CERT, 'seminar-demo-c0b16d489728.json'))

scoped_credentials = credentials.with_scopes(
    ['https://www.googleapis.com/auth/pubsub',
     'https://www.googleapis.com/auth/cloud-platform'])

Tạo Sender

class Sender:
    def __init__(self, data):
        self.__data = data
        self.__publisher = pubsub_v1.PublisherClient(credentials=credentials)
        # The `topic_path` method creates a fully qualified identifier
        # in the form `projects/{project_id}/topics/{topic_name}`
        self.__topic_path = self.__publisher.topic_path(project_id, topic_name)
    def __publish(self, message):
        future = self.__publisher.publish(self.__topic_path, data=message)
        return future

    def run(self):
        for idx, item in self.__data.iterrows():
            if idx > 0 and idx % 100 == 0:
                time.sleep(30)
            data = json.dumps(item.to_dict())
            data = data.encode('utf-8')
            # When you publish a message, the client returns a future.
            future = self.__publish(data)
            print('Published {} of message ID {}.'.format(data, future.result()))

Tạo Receiver

class Receiver:
    def __init__(self):
        self.__subscriber = pubsub_v1.SubscriberClient(credentials=credentials)
        self.__subscription_name = 'sub_demo'
        # The `subscription_path` method creates a fully qualified identifier
        # in the form `projects/{project_id}/subscriptions/{subscription_name}`
        self.__subscription_path = self.__subscriber.subscription_path(
            project_id, self.__subscription_name)

    def __callback(self, message):
        print('Received message: {}'.format(message))
        message.ack()
    def run(self):
        self.__subscriber.subscribe(self.__subscription_path, callback=self.__callback)
        # The subscriber is non-blocking. We must keep the main thread from
        # exiting to allow it to process messages asynchronously in the background.
        print('Listening for messages on {}'.format(self.__subscription_path))
        while True:
            time.sleep(60)

Trong đó :

self.__publisher = pubsub_v1.PublisherClient(credentials=credentials)

self.__subscriber = pubsub_v1.SubscriberClient(credentials=credentials)

=> chính là bước Authentication & Authorization với google với credentials là file .json chứa thông tin service account .
Khi sử dụng Client API của Google đều bắt buộc phải Authentication & Authorization. Do đó, sau khi thực hiện được Authentication & Authorization của ví dụ Cloud Pubsub này thì các bạn vẫn sử dụng
cho các dịch vụ khác của Google. Các bạn cũng có thể xem qua các cách để Authentication & Authorization ở đây để lựa chọn cho mình cách phù hợp, Goole khuyến khích sử dụng OAuth 2.0 nhưng quyền quyết định là của bạn.

Sender Main Class

def main():
    # read file csv
    # example : D:machine_learningdatasdata.csv
    data = pd.read_csv(os.path.join(ROOT_DATA, 'data.csv'), encoding='unicode_escape')
    sender = Sender(data)
    sender.run()

Receiver Main Class

def main():
    receiver = Receiver()
    receiver.run()

Kết quả thực hiện:

Google Pub/Sub Kết quả Push message.
Google Pub/Sub Kết quả Push message.
Google Pub/Sub Kết quả Push message.

Do Cloud PubSub là bất đồng bộ, do đó bạn để chạy sender để push message hoặc receiver để pull message trước hay sau đều được. Nếu chưa có message để pull thì nó sẽ chờ cho đến khi có message gửi tới thì nó sẽ pull về.

Hy vọng bài viết này giúp chúng ta có cái nhìn mới về Cloud PubSub cũng như cách ứng dụng của nó vào trong các ứng dụng gửi nhận dữ liệu bất đồng bộ và realtime.

Khi thực hành có chỗ nào chưa hiểu, cần support, các bạn hãy liên hệ với các chuyên gia – Cloud Ace Việt Nam – để được hỗ trợ tốt hơn.

Bao Vuong

Vương hiện là Cloud Engineer của Cloud Ace Vietnam.+6 năm kinh nghiệm phát triển backend +2 năm kinh nghiệm Data Analytics +1 năm kinh nghiệm Machine Learning

View all posts by Bao Vuong →

2 thoughts on “Cloud Pub/Sub – Sử dụng trong Python như thế nào?

  1. Chào bạn !
    Root_Cert : đường dẫn chưa file certificate, Trong demo này, mình sử dung service account và file certificate mình đặt tên nó là ‘seminar-demo-c0b16d489728.json’.

    ví dụ biến ROOT = ‘D:\machine_learning’
    và ROOT_CERT = os.path.join(ROOT, ‘certificates’) thì có nghĩa là ROOT_CERT =’D:\machine_learning\certificates\’
    // bạn có thể tìm hiều về cách hoạt động của os.path.join ở đây (https://docs.python.org/3/library/os.path.html)

    Root_Data : là thư mục chứa dữ liệu để ứng dụng đọc file .csv để push lên pubsub. Tường tự như trên: ROOT_DATA = os.path.join(ROOT, ‘datas’) # ROOT_DATA sẽ có giá trị à ‘D:\machine_learning\datas\’

    // để biết thêm thông tin về service account, vui lòng xem ở đường dẫn này :
    //https://cloud.google.com/iam/docs/creating-managing-service-account-keys

    Thân chào bạn.!

Comments are closed.