Skip to content Skip to sidebar Skip to footer

How To Set Up A Ssh Tunnel In Google Cloud Dataflow To An External Database Server?

I am facing a problem to make my Apache Beam pipeline work on Cloud Dataflow, with DataflowRunner. The first step of the pipeline is to connect to an external Postgresql server hos

Solution 1:

Problem solved ! I can't believe I've spent two full days on this... I was looking completely in the wrong direction.

The issue was not with some Dataflow or GCP networking configuration, and as far as I can tell...

You have full control to make any type of connection that you choose, so long as the firewall rules you set up within your project/network allow it

is true.

The problem was of course in my code : only the problem was revealed only in a distributed environment. I had make the mistake of opening the tunnel from the main pipeline processor, instead of the workers. So the SSH tunnel was up but not between the workers and the target server, only between the main pipeline and the target!

To fix this, I had to change my requesting DoFn to wrap the query execution with the tunnel :

classTunnelledSQLSourceDoFn(sql.SQLSourceDoFn):
"""Wraps SQLSourceDoFn in a ssh tunnel"""def__init__(self, *args, **kwargs):
    self.dbport = kwargs["port"]
    self.dbhost = kwargs["host"]
    self.args = args
    self.kwargs = kwargs
    super().__init__(*args, **kwargs)

defprocess(self, query, *args, **kwargs):
    # Remote side of the SSH Tunnel
    remote_address = (self.dbhost, self.dbport)
    ssh_tunnel = (self.kwargs['ssh_host'], self.kwargs['ssh_port'])
    with open_tunnel(
        ssh_tunnel,
        ssh_username=self.kwargs["ssh_user"],
        ssh_password=self.kwargs["ssh_password"],
        remote_bind_address=remote_address,
        set_keepalive=10.0
    ) as tunnel:
        forwarded_port = tunnel.local_bind_port
        self.kwargs["port"] = forwarded_port
        source = sql.SQLSource(*self.args, **self.kwargs)
        sql.SQLSouceInput._build_value(source, source.runtime_params)
        logging.info("Processing - {}".format(query))
        for records, schema in source.client.read(query):
            for row in records:
                yield source.client.row_as_dict(row, schema)

as you can see, I had to override some bits of pysql_beam library.

Finally, each worker open its own tunnel for each request. It's probably possible to optimize this behavior but it's enough for my needs.

Post a Comment for "How To Set Up A Ssh Tunnel In Google Cloud Dataflow To An External Database Server?"