Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Unable to Receive Events from Salesforce Pub/Sub API using Dart and gRPC #38

Open
josematute opened this issue Jun 8, 2023 · 0 comments

Comments

@josematute
Copy link

I'm trying to subscribe and listen to events from the Salesforce Pub/Sub API using Dart and gRPC. I've been able to successfully implement this in Python following the quick start guide, but I'm experiencing difficulties in Dart. Despite following a similar pattern, I'm not receiving any events in Dart while the Python script receives them in real time.

Here is the Dart code:

import 'package:grpc/grpc.dart';
    import 'package:pubsub_testing/src/generated/salesforceProtoFile.pbgrpc.dart' as pb_grpc;
    import "package:pubsub_testing/src/generated/salesforceProtoFile.pb.dart" as pb2;
    
    // Session ID, instance URL, and tenant ID are provided.
    
    final authMetadata = CallOptions(metadata: {
      'accesstoken': sessionId,
      'instanceurl': instanceUrl,
      'tenantid': tenantId,
    });

    final channel = ClientChannel(
      'api.pubsub.salesforce.com',
      port: 7443,
    );
    
    final stub = pb_grpc.PubSubClient(channel);
    
    Stream<pb2.FetchRequest> fetchReqStream(String topic) async* {
      while (true) {
        yield pb2.FetchRequest(
          topicName: topic,
          replayPreset: pb2.ReplayPreset.LATEST,
          numRequested: 100,
        );
      }
    }
    
    Future<void> subscribe(String mySubTopic) async {
      print('Subscribing to $mySubTopic');
      try {
        final substream =
            stub.subscribe(fetchReqStream(mySubTopic), options: authMetadata);
        print("substream: $substream");
    
        await for (var event in substream) {
          print("Got an event!\n");
          if (event.events.isNotEmpty) {
            print("Number of events received: ${event.events.length}");
            var payloadbytes = event.events[0].event.payload;
            var schemaid = event.events[0].event.schemaId;
    
            var schema = await stub.getSchema(pb2.SchemaRequest(schemaId: schemaid),
                options: authMetadata);
    
            print("Got an event!\n");
          } else {
            print("[${DateTime.now()}] The subscription is active.");
          }
        }
      } catch (e) {
        print('An error occurred during subscription: $e');
      }
    }

The code runs without throwing an error, and it seems to successfully subscribe because the terminal does not close, indicating that it's waiting for events. However, no events are being printed to the console, even though I know they are being published because my Python script is receiving them.

I've checked the permissions, and the session ID I'm using has the necessary permissions to subscribe to the topic.

Does anyone have experience with this type of situation or could suggest possible solutions? I would appreciate any insights or assistance.

The following is the Python implementation that works(Of course I removed sensible info). Also, I commented out some lines such that I made the code as minimal as possible so it would be easier to replicate in Dart. This file works and receives events in real time:

from __future__ import print_function
# import grpc
import requests
import threading
import io
import pubsub_api_pb2 as pb2
import pubsub_api_pb2_grpc as pb2_grpc
# import avro.schema
# import avro.io
import time
import certifi
import json

semaphore = threading.Semaphore(1)

latest_replay_id = None

# with open(certifi.where(), 'rb') as f:
#     creds = grpc.ssl_channel_credentials(f.read())

with grpc.secure_channel('api.pubsub.salesforce.com:7443', grpc.ssl_channel_credentials(None)) as channel:



    # Store Auth 
    sessionid = ''
    instanceurl = ''
    tenantid = ''
    authmetadata = (('accesstoken', sessionid),
    ('instanceurl', instanceurl),
    ('tenantid', tenantid))
    
    # Generate Stub
    stub = pb2_grpc.PubSubStub(channel)

    # Subscribe to event channel
    def fetchReqStream(topic):
        while True:
            semaphore.acquire()
            yield pb2.FetchRequest(
                topic_name = topic,
                replay_preset = pb2.ReplayPreset.LATEST,
                num_requested = 1)
    
    # Decode event message payload
    def decode(schema, payload):
        schema = avro.schema.parse(schema)
        buf = io.BytesIO(payload)
        decoder = avro.io.BinaryDecoder(buf)
        reader = avro.io.DatumReader(schema)
        ret = reader.read(decoder)
        return ret

    # Make the subscribe call
    mysubtopic = "/event/RS_L__ConversationEvent__e"
    print('Subscribing to ' + mysubtopic)
    substream = stub.Subscribe(fetchReqStream(mysubtopic),
            metadata=authmetadata   )
    for event in substream:
        if event.events:
            semaphore.release()
            print("Number of events received: ", len(event.events))
            payloadbytes = event.events[0].event.payload
            schemaid = event.events[0].event.schema_id
            schema = stub.GetSchema(
                    pb2.SchemaRequest(schema_id=schemaid),
                    metadata=authmetadata).schema_json
            decoded = decode(schema, payloadbytes)
            print("Got an event!", json.dumps(decoded), "\n")
            # print(f"payloadbytes: {payloadbytes}")
            # print(f"schemaid: {schemaid}")
        else:
            print("[", time.strftime('%b %d, %Y %l:%M%p %Z'),
            "] The subscription is active.")
        # latest_replay_id = event.latest_replay_id
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant